• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13541816950

26 Feb 2025 10:30AM UTC coverage: 58.836% (+0.02%) from 58.815%
13541816950

Pull #9550

github

ellemouton
graph/db: move various cache write calls to ChannelGraph

Here, we move the graph cache writes for AddLightningNode,
DeleteLightningNode, AddChannelEdge and MarkEdgeLive to the
ChannelGraph. Since these are writes, the cache is only updated if the
DB write is successful.
Pull Request #9550: graph: extract cache from CRUD [3]

73 of 85 new or added lines in 1 file covered. (85.88%)

288 existing lines in 17 files now uncovered.

136413 of 231851 relevant lines covered (58.84%)

19233.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.57
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "sync"
5
        "time"
6

7
        "github.com/lightningnetwork/lnd/batch"
8
        "github.com/lightningnetwork/lnd/graph/db/models"
9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// Config is a struct that holds all the necessary dependencies for a
15
// ChannelGraph.
16
type Config struct {
17
        // KVDB is the kvdb.Backend that will be used for initializing the
18
        // KVStore CRUD layer.
19
        KVDB kvdb.Backend
20

21
        // KVStoreOpts is a list of functional options that will be used when
22
        // initializing the KVStore.
23
        KVStoreOpts []KVStoreOptionModifier
24
}
25

26
// ChannelGraph is a layer above the graph's CRUD layer.
27
//
28
// NOTE: currently, this is purely a pass-through layer directly to the backing
29
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
30
// into this layer so that the KVStore is only responsible for CRUD operations.
31
type ChannelGraph struct {
32
        // cacheMu guards any writes to the graphCache. It should be held
33
        // across the DB write call and the graphCache update to make the
34
        // two updates as atomic as possible.
35
        cacheMu    sync.Mutex
36
        graphCache *GraphCache
37

38
        *KVStore
39
}
40

41
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
42
func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
43
        error) {
176✔
44

176✔
45
        opts := defaultChanGraphOptions()
176✔
46
        for _, o := range options {
281✔
47
                o(opts)
105✔
48
        }
105✔
49

50
        store, err := NewKVStore(cfg.KVDB, cfg.KVStoreOpts...)
176✔
51
        if err != nil {
176✔
52
                return nil, err
×
53
        }
×
54

55
        if !opts.useGraphCache {
212✔
56
                return &ChannelGraph{
36✔
57
                        KVStore: store,
36✔
58
                }, nil
36✔
59
        }
36✔
60

61
        // The graph cache can be turned off (e.g. for mobile users) for a
62
        // speed/memory usage tradeoff.
63
        graphCache := NewGraphCache(opts.preAllocCacheNumNodes)
143✔
64
        startTime := time.Now()
143✔
65
        log.Debugf("Populating in-memory channel graph, this might take a " +
143✔
66
                "while...")
143✔
67

143✔
68
        err = store.ForEachNodeCacheable(func(node route.Vertex,
143✔
69
                features *lnwire.FeatureVector) error {
246✔
70

103✔
71
                graphCache.AddNodeFeatures(node, features)
103✔
72

103✔
73
                return nil
103✔
74
        })
103✔
75
        if err != nil {
143✔
76
                return nil, err
×
77
        }
×
78

79
        err = store.ForEachChannel(func(info *models.ChannelEdgeInfo,
143✔
80
                policy1, policy2 *models.ChannelEdgePolicy) error {
542✔
81

399✔
82
                graphCache.AddChannel(info, policy1, policy2)
399✔
83

399✔
84
                return nil
399✔
85
        })
399✔
86
        if err != nil {
143✔
87
                return nil, err
×
88
        }
×
89

90
        log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
143✔
91
                time.Since(startTime), graphCache.Stats())
143✔
92

143✔
93
        store.setGraphCache(graphCache)
143✔
94

143✔
95
        return &ChannelGraph{
143✔
96
                KVStore:    store,
143✔
97
                graphCache: graphCache,
143✔
98
        }, nil
143✔
99
}
100

101
// ForEachNodeDirectedChannel iterates through all channels of a given node,
102
// executing the passed callback on the directed edge representing the channel
103
// and its incoming policy. If the callback returns an error, then the iteration
104
// is halted with the error propagated back up to the caller. If the graphCache
105
// is available, then it will be used to retrieve the node's channels instead
106
// of the database.
107
//
108
// Unknown policies are passed into the callback as nil values.
109
//
110
// NOTE: this is part of the graphdb.NodeTraverser interface.
111
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
112
        cb func(channel *DirectedChannel) error) error {
467✔
113

467✔
114
        if c.graphCache != nil {
931✔
115
                return c.graphCache.ForEachChannel(node, cb)
464✔
116
        }
464✔
117

118
        return c.KVStore.ForEachNodeDirectedChannel(node, cb)
6✔
119
}
120

121
// FetchNodeFeatures returns the features of the given node. If no features are
122
// known for the node, an empty feature vector is returned.
123
// If the graphCache is available, then it will be used to retrieve the node's
124
// features instead of the database.
125
//
126
// NOTE: this is part of the graphdb.NodeTraverser interface.
127
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
128
        *lnwire.FeatureVector, error) {
456✔
129

456✔
130
        if c.graphCache != nil {
912✔
131
                return c.graphCache.GetFeatures(node), nil
456✔
132
        }
456✔
133

134
        return c.KVStore.FetchNodeFeatures(node)
3✔
135
}
136

137
// GraphSession will provide the call-back with access to a NodeTraverser
138
// instance which can be used to perform queries against the channel graph. If
139
// the graph cache is not enabled, then the call-back will be provided with
140
// access to the graph via a consistent read-only transaction.
141
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
136✔
142
        if c.graphCache != nil {
218✔
143
                return cb(c)
82✔
144
        }
82✔
145

146
        return c.KVStore.GraphSession(cb)
54✔
147
}
148

149
// ForEachNodeCached iterates through all the stored vertices/nodes in the
150
// graph, executing the passed callback with each node encountered.
151
//
152
// NOTE: The callback contents MUST not be modified.
153
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
154
        chans map[uint64]*DirectedChannel) error) error {
1✔
155

1✔
156
        if c.graphCache != nil {
1✔
NEW
157
                return c.graphCache.ForEachNode(cb)
×
NEW
158
        }
×
159

160
        return c.KVStore.ForEachNodeCached(cb)
1✔
161
}
162

163
// AddLightningNode adds a vertex/node to the graph database. If the node is not
164
// in the database from before, this will add a new, unconnected one to the
165
// graph. If it is present from before, this will update that node's
166
// information. Note that this method is expected to only be called to update an
167
// already present node from a node announcement, or to insert a node found in a
168
// channel update.
169
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
170
        op ...batch.SchedulerOption) error {
803✔
171

803✔
172
        c.cacheMu.Lock()
803✔
173
        defer c.cacheMu.Unlock()
803✔
174

803✔
175
        err := c.KVStore.AddLightningNode(node, op...)
803✔
176
        if err != nil {
803✔
NEW
177
                return err
×
NEW
178
        }
×
179

180
        if c.graphCache != nil {
1,419✔
181
                c.graphCache.AddNodeFeatures(
616✔
182
                        node.PubKeyBytes, node.Features,
616✔
183
                )
616✔
184
        }
616✔
185

186
        return nil
803✔
187
}
188

189
// DeleteLightningNode starts a new database transaction to remove a vertex/node
190
// from the database according to the node's public key.
191
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
192
        c.cacheMu.Lock()
3✔
193
        defer c.cacheMu.Unlock()
3✔
194

3✔
195
        err := c.KVStore.DeleteLightningNode(nodePub)
3✔
196
        if err != nil {
3✔
NEW
197
                return err
×
NEW
198
        }
×
199

200
        if c.graphCache != nil {
6✔
201
                c.graphCache.RemoveNode(nodePub)
3✔
202
        }
3✔
203

204
        return nil
3✔
205
}
206

207
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
208
// undirected edge from the two target nodes are created. The information stored
209
// denotes the static attributes of the channel, such as the channelID, the keys
210
// involved in creation of the channel, and the set of features that the channel
211
// supports. The chanPoint and chanID are used to uniquely identify the edge
212
// globally within the database.
213
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
214
        op ...batch.SchedulerOption) error {
1,717✔
215

1,717✔
216
        c.cacheMu.Lock()
1,717✔
217
        defer c.cacheMu.Unlock()
1,717✔
218

1,717✔
219
        err := c.KVStore.AddChannelEdge(edge, op...)
1,717✔
220
        if err != nil {
1,951✔
221
                return err
234✔
222
        }
234✔
223

224
        if c.graphCache != nil {
2,776✔
225
                c.graphCache.AddChannel(edge, nil, nil)
1,293✔
226
        }
1,293✔
227

228
        return nil
1,483✔
229
}
230

231
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
232
// If the cache is enabled, the edge will be added back to the graph cache if
233
// we still have a record of this channel in the DB.
234
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
235
        c.cacheMu.Lock()
2✔
236
        defer c.cacheMu.Unlock()
2✔
237

2✔
238
        err := c.KVStore.MarkEdgeLive(chanID)
2✔
239
        if err != nil {
3✔
240
                return err
1✔
241
        }
1✔
242

243
        if c.graphCache != nil {
2✔
244
                // We need to add the channel back into our graph cache,
1✔
245
                // otherwise we won't use it for path finding.
1✔
246
                infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
1✔
247
                if err != nil {
1✔
NEW
248
                        return err
×
NEW
249
                }
×
250

251
                if len(infos) == 0 {
2✔
252
                        return nil
1✔
253
                }
1✔
254

NEW
255
                info := infos[0]
×
NEW
256

×
NEW
257
                c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
×
258
        }
259

NEW
260
        return nil
×
261
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc