• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.62
/graph/db/graph_cache.go
1
package graphdb
2

3
import (
4
        "fmt"
5
        "sync"
6

7
        "github.com/btcsuite/btcd/btcutil"
8
        "github.com/lightningnetwork/lnd/graph/db/models"
9
        "github.com/lightningnetwork/lnd/kvdb"
10
        "github.com/lightningnetwork/lnd/lnwire"
11
        "github.com/lightningnetwork/lnd/routing/route"
12
)
13

14
// GraphCacheNode is an interface for all the information the cache needs to know
15
// about a lightning node.
16
type GraphCacheNode interface {
17
        // PubKey is the node's public identity key.
18
        PubKey() route.Vertex
19

20
        // Features returns the node's p2p features.
21
        Features() *lnwire.FeatureVector
22

23
        // ForEachChannel iterates through all channels of a given node,
24
        // executing the passed callback with an edge info structure and the
25
        // policies of each end of the channel. The first edge policy is the
26
        // outgoing edge *to* the connecting node, while the second is the
27
        // incoming edge *from* the connecting node. If the callback returns an
28
        // error, then the iteration is halted with the error propagated back up
29
        // to the caller.
30
        ForEachChannel(kvdb.RTx,
31
                func(kvdb.RTx, *models.ChannelEdgeInfo,
32
                        *models.ChannelEdgePolicy,
33
                        *models.ChannelEdgePolicy) error) error
34
}
35

36
// DirectedChannel is a type that stores the channel information as seen from
37
// one side of the channel.
38
type DirectedChannel struct {
39
        // ChannelID is the unique identifier of this channel.
40
        ChannelID uint64
41

42
        // IsNode1 indicates if this is the node with the smaller public key.
43
        IsNode1 bool
44

45
        // OtherNode is the public key of the node on the other end of this
46
        // channel.
47
        OtherNode route.Vertex
48

49
        // Capacity is the announced capacity of this channel in satoshis.
50
        Capacity btcutil.Amount
51

52
        // OutPolicySet is a boolean that indicates whether the node has an
53
        // outgoing policy set. For pathfinding only the existence of the policy
54
        // is important to know, not the actual content.
55
        OutPolicySet bool
56

57
        // InPolicy is the incoming policy *from* the other node to this node.
58
        // In path finding, we're walking backward from the destination to the
59
        // source, so we're always interested in the edge that arrives to us
60
        // from the other node.
61
        InPolicy *models.CachedEdgePolicy
62

63
        // Inbound fees of this node.
64
        InboundFee lnwire.Fee
65
}
66

67
// DeepCopy creates a deep copy of the channel, including the incoming policy.
68
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
3✔
69
        channelCopy := *c
3✔
70

3✔
71
        if channelCopy.InPolicy != nil {
6✔
72
                inPolicyCopy := *channelCopy.InPolicy
3✔
73
                channelCopy.InPolicy = &inPolicyCopy
3✔
74

3✔
75
                // The fields for the ToNode can be overwritten by the path
3✔
76
                // finding algorithm, which is why we need a deep copy in the
3✔
77
                // first place. So we always start out with nil values, just to
3✔
78
                // be sure they don't contain any old data.
3✔
79
                channelCopy.InPolicy.ToNodePubKey = nil
3✔
80
                channelCopy.InPolicy.ToNodeFeatures = nil
3✔
81
        }
3✔
82

83
        return &channelCopy
3✔
84
}
85

86
// GraphCache is a type that holds a minimal set of information of the public
87
// channel graph that can be used for pathfinding.
88
type GraphCache struct {
89
        nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
90
        nodeFeatures map[route.Vertex]*lnwire.FeatureVector
91

92
        mtx sync.RWMutex
93
}
94

95
// NewGraphCache creates a new graphCache.
96
func NewGraphCache(preAllocNumNodes int) *GraphCache {
3✔
97
        return &GraphCache{
3✔
98
                nodeChannels: make(
3✔
99
                        map[route.Vertex]map[uint64]*DirectedChannel,
3✔
100
                        // A channel connects two nodes, so we can look it up
3✔
101
                        // from both sides, meaning we get double the number of
3✔
102
                        // entries.
3✔
103
                        preAllocNumNodes*2,
3✔
104
                ),
3✔
105
                nodeFeatures: make(
3✔
106
                        map[route.Vertex]*lnwire.FeatureVector,
3✔
107
                        preAllocNumNodes,
3✔
108
                ),
3✔
109
        }
3✔
110
}
3✔
111

112
// Stats returns statistics about the current cache size.
113
func (c *GraphCache) Stats() string {
3✔
114
        c.mtx.RLock()
3✔
115
        defer c.mtx.RUnlock()
3✔
116

3✔
117
        numChannels := 0
3✔
118
        for node := range c.nodeChannels {
6✔
119
                numChannels += len(c.nodeChannels[node])
3✔
120
        }
3✔
121
        return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
3✔
122
                "num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
3✔
123
                numChannels)
3✔
124
}
125

126
// AddNodeFeatures adds a graph node and its features to the cache.
127
func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) {
3✔
128
        nodePubKey := node.PubKey()
3✔
129

3✔
130
        // Only hold the lock for a short time. The `ForEachChannel()` below is
3✔
131
        // possibly slow as it has to go to the backend, so we can unlock
3✔
132
        // between the calls. And the AddChannel() method will acquire its own
3✔
133
        // lock anyway.
3✔
134
        c.mtx.Lock()
3✔
135
        c.nodeFeatures[nodePubKey] = node.Features()
3✔
136
        c.mtx.Unlock()
3✔
137
}
3✔
138

139
// AddNode adds a graph node, including all the (directed) channels of that
140
// node.
141
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
3✔
142
        c.AddNodeFeatures(node)
3✔
143

3✔
144
        return node.ForEachChannel(
3✔
145
                tx, func(tx kvdb.RTx, info *models.ChannelEdgeInfo,
3✔
146
                        outPolicy *models.ChannelEdgePolicy,
3✔
147
                        inPolicy *models.ChannelEdgePolicy) error {
6✔
148

3✔
149
                        c.AddChannel(info, outPolicy, inPolicy)
3✔
150

3✔
151
                        return nil
3✔
152
                },
3✔
153
        )
154
}
155

156
// AddChannel adds a non-directed channel, meaning that the order of policy 1
157
// and policy 2 does not matter, the directionality is extracted from the info
158
// and policy flags automatically. The policy will be set as the outgoing policy
159
// on one node and the incoming policy on the peer's side.
160
func (c *GraphCache) AddChannel(info *models.ChannelEdgeInfo,
161
        policy1 *models.ChannelEdgePolicy, policy2 *models.ChannelEdgePolicy) {
3✔
162

3✔
163
        if info == nil {
3✔
164
                return
×
165
        }
×
166

167
        if policy1 != nil && policy1.IsDisabled() &&
3✔
168
                policy2 != nil && policy2.IsDisabled() {
6✔
169

3✔
170
                return
3✔
171
        }
3✔
172

173
        // Create the edge entry for both nodes.
174
        c.mtx.Lock()
3✔
175
        c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
3✔
176
                ChannelID: info.ChannelID,
3✔
177
                IsNode1:   true,
3✔
178
                OtherNode: info.NodeKey2Bytes,
3✔
179
                Capacity:  info.Capacity,
3✔
180
        })
3✔
181
        c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
3✔
182
                ChannelID: info.ChannelID,
3✔
183
                IsNode1:   false,
3✔
184
                OtherNode: info.NodeKey1Bytes,
3✔
185
                Capacity:  info.Capacity,
3✔
186
        })
3✔
187
        c.mtx.Unlock()
3✔
188

3✔
189
        // The policy's node is always the to_node. So if policy 1 has to_node
3✔
190
        // of node 2 then we have the policy 1 as seen from node 1.
3✔
191
        if policy1 != nil {
6✔
192
                fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
3✔
193
                if policy1.ToNode != info.NodeKey2Bytes {
6✔
194
                        fromNode, toNode = toNode, fromNode
3✔
195
                }
3✔
196
                isEdge1 := policy1.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
197
                c.UpdatePolicy(policy1, fromNode, toNode, isEdge1)
3✔
198
        }
199
        if policy2 != nil {
6✔
200
                fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
3✔
201
                if policy2.ToNode != info.NodeKey1Bytes {
6✔
202
                        fromNode, toNode = toNode, fromNode
3✔
203
                }
3✔
204
                isEdge1 := policy2.ChannelFlags&lnwire.ChanUpdateDirection == 0
3✔
205
                c.UpdatePolicy(policy2, fromNode, toNode, isEdge1)
3✔
206
        }
207
}
208

209
// updateOrAddEdge makes sure the edge information for a node is either updated
210
// if it already exists or is added to that node's list of channels.
211
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
3✔
212
        if len(c.nodeChannels[node]) == 0 {
6✔
213
                c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
3✔
214
        }
3✔
215

216
        c.nodeChannels[node][edge.ChannelID] = edge
3✔
217
}
218

219
// UpdatePolicy updates a single policy on both the from and to node. The order
220
// of the from and to node is not strictly important. But we assume that a
221
// channel edge was added beforehand so that the directed channel struct already
222
// exists in the cache.
223
func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
224
        toNode route.Vertex, edge1 bool) {
3✔
225

3✔
226
        // Extract inbound fee if possible and available. If there is a decoding
3✔
227
        // error, ignore this policy.
3✔
228
        var inboundFee lnwire.Fee
3✔
229
        _, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
3✔
230
        if err != nil {
3✔
231
                log.Errorf("Failed to extract records from edge policy %v: %v",
×
232
                        policy.ChannelID, err)
×
233

×
234
                return
×
235
        }
×
236

237
        c.mtx.Lock()
3✔
238
        defer c.mtx.Unlock()
3✔
239

3✔
240
        updatePolicy := func(nodeKey route.Vertex) {
6✔
241
                if len(c.nodeChannels[nodeKey]) == 0 {
3✔
242
                        log.Warnf("Node=%v not found in graph cache", nodeKey)
×
243

×
244
                        return
×
245
                }
×
246

247
                channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
3✔
248
                if !ok {
3✔
249
                        log.Warnf("Channel=%v not found in graph cache",
×
250
                                policy.ChannelID)
×
251

×
252
                        return
×
253
                }
×
254

255
                // Edge 1 is defined as the policy for the direction of node1 to
256
                // node2.
257
                switch {
3✔
258
                // This is node 1, and it is edge 1, so this is the outgoing
259
                // policy for node 1.
260
                case channel.IsNode1 && edge1:
3✔
261
                        channel.OutPolicySet = true
3✔
262
                        channel.InboundFee = inboundFee
3✔
263

264
                // This is node 2, and it is edge 2, so this is the outgoing
265
                // policy for node 2.
266
                case !channel.IsNode1 && !edge1:
3✔
267
                        channel.OutPolicySet = true
3✔
268
                        channel.InboundFee = inboundFee
3✔
269

270
                // The other two cases left mean it's the inbound policy for the
271
                // node.
272
                default:
3✔
273
                        channel.InPolicy = models.NewCachedPolicy(policy)
3✔
274
                }
275
        }
276

277
        updatePolicy(fromNode)
3✔
278
        updatePolicy(toNode)
3✔
279
}
280

281
// RemoveNode completely removes a node and all its channels (including the
282
// peer's side).
283
func (c *GraphCache) RemoveNode(node route.Vertex) {
3✔
284
        c.mtx.Lock()
3✔
285
        defer c.mtx.Unlock()
3✔
286

3✔
287
        delete(c.nodeFeatures, node)
3✔
288

3✔
289
        // First remove all channels from the other nodes' lists.
3✔
290
        for _, channel := range c.nodeChannels[node] {
3✔
291
                c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
×
292
        }
×
293

294
        // Then remove our whole node completely.
295
        delete(c.nodeChannels, node)
3✔
296
}
297

298
// RemoveChannel removes a single channel between two nodes.
299
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
3✔
300
        c.mtx.Lock()
3✔
301
        defer c.mtx.Unlock()
3✔
302

3✔
303
        // Remove that one channel from both sides.
3✔
304
        c.removeChannelIfFound(node1, chanID)
3✔
305
        c.removeChannelIfFound(node2, chanID)
3✔
306
}
3✔
307

308
// removeChannelIfFound removes a single channel from one side.
309
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
3✔
310
        if len(c.nodeChannels[node]) == 0 {
6✔
311
                return
3✔
312
        }
3✔
313

314
        delete(c.nodeChannels[node], chanID)
3✔
315
}
316

317
// UpdateChannel updates the channel edge information for a specific edge. We
318
// expect the edge to already exist and be known. If it does not yet exist, this
319
// call is a no-op.
320
func (c *GraphCache) UpdateChannel(info *models.ChannelEdgeInfo) {
3✔
321
        c.mtx.Lock()
3✔
322
        defer c.mtx.Unlock()
3✔
323

3✔
324
        if len(c.nodeChannels[info.NodeKey1Bytes]) == 0 ||
3✔
325
                len(c.nodeChannels[info.NodeKey2Bytes]) == 0 {
3✔
326

×
327
                return
×
328
        }
×
329

330
        channel, ok := c.nodeChannels[info.NodeKey1Bytes][info.ChannelID]
3✔
331
        if ok {
6✔
332
                // We only expect to be called when the channel is already
3✔
333
                // known.
3✔
334
                channel.Capacity = info.Capacity
3✔
335
                channel.OtherNode = info.NodeKey2Bytes
3✔
336
        }
3✔
337

338
        channel, ok = c.nodeChannels[info.NodeKey2Bytes][info.ChannelID]
3✔
339
        if ok {
6✔
340
                channel.Capacity = info.Capacity
3✔
341
                channel.OtherNode = info.NodeKey1Bytes
3✔
342
        }
3✔
343
}
344

345
// getChannels returns a copy of the passed node's channels or nil if there
346
// isn't any.
347
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
3✔
348
        c.mtx.RLock()
3✔
349
        defer c.mtx.RUnlock()
3✔
350

3✔
351
        channels, ok := c.nodeChannels[node]
3✔
352
        if !ok {
6✔
353
                return nil
3✔
354
        }
3✔
355

356
        features, ok := c.nodeFeatures[node]
3✔
357
        if !ok {
6✔
358
                // If the features were set to nil explicitly, that's fine here.
3✔
359
                // The router will overwrite the features of the destination
3✔
360
                // node with those found in the invoice if necessary. But if we
3✔
361
                // didn't yet get a node announcement we want to mimic the
3✔
362
                // behavior of the old DB based code that would always set an
3✔
363
                // empty feature vector instead of leaving it nil.
3✔
364
                features = lnwire.EmptyFeatureVector()
3✔
365
        }
3✔
366

367
        toNodeCallback := func() route.Vertex {
6✔
368
                return node
3✔
369
        }
3✔
370

371
        i := 0
3✔
372
        channelsCopy := make([]*DirectedChannel, len(channels))
3✔
373
        for _, channel := range channels {
6✔
374
                // We need to copy the channel and policy to avoid it being
3✔
375
                // updated in the cache if the path finding algorithm sets
3✔
376
                // fields on it (currently only the ToNodeFeatures of the
3✔
377
                // policy).
3✔
378
                channelCopy := channel.DeepCopy()
3✔
379
                if channelCopy.InPolicy != nil {
6✔
380
                        channelCopy.InPolicy.ToNodePubKey = toNodeCallback
3✔
381
                        channelCopy.InPolicy.ToNodeFeatures = features
3✔
382
                }
3✔
383

384
                channelsCopy[i] = channelCopy
3✔
385
                i++
3✔
386
        }
387

388
        return channelsCopy
3✔
389
}
390

391
// ForEachChannel invokes the given callback for each channel of the given node.
392
func (c *GraphCache) ForEachChannel(node route.Vertex,
393
        cb func(channel *DirectedChannel) error) error {
3✔
394

3✔
395
        // Obtain a copy of the node's channels. We need do this in order to
3✔
396
        // avoid deadlocks caused by interaction with the graph cache, channel
3✔
397
        // state and the graph database from multiple goroutines. This snapshot
3✔
398
        // is only used for path finding where being stale is acceptable since
3✔
399
        // the real world graph and our representation may always become
3✔
400
        // slightly out of sync for a short time and the actual channel state
3✔
401
        // is stored separately.
3✔
402
        channels := c.getChannels(node)
3✔
403
        for _, channel := range channels {
6✔
404
                if err := cb(channel); err != nil {
3✔
405
                        return err
×
406
                }
×
407
        }
408

409
        return nil
3✔
410
}
411

412
// ForEachNode iterates over the adjacency list of the graph, executing the
413
// call back for each node and the set of channels that emanate from the given
414
// node.
415
//
416
// NOTE: This method should be considered _read only_, the channels or nodes
417
// passed in MUST NOT be modified.
418
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
UNCOV
419
        channels map[uint64]*DirectedChannel) error) error {
×
UNCOV
420

×
UNCOV
421
        c.mtx.RLock()
×
UNCOV
422
        defer c.mtx.RUnlock()
×
UNCOV
423

×
UNCOV
424
        for node, channels := range c.nodeChannels {
×
UNCOV
425
                // We don't make a copy here since this is a read-only RPC
×
UNCOV
426
                // call. We also don't need the node features either for this
×
UNCOV
427
                // call.
×
UNCOV
428
                if err := cb(node, channels); err != nil {
×
429
                        return err
×
430
                }
×
431
        }
432

UNCOV
433
        return nil
×
434
}
435

436
// GetFeatures returns the features of the node with the given ID. If no
437
// features are known for the node, an empty feature vector is returned.
438
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
3✔
439
        c.mtx.RLock()
3✔
440
        defer c.mtx.RUnlock()
3✔
441

3✔
442
        features, ok := c.nodeFeatures[node]
3✔
443
        if !ok || features == nil {
6✔
444
                // The router expects the features to never be nil, so we return
3✔
445
                // an empty feature set instead.
3✔
446
                return lnwire.EmptyFeatureVector()
3✔
447
        }
3✔
448

449
        return features
3✔
450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc