• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13565753400

27 Feb 2025 11:53AM UTC coverage: 58.748% (-0.08%) from 58.825%
13565753400

Pull #9544

github

web-flow
Merge pull request #9552 from ellemouton/graph16

graph: extract cache from CRUD [5]
Pull Request #9544: graph: move graph cache out of CRUD layer

2638 of 3370 new or added lines in 5 files covered. (78.28%)

592 existing lines in 29 files now uncovered.

136355 of 232101 relevant lines covered (58.75%)

19262.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.05
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "errors"
5
        "sync"
6
        "time"
7

8
        "github.com/btcsuite/btcd/chaincfg/chainhash"
9
        "github.com/btcsuite/btcd/wire"
10
        "github.com/lightningnetwork/lnd/batch"
11
        "github.com/lightningnetwork/lnd/graph/db/models"
12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/lnwire"
14
        "github.com/lightningnetwork/lnd/routing/route"
15
)
16

17
// Config is a struct that holds all the necessary dependencies for a
18
// ChannelGraph.
19
type Config struct {
20
        // KVDB is the kvdb.Backend that will be used for initializing the
21
        // KVStore CRUD layer.
22
        KVDB kvdb.Backend
23

24
        // KVStoreOpts is a list of functional options that will be used when
25
        // initializing the KVStore.
26
        KVStoreOpts []KVStoreOptionModifier
27
}
28

29
// ChannelGraph is a layer above the graph's CRUD layer.
30
//
31
// NOTE: currently, this is purely a pass-through layer directly to the backing
32
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
33
// into this layer so that the KVStore is only responsible for CRUD operations.
34
type ChannelGraph struct {
35
        // cacheMu guards any writes to the graphCache. It should be held
36
        // across the DB write call and the graphCache update to make the
37
        // two updates as atomic as possible.
38
        cacheMu    sync.Mutex
39
        graphCache *GraphCache
40

41
        *KVStore
42
}
43

44
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
45
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
46
        error) {
176✔
47

176✔
48
        opts := defaultChanGraphOptions()
176✔
49
        for _, o := range options {
280✔
50
                o(opts)
104✔
51
        }
104✔
52

53
        store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
176✔
54
        if err != nil {
176✔
NEW
55
                return nil, err
×
UNCOV
56
        }
×
57

58
        if !opts.useGraphCache {
211✔
59
                return &ChannelGraph{
35✔
60
                        KVStore: store,
35✔
61
                }, nil
35✔
62
        }
35✔
63

64
        // The graph cache can be turned off (e.g. for mobile users) for a
65
        // speed/memory usage tradeoff.
66
        graphCache := NewGraphCache(opts.preAllocCacheNumNodes)
143✔
67
        startTime := time.Now()
143✔
68
        log.Debugf("Populating in-memory channel graph, this might take a " +
143✔
69
                "while...")
143✔
70

143✔
71
        err = store.ForEachNodeCacheable(func(node route.Vertex,
143✔
72
                features *lnwire.FeatureVector) error {
245✔
73

102✔
74
                graphCache.AddNodeFeatures(node, features)
102✔
75

102✔
76
                return nil
102✔
77
        })
102✔
78
        if err != nil {
143✔
79
                return nil, err
×
80
        }
×
81

82
        err = store.ForEachChannel(func(info *models.ChannelEdgeInfo,
143✔
83
                policy1, policy2 *models.ChannelEdgePolicy) error {
541✔
84

398✔
85
                graphCache.AddChannel(info, policy1, policy2)
398✔
86

398✔
87
                return nil
398✔
88
        })
398✔
89
        if err != nil {
143✔
NEW
90
                return nil, err
×
91
        }
×
92

93
        log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
143✔
94
                time.Since(startTime), graphCache.Stats())
143✔
95

143✔
96
        store.setGraphCache(graphCache)
143✔
97

143✔
98
        return &ChannelGraph{
143✔
99
                KVStore:    store,
143✔
100
                graphCache: graphCache,
143✔
101
        }, nil
143✔
102
}
103

104
// ForEachNodeDirectedChannel iterates through all channels of a given node,
105
// executing the passed callback on the directed edge representing the channel
106
// and its incoming policy. If the callback returns an error, then the iteration
107
// is halted with the error propagated back up to the caller. If the graphCache
108
// is available, then it will be used to retrieve the node's channels instead
109
// of the database.
110
//
111
// Unknown policies are passed into the callback as nil values.
112
//
113
// NOTE: this is part of the graphdb.NodeTraverser interface.
114
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
115
        cb func(channel *DirectedChannel) error) error {
466✔
116

466✔
117
        if c.graphCache != nil {
929✔
118
                return c.graphCache.ForEachChannel(node, cb)
463✔
119
        }
463✔
120

121
        return c.KVStore.ForEachNodeDirectedChannel(node, cb)
5✔
122
}
123

124
// FetchNodeFeatures returns the features of the given node. If no features are
125
// known for the node, an empty feature vector is returned.
126
// If the graphCache is available, then it will be used to retrieve the node's
127
// features instead of the database.
128
//
129
// NOTE: this is part of the graphdb.NodeTraverser interface.
130
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
131
        *lnwire.FeatureVector, error) {
455✔
132

455✔
133
        if c.graphCache != nil {
910✔
134
                return c.graphCache.GetFeatures(node), nil
455✔
135
        }
455✔
136

137
        return c.KVStore.FetchNodeFeatures(node)
2✔
138
}
139

140
// GraphSession will provide the call-back with access to a NodeTraverser
141
// instance which can be used to perform queries against the channel graph. If
142
// the graph cache is not enabled, then the call-back will be provided with
143
// access to the graph via a consistent read-only transaction.
144
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
135✔
145
        if c.graphCache != nil {
216✔
146
                return cb(c)
81✔
147
        }
81✔
148

149
        return c.KVStore.GraphSession(cb)
54✔
150
}
151

152
// ForEachNodeCached iterates through all the stored vertices/nodes in the
153
// graph, executing the passed callback with each node encountered.
154
//
155
// NOTE: The callback contents MUST not be modified.
156
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
157
        chans map[uint64]*DirectedChannel) error) error {
1✔
158

1✔
159
        if c.graphCache != nil {
1✔
NEW
160
                return c.graphCache.ForEachNode(cb)
×
UNCOV
161
        }
×
162

163
        return c.KVStore.ForEachNodeCached(cb)
1✔
164
}
165

166
// AddLightningNode adds a vertex/node to the graph database. If the node is not
167
// in the database from before, this will add a new, unconnected one to the
168
// graph. If it is present from before, this will update that node's
169
// information. Note that this method is expected to only be called to update an
170
// already present node from a node announcement, or to insert a node found in a
171
// channel update.
172
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
173
        op ...batch.SchedulerOption) error {
802✔
174

802✔
175
        c.cacheMu.Lock()
802✔
176
        defer c.cacheMu.Unlock()
802✔
177

802✔
178
        err := c.KVStore.AddLightningNode(node, op...)
802✔
179
        if err != nil {
802✔
NEW
180
                return err
×
181
        }
×
182

183
        if c.graphCache != nil {
1,417✔
184
                c.graphCache.AddNodeFeatures(
615✔
185
                        node.PubKeyBytes, node.Features,
615✔
186
                )
615✔
187
        }
615✔
188

189
        return nil
802✔
190
}
191

192
// DeleteLightningNode starts a new database transaction to remove a vertex/node
193
// from the database according to the node's public key.
194
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
195
        c.cacheMu.Lock()
3✔
196
        defer c.cacheMu.Unlock()
3✔
197

3✔
198
        err := c.KVStore.DeleteLightningNode(nodePub)
3✔
199
        if err != nil {
3✔
200
                return err
×
201
        }
×
202

203
        if c.graphCache != nil {
6✔
204
                c.graphCache.RemoveNode(nodePub)
3✔
205
        }
3✔
206

207
        return nil
3✔
208
}
209

210
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
211
// undirected edge from the two target nodes are created. The information stored
212
// denotes the static attributes of the channel, such as the channelID, the keys
213
// involved in creation of the channel, and the set of features that the channel
214
// supports. The chanPoint and chanID are used to uniquely identify the edge
215
// globally within the database.
216
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
217
        op ...batch.SchedulerOption) error {
1,718✔
218

1,718✔
219
        c.cacheMu.Lock()
1,718✔
220
        defer c.cacheMu.Unlock()
1,718✔
221

1,718✔
222
        err := c.KVStore.AddChannelEdge(edge, op...)
1,718✔
223
        if err != nil {
1,952✔
224
                return err
234✔
225
        }
234✔
226

227
        if c.graphCache != nil {
2,778✔
228
                c.graphCache.AddChannel(edge, nil, nil)
1,294✔
229
        }
1,294✔
230

231
        return nil
1,484✔
232
}
233

234
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
235
// If the cache is enabled, the edge will be added back to the graph cache if
236
// we still have a record of this channel in the DB.
237
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
238
        c.cacheMu.Lock()
2✔
239
        defer c.cacheMu.Unlock()
2✔
240

2✔
241
        err := c.KVStore.MarkEdgeLive(chanID)
2✔
242
        if err != nil {
3✔
243
                return err
1✔
244
        }
1✔
245

246
        if c.graphCache != nil {
2✔
247
                // We need to add the channel back into our graph cache,
1✔
248
                // otherwise we won't use it for path finding.
1✔
249
                infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
1✔
250
                if err != nil {
1✔
251
                        return err
×
252
                }
×
253

254
                if len(infos) == 0 {
2✔
255
                        return nil
1✔
256
                }
1✔
257

NEW
258
                info := infos[0]
×
UNCOV
259

×
NEW
260
                c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
×
261
        }
262

NEW
263
        return nil
×
264
}
265

266
// DeleteChannelEdges removes edges with the given channel IDs from the
267
// database and marks them as zombies. This ensures that we're unable to re-add
268
// it to our database once again. If an edge does not exist within the
269
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
270
// true, then when we mark these edges as zombies, we'll set up the keys such
271
// that we require the node that failed to send the fresh update to be the one
272
// that resurrects the channel from its zombie state. The markZombie bool
273
// denotes whether to mark the channel as a zombie.
274
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
275
        chanIDs ...uint64) error {
146✔
276

146✔
277
        c.cacheMu.Lock()
146✔
278
        defer c.cacheMu.Unlock()
146✔
279

146✔
280
        infos, err := c.KVStore.DeleteChannelEdges(
146✔
281
                strictZombiePruning, markZombie, chanIDs...,
146✔
282
        )
146✔
283
        if err != nil {
207✔
284
                return err
61✔
285
        }
61✔
286

287
        if c.graphCache != nil {
170✔
288
                for _, info := range infos {
113✔
289
                        c.graphCache.RemoveChannel(
28✔
290
                                info.NodeKey1Bytes, info.NodeKey2Bytes,
28✔
291
                                info.ChannelID,
28✔
292
                        )
28✔
293
                }
28✔
294
        }
295

296
        return err
85✔
297
}
298

299
// DisconnectBlockAtHeight is used to indicate that the block specified
300
// by the passed height has been disconnected from the main chain. This
301
// will "rewind" the graph back to the height below, deleting channels
302
// that are no longer confirmed from the graph. The prune log will be
303
// set to the last prune height valid for the remaining chain.
304
// Channels that were removed from the graph resulting from the
305
// disconnected block are returned.
306
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
307
        []*models.ChannelEdgeInfo, error) {
150✔
308

150✔
309
        c.cacheMu.Lock()
150✔
310
        defer c.cacheMu.Unlock()
150✔
311

150✔
312
        edges, err := c.KVStore.DisconnectBlockAtHeight(height)
150✔
313
        if err != nil {
150✔
314
                return nil, err
×
315
        }
×
316

317
        if c.graphCache != nil {
300✔
318
                for _, edge := range edges {
243✔
319
                        c.graphCache.RemoveChannel(
93✔
320
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
93✔
321
                                edge.ChannelID,
93✔
322
                        )
93✔
323
                }
93✔
324
        }
325

326
        return edges, nil
150✔
327
}
328

329
// PruneGraph prunes newly closed channels from the channel graph in response
330
// to a new block being solved on the network. Any transactions which spend the
331
// funding output of any known channels within he graph will be deleted.
332
// Additionally, the "prune tip", or the last block which has been used to
333
// prune the graph is stored so callers can ensure the graph is fully in sync
334
// with the current UTXO state. A slice of channels that have been closed by
335
// the target block are returned if the function succeeds without error.
336
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
337
        blockHash *chainhash.Hash, blockHeight uint32) (
338
        []*models.ChannelEdgeInfo, error) {
249✔
339

249✔
340
        c.cacheMu.Lock()
249✔
341
        defer c.cacheMu.Unlock()
249✔
342

249✔
343
        edges, nodes, err := c.KVStore.PruneGraph(
249✔
344
                spentOutputs, blockHash, blockHeight,
249✔
345
        )
249✔
346
        if err != nil {
249✔
347
                return nil, err
×
UNCOV
348
        }
×
349

350
        if c.graphCache != nil {
498✔
351
                for _, edge := range edges {
272✔
352
                        c.graphCache.RemoveChannel(
23✔
353
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
23✔
354
                                edge.ChannelID,
23✔
355
                        )
23✔
356
                }
23✔
357

358
                for _, node := range nodes {
302✔
359
                        c.graphCache.RemoveNode(node)
53✔
360
                }
53✔
361

362
                log.Debugf("Pruned graph, cache now has %s",
249✔
363
                        c.graphCache.Stats())
249✔
364
        }
365

366
        return edges, nil
249✔
367
}
368

369
// PruneGraphNodes is a garbage collection method which attempts to prune out
370
// any nodes from the channel graph that are currently unconnected. This ensure
371
// that we only maintain a graph of reachable nodes. In the event that a pruned
372
// node gains more channels, it will be re-added back to the graph.
373
func (c *ChannelGraph) PruneGraphNodes() error {
25✔
374
        c.cacheMu.Lock()
25✔
375
        defer c.cacheMu.Unlock()
25✔
376

25✔
377
        nodes, err := c.KVStore.PruneGraphNodes()
25✔
378
        if err != nil {
25✔
NEW
379
                return err
×
NEW
380
        }
×
381

382
        if c.graphCache != nil {
50✔
383
                for _, node := range nodes {
32✔
384
                        c.graphCache.RemoveNode(node)
7✔
385
                }
7✔
386
        }
387

388
        return nil
25✔
389
}
390

391
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
392
// ID's that we don't know and are not known zombies of the passed set. In other
393
// words, we perform a set difference of our set of chan ID's and the ones
394
// passed in. This method can be used by callers to determine the set of
395
// channels another peer knows of that we don't.
396
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
397
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
131✔
398

131✔
399
        unknown, knownZombies, err := c.KVStore.FilterKnownChanIDs(chansInfo)
131✔
400
        if err != nil {
131✔
401
                return nil, err
×
402
        }
×
403

404
        for _, info := range knownZombies {
175✔
405
                // TODO(ziggie): Make sure that for the strict pruning case we
44✔
406
                // compare the pubkeys and whether the right timestamp is not
44✔
407
                // older than the `ChannelPruneExpiry`.
44✔
408
                //
44✔
409
                // NOTE: The timestamp data has no verification attached to it
44✔
410
                // in the `ReplyChannelRange` msg so we are trusting this data
44✔
411
                // at this point. However it is not critical because we are just
44✔
412
                // removing the channel from the db when the timestamps are more
44✔
413
                // recent. During the querying of the gossip msg verification
44✔
414
                // happens as usual. However we should start punishing peers
44✔
415
                // when they don't provide us honest data ?
44✔
416
                isStillZombie := isZombieChan(
44✔
417
                        info.Node1UpdateTimestamp, info.Node2UpdateTimestamp,
44✔
418
                )
44✔
419

44✔
420
                if isStillZombie {
73✔
421
                        continue
29✔
422
                }
423

424
                // If we have marked it as a zombie but the latest update
425
                // timestamps could bring it back from the dead, then we mark it
426
                // alive, and we let it be added to the set of IDs to query our
427
                // peer for.
428
                err := c.KVStore.MarkEdgeLive(
15✔
429
                        info.ShortChannelID.ToUint64(),
15✔
430
                )
15✔
431
                // Since there is a chance that the edge could have been marked
15✔
432
                // as "live" between the FilterKnownChanIDs call and the
15✔
433
                // MarkEdgeLive call, we ignore the error if the edge is already
15✔
434
                // marked as live.
15✔
435
                if err != nil && !errors.Is(err, ErrZombieEdgeNotFound) {
15✔
NEW
436
                        return nil, err
×
NEW
437
                }
×
438
        }
439

440
        return unknown, nil
131✔
441
}
442

443
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
444
// zombie. This method is used on an ad-hoc basis, when channels need to be
445
// marked as zombies outside the normal pruning cycle.
446
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
447
        pubKey1, pubKey2 [33]byte) error {
128✔
448

128✔
449
        c.cacheMu.Lock()
128✔
450
        defer c.cacheMu.Unlock()
128✔
451

128✔
452
        err := c.KVStore.MarkEdgeZombie(chanID, pubKey1, pubKey2)
128✔
453
        if err != nil {
128✔
NEW
454
                return err
×
NEW
455
        }
×
456

457
        if c.graphCache != nil {
256✔
458
                c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
128✔
459
        }
128✔
460

461
        return nil
128✔
462
}
463

464
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
465
// within the database for the referenced channel. The `flags` attribute within
466
// the ChannelEdgePolicy determines which of the directed edges are being
467
// updated. If the flag is 1, then the first node's information is being
468
// updated, otherwise it's the second node's information. The node ordering is
469
// determined by the lexicographical ordering of the identity public keys of the
470
// nodes on either side of the channel.
471
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
472
        op ...batch.SchedulerOption) error {
2,665✔
473

2,665✔
474
        c.cacheMu.Lock()
2,665✔
475
        defer c.cacheMu.Unlock()
2,665✔
476

2,665✔
477
        from, to, err := c.KVStore.UpdateEdgePolicy(edge, op...)
2,665✔
478
        if err != nil {
2,668✔
479
                return err
3✔
480
        }
3✔
481

482
        if c.graphCache == nil {
3,050✔
483
                return nil
388✔
484
        }
388✔
485

486
        var isUpdate1 bool
2,276✔
487
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,417✔
488
                isUpdate1 = true
1,141✔
489
        }
1,141✔
490

491
        c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
2,276✔
492

2,276✔
493
        return nil
2,276✔
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc