• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.34
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/btcsuite/btcd/btcutil"
8
        "github.com/lightningnetwork/lnd/graph/db/models"
9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// GraphCacheNode is an interface for all the information the cache needs to know
15
// about a lightning node.
16
type GraphCacheNode interface {
17
        // PubKey is the node's public identity key.
18
        PubKey() route.Vertex
19

20
        // Features returns the node's p2p features.
21
        Features() *lnwire.FeatureVector
22

23
        // ForEachChannel iterates through all channels of a given node,
24
        // executing the passed callback with an edge info structure and the
25
        // policies of each end of the channel. The first edge policy is the
26
        // outgoing edge *to* the connecting node, while the second is the
27
        // incoming edge *from* the connecting node. If the callback returns an
28
        // error, then the iteration is halted with the error propagated back up
29
        // to the caller.
30
        ForEachChannel(kvdb.RTx,
31
                func(kvdb.RTx, *models.ChannelEdgeInfo,
32
                        *models.ChannelEdgePolicy,
33
                        *models.ChannelEdgePolicy) error) error
34
}
35

36
// DirectedChannel is a type that stores the channel information as seen from
37
// one side of the channel.
38
type DirectedChannel struct {
39
        // ChannelID is the unique identifier of this channel.
40
        ChannelID uint64
41

42
        // IsNode1 indicates if this is the node with the smaller public key.
43
        IsNode1 bool
44

45
        // OtherNode is the public key of the node on the other end of this
46
        // channel.
47
        OtherNode route.Vertex
48

49
        // Capacity is the announced capacity of this channel in satoshis.
50
        Capacity btcutil.Amount
51

52
        // OutPolicySet is a boolean that indicates whether the node has an
53
        // outgoing policy set. For pathfinding only the existence of the policy
54
        // is important to know, not the actual content.
55
        OutPolicySet bool
56

57
        // InPolicy is the incoming policy *from* the other node to this node.
58
        // In path finding, we're walking backward from the destination to the
59
        // source, so we're always interested in the edge that arrives to us
60
        // from the other node.
61
        InPolicy *models.CachedEdgePolicy
62

63
        // Inbound fees of this node.
64
        InboundFee lnwire.Fee
65
}
66

67
// DeepCopy creates a deep copy of the channel, including the incoming policy.
68
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
1,336✔
69
        channelCopy := *c
1,336✔
70

1,336✔
71
        if channelCopy.InPolicy != nil {
2,646✔
72
                inPolicyCopy := *channelCopy.InPolicy
1,310✔
73
                channelCopy.InPolicy = &inPolicyCopy
1,310✔
74

1,310✔
75
                // The fields for the ToNode can be overwritten by the path
1,310✔
76
                // finding algorithm, which is why we need a deep copy in the
1,310✔
77
                // first place. So we always start out with nil values, just to
1,310✔
78
                // be sure they don't contain any old data.
1,310✔
79
                channelCopy.InPolicy.ToNodePubKey = nil
1,310✔
80
                channelCopy.InPolicy.ToNodeFeatures = nil
1,310✔
81
        }
1,310✔
82

83
        return &channelCopy
1,336✔
84
}
85

86
// GraphCache is a type that holds a minimal set of information of the public
87
// channel graph that can be used for pathfinding.
88
type GraphCache struct {
89
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
90
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
91

92
        mtx sync.RWMutex
93
}
94

95
// NewGraphCache creates a new graphCache.
96
func NewGraphCache(preAllocNumNodes int) *GraphCache {
142✔
97
        return &GraphCache{
142✔
98
                nodeChannels: make(
142✔
99
                        map[route.Vertex]map[uint64]*DirectedChannel,
142✔
100
                        // A channel connects two nodes, so we can look it up
142✔
101
                        // from both sides, meaning we get double the number of
142✔
102
                        // entries.
142✔
103
                        preAllocNumNodes*2,
142✔
104
                ),
142✔
105
                nodeFeatures: make(
142✔
106
                        map[route.Vertex]*lnwire.FeatureVector,
142✔
107
                        preAllocNumNodes,
142✔
108
                ),
142✔
109
        }
142✔
110
}
142✔
111

112
// Stats returns statistics about the current cache size.
113
func (c *GraphCache) Stats() string {
372✔
114
        c.mtx.RLock()
372✔
115
        defer c.mtx.RUnlock()
372✔
116

372✔
117
        numChannels := 0
372✔
118
        for node := range c.nodeChannels {
757✔
119
                numChannels += len(c.nodeChannels[node])
385✔
120
        }
385✔
121
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
372✔
122
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
372✔
123
                numChannels)
372✔
124
}
125

126
// AddNodeFeatures adds a graph node and its features to the cache.
127
func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) {
714✔
128
        nodePubKey := node.PubKey()
714✔
129

714✔
130
        // Only hold the lock for a short time. The `ForEachChannel()` below is
714✔
131
        // possibly slow as it has to go to the backend, so we can unlock
714✔
132
        // between the calls. And the AddChannel() method will acquire its own
714✔
133
        // lock anyway.
714✔
134
        c.mtx.Lock()
714✔
135
        c.nodeFeatures[nodePubKey] = node.Features()
714✔
136
        c.mtx.Unlock()
714✔
137
}
714✔
138

139
// AddNode adds a graph node, including all the (directed) channels of that
140
// node.
141
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
614✔
142
        c.AddNodeFeatures(node)
614✔
143

614✔
144
        return node.ForEachChannel(
614✔
145
                tx, func(tx kvdb.RTx, info *models.ChannelEdgeInfo,
614✔
146
                        outPolicy *models.ChannelEdgePolicy,
614✔
147
                        inPolicy *models.ChannelEdgePolicy) error {
625✔
148

11✔
149
                        c.AddChannel(info, outPolicy, inPolicy)
11✔
150

11✔
151
                        return nil
11✔
152
                },
11✔
153
        )
154
}
155

156
// AddChannel adds a non-directed channel, meaning that the order of policy 1
157
// and policy 2 does not matter, the directionality is extracted from the info
158
// and policy flags automatically. The policy will be set as the outgoing policy
159
// on one node and the incoming policy on the peer's side.
160
func (c *GraphCache) AddChannel(info *models.ChannelEdgeInfo,
161
        policy1 *models.ChannelEdgePolicy, policy2 *models.ChannelEdgePolicy) {
1,700✔
162

1,700✔
163
        if info == nil {
1,700✔
164
                return
×
165
        }
×
166

167
        if policy1 != nil && policy1.IsDisabled() &&
1,700✔
168
                policy2 != nil && policy2.IsDisabled() {
1,700✔
UNCOV
169

×
UNCOV
170
                return
×
UNCOV
171
        }
×
172

173
        // Create the edge entry for both nodes.
174
        c.mtx.Lock()
1,700✔
175
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
1,700✔
176
                ChannelID: info.ChannelID,
1,700✔
177
                IsNode1:   true,
1,700✔
178
                OtherNode: info.NodeKey2Bytes,
1,700✔
179
                Capacity:  info.Capacity,
1,700✔
180
        })
1,700✔
181
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
1,700✔
182
                ChannelID: info.ChannelID,
1,700✔
183
                IsNode1:   false,
1,700✔
184
                OtherNode: info.NodeKey1Bytes,
1,700✔
185
                Capacity:  info.Capacity,
1,700✔
186
        })
1,700✔
187
        c.mtx.Unlock()
1,700✔
188

1,700✔
189
        // The policy's node is always the to_node. So if policy 1 has to_node
1,700✔
190
        // of node 2 then we have the policy 1 as seen from node 1.
1,700✔
191
        if policy1 != nil {
2,101✔
192
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
401✔
193
                if policy1.ToNode != info.NodeKey2Bytes {
403✔
194
                        fromNode, toNode = toNode, fromNode
2✔
195
                }
2✔
196
                isEdge1 := policy1.ChannelFlags&lnwire.ChanUpdateDirection == 0
401✔
197
                c.UpdatePolicy(policy1, fromNode, toNode, isEdge1)
401✔
198
        }
199
        if policy2 != nil {
2,101✔
200
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
401✔
201
                if policy2.ToNode != info.NodeKey1Bytes {
403✔
202
                        fromNode, toNode = toNode, fromNode
2✔
203
                }
2✔
204
                isEdge1 := policy2.ChannelFlags&lnwire.ChanUpdateDirection == 0
401✔
205
                c.UpdatePolicy(policy2, fromNode, toNode, isEdge1)
401✔
206
        }
207
}
208

209
// updateOrAddEdge makes sure the edge information for a node is either updated
210
// if it already exists or is added to that node's list of channels.
211
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3,400✔
212
        if len(c.nodeChannels[node]) == 0 {
4,239✔
213
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
839✔
214
        }
839✔
215

216
        c.nodeChannels[node][edge.ChannelID] = edge
3,400✔
217
}
218

219
// UpdatePolicy updates a single policy on both the from and to node. The order
220
// of the from and to node is not strictly important. But we assume that a
221
// channel edge was added beforehand so that the directed channel struct already
222
// exists in the cache.
223
func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
224
        toNode route.Vertex, edge1 bool) {
3,076✔
225

3,076✔
226
        // Extract inbound fee if possible and available. If there is a decoding
3,076✔
227
        // error, ignore this policy.
3,076✔
228
        var inboundFee lnwire.Fee
3,076✔
229
        _, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
3,076✔
230
        if err != nil {
3,076✔
231
                return
×
232
        }
×
233

234
        c.mtx.Lock()
3,076✔
235
        defer c.mtx.Unlock()
3,076✔
236

3,076✔
237
        updatePolicy := func(nodeKey route.Vertex) {
9,228✔
238
                if len(c.nodeChannels[nodeKey]) == 0 {
6,152✔
239
                        return
×
240
                }
×
241

242
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
6,152✔
243
                if !ok {
6,152✔
244
                        return
×
245
                }
×
246

247
                // Edge 1 is defined as the policy for the direction of node1 to
248
                // node2.
249
                switch {
6,152✔
250
                // This is node 1, and it is edge 1, so this is the outgoing
251
                // policy for node 1.
252
                case channel.IsNode1 && edge1:
1,540✔
253
                        channel.OutPolicySet = true
1,540✔
254
                        channel.InboundFee = inboundFee
1,540✔
255

256
                // This is node 2, and it is edge 2, so this is the outgoing
257
                // policy for node 2.
258
                case !channel.IsNode1 && !edge1:
1,536✔
259
                        channel.OutPolicySet = true
1,536✔
260
                        channel.InboundFee = inboundFee
1,536✔
261

262
                // The other two cases left mean it's the inbound policy for the
263
                // node.
264
                default:
3,076✔
265
                        channel.InPolicy = models.NewCachedPolicy(policy)
3,076✔
266
                }
267
        }
268

269
        updatePolicy(fromNode)
3,076✔
270
        updatePolicy(toNode)
3,076✔
271
}
272

273
// RemoveNode completely removes a node and all its channels (including the
274
// peer's side).
275
func (c *GraphCache) RemoveNode(node route.Vertex) {
65✔
276
        c.mtx.Lock()
65✔
277
        defer c.mtx.Unlock()
65✔
278

65✔
279
        delete(c.nodeFeatures, node)
65✔
280

65✔
281
        // First remove all channels from the other nodes' lists.
65✔
282
        for _, channel := range c.nodeChannels[node] {
65✔
283
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
×
284
        }
×
285

286
        // Then remove our whole node completely.
287
        delete(c.nodeChannels, node)
65✔
288
}
289

290
// RemoveChannel removes a single channel between two nodes.
291
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
275✔
292
        c.mtx.Lock()
275✔
293
        defer c.mtx.Unlock()
275✔
294

275✔
295
        // Remove that one channel from both sides.
275✔
296
        c.removeChannelIfFound(node1, chanID)
275✔
297
        c.removeChannelIfFound(node2, chanID)
275✔
298
}
275✔
299

300
// removeChannelIfFound removes a single channel from one side.
301
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
550✔
302
        if len(c.nodeChannels[node]) == 0 {
732✔
303
                return
182✔
304
        }
182✔
305

306
        delete(c.nodeChannels[node], chanID)
368✔
307
}
308

309
// UpdateChannel updates the channel edge information for a specific edge. We
310
// expect the edge to already exist and be known. If it does not yet exist, this
311
// call is a no-op.
312
func (c *GraphCache) UpdateChannel(info *models.ChannelEdgeInfo) {
1✔
313
        c.mtx.Lock()
1✔
314
        defer c.mtx.Unlock()
1✔
315

1✔
316
        if len(c.nodeChannels[info.NodeKey1Bytes]) == 0 ||
1✔
317
                len(c.nodeChannels[info.NodeKey2Bytes]) == 0 {
1✔
318

×
319
                return
×
320
        }
×
321

322
        channel, ok := c.nodeChannels[info.NodeKey1Bytes][info.ChannelID]
1✔
323
        if ok {
2✔
324
                // We only expect to be called when the channel is already
1✔
325
                // known.
1✔
326
                channel.Capacity = info.Capacity
1✔
327
                channel.OtherNode = info.NodeKey2Bytes
1✔
328
        }
1✔
329

330
        channel, ok = c.nodeChannels[info.NodeKey2Bytes][info.ChannelID]
1✔
331
        if ok {
2✔
332
                channel.Capacity = info.Capacity
1✔
333
                channel.OtherNode = info.NodeKey1Bytes
1✔
334
        }
1✔
335
}
336

337
// getChannels returns a copy of the passed node's channels or nil if there
338
// isn't any.
339
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
501✔
340
        c.mtx.RLock()
501✔
341
        defer c.mtx.RUnlock()
501✔
342

501✔
343
        channels, ok := c.nodeChannels[node]
501✔
344
        if !ok {
508✔
345
                return nil
7✔
346
        }
7✔
347

348
        features, ok := c.nodeFeatures[node]
494✔
349
        if !ok {
508✔
350
                // If the features were set to nil explicitly, that's fine here.
14✔
351
                // The router will overwrite the features of the destination
14✔
352
                // node with those found in the invoice if necessary. But if we
14✔
353
                // didn't yet get a node announcement we want to mimic the
14✔
354
                // behavior of the old DB based code that would always set an
14✔
355
                // empty feature vector instead of leaving it nil.
14✔
356
                features = lnwire.EmptyFeatureVector()
14✔
357
        }
14✔
358

359
        toNodeCallback := func() route.Vertex {
942✔
360
                return node
448✔
361
        }
448✔
362

363
        i := 0
494✔
364
        channelsCopy := make([]*DirectedChannel, len(channels))
494✔
365
        for _, channel := range channels {
1,826✔
366
                // We need to copy the channel and policy to avoid it being
1,332✔
367
                // updated in the cache if the path finding algorithm sets
1,332✔
368
                // fields on it (currently only the ToNodeFeatures of the
1,332✔
369
                // policy).
1,332✔
370
                channelCopy := channel.DeepCopy()
1,332✔
371
                if channelCopy.InPolicy != nil {
2,639✔
372
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
1,307✔
373
                        channelCopy.InPolicy.ToNodeFeatures = features
1,307✔
374
                }
1,307✔
375

376
                channelsCopy[i] = channelCopy
1,332✔
377
                i++
1,332✔
378
        }
379

380
        return channelsCopy
494✔
381
}
382

383
// ForEachChannel invokes the given callback for each channel of the given node.
384
func (c *GraphCache) ForEachChannel(node route.Vertex,
385
        cb func(channel *DirectedChannel) error) error {
501✔
386

501✔
387
        // Obtain a copy of the node's channels. We need do this in order to
501✔
388
        // avoid deadlocks caused by interaction with the graph cache, channel
501✔
389
        // state and the graph database from multiple goroutines. This snapshot
501✔
390
        // is only used for path finding where being stale is acceptable since
501✔
391
        // the real world graph and our representation may always become
501✔
392
        // slightly out of sync for a short time and the actual channel state
501✔
393
        // is stored separately.
501✔
394
        channels := c.getChannels(node)
501✔
395
        for _, channel := range channels {
1,833✔
396
                if err := cb(channel); err != nil {
1,332✔
397
                        return err
×
398
                }
×
399
        }
400

401
        return nil
501✔
402
}
403

404
// ForEachNode iterates over the adjacency list of the graph, executing the
405
// call back for each node and the set of channels that emanate from the given
406
// node.
407
//
408
// NOTE: This method should be considered _read only_, the channels or nodes
409
// passed in MUST NOT be modified.
410
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
411
        channels map[uint64]*DirectedChannel) error) error {
2✔
412

2✔
413
        c.mtx.RLock()
2✔
414
        defer c.mtx.RUnlock()
2✔
415

2✔
416
        for node, channels := range c.nodeChannels {
6✔
417
                // We don't make a copy here since this is a read-only RPC
4✔
418
                // call. We also don't need the node features either for this
4✔
419
                // call.
4✔
420
                if err := cb(node, channels); err != nil {
4✔
421
                        return err
×
422
                }
×
423
        }
424

425
        return nil
2✔
426
}
427

428
// GetFeatures returns the features of the node with the given ID. If no
429
// features are known for the node, an empty feature vector is returned.
430
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
459✔
431
        c.mtx.RLock()
459✔
432
        defer c.mtx.RUnlock()
459✔
433

459✔
434
        features, ok := c.nodeFeatures[node]
459✔
435
        if !ok || features == nil {
469✔
436
                // The router expects the features to never be nil, so we return
10✔
437
                // an empty feature set instead.
10✔
438
                return lnwire.EmptyFeatureVector()
10✔
439
        }
10✔
440

441
        return features
449✔
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc