• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16011060102

01 Jul 2025 10:00PM UTC coverage: 67.601% (+0.04%) from 67.561%
16011060102

Pull #10015

github

web-flow
Merge 0142868ea into 538723e33
Pull Request #10015: graph/db: add zombie channels cleanup routine

57 of 62 new or added lines in 2 files covered. (91.94%)

38 existing lines in 9 files now uncovered.

135225 of 200034 relevant lines covered (67.6%)

21862.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.48
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// DirectedChannel is a type that stores the channel information as seen from
15
// one side of the channel.
16
type DirectedChannel struct {
17
        // ChannelID is the unique identifier of this channel.
18
        ChannelID uint64
19

20
        // IsNode1 indicates if this is the node with the smaller public key.
21
        IsNode1 bool
22

23
        // OtherNode is the public key of the node on the other end of this
24
        // channel.
25
        OtherNode route.Vertex
26

27
        // Capacity is the announced capacity of this channel in satoshis.
28
        Capacity btcutil.Amount
29

30
        // OutPolicySet is a boolean that indicates whether the node has an
31
        // outgoing policy set. For pathfinding only the existence of the policy
32
        // is important to know, not the actual content.
33
        OutPolicySet bool
34

35
        // InPolicy is the incoming policy *from* the other node to this node.
36
        // In path finding, we're walking backward from the destination to the
37
        // source, so we're always interested in the edge that arrives to us
38
        // from the other node.
39
        InPolicy *models.CachedEdgePolicy
40

41
        // Inbound fees of this node.
42
        InboundFee lnwire.Fee
43
}
44

45
// DeepCopy creates a deep copy of the channel, including the incoming policy.
46
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
1,597✔
47
        channelCopy := *c
1,597✔
48

1,597✔
49
        if channelCopy.InPolicy != nil {
3,168✔
50
                inPolicyCopy := *channelCopy.InPolicy
1,571✔
51
                channelCopy.InPolicy = &inPolicyCopy
1,571✔
52

1,571✔
53
                // The fields for the ToNode can be overwritten by the path
1,571✔
54
                // finding algorithm, which is why we need a deep copy in the
1,571✔
55
                // first place. So we always start out with nil values, just to
1,571✔
56
                // be sure they don't contain any old data.
1,571✔
57
                channelCopy.InPolicy.ToNodePubKey = nil
1,571✔
58
                channelCopy.InPolicy.ToNodeFeatures = nil
1,571✔
59
        }
1,571✔
60

61
        return &channelCopy
1,597✔
62
}
63

64
// GraphCache is a type that holds a minimal set of information of the public
65
// channel graph that can be used for pathfinding.
66
type GraphCache struct {
67
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
68
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
69
        // zombieIndex tracks channels that may be leaked during the removal
70
        // process. Since the remover could not have the node ID, these channels
71
        // are stored here and will be removed later in a separate loop.
72
        zombieIndex map[uint64]struct{}
73

74
        // zombieCleanerInterval is the interval at which the zombie cleaner
75
        // runs to clean up channels that are still missing their nodes.
76
        zombieCleanerInterval time.Duration
77

78
        mtx  sync.RWMutex
79
        quit chan struct{}
80
        wg   sync.WaitGroup
81
}
82

83
// NewGraphCache creates a new graphCache.
84
func NewGraphCache(preAllocNumNodes int) *GraphCache {
145✔
85
        return &GraphCache{
145✔
86
                nodeChannels: make(
145✔
87
                        map[route.Vertex]map[uint64]*DirectedChannel,
145✔
88
                        // A channel connects two nodes, so we can look it up
145✔
89
                        // from both sides, meaning we get double the number of
145✔
90
                        // entries.
145✔
91
                        preAllocNumNodes*2,
145✔
92
                ),
145✔
93
                nodeFeatures: make(
145✔
94
                        map[route.Vertex]*lnwire.FeatureVector,
145✔
95
                        preAllocNumNodes,
145✔
96
                ),
145✔
97
                zombieIndex:           make(map[uint64]struct{}),
145✔
98
                zombieCleanerInterval: 24 * time.Hour,
145✔
99
                quit:                  make(chan struct{}),
145✔
100
        }
145✔
101
}
145✔
102

103
// Stats returns statistics about the current cache size.
104
func (c *GraphCache) Stats() string {
383✔
105
        c.mtx.RLock()
383✔
106
        defer c.mtx.RUnlock()
383✔
107

383✔
108
        numChannels := 0
383✔
109
        for node := range c.nodeChannels {
793✔
110
                numChannels += len(c.nodeChannels[node])
410✔
111
        }
410✔
112
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
383✔
113
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
383✔
114
                numChannels)
383✔
115
}
116

117
// Start launches the background goroutine that periodically cleans up zombie
118
// channels.
119
func (c *GraphCache) Start() {
142✔
120
        c.wg.Add(1)
142✔
121
        go c.zombieCleaner()
142✔
122
}
142✔
123

124
// Stop signals the background cleaner to shut down and waits for it to exit.
125
func (c *GraphCache) Stop() {
140✔
126
        close(c.quit)
140✔
127
        c.wg.Wait()
140✔
128
}
140✔
129

130
// zombieCleaner periodically iterates over the zombie index and removes
131
// channels that are still missing their nodes.
132
func (c *GraphCache) zombieCleaner() {
142✔
133
        ticker := time.NewTicker(c.zombieCleanerInterval)
142✔
134
        defer func() {
282✔
135
                ticker.Stop()
140✔
136
                c.wg.Done()
140✔
137
        }()
140✔
138

139
        for {
285✔
140
                select {
143✔
141
                case <-ticker.C:
1✔
142
                        c.cleanupZombies()
1✔
143
                case <-c.quit:
140✔
144
                        return
140✔
145
                }
146
        }
147
}
148

149
// cleanupZombies attempts to prune channels tracked in the zombie index. If the
150
// nodes for a channel still cannot be resolved, the channel is deleted from the
151
// cache.
152
func (c *GraphCache) cleanupZombies() {
2✔
153
        c.mtx.Lock()
2✔
154
        defer c.mtx.Unlock()
2✔
155

2✔
156
        if len(c.zombieIndex) == 0 {
2✔
NEW
157
                return
×
NEW
158
        }
×
159

160
        // Go through all nodes and their channels once to check if any are
161
        // marked as zombies. This is faster than checking every node for each
162
        // zombie channel, since there are usually many more nodes than zombie
163
        // channels.
164
        for node, chans := range c.nodeChannels {
5✔
165
                for cid, ch := range chans {
6✔
166
                        // if the channel isn't a zombie, we can skip it.
3✔
167
                        if _, ok := c.zombieIndex[cid]; !ok {
5✔
168
                                continue
2✔
169
                        }
170

171
                        // delete peer's side channel if it exists.
172
                        c.removeChannelIfFound(ch.OtherNode, cid)
1✔
173
                        // delete the channel from our side.
1✔
174
                        delete(chans, cid)
1✔
175
                }
176

177
                // If all channels were deleted for this node, clean up the map
178
                // entry entirely.
179
                if len(chans) == 0 {
4✔
180
                        delete(c.nodeChannels, node)
1✔
181
                }
1✔
182
        }
183

184
        // Now that we have removed all channels that were zombies, we can
185
        // clear the zombie index.
186
        c.zombieIndex = make(map[uint64]struct{})
2✔
187
}
188

189
// AddNodeFeatures adds a graph node and its features to the cache.
190
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
191
        features *lnwire.FeatureVector) {
663✔
192

663✔
193
        c.mtx.Lock()
663✔
194
        defer c.mtx.Unlock()
663✔
195

663✔
196
        c.nodeFeatures[node] = features
663✔
197
}
663✔
198

199
// AddChannel adds a non-directed channel, meaning that the order of policy 1
200
// and policy 2 does not matter, the directionality is extracted from the info
201
// and policy flags automatically. The policy will be set as the outgoing policy
202
// on one node and the incoming policy on the peer's side.
203
func (c *GraphCache) AddChannel(info *models.CachedEdgeInfo,
204
        policy1, policy2 *models.CachedEdgePolicy) {
1,698✔
205

1,698✔
206
        if info == nil {
1,698✔
207
                return
×
208
        }
×
209

210
        if policy1 != nil && policy1.IsDisabled() &&
1,698✔
211
                policy2 != nil && policy2.IsDisabled() {
1,701✔
212

3✔
213
                return
3✔
214
        }
3✔
215

216
        // Create the edge entry for both nodes.
217
        c.mtx.Lock()
1,698✔
218
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
1,698✔
219
                ChannelID: info.ChannelID,
1,698✔
220
                IsNode1:   true,
1,698✔
221
                OtherNode: info.NodeKey2Bytes,
1,698✔
222
                Capacity:  info.Capacity,
1,698✔
223
        })
1,698✔
224
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
1,698✔
225
                ChannelID: info.ChannelID,
1,698✔
226
                IsNode1:   false,
1,698✔
227
                OtherNode: info.NodeKey1Bytes,
1,698✔
228
                Capacity:  info.Capacity,
1,698✔
229
        })
1,698✔
230
        c.mtx.Unlock()
1,698✔
231

1,698✔
232
        // The policy's node is always the to_node. So if policy 1 has to_node
1,698✔
233
        // of node 2 then we have the policy 1 as seen from node 1.
1,698✔
234
        if policy1 != nil {
2,099✔
235
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
401✔
236
                if !policy1.IsNode1() {
402✔
237
                        fromNode, toNode = toNode, fromNode
1✔
238
                }
1✔
239
                c.UpdatePolicy(policy1, fromNode, toNode)
401✔
240
        }
241
        if policy2 != nil {
2,099✔
242
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
401✔
243
                if policy2.IsNode1() {
402✔
244
                        fromNode, toNode = toNode, fromNode
1✔
245
                }
1✔
246
                c.UpdatePolicy(policy2, fromNode, toNode)
401✔
247
        }
248
}
249

250
// updateOrAddEdge makes sure the edge information for a node is either updated
251
// if it already exists or is added to that node's list of channels.
252
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3,393✔
253
        if len(c.nodeChannels[node]) == 0 {
4,244✔
254
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
851✔
255
        }
851✔
256

257
        c.nodeChannels[node][edge.ChannelID] = edge
3,393✔
258
}
259

260
// UpdatePolicy updates a single policy on both the from and to node. The order
261
// of the from and to node is not strictly important. But we assume that a
262
// channel edge was added beforehand so that the directed channel struct already
263
// exists in the cache.
264
func (c *GraphCache) UpdatePolicy(policy *models.CachedEdgePolicy, fromNode,
265
        toNode route.Vertex) {
3,081✔
266

3,081✔
267
        c.mtx.Lock()
3,081✔
268
        defer c.mtx.Unlock()
3,081✔
269

3,081✔
270
        updatePolicy := func(nodeKey route.Vertex) {
9,240✔
271
                if len(c.nodeChannels[nodeKey]) == 0 {
6,159✔
272
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
273

×
274
                        return
×
275
                }
×
276

277
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
6,159✔
278
                if !ok {
6,159✔
279
                        log.Warnf("Channel=%v not found in graph cache",
×
280
                                policy.ChannelID)
×
281

×
282
                        return
×
283
                }
×
284

285
                // Edge 1 is defined as the policy for the direction of node1 to
286
                // node2.
287
                switch {
6,159✔
288
                // This is node 1, and it is edge 1, so this is the outgoing
289
                // policy for node 1.
290
                case channel.IsNode1 && policy.IsNode1():
1,543✔
291
                        channel.OutPolicySet = true
1,543✔
292
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,708✔
293
                                channel.InboundFee = fee
165✔
294
                        })
165✔
295

296
                // This is node 2, and it is edge 2, so this is the outgoing
297
                // policy for node 2.
298
                case !channel.IsNode1 && !policy.IsNode1():
1,541✔
299
                        channel.OutPolicySet = true
1,541✔
300
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,705✔
301
                                channel.InboundFee = fee
164✔
302
                        })
164✔
303

304
                // The other two cases left mean it's the inbound policy for the
305
                // node.
306
                default:
3,081✔
307
                        channel.InPolicy = policy
3,081✔
308
                }
309
        }
310

311
        updatePolicy(fromNode)
3,081✔
312
        updatePolicy(toNode)
3,081✔
313
}
314

315
// RemoveNode completely removes a node and all its channels (including the
316
// peer's side).
317
func (c *GraphCache) RemoveNode(node route.Vertex) {
66✔
318
        c.mtx.Lock()
66✔
319
        defer c.mtx.Unlock()
66✔
320

66✔
321
        delete(c.nodeFeatures, node)
66✔
322

66✔
323
        // First remove all channels from the other nodes' lists.
66✔
324
        for _, channel := range c.nodeChannels[node] {
68✔
325
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
2✔
326
        }
2✔
327

328
        // Then remove our whole node completely.
329
        delete(c.nodeChannels, node)
66✔
330
}
331

332
// RemoveChannel removes a single channel between two nodes.
333
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
266✔
334
        c.mtx.Lock()
266✔
335
        defer c.mtx.Unlock()
266✔
336

266✔
337
        // Remove that one channel from both sides.
266✔
338
        c.removeChannelIfFound(node1, chanID)
266✔
339
        c.removeChannelIfFound(node2, chanID)
266✔
340

266✔
341
        zeroVertex := route.Vertex{}
266✔
342
        if node1 == zeroVertex || node2 == zeroVertex {
269✔
343
                // If one of the nodes is the zero vertex, it means that we will
3✔
344
                // leak the channel in the memory cache, since we don't have the
3✔
345
                // node ID to remove, so we add it to the zombie index to post
3✔
346
                // removal.
3✔
347
                c.zombieIndex[chanID] = struct{}{}
3✔
348
        }
3✔
349
}
350

351
// removeChannelIfFound removes a single channel from one side.
352
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
532✔
353
        if len(c.nodeChannels[node]) == 0 {
703✔
354
                return
171✔
355
        }
171✔
356

357
        delete(c.nodeChannels[node], chanID)
364✔
358
}
359

360
// getChannels returns a copy of the passed node's channels or nil if there
361
// isn't any.
362
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
548✔
363
        c.mtx.RLock()
548✔
364
        defer c.mtx.RUnlock()
548✔
365

548✔
366
        channels, ok := c.nodeChannels[node]
548✔
367
        if !ok {
558✔
368
                return nil
10✔
369
        }
10✔
370

371
        features, ok := c.nodeFeatures[node]
541✔
372
        if !ok {
696✔
373
                // If the features were set to nil explicitly, that's fine here.
155✔
374
                // The router will overwrite the features of the destination
155✔
375
                // node with those found in the invoice if necessary. But if we
155✔
376
                // didn't yet get a node announcement we want to mimic the
155✔
377
                // behavior of the old DB based code that would always set an
155✔
378
                // empty feature vector instead of leaving it nil.
155✔
379
                features = lnwire.EmptyFeatureVector()
155✔
380
        }
155✔
381

382
        toNodeCallback := func() route.Vertex {
992✔
383
                return node
451✔
384
        }
451✔
385

386
        i := 0
541✔
387
        channelsCopy := make([]*DirectedChannel, len(channels))
541✔
388
        for cid, channel := range channels {
2,134✔
389
                if _, ok := c.zombieIndex[cid]; ok {
1,593✔
NEW
390
                        // If this channel is a zombie, we don't want to return
×
NEW
391
                        // it, so we skip it.
×
NEW
392
                        continue
×
393
                }
394

395
                // We need to copy the channel and policy to avoid it being
396
                // updated in the cache if the path finding algorithm sets
397
                // fields on it (currently only the ToNodeFeatures of the
398
                // policy).
399
                channelCopy := channel.DeepCopy()
1,593✔
400
                if channelCopy.InPolicy != nil {
3,161✔
401
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
1,568✔
402
                        channelCopy.InPolicy.ToNodeFeatures = features
1,568✔
403
                }
1,568✔
404

405
                channelsCopy[i] = channelCopy
1,593✔
406
                i++
1,593✔
407
        }
408

409
        // Copy the slice to clean up the unused pre allocated tail entries.
410
        copy(channelsCopy, channelsCopy[:i])
541✔
411

541✔
412
        return channelsCopy
541✔
413
}
414

415
// ForEachChannel invokes the given callback for each channel of the given node.
416
func (c *GraphCache) ForEachChannel(node route.Vertex,
417
        cb func(channel *DirectedChannel) error) error {
548✔
418

548✔
419
        // Obtain a copy of the node's channels. We need do this in order to
548✔
420
        // avoid deadlocks caused by interaction with the graph cache, channel
548✔
421
        // state and the graph database from multiple goroutines. This snapshot
548✔
422
        // is only used for path finding where being stale is acceptable since
548✔
423
        // the real world graph and our representation may always become
548✔
424
        // slightly out of sync for a short time and the actual channel state
548✔
425
        // is stored separately.
548✔
426
        channels := c.getChannels(node)
548✔
427
        for _, channel := range channels {
2,123✔
428
                if err := cb(channel); err != nil {
1,589✔
429
                        return err
14✔
430
                }
14✔
431
        }
432

433
        return nil
537✔
434
}
435

436
// ForEachNode iterates over the adjacency list of the graph, executing the
437
// call back for each node and the set of channels that emanate from the given
438
// node.
439
//
440
// NOTE: This method should be considered _read only_, the channels or nodes
441
// passed in MUST NOT be modified.
442
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
443
        channels map[uint64]*DirectedChannel) error) error {
2✔
444

2✔
445
        c.mtx.RLock()
2✔
446
        defer c.mtx.RUnlock()
2✔
447

2✔
448
        for node, channels := range c.nodeChannels {
6✔
449
                // We don't make a copy here since this is a read-only RPC
4✔
450
                // call. We also don't need the node features either for this
4✔
451
                // call.
4✔
452
                if err := cb(node, channels); err != nil {
4✔
453
                        return err
×
454
                }
×
455
        }
456

457
        return nil
2✔
458
}
459

460
// GetFeatures returns the features of the node with the given ID. If no
461
// features are known for the node, an empty feature vector is returned.
462
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
472✔
463
        c.mtx.RLock()
472✔
464
        defer c.mtx.RUnlock()
472✔
465

472✔
466
        features, ok := c.nodeFeatures[node]
472✔
467
        if !ok || features == nil {
555✔
468
                // The router expects the features to never be nil, so we return
83✔
469
                // an empty feature set instead.
83✔
470
                return lnwire.EmptyFeatureVector()
83✔
471
        }
83✔
472

473
        return features
392✔
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc