• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17685638204

12 Sep 2025 08:45PM UTC coverage: 66.624% (-0.03%) from 66.651%
17685638204

Pull #10015

github

web-flow
Merge 0f62b1831 into 5082566ed
Pull Request #10015: graph/db: add zombie channels cleanup routine

59 of 67 new or added lines in 2 files covered. (88.06%)

126 existing lines in 24 files now uncovered.

136275 of 204543 relevant lines covered (66.62%)

21395.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.5
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// DirectedChannel is a type that stores the channel information as seen from
15
// one side of the channel.
16
type DirectedChannel struct {
17
        // ChannelID is the unique identifier of this channel.
18
        ChannelID uint64
19

20
        // IsNode1 indicates if this is the node with the smaller public key.
21
        IsNode1 bool
22

23
        // OtherNode is the public key of the node on the other end of this
24
        // channel.
25
        OtherNode route.Vertex
26

27
        // Capacity is the announced capacity of this channel in satoshis.
28
        Capacity btcutil.Amount
29

30
        // OutPolicySet is a boolean that indicates whether the node has an
31
        // outgoing policy set. For pathfinding only the existence of the policy
32
        // is important to know, not the actual content.
33
        OutPolicySet bool
34

35
        // InPolicy is the incoming policy *from* the other node to this node.
36
        // In path finding, we're walking backward from the destination to the
37
        // source, so we're always interested in the edge that arrives to us
38
        // from the other node.
39
        InPolicy *models.CachedEdgePolicy
40

41
        // Inbound fees of this node.
42
        InboundFee lnwire.Fee
43
}
44

45
// DeepCopy creates a deep copy of the channel, including the incoming policy.
46
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
1,597✔
47
        channelCopy := *c
1,597✔
48

1,597✔
49
        if channelCopy.InPolicy != nil {
3,168✔
50
                inPolicyCopy := *channelCopy.InPolicy
1,571✔
51
                channelCopy.InPolicy = &inPolicyCopy
1,571✔
52

1,571✔
53
                // The fields for the ToNode can be overwritten by the path
1,571✔
54
                // finding algorithm, which is why we need a deep copy in the
1,571✔
55
                // first place. So we always start out with nil values, just to
1,571✔
56
                // be sure they don't contain any old data.
1,571✔
57
                channelCopy.InPolicy.ToNodePubKey = nil
1,571✔
58
                channelCopy.InPolicy.ToNodeFeatures = nil
1,571✔
59
        }
1,571✔
60

61
        return &channelCopy
1,597✔
62
}
63

64
// GraphCache is a type that holds a minimal set of information of the public
65
// channel graph that can be used for pathfinding.
66
type GraphCache struct {
67
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
68
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
69

70
        // zombieIndex tracks channels that may be leaked during the removal
71
        // process. Since the remover could not have the node ID, these channels
72
        // are stored here and will be removed later in a separate loop.
73
        zombieIndex map[uint64]struct{}
74

75
        // zombieCleanerInterval is the interval at which the zombie cleaner
76
        // runs to clean up channels that are still missing their nodes.
77
        zombieCleanerInterval time.Duration
78

79
        mtx  sync.RWMutex
80
        quit chan struct{}
81
        wg   sync.WaitGroup
82
}
83

84
// NewGraphCache creates a new graphCache.
85
func NewGraphCache(preAllocNumNodes int) *GraphCache {
145✔
86
        oneHour := time.Hour
145✔
87

145✔
88
        return &GraphCache{
145✔
89
                nodeChannels: make(
145✔
90
                        map[route.Vertex]map[uint64]*DirectedChannel,
145✔
91
                        // A channel connects two nodes, so we can look it up
145✔
92
                        // from both sides, meaning we get double the number of
145✔
93
                        // entries.
145✔
94
                        preAllocNumNodes*2,
145✔
95
                ),
145✔
96
                nodeFeatures: make(
145✔
97
                        map[route.Vertex]*lnwire.FeatureVector,
145✔
98
                        preAllocNumNodes,
145✔
99
                ),
145✔
100
                zombieIndex:           make(map[uint64]struct{}),
145✔
101
                zombieCleanerInterval: oneHour,
145✔
102
                quit:                  make(chan struct{}),
145✔
103
        }
145✔
104
}
145✔
105

106
// Stats returns statistics about the current cache size.
107
func (c *GraphCache) Stats() string {
386✔
108
        c.mtx.RLock()
386✔
109
        defer c.mtx.RUnlock()
386✔
110

386✔
111
        numChannels := 0
386✔
112
        for node := range c.nodeChannels {
792✔
113
                numChannels += len(c.nodeChannels[node])
406✔
114
        }
406✔
115
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
386✔
116
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
386✔
117
                numChannels)
386✔
118
}
119

120
// Start launches the background goroutine that periodically cleans up zombie
121
// channels.
122
func (c *GraphCache) Start() {
142✔
123
        c.wg.Add(1)
142✔
124
        go c.zombieCleaner()
142✔
125
}
142✔
126

127
// Stop signals the background cleaner to shut down and waits for it to exit.
128
func (c *GraphCache) Stop() {
140✔
129
        close(c.quit)
140✔
130
        c.wg.Wait()
140✔
131
}
140✔
132

133
// zombieCleaner periodically iterates over the zombie index and removes
134
// channels that are still missing their nodes.
135
//
136
// NOTE: must be run as a goroutine.
137
func (c *GraphCache) zombieCleaner() {
142✔
138
        defer c.wg.Done()
142✔
139

142✔
140
        ticker := time.NewTicker(c.zombieCleanerInterval)
142✔
141
        defer ticker.Stop()
142✔
142

142✔
143
        for {
285✔
144
                select {
143✔
145
                case <-ticker.C:
1✔
146
                        c.cleanupZombies()
1✔
147
                case <-c.quit:
140✔
148
                        return
140✔
149
                }
150
        }
151
}
152

153
// cleanupZombies attempts to prune channels tracked in the zombie index. If the
154
// nodes for a channel still cannot be resolved, the channel is deleted from the
155
// cache.
156
func (c *GraphCache) cleanupZombies() {
2✔
157
        c.mtx.Lock()
2✔
158
        defer c.mtx.Unlock()
2✔
159

2✔
160
        if len(c.zombieIndex) == 0 {
2✔
NEW
161
                log.Debug("no zombie channels to clean up in GraphCache")
×
NEW
162
                return
×
NEW
163
        }
×
164

165
        // Go through all nodes and their channels once to check if any are
166
        // marked as zombies. This is faster than checking every node for each
167
        // zombie channel, since there are usually many more nodes than zombie
168
        // channels.
169
        for node, chans := range c.nodeChannels {
5✔
170
                for cid, ch := range chans {
6✔
171
                        // if the channel isn't a zombie, we can skip it.
3✔
172
                        if _, ok := c.zombieIndex[cid]; !ok {
5✔
173
                                continue
2✔
174
                        }
175

176
                        // delete peer's side channel if it exists.
177
                        c.removeChannelIfFound(ch.OtherNode, cid)
1✔
178

1✔
179
                        // delete the channel from our side.
1✔
180
                        delete(chans, cid)
1✔
181
                }
182

183
                // If all channels were deleted for this node, clean up the map
184
                // entry entirely.
185
                if len(chans) == 0 {
4✔
186
                        delete(c.nodeChannels, node)
1✔
187
                }
1✔
188
        }
189

190
        // Now that we have removed all channels that were zombies, we can
191
        // clear the zombie index.
192
        c.zombieIndex = make(map[uint64]struct{})
2✔
193
}
194

195
// AddNodeFeatures adds a graph node and its features to the cache.
196
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
197
        features *lnwire.FeatureVector) {
663✔
198

663✔
199
        c.mtx.Lock()
663✔
200
        defer c.mtx.Unlock()
663✔
201

663✔
202
        c.nodeFeatures[node] = features
663✔
203
}
663✔
204

205
// AddChannel adds a non-directed channel, meaning that the order of policy 1
206
// and policy 2 does not matter, the directionality is extracted from the info
207
// and policy flags automatically. The policy will be set as the outgoing policy
208
// on one node and the incoming policy on the peer's side.
209
func (c *GraphCache) AddChannel(info *models.CachedEdgeInfo,
210
        policy1, policy2 *models.CachedEdgePolicy) {
1,696✔
211

1,696✔
212
        if info == nil {
1,696✔
213
                return
×
214
        }
×
215

216
        if policy1 != nil && policy1.IsDisabled() &&
1,696✔
217
                policy2 != nil && policy2.IsDisabled() {
1,699✔
218

3✔
219
                return
3✔
220
        }
3✔
221

222
        // Create the edge entry for both nodes.
223
        c.mtx.Lock()
1,696✔
224
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
1,696✔
225
                ChannelID: info.ChannelID,
1,696✔
226
                IsNode1:   true,
1,696✔
227
                OtherNode: info.NodeKey2Bytes,
1,696✔
228
                Capacity:  info.Capacity,
1,696✔
229
        })
1,696✔
230
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
1,696✔
231
                ChannelID: info.ChannelID,
1,696✔
232
                IsNode1:   false,
1,696✔
233
                OtherNode: info.NodeKey1Bytes,
1,696✔
234
                Capacity:  info.Capacity,
1,696✔
235
        })
1,696✔
236
        c.mtx.Unlock()
1,696✔
237

1,696✔
238
        // The policy's node is always the to_node. So if policy 1 has to_node
1,696✔
239
        // of node 2 then we have the policy 1 as seen from node 1.
1,696✔
240
        if policy1 != nil {
2,097✔
241
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
401✔
242
                if !policy1.IsNode1() {
402✔
243
                        fromNode, toNode = toNode, fromNode
1✔
244
                }
1✔
245
                c.UpdatePolicy(policy1, fromNode, toNode)
401✔
246
        }
247
        if policy2 != nil {
2,097✔
248
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
401✔
249
                if policy2.IsNode1() {
402✔
250
                        fromNode, toNode = toNode, fromNode
1✔
251
                }
1✔
252
                c.UpdatePolicy(policy2, fromNode, toNode)
401✔
253
        }
254
}
255

256
// updateOrAddEdge makes sure the edge information for a node is either updated
257
// if it already exists or is added to that node's list of channels.
258
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3,389✔
259
        if len(c.nodeChannels[node]) == 0 {
4,246✔
260
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
857✔
261
        }
857✔
262

263
        c.nodeChannels[node][edge.ChannelID] = edge
3,389✔
264
}
265

266
// UpdatePolicy updates a single policy on both the from and to node. The order
267
// of the from and to node is not strictly important. But we assume that a
268
// channel edge was added beforehand so that the directed channel struct already
269
// exists in the cache.
270
func (c *GraphCache) UpdatePolicy(policy *models.CachedEdgePolicy, fromNode,
271
        toNode route.Vertex) {
3,081✔
272

3,081✔
273
        c.mtx.Lock()
3,081✔
274
        defer c.mtx.Unlock()
3,081✔
275

3,081✔
276
        updatePolicy := func(nodeKey route.Vertex) {
9,240✔
277
                if len(c.nodeChannels[nodeKey]) == 0 {
6,159✔
278
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
279

×
280
                        return
×
281
                }
×
282

283
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
6,159✔
284
                if !ok {
6,159✔
285
                        log.Warnf("Channel=%v not found in graph cache",
×
286
                                policy.ChannelID)
×
287

×
288
                        return
×
289
                }
×
290

291
                // Edge 1 is defined as the policy for the direction of node1 to
292
                // node2.
293
                switch {
6,159✔
294
                // This is node 1, and it is edge 1, so this is the outgoing
295
                // policy for node 1.
296
                case channel.IsNode1 && policy.IsNode1():
1,545✔
297
                        channel.OutPolicySet = true
1,545✔
298
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,710✔
299
                                channel.InboundFee = fee
165✔
300
                        })
165✔
301

302
                // This is node 2, and it is edge 2, so this is the outgoing
303
                // policy for node 2.
304
                case !channel.IsNode1 && !policy.IsNode1():
1,539✔
305
                        channel.OutPolicySet = true
1,539✔
306
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
1,703✔
307
                                channel.InboundFee = fee
164✔
308
                        })
164✔
309

310
                // The other two cases left mean it's the inbound policy for the
311
                // node.
312
                default:
3,081✔
313
                        channel.InPolicy = policy
3,081✔
314
                }
315
        }
316

317
        updatePolicy(fromNode)
3,081✔
318
        updatePolicy(toNode)
3,081✔
319
}
320

321
// RemoveNode completely removes a node and all its channels (including the
322
// peer's side).
323
func (c *GraphCache) RemoveNode(node route.Vertex) {
70✔
324
        c.mtx.Lock()
70✔
325
        defer c.mtx.Unlock()
70✔
326

70✔
327
        delete(c.nodeFeatures, node)
70✔
328

70✔
329
        // First remove all channels from the other nodes' lists.
70✔
330
        for _, channel := range c.nodeChannels[node] {
71✔
331
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
1✔
332
        }
1✔
333

334
        // Then remove our whole node completely.
335
        delete(c.nodeChannels, node)
70✔
336
}
337

338
// RemoveChannel removes a single channel between two nodes.
339
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
263✔
340
        c.mtx.Lock()
263✔
341
        defer c.mtx.Unlock()
263✔
342

263✔
343
        // Remove that one channel from both sides.
263✔
344
        c.removeChannelIfFound(node1, chanID)
263✔
345
        c.removeChannelIfFound(node2, chanID)
263✔
346

263✔
347
        zeroVertex := route.Vertex{}
263✔
348
        if node1 == zeroVertex || node2 == zeroVertex {
267✔
349
                // If one of the nodes is the zero vertex, it means that we will
4✔
350
                // leak the channel in the memory cache, since we don't have the
4✔
351
                // node ID to remove, so we add it to the zombie index to post
4✔
352
                // removal.
4✔
353
                c.zombieIndex[chanID] = struct{}{}
4✔
354
        }
4✔
355
}
356

357
// removeChannelIfFound removes a single channel from one side.
358
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
525✔
359
        if len(c.nodeChannels[node]) == 0 {
710✔
360
                return
185✔
361
        }
185✔
362

363
        delete(c.nodeChannels[node], chanID)
343✔
364
}
365

366
// getChannels returns a copy of the passed node's channels or nil if there
367
// isn't any.
368
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
548✔
369
        c.mtx.RLock()
548✔
370
        defer c.mtx.RUnlock()
548✔
371

548✔
372
        channels, ok := c.nodeChannels[node]
548✔
373
        if !ok {
558✔
374
                return nil
10✔
375
        }
10✔
376

377
        features, ok := c.nodeFeatures[node]
541✔
378
        if !ok {
696✔
379
                // If the features were set to nil explicitly, that's fine here.
155✔
380
                // The router will overwrite the features of the destination
155✔
381
                // node with those found in the invoice if necessary. But if we
155✔
382
                // didn't yet get a node announcement we want to mimic the
155✔
383
                // behavior of the old DB based code that would always set an
155✔
384
                // empty feature vector instead of leaving it nil.
155✔
385
                features = lnwire.EmptyFeatureVector()
155✔
386
        }
155✔
387

388
        toNodeCallback := func() route.Vertex {
992✔
389
                return node
451✔
390
        }
451✔
391

392
        i := 0
541✔
393
        channelsCopy := make([]*DirectedChannel, len(channels))
541✔
394
        for cid, channel := range channels {
2,134✔
395
                if _, ok := c.zombieIndex[cid]; ok {
1,593✔
NEW
396
                        // If this channel is a zombie, we don't want to return
×
NEW
397
                        // it, so we skip it. We can't delete it here since
×
NEW
398
                        // we're holding a mutex in read mode and changing to
×
NEW
399
                        // write mode will degrade parallel reads.
×
NEW
400
                        continue
×
401
                }
402

403
                // We need to copy the channel and policy to avoid it being
404
                // updated in the cache if the path finding algorithm sets
405
                // fields on it (currently only the ToNodeFeatures of the
406
                // policy).
407
                channelCopy := channel.DeepCopy()
1,593✔
408
                if channelCopy.InPolicy != nil {
3,161✔
409
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
1,568✔
410
                        channelCopy.InPolicy.ToNodeFeatures = features
1,568✔
411
                }
1,568✔
412

413
                channelsCopy[i] = channelCopy
1,593✔
414
                i++
1,593✔
415
        }
416

417
        return channelsCopy[:i]
541✔
418
}
419

420
// ForEachChannel invokes the given callback for each channel of the given node.
421
func (c *GraphCache) ForEachChannel(node route.Vertex,
422
        cb func(channel *DirectedChannel) error) error {
548✔
423

548✔
424
        // Obtain a copy of the node's channels. We need do this in order to
548✔
425
        // avoid deadlocks caused by interaction with the graph cache, channel
548✔
426
        // state and the graph database from multiple goroutines. This snapshot
548✔
427
        // is only used for path finding where being stale is acceptable since
548✔
428
        // the real world graph and our representation may always become
548✔
429
        // slightly out of sync for a short time and the actual channel state
548✔
430
        // is stored separately.
548✔
431
        channels := c.getChannels(node)
548✔
432
        for _, channel := range channels {
2,129✔
433
                if err := cb(channel); err != nil {
1,595✔
434
                        return err
14✔
435
                }
14✔
436
        }
437

438
        return nil
537✔
439
}
440

441
// ForEachNode iterates over the adjacency list of the graph, executing the
442
// call back for each node and the set of channels that emanate from the given
443
// node.
444
//
445
// NOTE: This method should be considered _read only_, the channels or nodes
446
// passed in MUST NOT be modified.
447
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
448
        channels map[uint64]*DirectedChannel) error) error {
2✔
449

2✔
450
        c.mtx.RLock()
2✔
451
        defer c.mtx.RUnlock()
2✔
452

2✔
453
        for node, channels := range c.nodeChannels {
6✔
454
                // We don't make a copy here since this is a read-only RPC
4✔
455
                // call. We also don't need the node features either for this
4✔
456
                // call.
4✔
457
                if err := cb(node, channels); err != nil {
4✔
458
                        return err
×
459
                }
×
460
        }
461

462
        return nil
2✔
463
}
464

465
// GetFeatures returns the features of the node with the given ID. If no
466
// features are known for the node, an empty feature vector is returned.
467
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
472✔
468
        c.mtx.RLock()
472✔
469
        defer c.mtx.RUnlock()
472✔
470

472✔
471
        features, ok := c.nodeFeatures[node]
472✔
472
        if !ok || features == nil {
555✔
473
                // The router expects the features to never be nil, so we return
83✔
474
                // an empty feature set instead.
83✔
475
                return lnwire.EmptyFeatureVector()
83✔
476
        }
83✔
477

478
        return features
392✔
479
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc