• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13543384853

26 Feb 2025 11:57AM UTC coverage: 58.865% (+0.03%) from 58.834%
13543384853

Pull #9551

github

ellemouton
graph/db: move cache writes for Prune methods

This commit moves the cache writes for PruneGraphNodes and PruneGraph
from the KVStore to the ChannelGraph.
Pull Request #9551: graph: extract cache from CRUD [4]

113 of 135 new or added lines in 2 files covered. (83.7%)

275 existing lines in 12 files now uncovered.

136522 of 231922 relevant lines covered (58.87%)

19149.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.41
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "sync"
5
        "time"
6

7
        "github.com/btcsuite/btcd/chaincfg/chainhash"
8
        "github.com/btcsuite/btcd/wire"
9
        "github.com/lightningnetwork/lnd/batch"
10
        "github.com/lightningnetwork/lnd/graph/db/models"
11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lnwire"
13
        "github.com/lightningnetwork/lnd/routing/route"
14
)
15

16
// Config is a struct that holds all the necessary dependencies for a
17
// ChannelGraph.
18
type Config struct {
19
        // KVDB is the kvdb.Backend that will be used for initializing the
20
        // KVStore CRUD layer.
21
        KVDB kvdb.Backend
22

23
        // KVStoreOpts is a list of functional options that will be used when
24
        // initializing the KVStore.
25
        KVStoreOpts []KVStoreOptionModifier
26
}
27

28
// ChannelGraph is a layer above the graph's CRUD layer.
29
//
30
// NOTE: currently, this is purely a pass-through layer directly to the backing
31
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
32
// into this layer so that the KVStore is only responsible for CRUD operations.
33
type ChannelGraph struct {
34
        // cacheMu guards any writes to the graphCache. It should be held
35
        // across the DB write call and the graphCache update to make the
36
        // two updates as atomic as possible.
37
        cacheMu    sync.Mutex
38
        graphCache *GraphCache
39

40
        *KVStore
41
}
42

43
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
44
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
45
        error) {
176✔
46

176✔
47
        opts := defaultChanGraphOptions()
176✔
48
        for _, o := range options {
281✔
49
                o(opts)
105✔
50
        }
105✔
51

52
        store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
176✔
53
        if err != nil {
176✔
54
                return nil, err
×
55
        }
×
56

57
        if !opts.useGraphCache {
212✔
58
                return &ChannelGraph{
36✔
59
                        KVStore: store,
36✔
60
                }, nil
36✔
61
        }
36✔
62

63
        // The graph cache can be turned off (e.g. for mobile users) for a
64
        // speed/memory usage tradeoff.
65
        graphCache := NewGraphCache(opts.preAllocCacheNumNodes)
143✔
66
        startTime := time.Now()
143✔
67
        log.Debugf("Populating in-memory channel graph, this might take a " +
143✔
68
                "while...")
143✔
69

143✔
70
        err = store.ForEachNodeCacheable(func(node route.Vertex,
143✔
71
                features *lnwire.FeatureVector) error {
246✔
72

103✔
73
                graphCache.AddNodeFeatures(node, features)
103✔
74

103✔
75
                return nil
103✔
76
        })
103✔
77
        if err != nil {
143✔
78
                return nil, err
×
79
        }
×
80

81
        err = store.ForEachChannel(func(info *models.ChannelEdgeInfo,
143✔
82
                policy1, policy2 *models.ChannelEdgePolicy) error {
542✔
83

399✔
84
                graphCache.AddChannel(info, policy1, policy2)
399✔
85

399✔
86
                return nil
399✔
87
        })
399✔
88
        if err != nil {
143✔
89
                return nil, err
×
90
        }
×
91

92
        log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
143✔
93
                time.Since(startTime), graphCache.Stats())
143✔
94

143✔
95
        store.setGraphCache(graphCache)
143✔
96

143✔
97
        return &ChannelGraph{
143✔
98
                KVStore:    store,
143✔
99
                graphCache: graphCache,
143✔
100
        }, nil
143✔
101
}
102

103
// ForEachNodeDirectedChannel iterates through all channels of a given node,
104
// executing the passed callback on the directed edge representing the channel
105
// and its incoming policy. If the callback returns an error, then the iteration
106
// is halted with the error propagated back up to the caller. If the graphCache
107
// is available, then it will be used to retrieve the node's channels instead
108
// of the database.
109
//
110
// Unknown policies are passed into the callback as nil values.
111
//
112
// NOTE: this is part of the graphdb.NodeTraverser interface.
113
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
114
        cb func(channel *DirectedChannel) error) error {
467✔
115

467✔
116
        if c.graphCache != nil {
931✔
117
                return c.graphCache.ForEachChannel(node, cb)
464✔
118
        }
464✔
119

120
        return c.KVStore.ForEachNodeDirectedChannel(node, cb)
6✔
121
}
122

123
// FetchNodeFeatures returns the features of the given node. If no features are
124
// known for the node, an empty feature vector is returned.
125
// If the graphCache is available, then it will be used to retrieve the node's
126
// features instead of the database.
127
//
128
// NOTE: this is part of the graphdb.NodeTraverser interface.
129
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
130
        *lnwire.FeatureVector, error) {
456✔
131

456✔
132
        if c.graphCache != nil {
912✔
133
                return c.graphCache.GetFeatures(node), nil
456✔
134
        }
456✔
135

136
        return c.KVStore.FetchNodeFeatures(node)
3✔
137
}
138

139
// GraphSession will provide the call-back with access to a NodeTraverser
140
// instance which can be used to perform queries against the channel graph. If
141
// the graph cache is not enabled, then the call-back will be provided with
142
// access to the graph via a consistent read-only transaction.
143
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
136✔
144
        if c.graphCache != nil {
218✔
145
                return cb(c)
82✔
146
        }
82✔
147

148
        return c.KVStore.GraphSession(cb)
54✔
149
}
150

151
// ForEachNodeCached iterates through all the stored vertices/nodes in the
152
// graph, executing the passed callback with each node encountered.
153
//
154
// NOTE: The callback contents MUST not be modified.
155
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
156
        chans map[uint64]*DirectedChannel) error) error {
1✔
157

1✔
158
        if c.graphCache != nil {
1✔
159
                return c.graphCache.ForEachNode(cb)
×
160
        }
×
161

162
        return c.KVStore.ForEachNodeCached(cb)
1✔
163
}
164

165
// AddLightningNode adds a vertex/node to the graph database. If the node is not
166
// in the database from before, this will add a new, unconnected one to the
167
// graph. If it is present from before, this will update that node's
168
// information. Note that this method is expected to only be called to update an
169
// already present node from a node announcement, or to insert a node found in a
170
// channel update.
171
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
172
        op ...batch.SchedulerOption) error {
803✔
173

803✔
174
        c.cacheMu.Lock()
803✔
175
        defer c.cacheMu.Unlock()
803✔
176

803✔
177
        err := c.KVStore.AddLightningNode(node, op...)
803✔
178
        if err != nil {
803✔
179
                return err
×
180
        }
×
181

182
        if c.graphCache != nil {
1,419✔
183
                c.graphCache.AddNodeFeatures(
616✔
184
                        node.PubKeyBytes, node.Features,
616✔
185
                )
616✔
186
        }
616✔
187

188
        return nil
803✔
189
}
190

191
// DeleteLightningNode starts a new database transaction to remove a vertex/node
192
// from the database according to the node's public key.
193
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
194
        c.cacheMu.Lock()
3✔
195
        defer c.cacheMu.Unlock()
3✔
196

3✔
197
        err := c.KVStore.DeleteLightningNode(nodePub)
3✔
198
        if err != nil {
3✔
199
                return err
×
200
        }
×
201

202
        if c.graphCache != nil {
6✔
203
                c.graphCache.RemoveNode(nodePub)
3✔
204
        }
3✔
205

206
        return nil
3✔
207
}
208

209
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
210
// undirected edge from the two target nodes are created. The information stored
211
// denotes the static attributes of the channel, such as the channelID, the keys
212
// involved in creation of the channel, and the set of features that the channel
213
// supports. The chanPoint and chanID are used to uniquely identify the edge
214
// globally within the database.
215
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
216
        op ...batch.SchedulerOption) error {
1,718✔
217

1,718✔
218
        c.cacheMu.Lock()
1,718✔
219
        defer c.cacheMu.Unlock()
1,718✔
220

1,718✔
221
        err := c.KVStore.AddChannelEdge(edge, op...)
1,718✔
222
        if err != nil {
1,952✔
223
                return err
234✔
224
        }
234✔
225

226
        if c.graphCache != nil {
2,778✔
227
                c.graphCache.AddChannel(edge, nil, nil)
1,294✔
228
        }
1,294✔
229

230
        return nil
1,484✔
231
}
232

233
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
234
// If the cache is enabled, the edge will be added back to the graph cache if
235
// we still have a record of this channel in the DB.
236
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
237
        c.cacheMu.Lock()
2✔
238
        defer c.cacheMu.Unlock()
2✔
239

2✔
240
        err := c.KVStore.MarkEdgeLive(chanID)
2✔
241
        if err != nil {
3✔
242
                return err
1✔
243
        }
1✔
244

245
        if c.graphCache != nil {
2✔
246
                // We need to add the channel back into our graph cache,
1✔
247
                // otherwise we won't use it for path finding.
1✔
248
                infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
1✔
249
                if err != nil {
1✔
250
                        return err
×
251
                }
×
252

253
                if len(infos) == 0 {
2✔
254
                        return nil
1✔
255
                }
1✔
256

257
                info := infos[0]
×
258

×
259
                c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
×
260
        }
261

262
        return nil
×
263
}
264

265
// DeleteChannelEdges removes edges with the given channel IDs from the
266
// database and marks them as zombies. This ensures that we're unable to re-add
267
// it to our database once again. If an edge does not exist within the
268
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
269
// true, then when we mark these edges as zombies, we'll set up the keys such
270
// that we require the node that failed to send the fresh update to be the one
271
// that resurrects the channel from its zombie state. The markZombie bool
272
// denotes whether to mark the channel as a zombie.
273
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
274
        chanIDs ...uint64) error {
149✔
275

149✔
276
        c.cacheMu.Lock()
149✔
277
        defer c.cacheMu.Unlock()
149✔
278

149✔
279
        infos, err := c.KVStore.DeleteChannelEdges(
149✔
280
                strictZombiePruning, markZombie, chanIDs...,
149✔
281
        )
149✔
282
        if err != nil {
210✔
283
                return err
61✔
284
        }
61✔
285

286
        if c.graphCache != nil {
176✔
287
                for _, info := range infos {
116✔
288
                        c.graphCache.RemoveChannel(
28✔
289
                                info.NodeKey1Bytes, info.NodeKey2Bytes,
28✔
290
                                info.ChannelID,
28✔
291
                        )
28✔
292
                }
28✔
293
        }
294

295
        return err
88✔
296
}
297

298
// DisconnectBlockAtHeight is used to indicate that the block specified
299
// by the passed height has been disconnected from the main chain. This
300
// will "rewind" the graph back to the height below, deleting channels
301
// that are no longer confirmed from the graph. The prune log will be
302
// set to the last prune height valid for the remaining chain.
303
// Channels that were removed from the graph resulting from the
304
// disconnected block are returned.
305
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
306
        []*models.ChannelEdgeInfo, error) {
161✔
307

161✔
308
        c.cacheMu.Lock()
161✔
309
        defer c.cacheMu.Unlock()
161✔
310

161✔
311
        edges, err := c.KVStore.DisconnectBlockAtHeight(height)
161✔
312
        if err != nil {
161✔
NEW
313
                return nil, err
×
NEW
314
        }
×
315

316
        if c.graphCache != nil {
322✔
317
                for _, edge := range edges {
257✔
318
                        c.graphCache.RemoveChannel(
96✔
319
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
96✔
320
                                edge.ChannelID,
96✔
321
                        )
96✔
322
                }
96✔
323
        }
324

325
        return edges, nil
161✔
326
}
327

328
// PruneGraph prunes newly closed channels from the channel graph in response
329
// to a new block being solved on the network. Any transactions which spend the
330
// funding output of any known channels within he graph will be deleted.
331
// Additionally, the "prune tip", or the last block which has been used to
332
// prune the graph is stored so callers can ensure the graph is fully in sync
333
// with the current UTXO state. A slice of channels that have been closed by
334
// the target block are returned if the function succeeds without error.
335
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
336
        blockHash *chainhash.Hash, blockHeight uint32) (
337
        []*models.ChannelEdgeInfo, error) {
242✔
338

242✔
339
        c.cacheMu.Lock()
242✔
340
        defer c.cacheMu.Unlock()
242✔
341

242✔
342
        edges, nodes, err := c.KVStore.PruneGraph(
242✔
343
                spentOutputs, blockHash, blockHeight,
242✔
344
        )
242✔
345
        if err != nil {
242✔
NEW
346
                return nil, err
×
NEW
347
        }
×
348

349
        if c.graphCache != nil {
484✔
350
                for _, edge := range edges {
266✔
351
                        c.graphCache.RemoveChannel(
24✔
352
                                edge.NodeKey1Bytes, edge.NodeKey2Bytes,
24✔
353
                                edge.ChannelID,
24✔
354
                        )
24✔
355
                }
24✔
356

357
                for _, node := range nodes {
298✔
358
                        c.graphCache.RemoveNode(node)
56✔
359
                }
56✔
360

361
                log.Debugf("Pruned graph, cache now has %s",
242✔
362
                        c.graphCache.Stats())
242✔
363
        }
364

365
        return edges, nil
242✔
366
}
367

368
// PruneGraphNodes is a garbage collection method which attempts to prune out
369
// any nodes from the channel graph that are currently unconnected. This ensure
370
// that we only maintain a graph of reachable nodes. In the event that a pruned
371
// node gains more channels, it will be re-added back to the graph.
372
func (c *ChannelGraph) PruneGraphNodes() error {
26✔
373
        c.cacheMu.Lock()
26✔
374
        defer c.cacheMu.Unlock()
26✔
375

26✔
376
        nodes, err := c.KVStore.PruneGraphNodes()
26✔
377
        if err != nil {
26✔
NEW
378
                return err
×
NEW
379
        }
×
380

381
        if c.graphCache != nil {
52✔
382
                for _, node := range nodes {
33✔
383
                        c.graphCache.RemoveNode(node)
7✔
384
                }
7✔
385
        }
386

387
        return nil
26✔
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc