• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.54
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/batch"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/kvdb"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
        "github.com/lightningnetwork/lnd/routing/route"
17
)
18

19
// Config is a struct that holds all the necessary dependencies for a
20
// ChannelGraph.
21
type Config struct {
22
        // KVDB is the kvdb.Backend that will be used for initializing the
23
        // KVStore CRUD layer.
24
        KVDB kvdb.Backend
25

26
        // KVStoreOpts is a list of functional options that will be used when
27
        // initializing the KVStore.
28
        KVStoreOpts []KVStoreOptionModifier
29
}
30

31
// ChannelGraph is a layer above the graph's CRUD layer.
32
//
33
// NOTE: currently, this is purely a pass-through layer directly to the backing
34
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
35
// into this layer so that the KVStore is only responsible for CRUD operations.
36
type ChannelGraph struct {
37
        started atomic.Bool
38
        stopped atomic.Bool
39

40
        // cacheMu guards any writes to the graphCache. It should be held
41
        // across the DB write call and the graphCache update to make the
42
        // two updates as atomic as possible.
43
        cacheMu sync.Mutex
44

45
        graphCache *GraphCache
46

47
        *KVStore
48

49
        quit chan struct{}
50
        wg   sync.WaitGroup
51
}
52

53
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
54
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
55
        error) {
3✔
56

3✔
57
        opts := defaultChanGraphOptions()
3✔
58
        for _, o := range options {
6✔
59
                o(opts)
3✔
60
        }
3✔
61

62
        store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
3✔
63
        if err != nil {
3✔
64
                return nil, err
×
65
        }
×
66

67
        g := &ChannelGraph{
3✔
68
                KVStore: store,
3✔
69
                quit:    make(chan struct{}),
3✔
70
        }
3✔
71

3✔
72
        // The graph cache can be turned off (e.g. for mobile users) for a
3✔
73
        // speed/memory usage tradeoff.
3✔
74
        if opts.useGraphCache {
6✔
75
                g.graphCache = NewGraphCache(opts.preAllocCacheNumNodes)
3✔
76
        }
3✔
77

78
        return g, nil
3✔
79
}
80

81
// Start kicks off any goroutines required for the ChannelGraph to function.
82
// If the graph cache is enabled, then it will be populated with the contents of
83
// the database.
84
func (c *ChannelGraph) Start() error {
3✔
85
        if !c.started.CompareAndSwap(false, true) {
3✔
NEW
86
                return nil
×
NEW
87
        }
×
88
        log.Debugf("ChannelGraph starting")
3✔
89
        defer log.Debug("ChannelGraph started")
3✔
90

3✔
91
        if c.graphCache != nil {
6✔
92
                if err := c.populateCache(); err != nil {
3✔
NEW
93
                        return fmt.Errorf("could not populate the graph "+
×
NEW
94
                                "cache: %w", err)
×
NEW
95
                }
×
96
        }
97

98
        return nil
3✔
99
}
100

101
// Stop signals any active goroutines for a graceful closure.
102
func (c *ChannelGraph) Stop() error {
3✔
103
        if !c.stopped.CompareAndSwap(false, true) {
3✔
NEW
104
                return nil
×
NEW
105
        }
×
106

107
        log.Debugf("ChannelGraph shutting down...")
3✔
108
        defer log.Debug("Builder shutdown complete")
3✔
109

3✔
110
        close(c.quit)
3✔
111
        c.wg.Wait()
3✔
112

3✔
113
        return nil
3✔
114
}
115

116
// populateCache loads the entire channel graph into the in-memory graph cache.
117
//
118
// NOTE: This should only be called if the graphCache has been constructed.
119
func (c *ChannelGraph) populateCache() error {
3✔
120
        startTime := time.Now()
3✔
121
        log.Debugf("Populating in-memory channel graph, this might take a " +
3✔
122
                "while...")
3✔
123

3✔
124
        err := c.KVStore.ForEachNodeCacheable(func(node route.Vertex,
3✔
125
                features *lnwire.FeatureVector) error {
6✔
126

3✔
127
                c.graphCache.AddNodeFeatures(node, features)
3✔
128

3✔
129
                return nil
3✔
130
        })
3✔
131
        if err != nil {
3✔
NEW
132
                return err
×
133
        }
×
134

135
        err = c.KVStore.ForEachChannel(func(info *models.ChannelEdgeInfo,
3✔
136
                policy1, policy2 *models.ChannelEdgePolicy) error {
6✔
137

3✔
138
                c.graphCache.AddChannel(info, policy1, policy2)
3✔
139

3✔
140
                return nil
3✔
141
        })
3✔
142
        if err != nil {
3✔
NEW
143
                return err
×
144
        }
×
145

146
        log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
3✔
147
                time.Since(startTime), c.graphCache.Stats())
3✔
148

3✔
149
        return nil
3✔
150
}
151

152
// ForEachNodeDirectedChannel iterates through all channels of a given node,
153
// executing the passed callback on the directed edge representing the channel
154
// and its incoming policy. If the callback returns an error, then the iteration
155
// is halted with the error propagated back up to the caller. If the graphCache
156
// is available, then it will be used to retrieve the node's channels instead
157
// of the database.
158
//
159
// Unknown policies are passed into the callback as nil values.
160
//
161
// NOTE: this is part of the graphdb.NodeTraverser interface.
162
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
163
        cb func(channel *DirectedChannel) error) error {
3✔
164

3✔
165
        if c.graphCache != nil {
6✔
166
                return c.graphCache.ForEachChannel(node, cb)
3✔
167
        }
3✔
168

169
        return c.KVStore.ForEachNodeDirectedChannel(node, cb)
3✔
170
}
171

172
// FetchNodeFeatures returns the features of the given node. If no features are
173
// known for the node, an empty feature vector is returned.
174
// If the graphCache is available, then it will be used to retrieve the node's
175
// features instead of the database.
176
//
177
// NOTE: this is part of the graphdb.NodeTraverser interface.
178
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
179
        *lnwire.FeatureVector, error) {
3✔
180

3✔
181
        if c.graphCache != nil {
6✔
182
                return c.graphCache.GetFeatures(node), nil
3✔
183
        }
3✔
184

185
        return c.KVStore.FetchNodeFeatures(node)
3✔
186
}
187

188
// GraphSession will provide the call-back with access to a NodeTraverser
189
// instance which can be used to perform queries against the channel graph. If
190
// the graph cache is not enabled, then the call-back will be provided with
191
// access to the graph via a consistent read-only transaction.
192
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
3✔
193
        if c.graphCache != nil {
6✔
194
                return cb(c)
3✔
195
        }
3✔
196

UNCOV
197
        return c.KVStore.GraphSession(cb)
×
198
}
199

200
// ForEachNodeCached iterates through all the stored vertices/nodes in the
201
// graph, executing the passed callback with each node encountered.
202
//
203
// NOTE: The callback contents MUST not be modified.
204
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
UNCOV
205
        chans map[uint64]*DirectedChannel) error) error {
×
UNCOV
206

×
UNCOV
207
        if c.graphCache != nil {
×
208
                return c.graphCache.ForEachNode(cb)
×
209
        }
×
210

UNCOV
211
        return c.KVStore.ForEachNodeCached(cb)
×
212
}
213

214
// AddLightningNode adds a vertex/node to the graph database. If the node is not
215
// in the database from before, this will add a new, unconnected one to the
216
// graph. If it is present from before, this will update that node's
217
// information. Note that this method is expected to only be called to update an
218
// already present node from a node announcement, or to insert a node found in a
219
// channel update.
220
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
221
        op ...batch.SchedulerOption) error {
3✔
222

3✔
223
        c.cacheMu.Lock()
3✔
224
        defer c.cacheMu.Unlock()
3✔
225

3✔
226
        err := c.KVStore.AddLightningNode(node, op...)
3✔
227
        if err != nil {
3✔
228
                return err
×
229
        }
×
230

231
        if c.graphCache != nil {
6✔
232
                c.graphCache.AddNodeFeatures(
3✔
233
                        node.PubKeyBytes, node.Features,
3✔
234
                )
3✔
235
        }
3✔
236

237
        return nil
3✔
238
}
239

240
// DeleteLightningNode starts a new database transaction to remove a vertex/node
241
// from the database according to the node's public key.
UNCOV
242
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
×
UNCOV
243
        c.cacheMu.Lock()
×
UNCOV
244
        defer c.cacheMu.Unlock()
×
UNCOV
245

×
UNCOV
246
        err := c.KVStore.DeleteLightningNode(nodePub)
×
UNCOV
247
        if err != nil {
×
248
                return err
×
249
        }
×
250

UNCOV
251
        if c.graphCache != nil {
×
UNCOV
252
                c.graphCache.RemoveNode(nodePub)
×
UNCOV
253
        }
×
254

UNCOV
255
        return nil
×
256
}
257

258
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
259
// undirected edge from the two target nodes are created. The information stored
260
// denotes the static attributes of the channel, such as the channelID, the keys
261
// involved in creation of the channel, and the set of features that the channel
262
// supports. The chanPoint and chanID are used to uniquely identify the edge
263
// globally within the database.
264
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
265
        op ...batch.SchedulerOption) error {
3✔
266

3✔
267
        c.cacheMu.Lock()
3✔
268
        defer c.cacheMu.Unlock()
3✔
269

3✔
270
        err := c.KVStore.AddChannelEdge(edge, op...)
3✔
271
        if err != nil {
3✔
UNCOV
272
                return err
×
UNCOV
273
        }
×
274

275
        if c.graphCache != nil {
6✔
276
                c.graphCache.AddChannel(edge, nil, nil)
3✔
277
        }
3✔
278

279
        return nil
3✔
280
}
281

282
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
283
// If the cache is enabled, the edge will be added back to the graph cache if
284
// we still have a record of this channel in the DB.
UNCOV
285
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
×
UNCOV
286
        c.cacheMu.Lock()
×
UNCOV
287
        defer c.cacheMu.Unlock()
×
UNCOV
288

×
UNCOV
289
        err := c.KVStore.MarkEdgeLive(chanID)
×
UNCOV
290
        if err != nil {
×
UNCOV
291
                return err
×
UNCOV
292
        }
×
293

UNCOV
294
        if c.graphCache != nil {
×
UNCOV
295
                // We need to add the channel back into our graph cache,
×
UNCOV
296
                // otherwise we won't use it for path finding.
×
UNCOV
297
                infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
×
UNCOV
298
                if err != nil {
×
299
                        return err
×
300
                }
×
301

UNCOV
302
                if len(infos) == 0 {
×
UNCOV
303
                        return nil
×
UNCOV
304
                }
×
305

306
                info := infos[0]
×
307

×
308
                c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
×
309
        }
310

311
        return nil
×
312
}
313

314
// DeleteChannelEdges removes edges with the given channel IDs from the
315
// database and marks them as zombies. This ensures that we're unable to re-add
316
// it to our database once again. If an edge does not exist within the
317
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
318
// true, then when we mark these edges as zombies, we'll set up the keys such
319
// that we require the node that failed to send the fresh update to be the one
320
// that resurrects the channel from its zombie state. The markZombie bool
321
// denotes whether to mark the channel as a zombie.
322
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
323
        chanIDs ...uint64) error {
3✔
324

3✔
325
        c.cacheMu.Lock()
3✔
326
        defer c.cacheMu.Unlock()
3✔
327

3✔
328
        infos, err := c.KVStore.DeleteChannelEdges(
3✔
329
                strictZombiePruning, markZombie, chanIDs...,
3✔
330
        )
3✔
331
        if err != nil {
3✔
UNCOV
332
                return err
×
UNCOV
333
        }
×
334

335
        if c.graphCache != nil {
6✔
336
                for _, info := range infos {
6✔
337
                        c.graphCache.RemoveChannel(
3✔
338
                                info.NodeKey1Bytes, info.NodeKey2Bytes,
3✔
339
                                info.ChannelID,
3✔
340
                        )
3✔
341
                }
3✔
342
        }
343

344
        return err
3✔
345
}
346

347
// DisconnectBlockAtHeight is used to indicate that the block specified
348
// by the passed height has been disconnected from the main chain. This
349
// will "rewind" the graph back to the height below, deleting channels
350
// that are no longer confirmed from the graph. The prune log will be
351
// set to the last prune height valid for the remaining chain.
352
// Channels that were removed from the graph resulting from the
353
// disconnected block are returned.
354
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
355
        []*models.ChannelEdgeInfo, error) {
2✔
356

2✔
357
        c.cacheMu.Lock()
2✔
358
        defer c.cacheMu.Unlock()
2✔
359

2✔
360
        edges, err := c.KVStore.DisconnectBlockAtHeight(height)
2✔
361
        if err != nil {
2✔
362
                return nil, err
×
363
        }
×
364

365
        if c.graphCache != nil {
4✔
366
                for _, edge := range edges {
4✔
367
                        c.graphCache.RemoveChannel(
2✔
368
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
2✔
369
                                edge.ChannelID,
2✔
370
                        )
2✔
371
                }
2✔
372
        }
373

374
        return edges, nil
2✔
375
}
376

377
// PruneGraph prunes newly closed channels from the channel graph in response
378
// to a new block being solved on the network. Any transactions which spend the
379
// funding output of any known channels within he graph will be deleted.
380
// Additionally, the "prune tip", or the last block which has been used to
381
// prune the graph is stored so callers can ensure the graph is fully in sync
382
// with the current UTXO state. A slice of channels that have been closed by
383
// the target block are returned if the function succeeds without error.
384
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
385
        blockHash *chainhash.Hash, blockHeight uint32) (
386
        []*models.ChannelEdgeInfo, error) {
3✔
387

3✔
388
        c.cacheMu.Lock()
3✔
389
        defer c.cacheMu.Unlock()
3✔
390

3✔
391
        edges, nodes, err := c.KVStore.PruneGraph(
3✔
392
                spentOutputs, blockHash, blockHeight,
3✔
393
        )
3✔
394
        if err != nil {
3✔
395
                return nil, err
×
396
        }
×
397

398
        if c.graphCache != nil {
6✔
399
                for _, edge := range edges {
6✔
400
                        c.graphCache.RemoveChannel(
3✔
401
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
3✔
402
                                edge.ChannelID,
3✔
403
                        )
3✔
404
                }
3✔
405

406
                for _, node := range nodes {
6✔
407
                        c.graphCache.RemoveNode(node)
3✔
408
                }
3✔
409

410
                log.Debugf("Pruned graph, cache now has %s",
3✔
411
                        c.graphCache.Stats())
3✔
412
        }
413

414
        return edges, nil
3✔
415
}
416

417
// PruneGraphNodes is a garbage collection method which attempts to prune out
418
// any nodes from the channel graph that are currently unconnected. This ensure
419
// that we only maintain a graph of reachable nodes. In the event that a pruned
420
// node gains more channels, it will be re-added back to the graph.
421
func (c *ChannelGraph) PruneGraphNodes() error {
3✔
422
        c.cacheMu.Lock()
3✔
423
        defer c.cacheMu.Unlock()
3✔
424

3✔
425
        nodes, err := c.KVStore.PruneGraphNodes()
3✔
426
        if err != nil {
3✔
427
                return err
×
428
        }
×
429

430
        if c.graphCache != nil {
6✔
431
                for _, node := range nodes {
3✔
UNCOV
432
                        c.graphCache.RemoveNode(node)
×
UNCOV
433
                }
×
434
        }
435

436
        return nil
3✔
437
}
438

439
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
440
// ID's that we don't know and are not known zombies of the passed set. In other
441
// words, we perform a set difference of our set of chan ID's and the ones
442
// passed in. This method can be used by callers to determine the set of
443
// channels another peer knows of that we don't.
444
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
445
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
3✔
446

3✔
447
        unknown, knownZombies, err := c.KVStore.FilterKnownChanIDs(chansInfo)
3✔
448
        if err != nil {
3✔
449
                return nil, err
×
450
        }
×
451

452
        for _, info := range knownZombies {
3✔
UNCOV
453
                // TODO(ziggie): Make sure that for the strict pruning case we
×
UNCOV
454
                // compare the pubkeys and whether the right timestamp is not
×
UNCOV
455
                // older than the `ChannelPruneExpiry`.
×
UNCOV
456
                //
×
UNCOV
457
                // NOTE: The timestamp data has no verification attached to it
×
UNCOV
458
                // in the `ReplyChannelRange` msg so we are trusting this data
×
UNCOV
459
                // at this point. However it is not critical because we are just
×
UNCOV
460
                // removing the channel from the db when the timestamps are more
×
UNCOV
461
                // recent. During the querying of the gossip msg verification
×
UNCOV
462
                // happens as usual. However we should start punishing peers
×
UNCOV
463
                // when they don't provide us honest data ?
×
UNCOV
464
                isStillZombie := isZombieChan(
×
UNCOV
465
                        info.Node1UpdateTimestamp, info.Node2UpdateTimestamp,
×
UNCOV
466
                )
×
UNCOV
467

×
UNCOV
468
                if isStillZombie {
×
UNCOV
469
                        continue
×
470
                }
471

472
                // If we have marked it as a zombie but the latest update
473
                // timestamps could bring it back from the dead, then we mark it
474
                // alive, and we let it be added to the set of IDs to query our
475
                // peer for.
UNCOV
476
                err := c.KVStore.MarkEdgeLive(
×
UNCOV
477
                        info.ShortChannelID.ToUint64(),
×
UNCOV
478
                )
×
UNCOV
479
                // Since there is a chance that the edge could have been marked
×
UNCOV
480
                // as "live" between the FilterKnownChanIDs call and the
×
UNCOV
481
                // MarkEdgeLive call, we ignore the error if the edge is already
×
UNCOV
482
                // marked as live.
×
UNCOV
483
                if err != nil && !errors.Is(err, ErrZombieEdgeNotFound) {
×
484
                        return nil, err
×
485
                }
×
486
        }
487

488
        return unknown, nil
3✔
489
}
490

491
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
492
// zombie. This method is used on an ad-hoc basis, when channels need to be
493
// marked as zombies outside the normal pruning cycle.
494
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
UNCOV
495
        pubKey1, pubKey2 [33]byte) error {
×
UNCOV
496

×
UNCOV
497
        c.cacheMu.Lock()
×
UNCOV
498
        defer c.cacheMu.Unlock()
×
UNCOV
499

×
UNCOV
500
        err := c.KVStore.MarkEdgeZombie(chanID, pubKey1, pubKey2)
×
UNCOV
501
        if err != nil {
×
502
                return err
×
503
        }
×
504

UNCOV
505
        if c.graphCache != nil {
×
UNCOV
506
                c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
×
UNCOV
507
        }
×
508

UNCOV
509
        return nil
×
510
}
511

512
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
513
// within the database for the referenced channel. The `flags` attribute within
514
// the ChannelEdgePolicy determines which of the directed edges are being
515
// updated. If the flag is 1, then the first node's information is being
516
// updated, otherwise it's the second node's information. The node ordering is
517
// determined by the lexicographical ordering of the identity public keys of the
518
// nodes on either side of the channel.
519
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
520
        op ...batch.SchedulerOption) error {
3✔
521

3✔
522
        c.cacheMu.Lock()
3✔
523
        defer c.cacheMu.Unlock()
3✔
524

3✔
525
        from, to, err := c.KVStore.UpdateEdgePolicy(edge, op...)
3✔
526
        if err != nil {
3✔
UNCOV
527
                return err
×
UNCOV
528
        }
×
529

530
        if c.graphCache == nil {
6✔
531
                return nil
3✔
532
        }
3✔
533

534
        var isUpdate1 bool
3✔
535
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
6✔
536
                isUpdate1 = true
3✔
537
        }
3✔
538

539
        c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
3✔
540

3✔
541
        return nil
3✔
542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc