• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13583650550

28 Feb 2025 07:47AM UTC coverage: 58.867% (+0.009%) from 58.858%
13583650550

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

42 of 56 new or added lines in 4 files covered. (75.0%)

38 existing lines in 12 files now uncovered.

136653 of 232137 relevant lines covered (58.87%)

19232.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.63
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/batch"
13
        "github.com/lightningnetwork/lnd/graph/db/models"
14
        "github.com/lightningnetwork/lnd/kvdb"
15
        "github.com/lightningnetwork/lnd/lnwire"
16
        "github.com/lightningnetwork/lnd/routing/route"
17
)
18

19
// Config is a struct that holds all the necessary dependencies for a
20
// ChannelGraph.
21
type Config struct {
22
        // KVDB is the kvdb.Backend that will be used for initializing the
23
        // KVStore CRUD layer.
24
        KVDB kvdb.Backend
25

26
        // KVStoreOpts is a list of functional options that will be used when
27
        // initializing the KVStore.
28
        KVStoreOpts []KVStoreOptionModifier
29
}
30

31
// ChannelGraph is a layer above the graph's CRUD layer.
32
//
33
// NOTE: currently, this is purely a pass-through layer directly to the backing
34
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
35
// into this layer so that the KVStore is only responsible for CRUD operations.
36
type ChannelGraph struct {
37
        started atomic.Bool
38
        stopped atomic.Bool
39

40
        // cacheMu guards any writes to the graphCache. It should be held
41
        // across the DB write call and the graphCache update to make the
42
        // two updates as atomic as possible.
43
        cacheMu sync.Mutex
44

45
        graphCache *GraphCache
46

47
        *KVStore
48

49
        quit chan struct{}
50
        wg   sync.WaitGroup
51
}
52

53
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
54
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
55
        error) {
177✔
56

177✔
57
        opts := defaultChanGraphOptions()
177✔
58
        for _, o := range options {
282✔
59
                o(opts)
105✔
60
        }
105✔
61

62
        store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
177✔
63
        if err != nil {
177✔
64
                return nil, err
×
65
        }
×
66

67
        g := &ChannelGraph{
177✔
68
                KVStore: store,
177✔
69
                quit:    make(chan struct{}),
177✔
70
        }
177✔
71

177✔
72
        // The graph cache can be turned off (e.g. for mobile users) for a
177✔
73
        // speed/memory usage tradeoff.
177✔
74
        if opts.useGraphCache {
321✔
75
                g.graphCache = NewGraphCache(opts.preAllocCacheNumNodes)
144✔
76
        }
144✔
77

78
        return g, nil
177✔
79
}
80

81
// Start kicks off any goroutines required for the ChannelGraph to function.
82
// If the graph cache is enabled, then it will be populated with the contents of
83
// the database.
84
func (c *ChannelGraph) Start() error {
177✔
85
        if !c.started.CompareAndSwap(false, true) {
177✔
NEW
86
                return nil
×
NEW
87
        }
×
88
        log.Debugf("ChannelGraph starting")
177✔
89
        defer log.Debug("ChannelGraph started")
177✔
90

177✔
91
        if c.graphCache != nil {
321✔
92
                if err := c.populateCache(); err != nil {
144✔
NEW
93
                        return fmt.Errorf("could not populate the graph "+
×
NEW
94
                                "cache: %w", err)
×
NEW
95
                }
×
96
        }
97

98
        return nil
177✔
99
}
100

101
// Stop signals any active goroutines for a graceful closure.
102
func (c *ChannelGraph) Stop() error {
177✔
103
        if !c.stopped.CompareAndSwap(false, true) {
177✔
NEW
104
                return nil
×
NEW
105
        }
×
106

107
        log.Debugf("ChannelGraph shutting down...")
177✔
108
        defer log.Debug("ChannelGraph shutdown complete")
177✔
109

177✔
110
        close(c.quit)
177✔
111
        c.wg.Wait()
177✔
112

177✔
113
        return nil
177✔
114
}
115

116
// populateCache loads the entire channel graph into the in-memory graph cache.
117
//
118
// NOTE: This should only be called if the graphCache has been constructed.
119
func (c *ChannelGraph) populateCache() error {
144✔
120
        startTime := time.Now()
144✔
121
        log.Info("Populating in-memory channel graph, this might take a " +
144✔
122
                "while...")
144✔
123

144✔
124
        err := c.KVStore.ForEachNodeCacheable(func(node route.Vertex,
144✔
125
                features *lnwire.FeatureVector) error {
247✔
126

103✔
127
                c.graphCache.AddNodeFeatures(node, features)
103✔
128

103✔
129
                return nil
103✔
130
        })
103✔
131
        if err != nil {
144✔
NEW
132
                return err
×
133
        }
×
134

135
        err = c.KVStore.ForEachChannel(func(info *models.ChannelEdgeInfo,
144✔
136
                policy1, policy2 *models.ChannelEdgePolicy) error {
543✔
137

399✔
138
                c.graphCache.AddChannel(info, policy1, policy2)
399✔
139

399✔
140
                return nil
399✔
141
        })
399✔
142
        if err != nil {
144✔
NEW
143
                return err
×
144
        }
×
145

146
        log.Infof("Finished populating in-memory channel graph (took %v, %s)",
144✔
147
                time.Since(startTime), c.graphCache.Stats())
144✔
148

144✔
149
        return nil
144✔
150
}
151

152
// ForEachNodeDirectedChannel iterates through all channels of a given node,
153
// executing the passed callback on the directed edge representing the channel
154
// and its incoming policy. If the callback returns an error, then the iteration
155
// is halted with the error propagated back up to the caller. If the graphCache
156
// is available, then it will be used to retrieve the node's channels instead
157
// of the database.
158
//
159
// Unknown policies are passed into the callback as nil values.
160
//
161
// NOTE: this is part of the graphdb.NodeTraverser interface.
162
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
163
        cb func(channel *DirectedChannel) error) error {
467✔
164

467✔
165
        if c.graphCache != nil {
931✔
166
                return c.graphCache.ForEachChannel(node, cb)
464✔
167
        }
464✔
168

169
        return c.KVStore.ForEachNodeDirectedChannel(node, cb)
6✔
170
}
171

172
// FetchNodeFeatures returns the features of the given node. If no features are
173
// known for the node, an empty feature vector is returned.
174
// If the graphCache is available, then it will be used to retrieve the node's
175
// features instead of the database.
176
//
177
// NOTE: this is part of the graphdb.NodeTraverser interface.
178
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
179
        *lnwire.FeatureVector, error) {
456✔
180

456✔
181
        if c.graphCache != nil {
912✔
182
                return c.graphCache.GetFeatures(node), nil
456✔
183
        }
456✔
184

185
        return c.KVStore.FetchNodeFeatures(node)
3✔
186
}
187

188
// GraphSession will provide the call-back with access to a NodeTraverser
189
// instance which can be used to perform queries against the channel graph. If
190
// the graph cache is not enabled, then the call-back will be provided with
191
// access to the graph via a consistent read-only transaction.
192
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
136✔
193
        if c.graphCache != nil {
218✔
194
                return cb(c)
82✔
195
        }
82✔
196

197
        return c.KVStore.GraphSession(cb)
54✔
198
}
199

200
// ForEachNodeCached iterates through all the stored vertices/nodes in the
201
// graph, executing the passed callback with each node encountered.
202
//
203
// NOTE: The callback contents MUST not be modified.
204
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
205
        chans map[uint64]*DirectedChannel) error) error {
1✔
206

1✔
207
        if c.graphCache != nil {
1✔
208
                return c.graphCache.ForEachNode(cb)
×
209
        }
×
210

211
        return c.KVStore.ForEachNodeCached(cb)
1✔
212
}
213

214
// AddLightningNode adds a vertex/node to the graph database. If the node is not
215
// in the database from before, this will add a new, unconnected one to the
216
// graph. If it is present from before, this will update that node's
217
// information. Note that this method is expected to only be called to update an
218
// already present node from a node announcement, or to insert a node found in a
219
// channel update.
220
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
221
        op ...batch.SchedulerOption) error {
803✔
222

803✔
223
        c.cacheMu.Lock()
803✔
224
        defer c.cacheMu.Unlock()
803✔
225

803✔
226
        err := c.KVStore.AddLightningNode(node, op...)
803✔
227
        if err != nil {
803✔
228
                return err
×
229
        }
×
230

231
        if c.graphCache != nil {
1,419✔
232
                c.graphCache.AddNodeFeatures(
616✔
233
                        node.PubKeyBytes, node.Features,
616✔
234
                )
616✔
235
        }
616✔
236

237
        return nil
803✔
238
}
239

240
// DeleteLightningNode starts a new database transaction to remove a vertex/node
241
// from the database according to the node's public key.
242
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
243
        c.cacheMu.Lock()
3✔
244
        defer c.cacheMu.Unlock()
3✔
245

3✔
246
        err := c.KVStore.DeleteLightningNode(nodePub)
3✔
247
        if err != nil {
3✔
248
                return err
×
249
        }
×
250

251
        if c.graphCache != nil {
6✔
252
                c.graphCache.RemoveNode(nodePub)
3✔
253
        }
3✔
254

255
        return nil
3✔
256
}
257

258
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
259
// undirected edge from the two target nodes are created. The information stored
260
// denotes the static attributes of the channel, such as the channelID, the keys
261
// involved in creation of the channel, and the set of features that the channel
262
// supports. The chanPoint and chanID are used to uniquely identify the edge
263
// globally within the database.
264
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
265
        op ...batch.SchedulerOption) error {
1,723✔
266

1,723✔
267
        c.cacheMu.Lock()
1,723✔
268
        defer c.cacheMu.Unlock()
1,723✔
269

1,723✔
270
        err := c.KVStore.AddChannelEdge(edge, op...)
1,723✔
271
        if err != nil {
1,957✔
272
                return err
234✔
273
        }
234✔
274

275
        if c.graphCache != nil {
2,788✔
276
                c.graphCache.AddChannel(edge, nil, nil)
1,299✔
277
        }
1,299✔
278

279
        return nil
1,489✔
280
}
281

282
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
283
// If the cache is enabled, the edge will be added back to the graph cache if
284
// we still have a record of this channel in the DB.
285
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
286
        c.cacheMu.Lock()
2✔
287
        defer c.cacheMu.Unlock()
2✔
288

2✔
289
        err := c.KVStore.MarkEdgeLive(chanID)
2✔
290
        if err != nil {
3✔
291
                return err
1✔
292
        }
1✔
293

294
        if c.graphCache != nil {
2✔
295
                // We need to add the channel back into our graph cache,
1✔
296
                // otherwise we won't use it for path finding.
1✔
297
                infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
1✔
298
                if err != nil {
1✔
299
                        return err
×
300
                }
×
301

302
                if len(infos) == 0 {
2✔
303
                        return nil
1✔
304
                }
1✔
305

306
                info := infos[0]
×
307

×
308
                c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
×
309
        }
310

311
        return nil
×
312
}
313

314
// DeleteChannelEdges removes edges with the given channel IDs from the
315
// database and marks them as zombies. This ensures that we're unable to re-add
316
// it to our database once again. If an edge does not exist within the
317
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
318
// true, then when we mark these edges as zombies, we'll set up the keys such
319
// that we require the node that failed to send the fresh update to be the one
320
// that resurrects the channel from its zombie state. The markZombie bool
321
// denotes whether to mark the channel as a zombie.
322
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
323
        chanIDs ...uint64) error {
146✔
324

146✔
325
        c.cacheMu.Lock()
146✔
326
        defer c.cacheMu.Unlock()
146✔
327

146✔
328
        infos, err := c.KVStore.DeleteChannelEdges(
146✔
329
                strictZombiePruning, markZombie, chanIDs...,
146✔
330
        )
146✔
331
        if err != nil {
216✔
332
                return err
70✔
333
        }
70✔
334

335
        if c.graphCache != nil {
152✔
336
                for _, info := range infos {
101✔
337
                        c.graphCache.RemoveChannel(
25✔
338
                                info.NodeKey1Bytes, info.NodeKey2Bytes,
25✔
339
                                info.ChannelID,
25✔
340
                        )
25✔
341
                }
25✔
342
        }
343

344
        return err
76✔
345
}
346

347
// DisconnectBlockAtHeight is used to indicate that the block specified
348
// by the passed height has been disconnected from the main chain. This
349
// will "rewind" the graph back to the height below, deleting channels
350
// that are no longer confirmed from the graph. The prune log will be
351
// set to the last prune height valid for the remaining chain.
352
// Channels that were removed from the graph resulting from the
353
// disconnected block are returned.
354
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
355
        []*models.ChannelEdgeInfo, error) {
157✔
356

157✔
357
        c.cacheMu.Lock()
157✔
358
        defer c.cacheMu.Unlock()
157✔
359

157✔
360
        edges, err := c.KVStore.DisconnectBlockAtHeight(height)
157✔
361
        if err != nil {
157✔
362
                return nil, err
×
363
        }
×
364

365
        if c.graphCache != nil {
314✔
366
                for _, edge := range edges {
263✔
367
                        c.graphCache.RemoveChannel(
106✔
368
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
106✔
369
                                edge.ChannelID,
106✔
370
                        )
106✔
371
                }
106✔
372
        }
373

374
        return edges, nil
157✔
375
}
376

377
// PruneGraph prunes newly closed channels from the channel graph in response
378
// to a new block being solved on the network. Any transactions which spend the
379
// funding output of any known channels within he graph will be deleted.
380
// Additionally, the "prune tip", or the last block which has been used to
381
// prune the graph is stored so callers can ensure the graph is fully in sync
382
// with the current UTXO state. A slice of channels that have been closed by
383
// the target block are returned if the function succeeds without error.
384
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
385
        blockHash *chainhash.Hash, blockHeight uint32) (
386
        []*models.ChannelEdgeInfo, error) {
233✔
387

233✔
388
        c.cacheMu.Lock()
233✔
389
        defer c.cacheMu.Unlock()
233✔
390

233✔
391
        edges, nodes, err := c.KVStore.PruneGraph(
233✔
392
                spentOutputs, blockHash, blockHeight,
233✔
393
        )
233✔
394
        if err != nil {
233✔
395
                return nil, err
×
396
        }
×
397

398
        if c.graphCache != nil {
466✔
399
                for _, edge := range edges {
253✔
400
                        c.graphCache.RemoveChannel(
20✔
401
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
20✔
402
                                edge.ChannelID,
20✔
403
                        )
20✔
404
                }
20✔
405

406
                for _, node := range nodes {
289✔
407
                        c.graphCache.RemoveNode(node)
56✔
408
                }
56✔
409

410
                log.Debugf("Pruned graph, cache now has %s",
233✔
411
                        c.graphCache.Stats())
233✔
412
        }
413

414
        return edges, nil
233✔
415
}
416

417
// PruneGraphNodes is a garbage collection method which attempts to prune out
418
// any nodes from the channel graph that are currently unconnected. This ensure
419
// that we only maintain a graph of reachable nodes. In the event that a pruned
420
// node gains more channels, it will be re-added back to the graph.
421
func (c *ChannelGraph) PruneGraphNodes() error {
26✔
422
        c.cacheMu.Lock()
26✔
423
        defer c.cacheMu.Unlock()
26✔
424

26✔
425
        nodes, err := c.KVStore.PruneGraphNodes()
26✔
426
        if err != nil {
26✔
427
                return err
×
428
        }
×
429

430
        if c.graphCache != nil {
52✔
431
                for _, node := range nodes {
33✔
432
                        c.graphCache.RemoveNode(node)
7✔
433
                }
7✔
434
        }
435

436
        return nil
26✔
437
}
438

439
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
440
// ID's that we don't know and are not known zombies of the passed set. In other
441
// words, we perform a set difference of our set of chan ID's and the ones
442
// passed in. This method can be used by callers to determine the set of
443
// channels another peer knows of that we don't.
444
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
445
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
127✔
446

127✔
447
        unknown, knownZombies, err := c.KVStore.FilterKnownChanIDs(chansInfo)
127✔
448
        if err != nil {
127✔
449
                return nil, err
×
450
        }
×
451

452
        for _, info := range knownZombies {
172✔
453
                // TODO(ziggie): Make sure that for the strict pruning case we
45✔
454
                // compare the pubkeys and whether the right timestamp is not
45✔
455
                // older than the `ChannelPruneExpiry`.
45✔
456
                //
45✔
457
                // NOTE: The timestamp data has no verification attached to it
45✔
458
                // in the `ReplyChannelRange` msg so we are trusting this data
45✔
459
                // at this point. However it is not critical because we are just
45✔
460
                // removing the channel from the db when the timestamps are more
45✔
461
                // recent. During the querying of the gossip msg verification
45✔
462
                // happens as usual. However we should start punishing peers
45✔
463
                // when they don't provide us honest data ?
45✔
464
                isStillZombie := isZombieChan(
45✔
465
                        info.Node1UpdateTimestamp, info.Node2UpdateTimestamp,
45✔
466
                )
45✔
467

45✔
468
                if isStillZombie {
69✔
469
                        continue
24✔
470
                }
471

472
                // If we have marked it as a zombie but the latest update
473
                // timestamps could bring it back from the dead, then we mark it
474
                // alive, and we let it be added to the set of IDs to query our
475
                // peer for.
476
                err := c.KVStore.MarkEdgeLive(
21✔
477
                        info.ShortChannelID.ToUint64(),
21✔
478
                )
21✔
479
                // Since there is a chance that the edge could have been marked
21✔
480
                // as "live" between the FilterKnownChanIDs call and the
21✔
481
                // MarkEdgeLive call, we ignore the error if the edge is already
21✔
482
                // marked as live.
21✔
483
                if err != nil && !errors.Is(err, ErrZombieEdgeNotFound) {
21✔
484
                        return nil, err
×
485
                }
×
486
        }
487

488
        return unknown, nil
127✔
489
}
490

491
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
492
// zombie. This method is used on an ad-hoc basis, when channels need to be
493
// marked as zombies outside the normal pruning cycle.
494
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
495
        pubKey1, pubKey2 [33]byte) error {
134✔
496

134✔
497
        c.cacheMu.Lock()
134✔
498
        defer c.cacheMu.Unlock()
134✔
499

134✔
500
        err := c.KVStore.MarkEdgeZombie(chanID, pubKey1, pubKey2)
134✔
501
        if err != nil {
134✔
502
                return err
×
503
        }
×
504

505
        if c.graphCache != nil {
268✔
506
                c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
134✔
507
        }
134✔
508

509
        return nil
134✔
510
}
511

512
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
513
// within the database for the referenced channel. The `flags` attribute within
514
// the ChannelEdgePolicy determines which of the directed edges are being
515
// updated. If the flag is 1, then the first node's information is being
516
// updated, otherwise it's the second node's information. The node ordering is
517
// determined by the lexicographical ordering of the identity public keys of the
518
// nodes on either side of the channel.
519
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
520
        op ...batch.SchedulerOption) error {
2,666✔
521

2,666✔
522
        c.cacheMu.Lock()
2,666✔
523
        defer c.cacheMu.Unlock()
2,666✔
524

2,666✔
525
        from, to, err := c.KVStore.UpdateEdgePolicy(edge, op...)
2,666✔
526
        if err != nil {
2,669✔
527
                return err
3✔
528
        }
3✔
529

530
        if c.graphCache == nil {
3,052✔
531
                return nil
389✔
532
        }
389✔
533

534
        var isUpdate1 bool
2,277✔
535
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,419✔
536
                isUpdate1 = true
1,142✔
537
        }
1,142✔
538

539
        c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
2,277✔
540

2,277✔
541
        return nil
2,277✔
542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc