• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.08
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/btcsuite/btcd/btcutil"
8
        "github.com/lightningnetwork/lnd/graph/db/models"
9
        "github.com/lightningnetwork/lnd/lnwire"
10
        "github.com/lightningnetwork/lnd/routing/route"
11
)
12

13
// DirectedChannel is a type that stores the channel information as seen from
14
// one side of the channel.
15
type DirectedChannel struct {
16
        // ChannelID is the unique identifier of this channel.
17
        ChannelID uint64
18

19
        // IsNode1 indicates if this is the node with the smaller public key.
20
        IsNode1 bool
21

22
        // OtherNode is the public key of the node on the other end of this
23
        // channel.
24
        OtherNode route.Vertex
25

26
        // Capacity is the announced capacity of this channel in satoshis.
27
        Capacity btcutil.Amount
28

29
        // OutPolicySet is a boolean that indicates whether the node has an
30
        // outgoing policy set. For pathfinding only the existence of the policy
31
        // is important to know, not the actual content.
32
        OutPolicySet bool
33

34
        // InPolicy is the incoming policy *from* the other node to this node.
35
        // In path finding, we're walking backward from the destination to the
36
        // source, so we're always interested in the edge that arrives to us
37
        // from the other node.
38
        InPolicy *models.CachedEdgePolicy
39

40
        // Inbound fees of this node.
41
        InboundFee lnwire.Fee
42
}
43

44
// DeepCopy creates a deep copy of the channel, including the incoming policy.
45
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
3✔
46
        channelCopy := *c
3✔
47

3✔
48
        if channelCopy.InPolicy != nil {
6✔
49
                inPolicyCopy := *channelCopy.InPolicy
3✔
50
                channelCopy.InPolicy = &inPolicyCopy
3✔
51

3✔
52
                // The fields for the ToNode can be overwritten by the path
3✔
53
                // finding algorithm, which is why we need a deep copy in the
3✔
54
                // first place. So we always start out with nil values, just to
3✔
55
                // be sure they don't contain any old data.
3✔
56
                channelCopy.InPolicy.ToNodePubKey = nil
3✔
57
                channelCopy.InPolicy.ToNodeFeatures = nil
3✔
58
        }
3✔
59

60
        return &channelCopy
3✔
61
}
62

63
// GraphCache is a type that holds a minimal set of information of the public
64
// channel graph that can be used for pathfinding.
65
type GraphCache struct {
66
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
67
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
68

69
        mtx sync.RWMutex
70
}
71

72
// NewGraphCache creates a new graphCache.
73
func NewGraphCache(preAllocNumNodes int) *GraphCache {
3✔
74
        return &GraphCache{
3✔
75
                nodeChannels: make(
3✔
76
                        map[route.Vertex]map[uint64]*DirectedChannel,
3✔
77
                        // A channel connects two nodes, so we can look it up
3✔
78
                        // from both sides, meaning we get double the number of
3✔
79
                        // entries.
3✔
80
                        preAllocNumNodes*2,
3✔
81
                ),
3✔
82
                nodeFeatures: make(
3✔
83
                        map[route.Vertex]*lnwire.FeatureVector,
3✔
84
                        preAllocNumNodes,
3✔
85
                ),
3✔
86
        }
3✔
87
}
3✔
88

89
// Stats returns statistics about the current cache size.
90
func (c *GraphCache) Stats() string {
3✔
91
        c.mtx.RLock()
3✔
92
        defer c.mtx.RUnlock()
3✔
93

3✔
94
        numChannels := 0
3✔
95
        for node := range c.nodeChannels {
6✔
96
                numChannels += len(c.nodeChannels[node])
3✔
97
        }
3✔
98
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
3✔
99
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
3✔
100
                numChannels)
3✔
101
}
102

103
// AddNodeFeatures adds a graph node and its features to the cache.
104
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
105
        features *lnwire.FeatureVector) {
3✔
106

3✔
107
        c.mtx.Lock()
3✔
108
        defer c.mtx.Unlock()
3✔
109

3✔
110
        c.nodeFeatures[node] = features
3✔
111
}
3✔
112

113
// AddChannel adds a non-directed channel, meaning that the order of policy 1
114
// and policy 2 does not matter, the directionality is extracted from the info
115
// and policy flags automatically. The policy will be set as the outgoing policy
116
// on one node and the incoming policy on the peer's side.
117
func (c *GraphCache) AddChannel(info *models.ChannelEdgeInfo,
118
        policy1 *models.ChannelEdgePolicy, policy2 *models.ChannelEdgePolicy) {
3✔
119

3✔
120
        if info == nil {
3✔
121
                return
×
122
        }
×
123

124
        if policy1 != nil && policy1.IsDisabled() &&
3✔
125
                policy2 != nil && policy2.IsDisabled() {
6✔
126

3✔
127
                return
3✔
128
        }
3✔
129

130
        // Create the edge entry for both nodes.
131
        c.mtx.Lock()
3✔
132
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
3✔
133
                ChannelID: info.ChannelID,
3✔
134
                IsNode1:   true,
3✔
135
                OtherNode: info.NodeKey2Bytes,
3✔
136
                Capacity:  info.Capacity,
3✔
137
        })
3✔
138
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
3✔
139
                ChannelID: info.ChannelID,
3✔
140
                IsNode1:   false,
3✔
141
                OtherNode: info.NodeKey1Bytes,
3✔
142
                Capacity:  info.Capacity,
3✔
143
        })
3✔
144
        c.mtx.Unlock()
3✔
145

3✔
146
        // The policy's node is always the to_node. So if policy 1 has to_node
3✔
147
        // of node 2 then we have the policy 1 as seen from node 1.
3✔
148
        if policy1 != nil {
6✔
149
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
3✔
150
                if policy1.ToNode != info.NodeKey2Bytes {
3✔
UNCOV
151
                        fromNode, toNode = toNode, fromNode
×
UNCOV
152
                }
×
153
                isEdge1 := policy1.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
154
                c.UpdatePolicy(policy1, fromNode, toNode, isEdge1)
3✔
155
        }
156
        if policy2 != nil {
6✔
157
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
3✔
158
                if policy2.ToNode != info.NodeKey1Bytes {
3✔
UNCOV
159
                        fromNode, toNode = toNode, fromNode
×
UNCOV
160
                }
×
161
                isEdge1 := policy2.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
162
                c.UpdatePolicy(policy2, fromNode, toNode, isEdge1)
3✔
163
        }
164
}
165

166
// updateOrAddEdge makes sure the edge information for a node is either updated
167
// if it already exists or is added to that node's list of channels.
168
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3✔
169
        if len(c.nodeChannels[node]) == 0 {
6✔
170
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
3✔
171
        }
3✔
172

173
        c.nodeChannels[node][edge.ChannelID] = edge
3✔
174
}
175

176
// UpdatePolicy updates a single policy on both the from and to node. The order
177
// of the from and to node is not strictly important. But we assume that a
178
// channel edge was added beforehand so that the directed channel struct already
179
// exists in the cache.
180
func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
181
        toNode route.Vertex, edge1 bool) {
3✔
182

3✔
183
        // Extract inbound fee if possible and available. If there is a decoding
3✔
184
        // error, ignore this policy.
3✔
185
        var inboundFee lnwire.Fee
3✔
186
        _, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
3✔
187
        if err != nil {
3✔
188
                log.Errorf("Failed to extract records from edge policy %v: %v",
×
189
                        policy.ChannelID, err)
×
190

×
191
                return
×
192
        }
×
193

194
        c.mtx.Lock()
3✔
195
        defer c.mtx.Unlock()
3✔
196

3✔
197
        updatePolicy := func(nodeKey route.Vertex) {
6✔
198
                if len(c.nodeChannels[nodeKey]) == 0 {
3✔
199
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
200

×
201
                        return
×
202
                }
×
203

204
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
3✔
205
                if !ok {
3✔
206
                        log.Warnf("Channel=%v not found in graph cache",
×
207
                                policy.ChannelID)
×
208

×
209
                        return
×
210
                }
×
211

212
                // Edge 1 is defined as the policy for the direction of node1 to
213
                // node2.
214
                switch {
3✔
215
                // This is node 1, and it is edge 1, so this is the outgoing
216
                // policy for node 1.
217
                case channel.IsNode1 && edge1:
3✔
218
                        channel.OutPolicySet = true
3✔
219
                        channel.InboundFee = inboundFee
3✔
220

221
                // This is node 2, and it is edge 2, so this is the outgoing
222
                // policy for node 2.
223
                case !channel.IsNode1 && !edge1:
3✔
224
                        channel.OutPolicySet = true
3✔
225
                        channel.InboundFee = inboundFee
3✔
226

227
                // The other two cases left mean it's the inbound policy for the
228
                // node.
229
                default:
3✔
230
                        channel.InPolicy = models.NewCachedPolicy(policy)
3✔
231
                }
232
        }
233

234
        updatePolicy(fromNode)
3✔
235
        updatePolicy(toNode)
3✔
236
}
237

238
// RemoveNode completely removes a node and all its channels (including the
239
// peer's side).
240
func (c *GraphCache) RemoveNode(node route.Vertex) {
3✔
241
        c.mtx.Lock()
3✔
242
        defer c.mtx.Unlock()
3✔
243

3✔
244
        delete(c.nodeFeatures, node)
3✔
245

3✔
246
        // First remove all channels from the other nodes' lists.
3✔
247
        for _, channel := range c.nodeChannels[node] {
3✔
248
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
×
249
        }
×
250

251
        // Then remove our whole node completely.
252
        delete(c.nodeChannels, node)
3✔
253
}
254

255
// RemoveChannel removes a single channel between two nodes.
256
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
3✔
257
        c.mtx.Lock()
3✔
258
        defer c.mtx.Unlock()
3✔
259

3✔
260
        // Remove that one channel from both sides.
3✔
261
        c.removeChannelIfFound(node1, chanID)
3✔
262
        c.removeChannelIfFound(node2, chanID)
3✔
263
}
3✔
264

265
// removeChannelIfFound removes a single channel from one side.
266
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
3✔
267
        if len(c.nodeChannels[node]) == 0 {
6✔
268
                return
3✔
269
        }
3✔
270

271
        delete(c.nodeChannels[node], chanID)
3✔
272
}
273

274
// UpdateChannel updates the channel edge information for a specific edge. We
275
// expect the edge to already exist and be known. If it does not yet exist, this
276
// call is a no-op.
277
func (c *GraphCache) UpdateChannel(info *models.ChannelEdgeInfo) {
×
278
        c.mtx.Lock()
×
279
        defer c.mtx.Unlock()
×
280

×
281
        if len(c.nodeChannels[info.NodeKey1Bytes]) == 0 ||
×
282
                len(c.nodeChannels[info.NodeKey2Bytes]) == 0 {
×
283

×
284
                return
×
285
        }
×
286

287
        channel, ok := c.nodeChannels[info.NodeKey1Bytes][info.ChannelID]
×
288
        if ok {
×
289
                // We only expect to be called when the channel is already
×
290
                // known.
×
291
                channel.Capacity = info.Capacity
×
292
                channel.OtherNode = info.NodeKey2Bytes
×
293
        }
×
294

295
        channel, ok = c.nodeChannels[info.NodeKey2Bytes][info.ChannelID]
×
296
        if ok {
×
297
                channel.Capacity = info.Capacity
×
298
                channel.OtherNode = info.NodeKey1Bytes
×
299
        }
×
300
}
301

302
// getChannels returns a copy of the passed node's channels or nil if there
303
// isn't any.
304
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
3✔
305
        c.mtx.RLock()
3✔
306
        defer c.mtx.RUnlock()
3✔
307

3✔
308
        channels, ok := c.nodeChannels[node]
3✔
309
        if !ok {
6✔
310
                return nil
3✔
311
        }
3✔
312

313
        features, ok := c.nodeFeatures[node]
3✔
314
        if !ok {
6✔
315
                // If the features were set to nil explicitly, that's fine here.
3✔
316
                // The router will overwrite the features of the destination
3✔
317
                // node with those found in the invoice if necessary. But if we
3✔
318
                // didn't yet get a node announcement we want to mimic the
3✔
319
                // behavior of the old DB based code that would always set an
3✔
320
                // empty feature vector instead of leaving it nil.
3✔
321
                features = lnwire.EmptyFeatureVector()
3✔
322
        }
3✔
323

324
        toNodeCallback := func() route.Vertex {
6✔
325
                return node
3✔
326
        }
3✔
327

328
        i := 0
3✔
329
        channelsCopy := make([]*DirectedChannel, len(channels))
3✔
330
        for _, channel := range channels {
6✔
331
                // We need to copy the channel and policy to avoid it being
3✔
332
                // updated in the cache if the path finding algorithm sets
3✔
333
                // fields on it (currently only the ToNodeFeatures of the
3✔
334
                // policy).
3✔
335
                channelCopy := channel.DeepCopy()
3✔
336
                if channelCopy.InPolicy != nil {
6✔
337
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
3✔
338
                        channelCopy.InPolicy.ToNodeFeatures = features
3✔
339
                }
3✔
340

341
                channelsCopy[i] = channelCopy
3✔
342
                i++
3✔
343
        }
344

345
        return channelsCopy
3✔
346
}
347

348
// ForEachChannel invokes the given callback for each channel of the given node.
349
func (c *GraphCache) ForEachChannel(node route.Vertex,
350
        cb func(channel *DirectedChannel) error) error {
3✔
351

3✔
352
        // Obtain a copy of the node's channels. We need do this in order to
3✔
353
        // avoid deadlocks caused by interaction with the graph cache, channel
3✔
354
        // state and the graph database from multiple goroutines. This snapshot
3✔
355
        // is only used for path finding where being stale is acceptable since
3✔
356
        // the real world graph and our representation may always become
3✔
357
        // slightly out of sync for a short time and the actual channel state
3✔
358
        // is stored separately.
3✔
359
        channels := c.getChannels(node)
3✔
360
        for _, channel := range channels {
6✔
361
                if err := cb(channel); err != nil {
3✔
362
                        return err
×
363
                }
×
364
        }
365

366
        return nil
3✔
367
}
368

369
// ForEachNode iterates over the adjacency list of the graph, executing the
370
// call back for each node and the set of channels that emanate from the given
371
// node.
372
//
373
// NOTE: This method should be considered _read only_, the channels or nodes
374
// passed in MUST NOT be modified.
375
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
UNCOV
376
        channels map[uint64]*DirectedChannel) error) error {
×
UNCOV
377

×
UNCOV
378
        c.mtx.RLock()
×
UNCOV
379
        defer c.mtx.RUnlock()
×
UNCOV
380

×
UNCOV
381
        for node, channels := range c.nodeChannels {
×
UNCOV
382
                // We don't make a copy here since this is a read-only RPC
×
UNCOV
383
                // call. We also don't need the node features either for this
×
UNCOV
384
                // call.
×
UNCOV
385
                if err := cb(node, channels); err != nil {
×
386
                        return err
×
387
                }
×
388
        }
389

UNCOV
390
        return nil
×
391
}
392

393
// GetFeatures returns the features of the node with the given ID. If no
394
// features are known for the node, an empty feature vector is returned.
395
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
3✔
396
        c.mtx.RLock()
3✔
397
        defer c.mtx.RUnlock()
3✔
398

3✔
399
        features, ok := c.nodeFeatures[node]
3✔
400
        if !ok || features == nil {
6✔
401
                // The router expects the features to never be nil, so we return
3✔
402
                // an empty feature set instead.
3✔
403
                return lnwire.EmptyFeatureVector()
3✔
404
        }
3✔
405

406
        return features
3✔
407
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc