• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16353982365

17 Jul 2025 07:18PM UTC coverage: 57.581% (+0.004%) from 57.577%
16353982365

Pull #10015

github

web-flow
Merge 4f05f0d60 into 47e9f05dd
Pull Request #10015: graph/db: add zombie channels cleanup routine

32 of 64 new or added lines in 2 files covered. (50.0%)

20 existing lines in 6 files now uncovered.

98718 of 171442 relevant lines covered (57.58%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.7
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btcd/btcutil"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// DirectedChannel is a type that stores the channel information as seen from
15
// one side of the channel.
16
type DirectedChannel struct {
17
        // ChannelID is the unique identifier of this channel.
18
        ChannelID uint64
19

20
        // IsNode1 indicates if this is the node with the smaller public key.
21
        IsNode1 bool
22

23
        // OtherNode is the public key of the node on the other end of this
24
        // channel.
25
        OtherNode route.Vertex
26

27
        // Capacity is the announced capacity of this channel in satoshis.
28
        Capacity btcutil.Amount
29

30
        // OutPolicySet is a boolean that indicates whether the node has an
31
        // outgoing policy set. For pathfinding only the existence of the policy
32
        // is important to know, not the actual content.
33
        OutPolicySet bool
34

35
        // InPolicy is the incoming policy *from* the other node to this node.
36
        // In path finding, we're walking backward from the destination to the
37
        // source, so we're always interested in the edge that arrives to us
38
        // from the other node.
39
        InPolicy *models.CachedEdgePolicy
40

41
        // Inbound fees of this node.
42
        InboundFee lnwire.Fee
43
}
44

45
// DeepCopy creates a deep copy of the channel, including the incoming policy.
46
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
3✔
47
        channelCopy := *c
3✔
48

3✔
49
        if channelCopy.InPolicy != nil {
6✔
50
                inPolicyCopy := *channelCopy.InPolicy
3✔
51
                channelCopy.InPolicy = &inPolicyCopy
3✔
52

3✔
53
                // The fields for the ToNode can be overwritten by the path
3✔
54
                // finding algorithm, which is why we need a deep copy in the
3✔
55
                // first place. So we always start out with nil values, just to
3✔
56
                // be sure they don't contain any old data.
3✔
57
                channelCopy.InPolicy.ToNodePubKey = nil
3✔
58
                channelCopy.InPolicy.ToNodeFeatures = nil
3✔
59
        }
3✔
60

61
        return &channelCopy
3✔
62
}
63

64
// GraphCache is a type that holds a minimal set of information of the public
65
// channel graph that can be used for pathfinding.
66
type GraphCache struct {
67
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
68
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
69

70
        // zombieIndex tracks channels that may be leaked during the removal
71
        // process. Since the remover could not have the node ID, these channels
72
        // are stored here and will be removed later in a separate loop.
73
        zombieIndex map[uint64]struct{}
74

75
        // zombieCleanerInterval is the interval at which the zombie cleaner
76
        // runs to clean up channels that are still missing their nodes.
77
        zombieCleanerInterval time.Duration
78

79
        mtx  sync.RWMutex
80
        quit chan struct{}
81
        wg   sync.WaitGroup
82
}
83

84
// NewGraphCache creates a new graphCache.
85
func NewGraphCache(preAllocNumNodes int) *GraphCache {
3✔
86
        return &GraphCache{
3✔
87
                nodeChannels: make(
3✔
88
                        map[route.Vertex]map[uint64]*DirectedChannel,
3✔
89
                        // A channel connects two nodes, so we can look it up
3✔
90
                        // from both sides, meaning we get double the number of
3✔
91
                        // entries.
3✔
92
                        preAllocNumNodes*2,
3✔
93
                ),
3✔
94
                nodeFeatures: make(
3✔
95
                        map[route.Vertex]*lnwire.FeatureVector,
3✔
96
                        preAllocNumNodes,
3✔
97
                ),
3✔
98
                zombieIndex:           make(map[uint64]struct{}),
3✔
99
                zombieCleanerInterval: time.Hour,
3✔
100
                quit:                  make(chan struct{}),
3✔
101
        }
3✔
102
}
3✔
103

104
// Stats returns statistics about the current cache size.
105
func (c *GraphCache) Stats() string {
3✔
106
        c.mtx.RLock()
3✔
107
        defer c.mtx.RUnlock()
3✔
108

3✔
109
        numChannels := 0
3✔
110
        for node := range c.nodeChannels {
6✔
111
                numChannels += len(c.nodeChannels[node])
3✔
112
        }
3✔
113
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
3✔
114
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
3✔
115
                numChannels)
3✔
116
}
117

118
// Start launches the background goroutine that periodically cleans up zombie
119
// channels.
120
func (c *GraphCache) Start() {
3✔
121
        c.wg.Add(1)
3✔
122
        go c.zombieCleaner()
3✔
123
}
3✔
124

125
// Stop signals the background cleaner to shut down and waits for it to exit.
126
func (c *GraphCache) Stop() {
3✔
127
        close(c.quit)
3✔
128
        c.wg.Wait()
3✔
129
}
3✔
130

131
// zombieCleaner periodically iterates over the zombie index and removes
132
// channels that are still missing their nodes.
133
//
134
// NOTE: must be run as a goroutine.
135
func (c *GraphCache) zombieCleaner() {
3✔
136
        defer c.wg.Done()
3✔
137

3✔
138
        ticker := time.NewTicker(c.zombieCleanerInterval)
3✔
139
        defer ticker.Stop()
3✔
140

3✔
141
        for {
6✔
142
                select {
3✔
NEW
143
                case <-ticker.C:
×
NEW
144
                        c.cleanupZombies()
×
145
                case <-c.quit:
3✔
146
                        return
3✔
147
                }
148
        }
149
}
150

151
// cleanupZombies attempts to prune channels tracked in the zombie index. If the
152
// nodes for a channel still cannot be resolved, the channel is deleted from the
153
// cache.
NEW
154
func (c *GraphCache) cleanupZombies() {
×
NEW
155
        c.mtx.Lock()
×
NEW
156
        defer c.mtx.Unlock()
×
NEW
157

×
NEW
158
        if len(c.zombieIndex) == 0 {
×
NEW
159
                log.Debug("no zombie channels to clean up")
×
NEW
160
                return
×
NEW
161
        }
×
162

163
        // Go through all nodes and their channels once to check if any are
164
        // marked as zombies. This is faster than checking every node for each
165
        // zombie channel, since there are usually many more nodes than zombie
166
        // channels.
NEW
167
        for node, chans := range c.nodeChannels {
×
NEW
168
                for cid, ch := range chans {
×
NEW
169
                        // if the channel isn't a zombie, we can skip it.
×
NEW
170
                        if _, ok := c.zombieIndex[cid]; !ok {
×
NEW
171
                                continue
×
172
                        }
173

174
                        // delete peer's side channel if it exists.
NEW
175
                        c.removeChannelIfFound(ch.OtherNode, cid)
×
NEW
176

×
NEW
177
                        // delete the channel from our side.
×
NEW
178
                        delete(chans, cid)
×
179
                }
180

181
                // If all channels were deleted for this node, clean up the map
182
                // entry entirely.
NEW
183
                if len(chans) == 0 {
×
NEW
184
                        delete(c.nodeChannels, node)
×
NEW
185
                }
×
186
        }
187

188
        // Now that we have removed all channels that were zombies, we can
189
        // clear the zombie index.
NEW
190
        c.zombieIndex = make(map[uint64]struct{})
×
191
}
192

193
// AddNodeFeatures adds a graph node and its features to the cache.
194
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
195
        features *lnwire.FeatureVector) {
3✔
196

3✔
197
        c.mtx.Lock()
3✔
198
        defer c.mtx.Unlock()
3✔
199

3✔
200
        c.nodeFeatures[node] = features
3✔
201
}
3✔
202

203
// AddChannel adds a non-directed channel, meaning that the order of policy 1
204
// and policy 2 does not matter, the directionality is extracted from the info
205
// and policy flags automatically. The policy will be set as the outgoing policy
206
// on one node and the incoming policy on the peer's side.
207
func (c *GraphCache) AddChannel(info *models.CachedEdgeInfo,
208
        policy1, policy2 *models.CachedEdgePolicy) {
3✔
209

3✔
210
        if info == nil {
3✔
211
                return
×
212
        }
×
213

214
        if policy1 != nil && policy1.IsDisabled() &&
3✔
215
                policy2 != nil && policy2.IsDisabled() {
6✔
216

3✔
217
                return
3✔
218
        }
3✔
219

220
        // Create the edge entry for both nodes.
221
        c.mtx.Lock()
3✔
222
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
3✔
223
                ChannelID: info.ChannelID,
3✔
224
                IsNode1:   true,
3✔
225
                OtherNode: info.NodeKey2Bytes,
3✔
226
                Capacity:  info.Capacity,
3✔
227
        })
3✔
228
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
3✔
229
                ChannelID: info.ChannelID,
3✔
230
                IsNode1:   false,
3✔
231
                OtherNode: info.NodeKey1Bytes,
3✔
232
                Capacity:  info.Capacity,
3✔
233
        })
3✔
234
        c.mtx.Unlock()
3✔
235

3✔
236
        // The policy's node is always the to_node. So if policy 1 has to_node
3✔
237
        // of node 2 then we have the policy 1 as seen from node 1.
3✔
238
        if policy1 != nil {
6✔
239
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
3✔
240
                if !policy1.IsNode1() {
3✔
241
                        fromNode, toNode = toNode, fromNode
×
242
                }
×
243
                c.UpdatePolicy(policy1, fromNode, toNode)
3✔
244
        }
245
        if policy2 != nil {
6✔
246
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
3✔
247
                if policy2.IsNode1() {
3✔
248
                        fromNode, toNode = toNode, fromNode
×
249
                }
×
250
                c.UpdatePolicy(policy2, fromNode, toNode)
3✔
251
        }
252
}
253

254
// updateOrAddEdge makes sure the edge information for a node is either updated
255
// if it already exists or is added to that node's list of channels.
256
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3✔
257
        if len(c.nodeChannels[node]) == 0 {
6✔
258
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
3✔
259
        }
3✔
260

261
        c.nodeChannels[node][edge.ChannelID] = edge
3✔
262
}
263

264
// UpdatePolicy updates a single policy on both the from and to node. The order
265
// of the from and to node is not strictly important. But we assume that a
266
// channel edge was added beforehand so that the directed channel struct already
267
// exists in the cache.
268
func (c *GraphCache) UpdatePolicy(policy *models.CachedEdgePolicy, fromNode,
269
        toNode route.Vertex) {
3✔
270

3✔
271
        c.mtx.Lock()
3✔
272
        defer c.mtx.Unlock()
3✔
273

3✔
274
        updatePolicy := func(nodeKey route.Vertex) {
6✔
275
                if len(c.nodeChannels[nodeKey]) == 0 {
3✔
276
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
277

×
278
                        return
×
279
                }
×
280

281
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
3✔
282
                if !ok {
3✔
283
                        log.Warnf("Channel=%v not found in graph cache",
×
284
                                policy.ChannelID)
×
285

×
286
                        return
×
287
                }
×
288

289
                // Edge 1 is defined as the policy for the direction of node1 to
290
                // node2.
291
                switch {
3✔
292
                // This is node 1, and it is edge 1, so this is the outgoing
293
                // policy for node 1.
294
                case channel.IsNode1 && policy.IsNode1():
3✔
295
                        channel.OutPolicySet = true
3✔
296
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
6✔
297
                                channel.InboundFee = fee
3✔
298
                        })
3✔
299

300
                // This is node 2, and it is edge 2, so this is the outgoing
301
                // policy for node 2.
302
                case !channel.IsNode1 && !policy.IsNode1():
3✔
303
                        channel.OutPolicySet = true
3✔
304
                        policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
6✔
305
                                channel.InboundFee = fee
3✔
306
                        })
3✔
307

308
                // The other two cases left mean it's the inbound policy for the
309
                // node.
310
                default:
3✔
311
                        channel.InPolicy = policy
3✔
312
                }
313
        }
314

315
        updatePolicy(fromNode)
3✔
316
        updatePolicy(toNode)
3✔
317
}
318

319
// RemoveNode completely removes a node and all its channels (including the
320
// peer's side).
321
func (c *GraphCache) RemoveNode(node route.Vertex) {
3✔
322
        c.mtx.Lock()
3✔
323
        defer c.mtx.Unlock()
3✔
324

3✔
325
        delete(c.nodeFeatures, node)
3✔
326

3✔
327
        // First remove all channels from the other nodes' lists.
3✔
328
        for _, channel := range c.nodeChannels[node] {
3✔
329
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
×
330
        }
×
331

332
        // Then remove our whole node completely.
333
        delete(c.nodeChannels, node)
3✔
334
}
335

336
// RemoveChannel removes a single channel between two nodes.
337
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
3✔
338
        c.mtx.Lock()
3✔
339
        defer c.mtx.Unlock()
3✔
340

3✔
341
        // Remove that one channel from both sides.
3✔
342
        c.removeChannelIfFound(node1, chanID)
3✔
343
        c.removeChannelIfFound(node2, chanID)
3✔
344

3✔
345
        zeroVertex := route.Vertex{}
3✔
346
        if node1 == zeroVertex || node2 == zeroVertex {
3✔
NEW
347
                // If one of the nodes is the zero vertex, it means that we will
×
NEW
348
                // leak the channel in the memory cache, since we don't have the
×
NEW
349
                // node ID to remove, so we add it to the zombie index to post
×
NEW
350
                // removal.
×
NEW
351
                c.zombieIndex[chanID] = struct{}{}
×
NEW
352
        }
×
353
}
354

355
// removeChannelIfFound removes a single channel from one side.
356
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
3✔
357
        if len(c.nodeChannels[node]) == 0 {
6✔
358
                return
3✔
359
        }
3✔
360

361
        delete(c.nodeChannels[node], chanID)
3✔
362
}
363

364
// getChannels returns a copy of the passed node's channels or nil if there
365
// isn't any.
366
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
3✔
367
        c.mtx.RLock()
3✔
368
        defer c.mtx.RUnlock()
3✔
369

3✔
370
        channels, ok := c.nodeChannels[node]
3✔
371
        if !ok {
6✔
372
                return nil
3✔
373
        }
3✔
374

375
        features, ok := c.nodeFeatures[node]
3✔
376
        if !ok {
6✔
377
                // If the features were set to nil explicitly, that's fine here.
3✔
378
                // The router will overwrite the features of the destination
3✔
379
                // node with those found in the invoice if necessary. But if we
3✔
380
                // didn't yet get a node announcement we want to mimic the
3✔
381
                // behavior of the old DB based code that would always set an
3✔
382
                // empty feature vector instead of leaving it nil.
3✔
383
                features = lnwire.EmptyFeatureVector()
3✔
384
        }
3✔
385

386
        toNodeCallback := func() route.Vertex {
6✔
387
                return node
3✔
388
        }
3✔
389

390
        i := 0
3✔
391
        channelsCopy := make([]*DirectedChannel, len(channels))
3✔
392
        for cid, channel := range channels {
6✔
393
                if _, ok := c.zombieIndex[cid]; ok {
3✔
NEW
394
                        // If this channel is a zombie, we don't want to return
×
NEW
395
                        // it, so we skip it.
×
NEW
396
                        continue
×
397
                }
398

399
                // We need to copy the channel and policy to avoid it being
400
                // updated in the cache if the path finding algorithm sets
401
                // fields on it (currently only the ToNodeFeatures of the
402
                // policy).
403
                channelCopy := channel.DeepCopy()
3✔
404
                if channelCopy.InPolicy != nil {
6✔
405
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
3✔
406
                        channelCopy.InPolicy.ToNodeFeatures = features
3✔
407
                }
3✔
408

409
                channelsCopy[i] = channelCopy
3✔
410
                i++
3✔
411
        }
412

413
        // Copy the slice to clean up the unused pre allocated tail entries.
414
        copy(channelsCopy, channelsCopy[:i])
3✔
415

3✔
416
        return channelsCopy
3✔
417
}
418

419
// ForEachChannel invokes the given callback for each channel of the given node.
420
func (c *GraphCache) ForEachChannel(node route.Vertex,
421
        cb func(channel *DirectedChannel) error) error {
3✔
422

3✔
423
        // Obtain a copy of the node's channels. We need do this in order to
3✔
424
        // avoid deadlocks caused by interaction with the graph cache, channel
3✔
425
        // state and the graph database from multiple goroutines. This snapshot
3✔
426
        // is only used for path finding where being stale is acceptable since
3✔
427
        // the real world graph and our representation may always become
3✔
428
        // slightly out of sync for a short time and the actual channel state
3✔
429
        // is stored separately.
3✔
430
        channels := c.getChannels(node)
3✔
431
        for _, channel := range channels {
6✔
432
                if err := cb(channel); err != nil {
6✔
433
                        return err
3✔
434
                }
3✔
435
        }
436

437
        return nil
3✔
438
}
439

440
// ForEachNode iterates over the adjacency list of the graph, executing the
441
// call back for each node and the set of channels that emanate from the given
442
// node.
443
//
444
// NOTE: This method should be considered _read only_, the channels or nodes
445
// passed in MUST NOT be modified.
446
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
447
        channels map[uint64]*DirectedChannel) error) error {
×
448

×
449
        c.mtx.RLock()
×
450
        defer c.mtx.RUnlock()
×
451

×
452
        for node, channels := range c.nodeChannels {
×
453
                // We don't make a copy here since this is a read-only RPC
×
454
                // call. We also don't need the node features either for this
×
455
                // call.
×
456
                if err := cb(node, channels); err != nil {
×
457
                        return err
×
458
                }
×
459
        }
460

461
        return nil
×
462
}
463

464
// GetFeatures returns the features of the node with the given ID. If no
465
// features are known for the node, an empty feature vector is returned.
466
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
3✔
467
        c.mtx.RLock()
3✔
468
        defer c.mtx.RUnlock()
3✔
469

3✔
470
        features, ok := c.nodeFeatures[node]
3✔
471
        if !ok || features == nil {
6✔
472
                // The router expects the features to never be nil, so we return
3✔
473
                // an empty feature set instead.
3✔
474
                return lnwire.EmptyFeatureVector()
3✔
475
        }
3✔
476

477
        return features
3✔
478
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc