• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13543448815

26 Feb 2025 12:04PM UTC coverage: 49.359% (-9.5%) from 58.834%
13543448815

Pull #9552

github

ellemouton
graph/db: move cache write for UpdateEdgePolicy

To the ChannelGraph.
Pull Request #9552: graph: extract cache from CRUD [5]

15 of 19 new or added lines in 1 file covered. (78.95%)

27509 existing lines in 434 files now uncovered.

100968 of 204557 relevant lines covered (49.36%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.74
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "errors"
5
        "sync"
6
        "time"
7

8
        "github.com/lightningnetwork/lnd/batch"
9
        "github.com/lightningnetwork/lnd/graph/db/models"
10
        "github.com/lightningnetwork/lnd/kvdb"
11
        "github.com/lightningnetwork/lnd/lnwire"
12
        "github.com/lightningnetwork/lnd/routing/route"
13
)
14

15
// Config is a struct that holds all the necessary dependencies for a
16
// ChannelGraph.
17
type Config struct {
18
        // KVDB is the kvdb.Backend that will be used for initializing the
19
        // KVStore CRUD layer.
20
        KVDB kvdb.Backend
21

22
        // KVStoreOpts is a list of functional options that will be used when
23
        // initializing the KVStore.
24
        KVStoreOpts []KVStoreOptionModifier
25
}
26

27
// ChannelGraph is a layer above the graph's CRUD layer.
28
//
29
// NOTE: currently, this is purely a pass-through layer directly to the backing
30
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
31
// into this layer so that the KVStore is only responsible for CRUD operations.
32
type ChannelGraph struct {
33
        // cacheMu guards any writes to the graphCache. It should be held
34
        // across the DB write call and the graphCache update to make the
35
        // two updates as atomic as possible.
36
        cacheMu    sync.Mutex
37
        graphCache *GraphCache
38

39
        *KVStore
40
}
41

42
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
43
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
44
        error) {
3✔
45

3✔
46
        opts := defaultChanGraphOptions()
3✔
47
        for _, o := range options {
6✔
48
                o(opts)
3✔
49
        }
3✔
50

51
        store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
3✔
52
        if err != nil {
3✔
53
                return nil, err
×
54
        }
×
55

56
        if !opts.useGraphCache {
6✔
57
                return &ChannelGraph{
3✔
58
                        KVStore: store,
3✔
59
                }, nil
3✔
60
        }
3✔
61

62
        // The graph cache can be turned off (e.g. for mobile users) for a
63
        // speed/memory usage tradeoff.
64
        graphCache := NewGraphCache(opts.preAllocCacheNumNodes)
3✔
65
        startTime := time.Now()
3✔
66
        log.Debugf("Populating in-memory channel graph, this might take a " +
3✔
67
                "while...")
3✔
68

3✔
69
        err = store.ForEachNodeCacheable(func(node route.Vertex,
3✔
70
                features *lnwire.FeatureVector) error {
6✔
71

3✔
72
                graphCache.AddNodeFeatures(node, features)
3✔
73

3✔
74
                return nil
3✔
75
        })
3✔
76
        if err != nil {
3✔
77
                return nil, err
×
78
        }
×
79

80
        err = store.ForEachChannel(func(info *models.ChannelEdgeInfo,
3✔
81
                policy1, policy2 *models.ChannelEdgePolicy) error {
6✔
82

3✔
83
                graphCache.AddChannel(info, policy1, policy2)
3✔
84

3✔
85
                return nil
3✔
86
        })
3✔
87
        if err != nil {
3✔
88
                return nil, err
×
89
        }
×
90

91
        log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
3✔
92
                time.Since(startTime), graphCache.Stats())
3✔
93

3✔
94
        store.setGraphCache(graphCache)
3✔
95

3✔
96
        return &ChannelGraph{
3✔
97
                KVStore:    store,
3✔
98
                graphCache: graphCache,
3✔
99
        }, nil
3✔
100
}
101

102
// ForEachNodeDirectedChannel iterates through all channels of a given node,
103
// executing the passed callback on the directed edge representing the channel
104
// and its incoming policy. If the callback returns an error, then the iteration
105
// is halted with the error propagated back up to the caller. If the graphCache
106
// is available, then it will be used to retrieve the node's channels instead
107
// of the database.
108
//
109
// Unknown policies are passed into the callback as nil values.
110
//
111
// NOTE: this is part of the graphdb.NodeTraverser interface.
112
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
113
        cb func(channel *DirectedChannel) error) error {
3✔
114

3✔
115
        if c.graphCache != nil {
6✔
116
                return c.graphCache.ForEachChannel(node, cb)
3✔
117
        }
3✔
118

119
        return c.KVStore.ForEachNodeDirectedChannel(node, cb)
3✔
120
}
121

122
// FetchNodeFeatures returns the features of the given node. If no features are
123
// known for the node, an empty feature vector is returned.
124
// If the graphCache is available, then it will be used to retrieve the node's
125
// features instead of the database.
126
//
127
// NOTE: this is part of the graphdb.NodeTraverser interface.
128
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
129
        *lnwire.FeatureVector, error) {
3✔
130

3✔
131
        if c.graphCache != nil {
6✔
132
                return c.graphCache.GetFeatures(node), nil
3✔
133
        }
3✔
134

135
        return c.KVStore.FetchNodeFeatures(node)
3✔
136
}
137

138
// GraphSession will provide the call-back with access to a NodeTraverser
139
// instance which can be used to perform queries against the channel graph. If
140
// the graph cache is not enabled, then the call-back will be provided with
141
// access to the graph via a consistent read-only transaction.
142
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
3✔
143
        if c.graphCache != nil {
6✔
144
                return cb(c)
3✔
145
        }
3✔
146

UNCOV
147
        return c.KVStore.GraphSession(cb)
×
148
}
149

150
// ForEachNodeCached iterates through all the stored vertices/nodes in the
151
// graph, executing the passed callback with each node encountered.
152
//
153
// NOTE: The callback contents MUST not be modified.
154
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
UNCOV
155
        chans map[uint64]*DirectedChannel) error) error {
×
UNCOV
156

×
UNCOV
157
        if c.graphCache != nil {
×
158
                return c.graphCache.ForEachNode(cb)
×
159
        }
×
160

UNCOV
161
        return c.KVStore.ForEachNodeCached(cb)
×
162
}
163

164
// AddLightningNode adds a vertex/node to the graph database. If the node is not
165
// in the database from before, this will add a new, unconnected one to the
166
// graph. If it is present from before, this will update that node's
167
// information. Note that this method is expected to only be called to update an
168
// already present node from a node announcement, or to insert a node found in a
169
// channel update.
170
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
171
        op ...batch.SchedulerOption) error {
3✔
172

3✔
173
        c.cacheMu.Lock()
3✔
174
        defer c.cacheMu.Unlock()
3✔
175

3✔
176
        err := c.KVStore.AddLightningNode(node, op...)
3✔
177
        if err != nil {
3✔
178
                return err
×
179
        }
×
180

181
        if c.graphCache != nil {
6✔
182
                c.graphCache.AddNodeFeatures(
3✔
183
                        node.PubKeyBytes, node.Features,
3✔
184
                )
3✔
185
        }
3✔
186

187
        return nil
3✔
188
}
189

190
// DeleteLightningNode starts a new database transaction to remove a vertex/node
191
// from the database according to the node's public key.
UNCOV
192
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
×
UNCOV
193
        c.cacheMu.Lock()
×
UNCOV
194
        defer c.cacheMu.Unlock()
×
UNCOV
195

×
UNCOV
196
        err := c.KVStore.DeleteLightningNode(nodePub)
×
UNCOV
197
        if err != nil {
×
198
                return err
×
199
        }
×
200

UNCOV
201
        if c.graphCache != nil {
×
UNCOV
202
                c.graphCache.RemoveNode(nodePub)
×
UNCOV
203
        }
×
204

UNCOV
205
        return nil
×
206
}
207

208
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
209
// undirected edge from the two target nodes are created. The information stored
210
// denotes the static attributes of the channel, such as the channelID, the keys
211
// involved in creation of the channel, and the set of features that the channel
212
// supports. The chanPoint and chanID are used to uniquely identify the edge
213
// globally within the database.
214
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
215
        op ...batch.SchedulerOption) error {
3✔
216

3✔
217
        c.cacheMu.Lock()
3✔
218
        defer c.cacheMu.Unlock()
3✔
219

3✔
220
        err := c.KVStore.AddChannelEdge(edge, op...)
3✔
221
        if err != nil {
3✔
UNCOV
222
                return err
×
UNCOV
223
        }
×
224

225
        if c.graphCache != nil {
6✔
226
                c.graphCache.AddChannel(edge, nil, nil)
3✔
227
        }
3✔
228

229
        return nil
3✔
230
}
231

232
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
233
// If the cache is enabled, the edge will be added back to the graph cache if
234
// we still have a record of this channel in the DB.
UNCOV
235
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
×
UNCOV
236
        c.cacheMu.Lock()
×
UNCOV
237
        defer c.cacheMu.Unlock()
×
UNCOV
238

×
UNCOV
239
        err := c.KVStore.MarkEdgeLive(chanID)
×
UNCOV
240
        if err != nil {
×
UNCOV
241
                return err
×
UNCOV
242
        }
×
243

UNCOV
244
        if c.graphCache != nil {
×
UNCOV
245
                // We need to add the channel back into our graph cache,
×
UNCOV
246
                // otherwise we won't use it for path finding.
×
UNCOV
247
                infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
×
UNCOV
248
                if err != nil {
×
249
                        return err
×
250
                }
×
251

UNCOV
252
                if len(infos) == 0 {
×
UNCOV
253
                        return nil
×
UNCOV
254
                }
×
255

256
                info := infos[0]
×
257

×
258
                c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
×
259
        }
260

261
        return nil
×
262
}
263

264
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
265
// ID's that we don't know and are not known zombies of the passed set. In other
266
// words, we perform a set difference of our set of chan ID's and the ones
267
// passed in. This method can be used by callers to determine the set of
268
// channels another peer knows of that we don't.
269
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
270
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
3✔
271

3✔
272
        unknown, knownZombies, err := c.KVStore.FilterKnownChanIDs(chansInfo)
3✔
273
        if err != nil {
3✔
UNCOV
274
                return nil, err
×
UNCOV
275
        }
×
276

277
        for _, info := range knownZombies {
3✔
UNCOV
278
                // TODO(ziggie): Make sure that for the strict pruning case we
×
UNCOV
279
                // compare the pubkeys and whether the right timestamp is not
×
UNCOV
280
                // older than the `ChannelPruneExpiry`.
×
UNCOV
281
                //
×
UNCOV
282
                // NOTE: The timestamp data has no verification attached to it
×
UNCOV
283
                // in the `ReplyChannelRange` msg so we are trusting this data
×
UNCOV
284
                // at this point. However it is not critical because we are just
×
UNCOV
285
                // removing the channel from the db when the timestamps are more
×
UNCOV
286
                // recent. During the querying of the gossip msg verification
×
UNCOV
287
                // happens as usual. However we should start punishing peers
×
UNCOV
288
                // when they don't provide us honest data ?
×
UNCOV
289
                isStillZombie := isZombieChan(
×
UNCOV
290
                        info.Node1UpdateTimestamp, info.Node2UpdateTimestamp,
×
UNCOV
291
                )
×
UNCOV
292

×
UNCOV
293
                if isStillZombie {
×
UNCOV
294
                        continue
×
295
                }
296

297
                // If we have marked it as a zombie but the latest update
298
                // timestamps could bring it back from the dead, then we mark it
299
                // alive, and we let it be added to the set of IDs to query our
300
                // peer for.
UNCOV
301
                err := c.KVStore.MarkEdgeLive(
×
UNCOV
302
                        info.ShortChannelID.ToUint64(),
×
UNCOV
303
                )
×
UNCOV
304
                // Since there is a chance that the edge could have been marked
×
UNCOV
305
                // as "live" between the FilterKnownChanIDs call and the
×
UNCOV
306
                // MarkEdgeLive call, we ignore the error if the edge is already
×
UNCOV
307
                // marked as live.
×
UNCOV
308
                if err != nil && !errors.Is(err, ErrZombieEdgeNotFound) {
×
UNCOV
309
                        return nil, err
×
UNCOV
310
                }
×
311
        }
312

313
        return unknown, nil
3✔
314
}
315

316
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
317
// zombie. This method is used on an ad-hoc basis, when channels need to be
318
// marked as zombies outside the normal pruning cycle.
319
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
UNCOV
320
        pubKey1, pubKey2 [33]byte) error {
×
UNCOV
321

×
UNCOV
322
        c.cacheMu.Lock()
×
UNCOV
323
        defer c.cacheMu.Unlock()
×
UNCOV
324

×
UNCOV
325
        err := c.KVStore.MarkEdgeZombie(chanID, pubKey1, pubKey2)
×
UNCOV
326
        if err != nil {
×
UNCOV
327
                return err
×
UNCOV
328
        }
×
329

UNCOV
330
        if c.graphCache != nil {
×
UNCOV
331
                c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
×
UNCOV
332
        }
×
333

UNCOV
334
        return nil
×
335
}
336

337
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
338
// within the database for the referenced channel. The `flags` attribute within
339
// the ChannelEdgePolicy determines which of the directed edges are being
340
// updated. If the flag is 1, then the first node's information is being
341
// updated, otherwise it's the second node's information. The node ordering is
342
// determined by the lexicographical ordering of the identity public keys of the
343
// nodes on either side of the channel.
344
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
345
        op ...batch.SchedulerOption) error {
3✔
346

3✔
347
        c.cacheMu.Lock()
3✔
348
        defer c.cacheMu.Unlock()
3✔
349

3✔
350
        from, to, err := c.KVStore.UpdateEdgePolicy(edge, op...)
3✔
351
        if err != nil {
3✔
UNCOV
352
                return err
×
UNCOV
353
        }
×
354

355
        if c.graphCache == nil {
6✔
356
                return nil
3✔
357
        }
3✔
358

359
        var isUpdate1 bool
3✔
360
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
6✔
361
                isUpdate1 = true
3✔
362
        }
3✔
363

364
        c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
3✔
365

3✔
366
        return nil
3✔
367
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc