• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13522615507

25 Feb 2025 01:40PM UTC coverage: 58.836% (+0.02%) from 58.815%
13522615507

Pull #9550

github

ellemouton
graph/db: move various cache write calls to ChannelGraph

Here, we move the graph cache writes for AddLightningNode,
DeleteLightningNode, AddChannelEdge and MarkEdgeLive to the
ChannelGraph. Since these are writes, the cache is only updated if the
DB write is successful.
Pull Request #9550: graph: extract cache from CRUD [3]

73 of 85 new or added lines in 1 file covered. (85.88%)

275 existing lines in 12 files now uncovered.

136412 of 231851 relevant lines covered (58.84%)

19316.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.32
/graph/db/kv_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/btcsuite/btcwallet/walletdb"
22
        "github.com/lightningnetwork/lnd/aliasmgr"
23
        "github.com/lightningnetwork/lnd/batch"
24
        "github.com/lightningnetwork/lnd/graph/db/models"
25
        "github.com/lightningnetwork/lnd/input"
26
        "github.com/lightningnetwork/lnd/kvdb"
27
        "github.com/lightningnetwork/lnd/lnwire"
28
        "github.com/lightningnetwork/lnd/routing/route"
29
)
30

31
var (
32
        // nodeBucket is a bucket which houses all the vertices or nodes within
33
        // the channel graph. This bucket has a single-sub bucket which adds an
34
        // additional index from pubkey -> alias. Within the top-level of this
35
        // bucket, the key space maps a node's compressed public key to the
36
        // serialized information for that node. Additionally, there's a
37
        // special key "source" which stores the pubkey of the source node. The
38
        // source node is used as the starting point for all graph/queries and
39
        // traversals. The graph is formed as a star-graph with the source node
40
        // at the center.
41
        //
42
        // maps: pubKey -> nodeInfo
43
        // maps: source -> selfPubKey
44
        nodeBucket = []byte("graph-node")
45

46
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
47
        // will be used to quickly look up the "freshness" of a node's last
48
        // update to the network. The bucket only contains keys, and no values,
49
        // it's mapping:
50
        //
51
        // maps: updateTime || nodeID -> nil
52
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
53

54
        // sourceKey is a special key that resides within the nodeBucket. The
55
        // sourceKey maps a key to the public key of the "self node".
56
        sourceKey = []byte("source")
57

58
        // aliasIndexBucket is a sub-bucket that's nested within the main
59
        // nodeBucket. This bucket maps the public key of a node to its
60
        // current alias. This bucket is provided as it can be used within a
61
        // future UI layer to add an additional degree of confirmation.
62
        aliasIndexBucket = []byte("alias")
63

64
        // edgeBucket is a bucket which houses all of the edge or channel
65
        // information within the channel graph. This bucket essentially acts
66
        // as an adjacency list, which in conjunction with a range scan, can be
67
        // used to iterate over all the incoming and outgoing edges for a
68
        // particular node. Key in the bucket use a prefix scheme which leads
69
        // with the node's public key and sends with the compact edge ID.
70
        // For each chanID, there will be two entries within the bucket, as the
71
        // graph is directed: nodes may have different policies w.r.t to fees
72
        // for their respective directions.
73
        //
74
        // maps: pubKey || chanID -> channel edge policy for node
75
        edgeBucket = []byte("graph-edge")
76

77
        // unknownPolicy is represented as an empty slice. It is
78
        // used as the value in edgeBucket for unknown channel edge policies.
79
        // Unknown policies are still stored in the database to enable efficient
80
        // lookup of incoming channel edges.
81
        unknownPolicy = []byte{}
82

83
        // chanStart is an array of all zero bytes which is used to perform
84
        // range scans within the edgeBucket to obtain all of the outgoing
85
        // edges for a particular node.
86
        chanStart [8]byte
87

88
        // edgeIndexBucket is an index which can be used to iterate all edges
89
        // in the bucket, grouping them according to their in/out nodes.
90
        // Additionally, the items in this bucket also contain the complete
91
        // edge information for a channel. The edge information includes the
92
        // capacity of the channel, the nodes that made the channel, etc. This
93
        // bucket resides within the edgeBucket above. Creation of an edge
94
        // proceeds in two phases: first the edge is added to the edge index,
95
        // afterwards the edgeBucket can be updated with the latest details of
96
        // the edge as they are announced on the network.
97
        //
98
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
99
        edgeIndexBucket = []byte("edge-index")
100

101
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
102
        // bucket contains an index which allows us to gauge the "freshness" of
103
        // a channel's last updates.
104
        //
105
        // maps: updateTime || chanID -> nil
106
        edgeUpdateIndexBucket = []byte("edge-update-index")
107

108
        // channelPointBucket maps a channel's full outpoint (txid:index) to
109
        // its short 8-byte channel ID. This bucket resides within the
110
        // edgeBucket above, and can be used to quickly remove an edge due to
111
        // the outpoint being spent, or to query for existence of a channel.
112
        //
113
        // maps: outPoint -> chanID
114
        channelPointBucket = []byte("chan-index")
115

116
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
117
        // responsible for maintaining an index of zombie channels. Each entry
118
        // exists within the bucket as follows:
119
        //
120
        // maps: chanID -> pubKey1 || pubKey2
121
        //
122
        // The chanID represents the channel ID of the edge that is marked as a
123
        // zombie and is used as the key, which maps to the public keys of the
124
        // edge's participants.
125
        zombieBucket = []byte("zombie-index")
126

127
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
128
        // bucket responsible for maintaining an index of disabled edge
129
        // policies. Each entry exists within the bucket as follows:
130
        //
131
        // maps: <chanID><direction> -> []byte{}
132
        //
133
        // The chanID represents the channel ID of the edge and the direction is
134
        // one byte representing the direction of the edge. The main purpose of
135
        // this index is to allow pruning disabled channels in a fast way
136
        // without the need to iterate all over the graph.
137
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
138

139
        // graphMetaBucket is a top-level bucket which stores various meta-deta
140
        // related to the on-disk channel graph. Data stored in this bucket
141
        // includes the block to which the graph has been synced to, the total
142
        // number of channels, etc.
143
        graphMetaBucket = []byte("graph-meta")
144

145
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
146
        // a mapping from the block height to the hash for the blocks used to
147
        // prune the graph.
148
        // Once a new block is discovered, any channels that have been closed
149
        // (by spending the outpoint) can safely be removed from the graph, and
150
        // the block is added to the prune log. We need to keep such a log for
151
        // the case where a reorg happens, and we must "rewind" the state of the
152
        // graph by removing channels that were previously confirmed. In such a
153
        // case we'll remove all entries from the prune log with a block height
154
        // that no longer exists.
155
        pruneLogBucket = []byte("prune-log")
156

157
        // closedScidBucket is a top-level bucket that stores scids for
158
        // channels that we know to be closed. This is used so that we don't
159
        // need to perform expensive validation checks if we receive a channel
160
        // announcement for the channel again.
161
        //
162
        // maps: scid -> []byte{}
163
        closedScidBucket = []byte("closed-scid")
164
)
165

166
const (
167
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
168
        // we'll permit to be written to disk. We limit this as otherwise, it
169
        // would be possible for a node to create a ton of updates and slowly
170
        // fill our disk, and also waste bandwidth due to relaying.
171
        MaxAllowedExtraOpaqueBytes = 10000
172
)
173

174
// KVStore is a persistent, on-disk graph representation of the Lightning
175
// Network. This struct can be used to implement path finding algorithms on top
176
// of, and also to update a node's view based on information received from the
177
// p2p network. Internally, the graph is stored using a modified adjacency list
178
// representation with some added object interaction possible with each
179
// serialized edge/node. The graph is stored is directed, meaning that are two
180
// edges stored for each channel: an inbound/outbound edge for each node pair.
181
// Nodes, edges, and edge information can all be added to the graph
182
// independently. Edge removal results in the deletion of all edge information
183
// for that edge.
184
type KVStore struct {
185
        db kvdb.Backend
186

187
        // cacheMu guards all caches (rejectCache, chanCache, graphCache). If
188
        // this mutex will be acquired at the same time as the DB mutex then
189
        // the cacheMu MUST be acquired first to prevent deadlock.
190
        cacheMu     sync.RWMutex
191
        rejectCache *rejectCache
192
        chanCache   *channelCache
193
        graphCache  *GraphCache
194

195
        chanScheduler batch.Scheduler
196
        nodeScheduler batch.Scheduler
197
}
198

199
// NewKVStore allocates a new KVStore backed by a DB instance. The
200
// returned instance has its own unique reject cache and channel cache.
201
func NewKVStore(db kvdb.Backend, options ...KVStoreOptionModifier) (*KVStore,
202
        error) {
176✔
203

176✔
204
        opts := DefaultOptions()
176✔
205
        for _, o := range options {
179✔
206
                o(opts)
3✔
207
        }
3✔
208

209
        if !opts.NoMigration {
352✔
210
                if err := initKVStore(db); err != nil {
176✔
211
                        return nil, err
×
212
                }
×
213
        }
214

215
        g := &KVStore{
176✔
216
                db:          db,
176✔
217
                rejectCache: newRejectCache(opts.RejectCacheSize),
176✔
218
                chanCache:   newChannelCache(opts.ChannelCacheSize),
176✔
219
        }
176✔
220
        g.chanScheduler = batch.NewTimeScheduler(
176✔
221
                db, &g.cacheMu, opts.BatchCommitInterval,
176✔
222
        )
176✔
223
        g.nodeScheduler = batch.NewTimeScheduler(
176✔
224
                db, nil, opts.BatchCommitInterval,
176✔
225
        )
176✔
226

176✔
227
        return g, nil
176✔
228
}
229

230
// setGraphCache sets the KVStore's graphCache.
231
//
232
// NOTE: this is temporary and will only be called from the ChannelGraph's
233
// constructor before the KVStore methods are available to be called. This will
234
// be removed once the graph cache is fully owned by the ChannelGraph.
235
func (c *KVStore) setGraphCache(cache *GraphCache) {
143✔
236
        c.graphCache = cache
143✔
237
}
143✔
238

239
// channelMapKey is the key structure used for storing channel edge policies.
240
type channelMapKey struct {
241
        nodeKey route.Vertex
242
        chanID  [8]byte
243
}
244

245
// getChannelMap loads all channel edge policies from the database and stores
246
// them in a map.
247
func (c *KVStore) getChannelMap(edges kvdb.RBucket) (
248
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
147✔
249

147✔
250
        // Create a map to store all channel edge policies.
147✔
251
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
147✔
252

147✔
253
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
1,721✔
254
                // Skip embedded buckets.
1,574✔
255
                if bytes.Equal(k, edgeIndexBucket) ||
1,574✔
256
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
1,574✔
257
                        bytes.Equal(k, zombieBucket) ||
1,574✔
258
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
1,574✔
259
                        bytes.Equal(k, channelPointBucket) {
2,158✔
260

584✔
261
                        return nil
584✔
262
                }
584✔
263

264
                // Validate key length.
265
                if len(k) != 33+8 {
993✔
266
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
267
                }
×
268

269
                var key channelMapKey
993✔
270
                copy(key.nodeKey[:], k[:33])
993✔
271
                copy(key.chanID[:], k[33:])
993✔
272

993✔
273
                // No need to deserialize unknown policy.
993✔
274
                if bytes.Equal(edgeBytes, unknownPolicy) {
993✔
275
                        return nil
×
276
                }
×
277

278
                edgeReader := bytes.NewReader(edgeBytes)
993✔
279
                edge, err := deserializeChanEdgePolicyRaw(
993✔
280
                        edgeReader,
993✔
281
                )
993✔
282

993✔
283
                switch {
993✔
284
                // If the db policy was missing an expected optional field, we
285
                // return nil as if the policy was unknown.
286
                case errors.Is(err, ErrEdgePolicyOptionalFieldNotFound):
×
287
                        return nil
×
288

289
                case err != nil:
×
290
                        return err
×
291
                }
292

293
                channelMap[key] = edge
993✔
294

993✔
295
                return nil
993✔
296
        })
297
        if err != nil {
147✔
298
                return nil, err
×
299
        }
×
300

301
        return channelMap, nil
147✔
302
}
303

304
var graphTopLevelBuckets = [][]byte{
305
        nodeBucket,
306
        edgeBucket,
307
        graphMetaBucket,
308
        closedScidBucket,
309
}
310

311
// Wipe completely deletes all saved state within all used buckets within the
312
// database. The deletion is done in a single transaction, therefore this
313
// operation is fully atomic.
314
func (c *KVStore) Wipe() error {
×
315
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
316
                for _, tlb := range graphTopLevelBuckets {
×
317
                        err := tx.DeleteTopLevelBucket(tlb)
×
318
                        if err != nil &&
×
319
                                !errors.Is(err, kvdb.ErrBucketNotFound) {
×
320

×
321
                                return err
×
322
                        }
×
323
                }
324

325
                return nil
×
326
        }, func() {})
×
327
        if err != nil {
×
328
                return err
×
329
        }
×
330

331
        return initKVStore(c.db)
×
332
}
333

334
// createChannelDB creates and initializes a fresh version of  In
335
// the case that the target path has not yet been created or doesn't yet exist,
336
// then the path is created. Additionally, all required top-level buckets used
337
// within the database are created.
338
func initKVStore(db kvdb.Backend) error {
176✔
339
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
352✔
340
                for _, tlb := range graphTopLevelBuckets {
871✔
341
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
695✔
342
                                return err
×
343
                        }
×
344
                }
345

346
                nodes := tx.ReadWriteBucket(nodeBucket)
176✔
347
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
176✔
348
                if err != nil {
176✔
349
                        return err
×
350
                }
×
351
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
176✔
352
                if err != nil {
176✔
353
                        return err
×
354
                }
×
355

356
                edges := tx.ReadWriteBucket(edgeBucket)
176✔
357
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
176✔
358
                if err != nil {
176✔
359
                        return err
×
360
                }
×
361
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
176✔
362
                if err != nil {
176✔
363
                        return err
×
364
                }
×
365
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
176✔
366
                if err != nil {
176✔
367
                        return err
×
368
                }
×
369
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
176✔
370
                if err != nil {
176✔
371
                        return err
×
372
                }
×
373

374
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
176✔
375
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
176✔
376

176✔
377
                return err
176✔
378
        }, func() {})
176✔
379
        if err != nil {
176✔
380
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
381
        }
×
382

383
        return nil
176✔
384
}
385

386
// AddrsForNode returns all known addresses for the target node public key that
387
// the graph DB is aware of. The returned boolean indicates if the given node is
388
// unknown to the graph DB or not.
389
//
390
// NOTE: this is part of the channeldb.AddrSource interface.
391
func (c *KVStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
392
        error) {
4✔
393

4✔
394
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
4✔
395
        if err != nil {
4✔
396
                return false, nil, err
×
397
        }
×
398

399
        node, err := c.FetchLightningNode(pubKey)
4✔
400
        // We don't consider it an error if the graph is unaware of the node.
4✔
401
        switch {
4✔
402
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
403
                return false, nil, err
×
404

405
        case errors.Is(err, ErrGraphNodeNotFound):
3✔
406
                return false, nil, nil
3✔
407
        }
408

409
        return true, node.Addresses, nil
4✔
410
}
411

412
// ForEachChannel iterates through all the channel edges stored within the
413
// graph and invokes the passed callback for each edge. The callback takes two
414
// edges as since this is a directed graph, both the in/out edges are visited.
415
// If the callback returns an error, then the transaction is aborted and the
416
// iteration stops early.
417
//
418
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
419
// for that particular channel edge routing policy will be passed into the
420
// callback.
421
func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
422
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
147✔
423

147✔
424
        return c.db.View(func(tx kvdb.RTx) error {
294✔
425
                edges := tx.ReadBucket(edgeBucket)
147✔
426
                if edges == nil {
147✔
427
                        return ErrGraphNoEdgesFound
×
428
                }
×
429

430
                // First, load all edges in memory indexed by node and channel
431
                // id.
432
                channelMap, err := c.getChannelMap(edges)
147✔
433
                if err != nil {
147✔
434
                        return err
×
435
                }
×
436

437
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
147✔
438
                if edgeIndex == nil {
147✔
439
                        return ErrGraphNoEdgesFound
×
440
                }
×
441

442
                // Load edge index, recombine each channel with the policies
443
                // loaded above and invoke the callback.
444
                return kvdb.ForAll(
147✔
445
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
645✔
446
                                var chanID [8]byte
498✔
447
                                copy(chanID[:], k)
498✔
448

498✔
449
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
498✔
450
                                info, err := deserializeChanEdgeInfo(
498✔
451
                                        edgeInfoReader,
498✔
452
                                )
498✔
453
                                if err != nil {
498✔
454
                                        return err
×
455
                                }
×
456

457
                                policy1 := channelMap[channelMapKey{
498✔
458
                                        nodeKey: info.NodeKey1Bytes,
498✔
459
                                        chanID:  chanID,
498✔
460
                                }]
498✔
461

498✔
462
                                policy2 := channelMap[channelMapKey{
498✔
463
                                        nodeKey: info.NodeKey2Bytes,
498✔
464
                                        chanID:  chanID,
498✔
465
                                }]
498✔
466

498✔
467
                                return cb(&info, policy1, policy2)
498✔
468
                        },
469
                )
470
        }, func() {})
147✔
471
}
472

473
// forEachNodeDirectedChannel iterates through all channels of a given node,
474
// executing the passed callback on the directed edge representing the channel
475
// and its incoming policy. If the callback returns an error, then the iteration
476
// is halted with the error propagated back up to the caller. An optional read
477
// transaction may be provided. If none is provided, a new one will be created.
478
//
479
// Unknown policies are passed into the callback as nil values.
480
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
481
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
245✔
482

245✔
483
        // Fallback that uses the database.
245✔
484
        toNodeCallback := func() route.Vertex {
380✔
485
                return node
135✔
486
        }
135✔
487
        toNodeFeatures, err := c.fetchNodeFeatures(tx, node)
245✔
488
        if err != nil {
245✔
489
                return err
×
490
        }
×
491

492
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
245✔
493
                p2 *models.ChannelEdgePolicy) error {
744✔
494

499✔
495
                var cachedInPolicy *models.CachedEdgePolicy
499✔
496
                if p2 != nil {
995✔
497
                        cachedInPolicy = models.NewCachedPolicy(p2)
496✔
498
                        cachedInPolicy.ToNodePubKey = toNodeCallback
496✔
499
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
496✔
500
                }
496✔
501

502
                var inboundFee lnwire.Fee
499✔
503
                if p1 != nil {
997✔
504
                        // Extract inbound fee. If there is a decoding error,
498✔
505
                        // skip this edge.
498✔
506
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
498✔
507
                        if err != nil {
499✔
508
                                return nil
1✔
509
                        }
1✔
510
                }
511

512
                directedChannel := &DirectedChannel{
498✔
513
                        ChannelID:    e.ChannelID,
498✔
514
                        IsNode1:      node == e.NodeKey1Bytes,
498✔
515
                        OtherNode:    e.NodeKey2Bytes,
498✔
516
                        Capacity:     e.Capacity,
498✔
517
                        OutPolicySet: p1 != nil,
498✔
518
                        InPolicy:     cachedInPolicy,
498✔
519
                        InboundFee:   inboundFee,
498✔
520
                }
498✔
521

498✔
522
                if node == e.NodeKey2Bytes {
751✔
523
                        directedChannel.OtherNode = e.NodeKey1Bytes
253✔
524
                }
253✔
525

526
                return cb(directedChannel)
498✔
527
        }
528

529
        return nodeTraversal(tx, node[:], c.db, dbCallback)
245✔
530
}
531

532
// fetchNodeFeatures returns the features of a given node. If no features are
533
// known for the node, an empty feature vector is returned. An optional read
534
// transaction may be provided. If none is provided, a new one will be created.
535
func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
536
        node route.Vertex) (*lnwire.FeatureVector, error) {
689✔
537

689✔
538
        // Fallback that uses the database.
689✔
539
        targetNode, err := c.FetchLightningNodeTx(tx, node)
689✔
540
        switch {
689✔
541
        // If the node exists and has features, return them directly.
542
        case err == nil:
678✔
543
                return targetNode.Features, nil
678✔
544

545
        // If we couldn't find a node announcement, populate a blank feature
546
        // vector.
547
        case errors.Is(err, ErrGraphNodeNotFound):
11✔
548
                return lnwire.EmptyFeatureVector(), nil
11✔
549

550
        // Otherwise, bubble the error up.
551
        default:
×
552
                return nil, err
×
553
        }
554
}
555

556
// ForEachNodeDirectedChannel iterates through all channels of a given node,
557
// executing the passed callback on the directed edge representing the channel
558
// and its incoming policy. If the callback returns an error, then the iteration
559
// is halted with the error propagated back up to the caller.
560
//
561
// Unknown policies are passed into the callback as nil values.
562
//
563
// NOTE: this is part of the graphdb.NodeTraverser interface.
564
func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
565
        cb func(channel *DirectedChannel) error) error {
6✔
566

6✔
567
        return c.forEachNodeDirectedChannel(nil, nodePub, cb)
6✔
568
}
6✔
569

570
// FetchNodeFeatures returns the features of the given node. If no features are
571
// known for the node, an empty feature vector is returned.
572
//
573
// NOTE: this is part of the graphdb.NodeTraverser interface.
574
func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
575
        *lnwire.FeatureVector, error) {
3✔
576

3✔
577
        return c.fetchNodeFeatures(nil, nodePub)
3✔
578
}
3✔
579

580
// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
581
// data to the call-back.
582
//
583
// NOTE: The callback contents MUST not be modified.
584
func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
585
        chans map[uint64]*DirectedChannel) error) error {
1✔
586

1✔
587
        // Otherwise call back to a version that uses the database directly.
1✔
588
        // We'll iterate over each node, then the set of channels for each
1✔
589
        // node, and construct a similar callback functiopn signature as the
1✔
590
        // main funcotin expects.
1✔
591
        return c.forEachNode(func(tx kvdb.RTx,
1✔
592
                node *models.LightningNode) error {
21✔
593

20✔
594
                channels := make(map[uint64]*DirectedChannel)
20✔
595

20✔
596
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
20✔
597
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
20✔
598
                                p1 *models.ChannelEdgePolicy,
20✔
599
                                p2 *models.ChannelEdgePolicy) error {
210✔
600

190✔
601
                                toNodeCallback := func() route.Vertex {
190✔
602
                                        return node.PubKeyBytes
×
603
                                }
×
604
                                toNodeFeatures, err := c.fetchNodeFeatures(
190✔
605
                                        tx, node.PubKeyBytes,
190✔
606
                                )
190✔
607
                                if err != nil {
190✔
608
                                        return err
×
609
                                }
×
610

611
                                var cachedInPolicy *models.CachedEdgePolicy
190✔
612
                                if p2 != nil {
380✔
613
                                        cachedInPolicy =
190✔
614
                                                models.NewCachedPolicy(p2)
190✔
615
                                        cachedInPolicy.ToNodePubKey =
190✔
616
                                                toNodeCallback
190✔
617
                                        cachedInPolicy.ToNodeFeatures =
190✔
618
                                                toNodeFeatures
190✔
619
                                }
190✔
620

621
                                directedChannel := &DirectedChannel{
190✔
622
                                        ChannelID: e.ChannelID,
190✔
623
                                        IsNode1: node.PubKeyBytes ==
190✔
624
                                                e.NodeKey1Bytes,
190✔
625
                                        OtherNode:    e.NodeKey2Bytes,
190✔
626
                                        Capacity:     e.Capacity,
190✔
627
                                        OutPolicySet: p1 != nil,
190✔
628
                                        InPolicy:     cachedInPolicy,
190✔
629
                                }
190✔
630

190✔
631
                                if node.PubKeyBytes == e.NodeKey2Bytes {
285✔
632
                                        directedChannel.OtherNode =
95✔
633
                                                e.NodeKey1Bytes
95✔
634
                                }
95✔
635

636
                                channels[e.ChannelID] = directedChannel
190✔
637

190✔
638
                                return nil
190✔
639
                        })
640
                if err != nil {
20✔
641
                        return err
×
642
                }
×
643

644
                return cb(node.PubKeyBytes, channels)
20✔
645
        })
646
}
647

648
// DisabledChannelIDs returns the channel ids of disabled channels.
649
// A channel is disabled when two of the associated ChanelEdgePolicies
650
// have their disabled bit on.
651
func (c *KVStore) DisabledChannelIDs() ([]uint64, error) {
6✔
652
        var disabledChanIDs []uint64
6✔
653
        var chanEdgeFound map[uint64]struct{}
6✔
654

6✔
655
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
656
                edges := tx.ReadBucket(edgeBucket)
6✔
657
                if edges == nil {
6✔
658
                        return ErrGraphNoEdgesFound
×
659
                }
×
660

661
                disabledEdgePolicyIndex := edges.NestedReadBucket(
6✔
662
                        disabledEdgePolicyBucket,
6✔
663
                )
6✔
664
                if disabledEdgePolicyIndex == nil {
7✔
665
                        return nil
1✔
666
                }
1✔
667

668
                // We iterate over all disabled policies and we add each channel
669
                // that has more than one disabled policy to disabledChanIDs
670
                // array.
671
                return disabledEdgePolicyIndex.ForEach(
5✔
672
                        func(k, v []byte) error {
16✔
673
                                chanID := byteOrder.Uint64(k[:8])
11✔
674
                                _, edgeFound := chanEdgeFound[chanID]
11✔
675
                                if edgeFound {
15✔
676
                                        delete(chanEdgeFound, chanID)
4✔
677
                                        disabledChanIDs = append(
4✔
678
                                                disabledChanIDs, chanID,
4✔
679
                                        )
4✔
680

4✔
681
                                        return nil
4✔
682
                                }
4✔
683

684
                                chanEdgeFound[chanID] = struct{}{}
7✔
685

7✔
686
                                return nil
7✔
687
                        },
688
                )
689
        }, func() {
6✔
690
                disabledChanIDs = nil
6✔
691
                chanEdgeFound = make(map[uint64]struct{})
6✔
692
        })
6✔
693
        if err != nil {
6✔
694
                return nil, err
×
695
        }
×
696

697
        return disabledChanIDs, nil
6✔
698
}
699

700
// ForEachNode iterates through all the stored vertices/nodes in the graph,
701
// executing the passed callback with each node encountered. If the callback
702
// returns an error, then the transaction is aborted and the iteration stops
703
// early. Any operations performed on the NodeTx passed to the call-back are
704
// executed under the same read transaction and so, methods on the NodeTx object
705
// _MUST_ only be called from within the call-back.
706
func (c *KVStore) ForEachNode(cb func(tx NodeRTx) error) error {
123✔
707
        return c.forEachNode(func(tx kvdb.RTx,
123✔
708
                node *models.LightningNode) error {
1,096✔
709

973✔
710
                return cb(newChanGraphNodeTx(tx, c, node))
973✔
711
        })
973✔
712
}
713

714
// forEachNode iterates through all the stored vertices/nodes in the graph,
715
// executing the passed callback with each node encountered. If the callback
716
// returns an error, then the transaction is aborted and the iteration stops
717
// early.
718
//
719
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
720
// traversal when graph gets mega.
721
func (c *KVStore) forEachNode(
722
        cb func(kvdb.RTx, *models.LightningNode) error) error {
132✔
723

132✔
724
        traversal := func(tx kvdb.RTx) error {
264✔
725
                // First grab the nodes bucket which stores the mapping from
132✔
726
                // pubKey to node information.
132✔
727
                nodes := tx.ReadBucket(nodeBucket)
132✔
728
                if nodes == nil {
132✔
729
                        return ErrGraphNotFound
×
730
                }
×
731

732
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,574✔
733
                        // If this is the source key, then we skip this
1,442✔
734
                        // iteration as the value for this key is a pubKey
1,442✔
735
                        // rather than raw node information.
1,442✔
736
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
1,706✔
737
                                return nil
264✔
738
                        }
264✔
739

740
                        nodeReader := bytes.NewReader(nodeBytes)
1,181✔
741
                        node, err := deserializeLightningNode(nodeReader)
1,181✔
742
                        if err != nil {
1,181✔
743
                                return err
×
744
                        }
×
745

746
                        // Execute the callback, the transaction will abort if
747
                        // this returns an error.
748
                        return cb(tx, &node)
1,181✔
749
                })
750
        }
751

752
        return kvdb.View(c.db, traversal, func() {})
264✔
753
}
754

755
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
756
// graph, executing the passed callback with each node encountered. If the
757
// callback returns an error, then the transaction is aborted and the iteration
758
// stops early.
759
func (c *KVStore) ForEachNodeCacheable(cb func(route.Vertex,
760
        *lnwire.FeatureVector) error) error {
144✔
761

144✔
762
        traversal := func(tx kvdb.RTx) error {
288✔
763
                // First grab the nodes bucket which stores the mapping from
144✔
764
                // pubKey to node information.
144✔
765
                nodes := tx.ReadBucket(nodeBucket)
144✔
766
                if nodes == nil {
144✔
767
                        return ErrGraphNotFound
×
768
                }
×
769

770
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
549✔
771
                        // If this is the source key, then we skip this
405✔
772
                        // iteration as the value for this key is a pubKey
405✔
773
                        // rather than raw node information.
405✔
774
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
690✔
775
                                return nil
285✔
776
                        }
285✔
777

778
                        nodeReader := bytes.NewReader(nodeBytes)
123✔
779
                        node, features, err := deserializeLightningNodeCacheable( //nolint:ll
123✔
780
                                nodeReader,
123✔
781
                        )
123✔
782
                        if err != nil {
123✔
783
                                return err
×
784
                        }
×
785

786
                        // Execute the callback, the transaction will abort if
787
                        // this returns an error.
788
                        return cb(node, features)
123✔
789
                })
790
        }
791

792
        return kvdb.View(c.db, traversal, func() {})
288✔
793
}
794

795
// SourceNode returns the source node of the graph. The source node is treated
796
// as the center node within a star-graph. This method may be used to kick off
797
// a path finding algorithm in order to explore the reachability of another
798
// node based off the source node.
799
func (c *KVStore) SourceNode() (*models.LightningNode, error) {
234✔
800
        var source *models.LightningNode
234✔
801
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
468✔
802
                // First grab the nodes bucket which stores the mapping from
234✔
803
                // pubKey to node information.
234✔
804
                nodes := tx.ReadBucket(nodeBucket)
234✔
805
                if nodes == nil {
234✔
806
                        return ErrGraphNotFound
×
807
                }
×
808

809
                node, err := c.sourceNode(nodes)
234✔
810
                if err != nil {
235✔
811
                        return err
1✔
812
                }
1✔
813
                source = node
233✔
814

233✔
815
                return nil
233✔
816
        }, func() {
234✔
817
                source = nil
234✔
818
        })
234✔
819
        if err != nil {
235✔
820
                return nil, err
1✔
821
        }
1✔
822

823
        return source, nil
233✔
824
}
825

826
// sourceNode uses an existing database transaction and returns the source node
827
// of the graph. The source node is treated as the center node within a
828
// star-graph. This method may be used to kick off a path finding algorithm in
829
// order to explore the reachability of another node based off the source node.
830
func (c *KVStore) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
831
        error) {
494✔
832

494✔
833
        selfPub := nodes.Get(sourceKey)
494✔
834
        if selfPub == nil {
495✔
835
                return nil, ErrSourceNodeNotSet
1✔
836
        }
1✔
837

838
        // With the pubKey of the source node retrieved, we're able to
839
        // fetch the full node information.
840
        node, err := fetchLightningNode(nodes, selfPub)
493✔
841
        if err != nil {
493✔
842
                return nil, err
×
843
        }
×
844

845
        return &node, nil
493✔
846
}
847

848
// SetSourceNode sets the source node within the graph database. The source
849
// node is to be used as the center of a star-graph within path finding
850
// algorithms.
851
func (c *KVStore) SetSourceNode(node *models.LightningNode) error {
120✔
852
        nodePubBytes := node.PubKeyBytes[:]
120✔
853

120✔
854
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
240✔
855
                // First grab the nodes bucket which stores the mapping from
120✔
856
                // pubKey to node information.
120✔
857
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
120✔
858
                if err != nil {
120✔
859
                        return err
×
860
                }
×
861

862
                // Next we create the mapping from source to the targeted
863
                // public key.
864
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
120✔
865
                        return err
×
866
                }
×
867

868
                // Finally, we commit the information of the lightning node
869
                // itself.
870
                return addLightningNode(tx, node)
120✔
871
        }, func() {})
120✔
872
}
873

874
// AddLightningNode adds a vertex/node to the graph database. If the node is not
875
// in the database from before, this will add a new, unconnected one to the
876
// graph. If it is present from before, this will update that node's
877
// information. Note that this method is expected to only be called to update an
878
// already present node from a node announcement, or to insert a node found in a
879
// channel update.
880
//
881
// TODO(roasbeef): also need sig of announcement.
882
func (c *KVStore) AddLightningNode(node *models.LightningNode,
883
        op ...batch.SchedulerOption) error {
803✔
884

803✔
885
        r := &batch.Request{
803✔
886
                Update: func(tx kvdb.RwTx) error {
1,606✔
887
                        return addLightningNode(tx, node)
803✔
888
                },
803✔
889
        }
890

891
        for _, f := range op {
806✔
892
                f(r)
3✔
893
        }
3✔
894

895
        return c.nodeScheduler.Execute(r)
803✔
896
}
897

898
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
1,005✔
899
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,005✔
900
        if err != nil {
1,005✔
901
                return err
×
902
        }
×
903

904
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
1,005✔
905
        if err != nil {
1,005✔
906
                return err
×
907
        }
×
908

909
        updateIndex, err := nodes.CreateBucketIfNotExists(
1,005✔
910
                nodeUpdateIndexBucket,
1,005✔
911
        )
1,005✔
912
        if err != nil {
1,005✔
913
                return err
×
914
        }
×
915

916
        return putLightningNode(nodes, aliases, updateIndex, node)
1,005✔
917
}
918

919
// LookupAlias attempts to return the alias as advertised by the target node.
920
// TODO(roasbeef): currently assumes that aliases are unique...
921
func (c *KVStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
5✔
922
        var alias string
5✔
923

5✔
924
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
925
                nodes := tx.ReadBucket(nodeBucket)
5✔
926
                if nodes == nil {
5✔
927
                        return ErrGraphNodesNotFound
×
928
                }
×
929

930
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
5✔
931
                if aliases == nil {
5✔
932
                        return ErrGraphNodesNotFound
×
933
                }
×
934

935
                nodePub := pub.SerializeCompressed()
5✔
936
                a := aliases.Get(nodePub)
5✔
937
                if a == nil {
6✔
938
                        return ErrNodeAliasNotFound
1✔
939
                }
1✔
940

941
                // TODO(roasbeef): should actually be using the utf-8
942
                // package...
943
                alias = string(a)
4✔
944

4✔
945
                return nil
4✔
946
        }, func() {
5✔
947
                alias = ""
5✔
948
        })
5✔
949
        if err != nil {
6✔
950
                return "", err
1✔
951
        }
1✔
952

953
        return alias, nil
4✔
954
}
955

956
// DeleteLightningNode starts a new database transaction to remove a vertex/node
957
// from the database according to the node's public key.
958
func (c *KVStore) DeleteLightningNode(nodePub route.Vertex) error {
3✔
959
        // TODO(roasbeef): ensure dangling edges are removed...
3✔
960
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
961
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
962
                if nodes == nil {
3✔
963
                        return ErrGraphNodeNotFound
×
964
                }
×
965

966
                return c.deleteLightningNode(nodes, nodePub[:])
3✔
967
        }, func() {})
3✔
968
}
969

970
// deleteLightningNode uses an existing database transaction to remove a
971
// vertex/node from the database according to the node's public key.
972
func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket,
973
        compressedPubKey []byte) error {
80✔
974

80✔
975
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
80✔
976
        if aliases == nil {
80✔
977
                return ErrGraphNodesNotFound
×
978
        }
×
979

980
        if err := aliases.Delete(compressedPubKey); err != nil {
80✔
981
                return err
×
982
        }
×
983

984
        // Before we delete the node, we'll fetch its current state so we can
985
        // determine when its last update was to clear out the node update
986
        // index.
987
        node, err := fetchLightningNode(nodes, compressedPubKey)
80✔
988
        if err != nil {
80✔
989
                return err
×
990
        }
×
991

992
        if err := nodes.Delete(compressedPubKey); err != nil {
80✔
993
                return err
×
994
        }
×
995

996
        // Finally, we'll delete the index entry for the node within the
997
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
998
        // need to track its last update.
999
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
80✔
1000
        if nodeUpdateIndex == nil {
80✔
1001
                return ErrGraphNodesNotFound
×
1002
        }
×
1003

1004
        // In order to delete the entry, we'll need to reconstruct the key for
1005
        // its last update.
1006
        updateUnix := uint64(node.LastUpdate.Unix())
80✔
1007
        var indexKey [8 + 33]byte
80✔
1008
        byteOrder.PutUint64(indexKey[:8], updateUnix)
80✔
1009
        copy(indexKey[8:], compressedPubKey)
80✔
1010

80✔
1011
        return nodeUpdateIndex.Delete(indexKey[:])
80✔
1012
}
1013

1014
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1015
// undirected edge from the two target nodes are created. The information stored
1016
// denotes the static attributes of the channel, such as the channelID, the keys
1017
// involved in creation of the channel, and the set of features that the channel
1018
// supports. The chanPoint and chanID are used to uniquely identify the edge
1019
// globally within the database.
1020
func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
1021
        op ...batch.SchedulerOption) error {
1,722✔
1022

1,722✔
1023
        var alreadyExists bool
1,722✔
1024
        r := &batch.Request{
1,722✔
1025
                Reset: func() {
3,444✔
1026
                        alreadyExists = false
1,722✔
1027
                },
1,722✔
1028
                Update: func(tx kvdb.RwTx) error {
1,722✔
1029
                        err := c.addChannelEdge(tx, edge)
1,722✔
1030

1,722✔
1031
                        // Silence ErrEdgeAlreadyExist so that the batch can
1,722✔
1032
                        // succeed, but propagate the error via local state.
1,722✔
1033
                        if errors.Is(err, ErrEdgeAlreadyExist) {
1,956✔
1034
                                alreadyExists = true
234✔
1035
                                return nil
234✔
1036
                        }
234✔
1037

1038
                        return err
1,488✔
1039
                },
1040
                OnCommit: func(err error) error {
1,722✔
1041
                        switch {
1,722✔
1042
                        case err != nil:
×
1043
                                return err
×
1044
                        case alreadyExists:
234✔
1045
                                return ErrEdgeAlreadyExist
234✔
1046
                        default:
1,488✔
1047
                                c.rejectCache.remove(edge.ChannelID)
1,488✔
1048
                                c.chanCache.remove(edge.ChannelID)
1,488✔
1049
                                return nil
1,488✔
1050
                        }
1051
                },
1052
        }
1053

1054
        for _, f := range op {
1,725✔
1055
                if f == nil {
3✔
1056
                        return fmt.Errorf("nil scheduler option was used")
×
1057
                }
×
1058

1059
                f(r)
3✔
1060
        }
1061

1062
        return c.chanScheduler.Execute(r)
1,722✔
1063
}
1064

1065
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1066
// utilize an existing db transaction.
1067
func (c *KVStore) addChannelEdge(tx kvdb.RwTx,
1068
        edge *models.ChannelEdgeInfo) error {
1,722✔
1069

1,722✔
1070
        // Construct the channel's primary key which is the 8-byte channel ID.
1,722✔
1071
        var chanKey [8]byte
1,722✔
1072
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1,722✔
1073

1,722✔
1074
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,722✔
1075
        if err != nil {
1,722✔
1076
                return err
×
1077
        }
×
1078
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
1,722✔
1079
        if err != nil {
1,722✔
1080
                return err
×
1081
        }
×
1082
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
1,722✔
1083
        if err != nil {
1,722✔
1084
                return err
×
1085
        }
×
1086
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
1,722✔
1087
        if err != nil {
1,722✔
1088
                return err
×
1089
        }
×
1090

1091
        // First, attempt to check if this edge has already been created. If
1092
        // so, then we can exit early as this method is meant to be idempotent.
1093
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
1,956✔
1094
                return ErrEdgeAlreadyExist
234✔
1095
        }
234✔
1096

1097
        // Before we insert the channel into the database, we'll ensure that
1098
        // both nodes already exist in the channel graph. If either node
1099
        // doesn't, then we'll insert a "shell" node that just includes its
1100
        // public key, so subsequent validation and queries can work properly.
1101
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
1,488✔
1102
        switch {
1,488✔
1103
        case errors.Is(node1Err, ErrGraphNodeNotFound):
21✔
1104
                node1Shell := models.LightningNode{
21✔
1105
                        PubKeyBytes:          edge.NodeKey1Bytes,
21✔
1106
                        HaveNodeAnnouncement: false,
21✔
1107
                }
21✔
1108
                err := addLightningNode(tx, &node1Shell)
21✔
1109
                if err != nil {
21✔
1110
                        return fmt.Errorf("unable to create shell node "+
×
1111
                                "for: %x: %w", edge.NodeKey1Bytes, err)
×
1112
                }
×
1113
        case node1Err != nil:
×
1114
                return node1Err
×
1115
        }
1116

1117
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
1,488✔
1118
        switch {
1,488✔
1119
        case errors.Is(node2Err, ErrGraphNodeNotFound):
70✔
1120
                node2Shell := models.LightningNode{
70✔
1121
                        PubKeyBytes:          edge.NodeKey2Bytes,
70✔
1122
                        HaveNodeAnnouncement: false,
70✔
1123
                }
70✔
1124
                err := addLightningNode(tx, &node2Shell)
70✔
1125
                if err != nil {
70✔
1126
                        return fmt.Errorf("unable to create shell node "+
×
1127
                                "for: %x: %w", edge.NodeKey2Bytes, err)
×
1128
                }
×
1129
        case node2Err != nil:
×
1130
                return node2Err
×
1131
        }
1132

1133
        // If the edge hasn't been created yet, then we'll first add it to the
1134
        // edge index in order to associate the edge between two nodes and also
1135
        // store the static components of the channel.
1136
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
1,488✔
1137
                return err
×
1138
        }
×
1139

1140
        // Mark edge policies for both sides as unknown. This is to enable
1141
        // efficient incoming channel lookup for a node.
1142
        keys := []*[33]byte{
1,488✔
1143
                &edge.NodeKey1Bytes,
1,488✔
1144
                &edge.NodeKey2Bytes,
1,488✔
1145
        }
1,488✔
1146
        for _, key := range keys {
4,461✔
1147
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
2,973✔
1148
                if err != nil {
2,973✔
1149
                        return err
×
1150
                }
×
1151
        }
1152

1153
        // Finally we add it to the channel index which maps channel points
1154
        // (outpoints) to the shorter channel ID's.
1155
        var b bytes.Buffer
1,488✔
1156
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
1,488✔
1157
                return err
×
1158
        }
×
1159

1160
        return chanIndex.Put(b.Bytes(), chanKey[:])
1,488✔
1161
}
1162

1163
// HasChannelEdge returns true if the database knows of a channel edge with the
1164
// passed channel ID, and false otherwise. If an edge with that ID is found
1165
// within the graph, then two time stamps representing the last time the edge
1166
// was updated for both directed edges are returned along with the boolean. If
1167
// it is not found, then the zombie index is checked and its result is returned
1168
// as the second boolean.
1169
func (c *KVStore) HasChannelEdge(
1170
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
223✔
1171

223✔
1172
        var (
223✔
1173
                upd1Time time.Time
223✔
1174
                upd2Time time.Time
223✔
1175
                exists   bool
223✔
1176
                isZombie bool
223✔
1177
        )
223✔
1178

223✔
1179
        // We'll query the cache with the shared lock held to allow multiple
223✔
1180
        // readers to access values in the cache concurrently if they exist.
223✔
1181
        c.cacheMu.RLock()
223✔
1182
        if entry, ok := c.rejectCache.get(chanID); ok {
296✔
1183
                c.cacheMu.RUnlock()
73✔
1184
                upd1Time = time.Unix(entry.upd1Time, 0)
73✔
1185
                upd2Time = time.Unix(entry.upd2Time, 0)
73✔
1186
                exists, isZombie = entry.flags.unpack()
73✔
1187

73✔
1188
                return upd1Time, upd2Time, exists, isZombie, nil
73✔
1189
        }
73✔
1190
        c.cacheMu.RUnlock()
153✔
1191

153✔
1192
        c.cacheMu.Lock()
153✔
1193
        defer c.cacheMu.Unlock()
153✔
1194

153✔
1195
        // The item was not found with the shared lock, so we'll acquire the
153✔
1196
        // exclusive lock and check the cache again in case another method added
153✔
1197
        // the entry to the cache while no lock was held.
153✔
1198
        if entry, ok := c.rejectCache.get(chanID); ok {
160✔
1199
                upd1Time = time.Unix(entry.upd1Time, 0)
7✔
1200
                upd2Time = time.Unix(entry.upd2Time, 0)
7✔
1201
                exists, isZombie = entry.flags.unpack()
7✔
1202

7✔
1203
                return upd1Time, upd2Time, exists, isZombie, nil
7✔
1204
        }
7✔
1205

1206
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
298✔
1207
                edges := tx.ReadBucket(edgeBucket)
149✔
1208
                if edges == nil {
149✔
1209
                        return ErrGraphNoEdgesFound
×
1210
                }
×
1211
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
149✔
1212
                if edgeIndex == nil {
149✔
1213
                        return ErrGraphNoEdgesFound
×
1214
                }
×
1215

1216
                var channelID [8]byte
149✔
1217
                byteOrder.PutUint64(channelID[:], chanID)
149✔
1218

149✔
1219
                // If the edge doesn't exist, then we'll also check our zombie
149✔
1220
                // index.
149✔
1221
                if edgeIndex.Get(channelID[:]) == nil {
250✔
1222
                        exists = false
101✔
1223
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
101✔
1224
                        if zombieIndex != nil {
202✔
1225
                                isZombie, _, _ = isZombieEdge(
101✔
1226
                                        zombieIndex, chanID,
101✔
1227
                                )
101✔
1228
                        }
101✔
1229

1230
                        return nil
101✔
1231
                }
1232

1233
                exists = true
51✔
1234
                isZombie = false
51✔
1235

51✔
1236
                // If the channel has been found in the graph, then retrieve
51✔
1237
                // the edges itself so we can return the last updated
51✔
1238
                // timestamps.
51✔
1239
                nodes := tx.ReadBucket(nodeBucket)
51✔
1240
                if nodes == nil {
51✔
1241
                        return ErrGraphNodeNotFound
×
1242
                }
×
1243

1244
                e1, e2, err := fetchChanEdgePolicies(
51✔
1245
                        edgeIndex, edges, channelID[:],
51✔
1246
                )
51✔
1247
                if err != nil {
51✔
1248
                        return err
×
1249
                }
×
1250

1251
                // As we may have only one of the edges populated, only set the
1252
                // update time if the edge was found in the database.
1253
                if e1 != nil {
72✔
1254
                        upd1Time = e1.LastUpdate
21✔
1255
                }
21✔
1256
                if e2 != nil {
70✔
1257
                        upd2Time = e2.LastUpdate
19✔
1258
                }
19✔
1259

1260
                return nil
51✔
1261
        }, func() {}); err != nil {
149✔
1262
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1263
        }
×
1264

1265
        c.rejectCache.insert(chanID, rejectCacheEntry{
149✔
1266
                upd1Time: upd1Time.Unix(),
149✔
1267
                upd2Time: upd2Time.Unix(),
149✔
1268
                flags:    packRejectFlags(exists, isZombie),
149✔
1269
        })
149✔
1270

149✔
1271
        return upd1Time, upd2Time, exists, isZombie, nil
149✔
1272
}
1273

1274
// AddEdgeProof sets the proof of an existing edge in the graph database.
1275
func (c *KVStore) AddEdgeProof(chanID lnwire.ShortChannelID,
1276
        proof *models.ChannelAuthProof) error {
4✔
1277

4✔
1278
        // Construct the channel's primary key which is the 8-byte channel ID.
4✔
1279
        var chanKey [8]byte
4✔
1280
        binary.BigEndian.PutUint64(chanKey[:], chanID.ToUint64())
4✔
1281

4✔
1282
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
8✔
1283
                edges := tx.ReadWriteBucket(edgeBucket)
4✔
1284
                if edges == nil {
4✔
1285
                        return ErrEdgeNotFound
×
1286
                }
×
1287

1288
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
4✔
1289
                if edgeIndex == nil {
4✔
1290
                        return ErrEdgeNotFound
×
1291
                }
×
1292

1293
                edge, err := fetchChanEdgeInfo(edgeIndex, chanKey[:])
4✔
1294
                if err != nil {
4✔
1295
                        return err
×
1296
                }
×
1297

1298
                edge.AuthProof = proof
4✔
1299

4✔
1300
                return putChanEdgeInfo(edgeIndex, &edge, chanKey)
4✔
1301
        }, func() {})
4✔
1302
}
1303

1304
const (
1305
        // pruneTipBytes is the total size of the value which stores a prune
1306
        // entry of the graph in the prune log. The "prune tip" is the last
1307
        // entry in the prune log, and indicates if the channel graph is in
1308
        // sync with the current UTXO state. The structure of the value
1309
        // is: blockHash, taking 32 bytes total.
1310
        pruneTipBytes = 32
1311
)
1312

1313
// PruneGraph prunes newly closed channels from the channel graph in response
1314
// to a new block being solved on the network. Any transactions which spend the
1315
// funding output of any known channels within he graph will be deleted.
1316
// Additionally, the "prune tip", or the last block which has been used to
1317
// prune the graph is stored so callers can ensure the graph is fully in sync
1318
// with the current UTXO state. A slice of channels that have been closed by
1319
// the target block are returned if the function succeeds without error.
1320
func (c *KVStore) PruneGraph(spentOutputs []*wire.OutPoint,
1321
        blockHash *chainhash.Hash, blockHeight uint32) (
1322
        []*models.ChannelEdgeInfo, error) {
240✔
1323

240✔
1324
        c.cacheMu.Lock()
240✔
1325
        defer c.cacheMu.Unlock()
240✔
1326

240✔
1327
        var chansClosed []*models.ChannelEdgeInfo
240✔
1328

240✔
1329
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
480✔
1330
                // First grab the edges bucket which houses the information
240✔
1331
                // we'd like to delete
240✔
1332
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
240✔
1333
                if err != nil {
240✔
1334
                        return err
×
1335
                }
×
1336

1337
                // Next grab the two edge indexes which will also need to be
1338
                // updated.
1339
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
240✔
1340
                if err != nil {
240✔
1341
                        return err
×
1342
                }
×
1343
                chanIndex, err := edges.CreateBucketIfNotExists(
240✔
1344
                        channelPointBucket,
240✔
1345
                )
240✔
1346
                if err != nil {
240✔
1347
                        return err
×
1348
                }
×
1349
                nodes := tx.ReadWriteBucket(nodeBucket)
240✔
1350
                if nodes == nil {
240✔
1351
                        return ErrSourceNodeNotSet
×
1352
                }
×
1353
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
240✔
1354
                if err != nil {
240✔
1355
                        return err
×
1356
                }
×
1357

1358
                // For each of the outpoints that have been spent within the
1359
                // block, we attempt to delete them from the graph as if that
1360
                // outpoint was a channel, then it has now been closed.
1361
                for _, chanPoint := range spentOutputs {
377✔
1362
                        // TODO(roasbeef): load channel bloom filter, continue
137✔
1363
                        // if NOT if filter
137✔
1364

137✔
1365
                        var opBytes bytes.Buffer
137✔
1366
                        err := WriteOutpoint(&opBytes, chanPoint)
137✔
1367
                        if err != nil {
137✔
1368
                                return err
×
1369
                        }
×
1370

1371
                        // First attempt to see if the channel exists within
1372
                        // the database, if not, then we can exit early.
1373
                        chanID := chanIndex.Get(opBytes.Bytes())
137✔
1374
                        if chanID == nil {
247✔
1375
                                continue
110✔
1376
                        }
1377

1378
                        // However, if it does, then we'll read out the full
1379
                        // version so we can add it to the set of deleted
1380
                        // channels.
1381
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
27✔
1382
                        if err != nil {
27✔
1383
                                return err
×
1384
                        }
×
1385

1386
                        // Attempt to delete the channel, an ErrEdgeNotFound
1387
                        // will be returned if that outpoint isn't known to be
1388
                        // a channel. If no error is returned, then a channel
1389
                        // was successfully pruned.
1390
                        err = c.delChannelEdgeUnsafe(
27✔
1391
                                edges, edgeIndex, chanIndex, zombieIndex,
27✔
1392
                                chanID, false, false,
27✔
1393
                        )
27✔
1394
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
27✔
1395
                                return err
×
1396
                        }
×
1397

1398
                        chansClosed = append(chansClosed, &edgeInfo)
27✔
1399
                }
1400

1401
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
240✔
1402
                if err != nil {
240✔
1403
                        return err
×
1404
                }
×
1405

1406
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
240✔
1407
                        pruneLogBucket,
240✔
1408
                )
240✔
1409
                if err != nil {
240✔
1410
                        return err
×
1411
                }
×
1412

1413
                // With the graph pruned, add a new entry to the prune log,
1414
                // which can be used to check if the graph is fully synced with
1415
                // the current UTXO state.
1416
                var blockHeightBytes [4]byte
240✔
1417
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
240✔
1418

240✔
1419
                var newTip [pruneTipBytes]byte
240✔
1420
                copy(newTip[:], blockHash[:])
240✔
1421

240✔
1422
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
240✔
1423
                if err != nil {
240✔
1424
                        return err
×
1425
                }
×
1426

1427
                // Now that the graph has been pruned, we'll also attempt to
1428
                // prune any nodes that have had a channel closed within the
1429
                // latest block.
1430
                return c.pruneGraphNodes(nodes, edgeIndex)
240✔
1431
        }, func() {
240✔
1432
                chansClosed = nil
240✔
1433
        })
240✔
1434
        if err != nil {
240✔
1435
                return nil, err
×
1436
        }
×
1437

1438
        for _, channel := range chansClosed {
267✔
1439
                c.rejectCache.remove(channel.ChannelID)
27✔
1440
                c.chanCache.remove(channel.ChannelID)
27✔
1441
        }
27✔
1442

1443
        if c.graphCache != nil {
480✔
1444
                log.Debugf("Pruned graph, cache now has %s",
240✔
1445
                        c.graphCache.Stats())
240✔
1446
        }
240✔
1447

1448
        return chansClosed, nil
240✔
1449
}
1450

1451
// PruneGraphNodes is a garbage collection method which attempts to prune out
1452
// any nodes from the channel graph that are currently unconnected. This ensure
1453
// that we only maintain a graph of reachable nodes. In the event that a pruned
1454
// node gains more channels, it will be re-added back to the graph.
1455
func (c *KVStore) PruneGraphNodes() error {
26✔
1456
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
52✔
1457
                nodes := tx.ReadWriteBucket(nodeBucket)
26✔
1458
                if nodes == nil {
26✔
1459
                        return ErrGraphNodesNotFound
×
1460
                }
×
1461
                edges := tx.ReadWriteBucket(edgeBucket)
26✔
1462
                if edges == nil {
26✔
1463
                        return ErrGraphNotFound
×
1464
                }
×
1465
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
26✔
1466
                if edgeIndex == nil {
26✔
1467
                        return ErrGraphNoEdgesFound
×
1468
                }
×
1469

1470
                return c.pruneGraphNodes(nodes, edgeIndex)
26✔
1471
        }, func() {})
26✔
1472
}
1473

1474
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1475
// channel closed within the current block. If the node still has existing
1476
// channels in the graph, this will act as a no-op.
1477
func (c *KVStore) pruneGraphNodes(nodes kvdb.RwBucket,
1478
        edgeIndex kvdb.RwBucket) error {
263✔
1479

263✔
1480
        log.Trace("Pruning nodes from graph with no open channels")
263✔
1481

263✔
1482
        // We'll retrieve the graph's source node to ensure we don't remove it
263✔
1483
        // even if it no longer has any open channels.
263✔
1484
        sourceNode, err := c.sourceNode(nodes)
263✔
1485
        if err != nil {
263✔
1486
                return err
×
1487
        }
×
1488

1489
        // We'll use this map to keep count the number of references to a node
1490
        // in the graph. A node should only be removed once it has no more
1491
        // references in the graph.
1492
        nodeRefCounts := make(map[[33]byte]int)
263✔
1493
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,553✔
1494
                // If this is the source key, then we skip this
1,290✔
1495
                // iteration as the value for this key is a pubKey
1,290✔
1496
                // rather than raw node information.
1,290✔
1497
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
2,073✔
1498
                        return nil
783✔
1499
                }
783✔
1500

1501
                var nodePub [33]byte
510✔
1502
                copy(nodePub[:], pubKey)
510✔
1503
                nodeRefCounts[nodePub] = 0
510✔
1504

510✔
1505
                return nil
510✔
1506
        })
1507
        if err != nil {
263✔
1508
                return err
×
1509
        }
×
1510

1511
        // To ensure we never delete the source node, we'll start off by
1512
        // bumping its ref count to 1.
1513
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
263✔
1514

263✔
1515
        // Next, we'll run through the edgeIndex which maps a channel ID to the
263✔
1516
        // edge info. We'll use this scan to populate our reference count map
263✔
1517
        // above.
263✔
1518
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
446✔
1519
                // The first 66 bytes of the edge info contain the pubkeys of
183✔
1520
                // the nodes that this edge attaches. We'll extract them, and
183✔
1521
                // add them to the ref count map.
183✔
1522
                var node1, node2 [33]byte
183✔
1523
                copy(node1[:], edgeInfoBytes[:33])
183✔
1524
                copy(node2[:], edgeInfoBytes[33:])
183✔
1525

183✔
1526
                // With the nodes extracted, we'll increase the ref count of
183✔
1527
                // each of the nodes.
183✔
1528
                nodeRefCounts[node1]++
183✔
1529
                nodeRefCounts[node2]++
183✔
1530

183✔
1531
                return nil
183✔
1532
        })
183✔
1533
        if err != nil {
263✔
1534
                return err
×
1535
        }
×
1536

1537
        // Finally, we'll make a second pass over the set of nodes, and delete
1538
        // any nodes that have a ref count of zero.
1539
        var numNodesPruned int
263✔
1540
        for nodePubKey, refCount := range nodeRefCounts {
773✔
1541
                // If the ref count of the node isn't zero, then we can safely
510✔
1542
                // skip it as it still has edges to or from it within the
510✔
1543
                // graph.
510✔
1544
                if refCount != 0 {
946✔
1545
                        continue
436✔
1546
                }
1547

1548
                if c.graphCache != nil {
154✔
1549
                        c.graphCache.RemoveNode(nodePubKey)
77✔
1550
                }
77✔
1551

1552
                // If we reach this point, then there are no longer any edges
1553
                // that connect this node, so we can delete it.
1554
                err := c.deleteLightningNode(nodes, nodePubKey[:])
77✔
1555
                if err != nil {
77✔
1556
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1557
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1558

×
1559
                                log.Warnf("Unable to prune node %x from the "+
×
1560
                                        "graph: %v", nodePubKey, err)
×
1561
                                continue
×
1562
                        }
1563

1564
                        return err
×
1565
                }
1566

1567
                log.Infof("Pruned unconnected node %x from channel graph",
77✔
1568
                        nodePubKey[:])
77✔
1569

77✔
1570
                numNodesPruned++
77✔
1571
        }
1572

1573
        if numNodesPruned > 0 {
324✔
1574
                log.Infof("Pruned %v unconnected nodes from the channel graph",
61✔
1575
                        numNodesPruned)
61✔
1576
        }
61✔
1577

1578
        return nil
263✔
1579
}
1580

1581
// DisconnectBlockAtHeight is used to indicate that the block specified
1582
// by the passed height has been disconnected from the main chain. This
1583
// will "rewind" the graph back to the height below, deleting channels
1584
// that are no longer confirmed from the graph. The prune log will be
1585
// set to the last prune height valid for the remaining chain.
1586
// Channels that were removed from the graph resulting from the
1587
// disconnected block are returned.
1588
func (c *KVStore) DisconnectBlockAtHeight(height uint32) (
1589
        []*models.ChannelEdgeInfo, error) {
164✔
1590

164✔
1591
        // Every channel having a ShortChannelID starting at 'height'
164✔
1592
        // will no longer be confirmed.
164✔
1593
        startShortChanID := lnwire.ShortChannelID{
164✔
1594
                BlockHeight: height,
164✔
1595
        }
164✔
1596

164✔
1597
        // Delete everything after this height from the db up until the
164✔
1598
        // SCID alias range.
164✔
1599
        endShortChanID := aliasmgr.StartingAlias
164✔
1600

164✔
1601
        // The block height will be the 3 first bytes of the channel IDs.
164✔
1602
        var chanIDStart [8]byte
164✔
1603
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
164✔
1604
        var chanIDEnd [8]byte
164✔
1605
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
164✔
1606

164✔
1607
        c.cacheMu.Lock()
164✔
1608
        defer c.cacheMu.Unlock()
164✔
1609

164✔
1610
        // Keep track of the channels that are removed from the graph.
164✔
1611
        var removedChans []*models.ChannelEdgeInfo
164✔
1612

164✔
1613
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
328✔
1614
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
164✔
1615
                if err != nil {
164✔
1616
                        return err
×
1617
                }
×
1618
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
164✔
1619
                if err != nil {
164✔
1620
                        return err
×
1621
                }
×
1622
                chanIndex, err := edges.CreateBucketIfNotExists(
164✔
1623
                        channelPointBucket,
164✔
1624
                )
164✔
1625
                if err != nil {
164✔
1626
                        return err
×
1627
                }
×
1628
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
164✔
1629
                if err != nil {
164✔
1630
                        return err
×
1631
                }
×
1632

1633
                // Scan from chanIDStart to chanIDEnd, deleting every
1634
                // found edge.
1635
                // NOTE: we must delete the edges after the cursor loop, since
1636
                // modifying the bucket while traversing is not safe.
1637
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1638
                // so that the StartingAlias itself isn't deleted.
1639
                var keys [][]byte
164✔
1640
                cursor := edgeIndex.ReadWriteCursor()
164✔
1641

164✔
1642
                //nolint:ll
164✔
1643
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
164✔
1644
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, v = cursor.Next() {
263✔
1645
                        edgeInfoReader := bytes.NewReader(v)
99✔
1646
                        edgeInfo, err := deserializeChanEdgeInfo(edgeInfoReader)
99✔
1647
                        if err != nil {
99✔
1648
                                return err
×
1649
                        }
×
1650

1651
                        keys = append(keys, k)
99✔
1652
                        removedChans = append(removedChans, &edgeInfo)
99✔
1653
                }
1654

1655
                for _, k := range keys {
263✔
1656
                        err = c.delChannelEdgeUnsafe(
99✔
1657
                                edges, edgeIndex, chanIndex, zombieIndex,
99✔
1658
                                k, false, false,
99✔
1659
                        )
99✔
1660
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
99✔
1661
                                return err
×
1662
                        }
×
1663
                }
1664

1665
                // Delete all the entries in the prune log having a height
1666
                // greater or equal to the block disconnected.
1667
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
164✔
1668
                if err != nil {
164✔
1669
                        return err
×
1670
                }
×
1671

1672
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
164✔
1673
                        pruneLogBucket,
164✔
1674
                )
164✔
1675
                if err != nil {
164✔
1676
                        return err
×
1677
                }
×
1678

1679
                var pruneKeyStart [4]byte
164✔
1680
                byteOrder.PutUint32(pruneKeyStart[:], height)
164✔
1681

164✔
1682
                var pruneKeyEnd [4]byte
164✔
1683
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
164✔
1684

164✔
1685
                // To avoid modifying the bucket while traversing, we delete
164✔
1686
                // the keys in a second loop.
164✔
1687
                var pruneKeys [][]byte
164✔
1688
                pruneCursor := pruneBucket.ReadWriteCursor()
164✔
1689
                //nolint:ll
164✔
1690
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
164✔
1691
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
263✔
1692
                        pruneKeys = append(pruneKeys, k)
99✔
1693
                }
99✔
1694

1695
                for _, k := range pruneKeys {
263✔
1696
                        if err := pruneBucket.Delete(k); err != nil {
99✔
1697
                                return err
×
1698
                        }
×
1699
                }
1700

1701
                return nil
164✔
1702
        }, func() {
164✔
1703
                removedChans = nil
164✔
1704
        }); err != nil {
164✔
1705
                return nil, err
×
1706
        }
×
1707

1708
        for _, channel := range removedChans {
263✔
1709
                c.rejectCache.remove(channel.ChannelID)
99✔
1710
                c.chanCache.remove(channel.ChannelID)
99✔
1711
        }
99✔
1712

1713
        return removedChans, nil
164✔
1714
}
1715

1716
// PruneTip returns the block height and hash of the latest block that has been
1717
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1718
// to tell if the graph is currently in sync with the current best known UTXO
1719
// state.
1720
func (c *KVStore) PruneTip() (*chainhash.Hash, uint32, error) {
56✔
1721
        var (
56✔
1722
                tipHash   chainhash.Hash
56✔
1723
                tipHeight uint32
56✔
1724
        )
56✔
1725

56✔
1726
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
112✔
1727
                graphMeta := tx.ReadBucket(graphMetaBucket)
56✔
1728
                if graphMeta == nil {
56✔
1729
                        return ErrGraphNotFound
×
1730
                }
×
1731
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
56✔
1732
                if pruneBucket == nil {
56✔
1733
                        return ErrGraphNeverPruned
×
1734
                }
×
1735

1736
                pruneCursor := pruneBucket.ReadCursor()
56✔
1737

56✔
1738
                // The prune key with the largest block height will be our
56✔
1739
                // prune tip.
56✔
1740
                k, v := pruneCursor.Last()
56✔
1741
                if k == nil {
77✔
1742
                        return ErrGraphNeverPruned
21✔
1743
                }
21✔
1744

1745
                // Once we have the prune tip, the value will be the block hash,
1746
                // and the key the block height.
1747
                copy(tipHash[:], v)
38✔
1748
                tipHeight = byteOrder.Uint32(k)
38✔
1749

38✔
1750
                return nil
38✔
1751
        }, func() {})
56✔
1752
        if err != nil {
77✔
1753
                return nil, 0, err
21✔
1754
        }
21✔
1755

1756
        return &tipHash, tipHeight, nil
38✔
1757
}
1758

1759
// DeleteChannelEdges removes edges with the given channel IDs from the
1760
// database and marks them as zombies. This ensures that we're unable to re-add
1761
// it to our database once again. If an edge does not exist within the
1762
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1763
// true, then when we mark these edges as zombies, we'll set up the keys such
1764
// that we require the node that failed to send the fresh update to be the one
1765
// that resurrects the channel from its zombie state. The markZombie bool
1766
// denotes whether or not to mark the channel as a zombie.
1767
func (c *KVStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1768
        chanIDs ...uint64) error {
141✔
1769

141✔
1770
        // TODO(roasbeef): possibly delete from node bucket if node has no more
141✔
1771
        // channels
141✔
1772
        // TODO(roasbeef): don't delete both edges?
141✔
1773

141✔
1774
        c.cacheMu.Lock()
141✔
1775
        defer c.cacheMu.Unlock()
141✔
1776

141✔
1777
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
282✔
1778
                edges := tx.ReadWriteBucket(edgeBucket)
141✔
1779
                if edges == nil {
141✔
1780
                        return ErrEdgeNotFound
×
1781
                }
×
1782
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
141✔
1783
                if edgeIndex == nil {
141✔
1784
                        return ErrEdgeNotFound
×
1785
                }
×
1786
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
141✔
1787
                if chanIndex == nil {
141✔
1788
                        return ErrEdgeNotFound
×
1789
                }
×
1790
                nodes := tx.ReadWriteBucket(nodeBucket)
141✔
1791
                if nodes == nil {
141✔
1792
                        return ErrGraphNodeNotFound
×
1793
                }
×
1794
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
141✔
1795
                if err != nil {
141✔
1796
                        return err
×
1797
                }
×
1798

1799
                var rawChanID [8]byte
141✔
1800
                for _, chanID := range chanIDs {
230✔
1801
                        byteOrder.PutUint64(rawChanID[:], chanID)
89✔
1802
                        err := c.delChannelEdgeUnsafe(
89✔
1803
                                edges, edgeIndex, chanIndex, zombieIndex,
89✔
1804
                                rawChanID[:], markZombie, strictZombiePruning,
89✔
1805
                        )
89✔
1806
                        if err != nil {
152✔
1807
                                return err
63✔
1808
                        }
63✔
1809
                }
1810

1811
                return nil
78✔
1812
        }, func() {})
141✔
1813
        if err != nil {
204✔
1814
                return err
63✔
1815
        }
63✔
1816

1817
        for _, chanID := range chanIDs {
104✔
1818
                c.rejectCache.remove(chanID)
26✔
1819
                c.chanCache.remove(chanID)
26✔
1820
        }
26✔
1821

1822
        return nil
78✔
1823
}
1824

1825
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1826
// passed channel point (outpoint). If the passed channel doesn't exist within
1827
// the database, then ErrEdgeNotFound is returned.
1828
func (c *KVStore) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
4✔
1829
        var chanID uint64
4✔
1830
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
1831
                var err error
4✔
1832
                chanID, err = getChanID(tx, chanPoint)
4✔
1833
                return err
4✔
1834
        }, func() {
8✔
1835
                chanID = 0
4✔
1836
        }); err != nil {
7✔
1837
                return 0, err
3✔
1838
        }
3✔
1839

1840
        return chanID, nil
4✔
1841
}
1842

1843
// getChanID returns the assigned channel ID for a given channel point.
1844
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
4✔
1845
        var b bytes.Buffer
4✔
1846
        if err := WriteOutpoint(&b, chanPoint); err != nil {
4✔
1847
                return 0, err
×
1848
        }
×
1849

1850
        edges := tx.ReadBucket(edgeBucket)
4✔
1851
        if edges == nil {
4✔
1852
                return 0, ErrGraphNoEdgesFound
×
1853
        }
×
1854
        chanIndex := edges.NestedReadBucket(channelPointBucket)
4✔
1855
        if chanIndex == nil {
4✔
1856
                return 0, ErrGraphNoEdgesFound
×
1857
        }
×
1858

1859
        chanIDBytes := chanIndex.Get(b.Bytes())
4✔
1860
        if chanIDBytes == nil {
7✔
1861
                return 0, ErrEdgeNotFound
3✔
1862
        }
3✔
1863

1864
        chanID := byteOrder.Uint64(chanIDBytes)
4✔
1865

4✔
1866
        return chanID, nil
4✔
1867
}
1868

1869
// TODO(roasbeef): allow updates to use Batch?
1870

1871
// HighestChanID returns the "highest" known channel ID in the channel graph.
1872
// This represents the "newest" channel from the PoV of the chain. This method
1873
// can be used by peers to quickly determine if they're graphs are in sync.
1874
func (c *KVStore) HighestChanID() (uint64, error) {
6✔
1875
        var cid uint64
6✔
1876

6✔
1877
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
1878
                edges := tx.ReadBucket(edgeBucket)
6✔
1879
                if edges == nil {
6✔
1880
                        return ErrGraphNoEdgesFound
×
1881
                }
×
1882
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
6✔
1883
                if edgeIndex == nil {
6✔
1884
                        return ErrGraphNoEdgesFound
×
1885
                }
×
1886

1887
                // In order to find the highest chan ID, we'll fetch a cursor
1888
                // and use that to seek to the "end" of our known rage.
1889
                cidCursor := edgeIndex.ReadCursor()
6✔
1890

6✔
1891
                lastChanID, _ := cidCursor.Last()
6✔
1892

6✔
1893
                // If there's no key, then this means that we don't actually
6✔
1894
                // know of any channels, so we'll return a predicable error.
6✔
1895
                if lastChanID == nil {
10✔
1896
                        return ErrGraphNoEdgesFound
4✔
1897
                }
4✔
1898

1899
                // Otherwise, we'll de serialize the channel ID and return it
1900
                // to the caller.
1901
                cid = byteOrder.Uint64(lastChanID)
5✔
1902

5✔
1903
                return nil
5✔
1904
        }, func() {
6✔
1905
                cid = 0
6✔
1906
        })
6✔
1907
        if err != nil && !errors.Is(err, ErrGraphNoEdgesFound) {
6✔
1908
                return 0, err
×
1909
        }
×
1910

1911
        return cid, nil
6✔
1912
}
1913

1914
// ChannelEdge represents the complete set of information for a channel edge in
1915
// the known channel graph. This struct couples the core information of the
1916
// edge as well as each of the known advertised edge policies.
1917
type ChannelEdge struct {
1918
        // Info contains all the static information describing the channel.
1919
        Info *models.ChannelEdgeInfo
1920

1921
        // Policy1 points to the "first" edge policy of the channel containing
1922
        // the dynamic information required to properly route through the edge.
1923
        Policy1 *models.ChannelEdgePolicy
1924

1925
        // Policy2 points to the "second" edge policy of the channel containing
1926
        // the dynamic information required to properly route through the edge.
1927
        Policy2 *models.ChannelEdgePolicy
1928

1929
        // Node1 is "node 1" in the channel. This is the node that would have
1930
        // produced Policy1 if it exists.
1931
        Node1 *models.LightningNode
1932

1933
        // Node2 is "node 2" in the channel. This is the node that would have
1934
        // produced Policy2 if it exists.
1935
        Node2 *models.LightningNode
1936
}
1937

1938
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1939
// one edge that has an update timestamp within the specified horizon.
1940
func (c *KVStore) ChanUpdatesInHorizon(startTime,
1941
        endTime time.Time) ([]ChannelEdge, error) {
146✔
1942

146✔
1943
        // To ensure we don't return duplicate ChannelEdges, we'll use an
146✔
1944
        // additional map to keep track of the edges already seen to prevent
146✔
1945
        // re-adding it.
146✔
1946
        var edgesSeen map[uint64]struct{}
146✔
1947
        var edgesToCache map[uint64]ChannelEdge
146✔
1948
        var edgesInHorizon []ChannelEdge
146✔
1949

146✔
1950
        c.cacheMu.Lock()
146✔
1951
        defer c.cacheMu.Unlock()
146✔
1952

146✔
1953
        var hits int
146✔
1954
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
292✔
1955
                edges := tx.ReadBucket(edgeBucket)
146✔
1956
                if edges == nil {
146✔
1957
                        return ErrGraphNoEdgesFound
×
1958
                }
×
1959
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
146✔
1960
                if edgeIndex == nil {
146✔
1961
                        return ErrGraphNoEdgesFound
×
1962
                }
×
1963
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
146✔
1964
                if edgeUpdateIndex == nil {
146✔
1965
                        return ErrGraphNoEdgesFound
×
1966
                }
×
1967

1968
                nodes := tx.ReadBucket(nodeBucket)
146✔
1969
                if nodes == nil {
146✔
1970
                        return ErrGraphNodesNotFound
×
1971
                }
×
1972

1973
                // We'll now obtain a cursor to perform a range query within
1974
                // the index to find all channels within the horizon.
1975
                updateCursor := edgeUpdateIndex.ReadCursor()
146✔
1976

146✔
1977
                var startTimeBytes, endTimeBytes [8 + 8]byte
146✔
1978
                byteOrder.PutUint64(
146✔
1979
                        startTimeBytes[:8], uint64(startTime.Unix()),
146✔
1980
                )
146✔
1981
                byteOrder.PutUint64(
146✔
1982
                        endTimeBytes[:8], uint64(endTime.Unix()),
146✔
1983
                )
146✔
1984

146✔
1985
                // With our start and end times constructed, we'll step through
146✔
1986
                // the index collecting the info and policy of each update of
146✔
1987
                // each channel that has a last update within the time range.
146✔
1988
                //
146✔
1989
                //nolint:ll
146✔
1990
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
146✔
1991
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
195✔
1992
                        // We have a new eligible entry, so we'll slice of the
49✔
1993
                        // chan ID so we can query it in the DB.
49✔
1994
                        chanID := indexKey[8:]
49✔
1995

49✔
1996
                        // If we've already retrieved the info and policies for
49✔
1997
                        // this edge, then we can skip it as we don't need to do
49✔
1998
                        // so again.
49✔
1999
                        chanIDInt := byteOrder.Uint64(chanID)
49✔
2000
                        if _, ok := edgesSeen[chanIDInt]; ok {
68✔
2001
                                continue
19✔
2002
                        }
2003

2004
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
41✔
2005
                                hits++
11✔
2006
                                edgesSeen[chanIDInt] = struct{}{}
11✔
2007
                                edgesInHorizon = append(edgesInHorizon, channel)
11✔
2008

11✔
2009
                                continue
11✔
2010
                        }
2011

2012
                        // First, we'll fetch the static edge information.
2013
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
21✔
2014
                        if err != nil {
21✔
2015
                                chanID := byteOrder.Uint64(chanID)
×
2016
                                return fmt.Errorf("unable to fetch info for "+
×
2017
                                        "edge with chan_id=%v: %v", chanID, err)
×
2018
                        }
×
2019

2020
                        // With the static information obtained, we'll now
2021
                        // fetch the dynamic policy info.
2022
                        edge1, edge2, err := fetchChanEdgePolicies(
21✔
2023
                                edgeIndex, edges, chanID,
21✔
2024
                        )
21✔
2025
                        if err != nil {
21✔
2026
                                chanID := byteOrder.Uint64(chanID)
×
2027
                                return fmt.Errorf("unable to fetch policies "+
×
2028
                                        "for edge with chan_id=%v: %v", chanID,
×
2029
                                        err)
×
2030
                        }
×
2031

2032
                        node1, err := fetchLightningNode(
21✔
2033
                                nodes, edgeInfo.NodeKey1Bytes[:],
21✔
2034
                        )
21✔
2035
                        if err != nil {
21✔
2036
                                return err
×
2037
                        }
×
2038

2039
                        node2, err := fetchLightningNode(
21✔
2040
                                nodes, edgeInfo.NodeKey2Bytes[:],
21✔
2041
                        )
21✔
2042
                        if err != nil {
21✔
2043
                                return err
×
2044
                        }
×
2045

2046
                        // Finally, we'll collate this edge with the rest of
2047
                        // edges to be returned.
2048
                        edgesSeen[chanIDInt] = struct{}{}
21✔
2049
                        channel := ChannelEdge{
21✔
2050
                                Info:    &edgeInfo,
21✔
2051
                                Policy1: edge1,
21✔
2052
                                Policy2: edge2,
21✔
2053
                                Node1:   &node1,
21✔
2054
                                Node2:   &node2,
21✔
2055
                        }
21✔
2056
                        edgesInHorizon = append(edgesInHorizon, channel)
21✔
2057
                        edgesToCache[chanIDInt] = channel
21✔
2058
                }
2059

2060
                return nil
146✔
2061
        }, func() {
146✔
2062
                edgesSeen = make(map[uint64]struct{})
146✔
2063
                edgesToCache = make(map[uint64]ChannelEdge)
146✔
2064
                edgesInHorizon = nil
146✔
2065
        })
146✔
2066
        switch {
146✔
2067
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2068
                fallthrough
×
2069
        case errors.Is(err, ErrGraphNodesNotFound):
×
2070
                break
×
2071

2072
        case err != nil:
×
2073
                return nil, err
×
2074
        }
2075

2076
        // Insert any edges loaded from disk into the cache.
2077
        for chanid, channel := range edgesToCache {
167✔
2078
                c.chanCache.insert(chanid, channel)
21✔
2079
        }
21✔
2080

2081
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
146✔
2082
                float64(hits)/float64(len(edgesInHorizon)), hits,
146✔
2083
                len(edgesInHorizon))
146✔
2084

146✔
2085
        return edgesInHorizon, nil
146✔
2086
}
2087

2088
// NodeUpdatesInHorizon returns all the known lightning node which have an
2089
// update timestamp within the passed range. This method can be used by two
2090
// nodes to quickly determine if they have the same set of up to date node
2091
// announcements.
2092
func (c *KVStore) NodeUpdatesInHorizon(startTime,
2093
        endTime time.Time) ([]models.LightningNode, error) {
11✔
2094

11✔
2095
        var nodesInHorizon []models.LightningNode
11✔
2096

11✔
2097
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
2098
                nodes := tx.ReadBucket(nodeBucket)
11✔
2099
                if nodes == nil {
11✔
2100
                        return ErrGraphNodesNotFound
×
2101
                }
×
2102

2103
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
11✔
2104
                if nodeUpdateIndex == nil {
11✔
2105
                        return ErrGraphNodesNotFound
×
2106
                }
×
2107

2108
                // We'll now obtain a cursor to perform a range query within
2109
                // the index to find all node announcements within the horizon.
2110
                updateCursor := nodeUpdateIndex.ReadCursor()
11✔
2111

11✔
2112
                var startTimeBytes, endTimeBytes [8 + 33]byte
11✔
2113
                byteOrder.PutUint64(
11✔
2114
                        startTimeBytes[:8], uint64(startTime.Unix()),
11✔
2115
                )
11✔
2116
                byteOrder.PutUint64(
11✔
2117
                        endTimeBytes[:8], uint64(endTime.Unix()),
11✔
2118
                )
11✔
2119

11✔
2120
                // With our start and end times constructed, we'll step through
11✔
2121
                // the index collecting info for each node within the time
11✔
2122
                // range.
11✔
2123
                //
11✔
2124
                //nolint:ll
11✔
2125
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
11✔
2126
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
43✔
2127
                        nodePub := indexKey[8:]
32✔
2128
                        node, err := fetchLightningNode(nodes, nodePub)
32✔
2129
                        if err != nil {
32✔
2130
                                return err
×
2131
                        }
×
2132

2133
                        nodesInHorizon = append(nodesInHorizon, node)
32✔
2134
                }
2135

2136
                return nil
11✔
2137
        }, func() {
11✔
2138
                nodesInHorizon = nil
11✔
2139
        })
11✔
2140
        switch {
11✔
2141
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2142
                fallthrough
×
2143
        case errors.Is(err, ErrGraphNodesNotFound):
×
2144
                break
×
2145

2146
        case err != nil:
×
2147
                return nil, err
×
2148
        }
2149

2150
        return nodesInHorizon, nil
11✔
2151
}
2152

2153
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2154
// ID's that we don't know and are not known zombies of the passed set. In other
2155
// words, we perform a set difference of our set of chan ID's and the ones
2156
// passed in. This method can be used by callers to determine the set of
2157
// channels another peer knows of that we don't.
2158
func (c *KVStore) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
2159
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
120✔
2160

120✔
2161
        var newChanIDs []uint64
120✔
2162

120✔
2163
        c.cacheMu.Lock()
120✔
2164
        defer c.cacheMu.Unlock()
120✔
2165

120✔
2166
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
240✔
2167
                edges := tx.ReadBucket(edgeBucket)
120✔
2168
                if edges == nil {
120✔
2169
                        return ErrGraphNoEdgesFound
×
2170
                }
×
2171
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
120✔
2172
                if edgeIndex == nil {
120✔
2173
                        return ErrGraphNoEdgesFound
×
2174
                }
×
2175

2176
                // Fetch the zombie index, it may not exist if no edges have
2177
                // ever been marked as zombies. If the index has been
2178
                // initialized, we will use it later to skip known zombie edges.
2179
                zombieIndex := edges.NestedReadBucket(zombieBucket)
120✔
2180

120✔
2181
                // We'll run through the set of chanIDs and collate only the
120✔
2182
                // set of channel that are unable to be found within our db.
120✔
2183
                var cidBytes [8]byte
120✔
2184
                for _, info := range chansInfo {
238✔
2185
                        scid := info.ShortChannelID.ToUint64()
118✔
2186
                        byteOrder.PutUint64(cidBytes[:], scid)
118✔
2187

118✔
2188
                        // If the edge is already known, skip it.
118✔
2189
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
142✔
2190
                                continue
24✔
2191
                        }
2192

2193
                        // If the edge is a known zombie, skip it.
2194
                        if zombieIndex != nil {
194✔
2195
                                isZombie, _, _ := isZombieEdge(
97✔
2196
                                        zombieIndex, scid,
97✔
2197
                                )
97✔
2198

97✔
2199
                                // TODO(ziggie): Make sure that for the strict
97✔
2200
                                // pruning case we compare the pubkeys and
97✔
2201
                                // whether the right timestamp is not older than
97✔
2202
                                // the `ChannelPruneExpiry`.
97✔
2203
                                //
97✔
2204
                                // NOTE: The timestamp data has no verification
97✔
2205
                                // attached to it in the `ReplyChannelRange` msg
97✔
2206
                                // so we are trusting this data at this point.
97✔
2207
                                // However it is not critical because we are
97✔
2208
                                // just removing the channel from the db when
97✔
2209
                                // the timestamps are more recent. During the
97✔
2210
                                // querying of the gossip msg verification
97✔
2211
                                // happens as usual.
97✔
2212
                                // However we should start punishing peers when
97✔
2213
                                // they don't provide us honest data ?
97✔
2214
                                isStillZombie := isZombieChan(
97✔
2215
                                        info.Node1UpdateTimestamp,
97✔
2216
                                        info.Node2UpdateTimestamp,
97✔
2217
                                )
97✔
2218

97✔
2219
                                switch {
97✔
2220
                                // If the edge is a known zombie and if we
2221
                                // would still consider it a zombie given the
2222
                                // latest update timestamps, then we skip this
2223
                                // channel.
2224
                                case isZombie && isStillZombie:
27✔
2225
                                        continue
27✔
2226

2227
                                // Otherwise, if we have marked it as a zombie
2228
                                // but the latest update timestamps could bring
2229
                                // it back from the dead, then we mark it alive,
2230
                                // and we let it be added to the set of IDs to
2231
                                // query our peer for.
2232
                                case isZombie && !isStillZombie:
21✔
2233
                                        err := c.markEdgeLiveUnsafe(tx, scid)
21✔
2234
                                        if err != nil {
21✔
2235
                                                return err
×
2236
                                        }
×
2237
                                }
2238
                        }
2239

2240
                        newChanIDs = append(newChanIDs, scid)
70✔
2241
                }
2242

2243
                return nil
120✔
2244
        }, func() {
120✔
2245
                newChanIDs = nil
120✔
2246
        })
120✔
2247
        switch {
120✔
2248
        // If we don't know of any edges yet, then we'll return the entire set
2249
        // of chan IDs specified.
2250
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2251
                ogChanIDs := make([]uint64, len(chansInfo))
×
2252
                for i, info := range chansInfo {
×
2253
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2254
                }
×
2255

2256
                return ogChanIDs, nil
×
2257

2258
        case err != nil:
×
2259
                return nil, err
×
2260
        }
2261

2262
        return newChanIDs, nil
120✔
2263
}
2264

2265
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2266
// latest received channel updates for the channel.
2267
type ChannelUpdateInfo struct {
2268
        // ShortChannelID is the SCID identifier of the channel.
2269
        ShortChannelID lnwire.ShortChannelID
2270

2271
        // Node1UpdateTimestamp is the timestamp of the latest received update
2272
        // from the node 1 channel peer. This will be set to zero time if no
2273
        // update has yet been received from this node.
2274
        Node1UpdateTimestamp time.Time
2275

2276
        // Node2UpdateTimestamp is the timestamp of the latest received update
2277
        // from the node 2 channel peer. This will be set to zero time if no
2278
        // update has yet been received from this node.
2279
        Node2UpdateTimestamp time.Time
2280
}
2281

2282
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2283
// timestamps with zero seconds unix timestamp which equals
2284
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2285
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2286
        node2Timestamp time.Time) ChannelUpdateInfo {
221✔
2287

221✔
2288
        chanInfo := ChannelUpdateInfo{
221✔
2289
                ShortChannelID:       scid,
221✔
2290
                Node1UpdateTimestamp: node1Timestamp,
221✔
2291
                Node2UpdateTimestamp: node2Timestamp,
221✔
2292
        }
221✔
2293

221✔
2294
        if node1Timestamp.IsZero() {
432✔
2295
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
211✔
2296
        }
211✔
2297

2298
        if node2Timestamp.IsZero() {
432✔
2299
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
211✔
2300
        }
211✔
2301

2302
        return chanInfo
221✔
2303
}
2304

2305
// BlockChannelRange represents a range of channels for a given block height.
2306
type BlockChannelRange struct {
2307
        // Height is the height of the block all of the channels below were
2308
        // included in.
2309
        Height uint32
2310

2311
        // Channels is the list of channels identified by their short ID
2312
        // representation known to us that were included in the block height
2313
        // above. The list may include channel update timestamp information if
2314
        // requested.
2315
        Channels []ChannelUpdateInfo
2316
}
2317

2318
// FilterChannelRange returns the channel ID's of all known channels which were
2319
// mined in a block height within the passed range. The channel IDs are grouped
2320
// by their common block height. This method can be used to quickly share with a
2321
// peer the set of channels we know of within a particular range to catch them
2322
// up after a period of time offline. If withTimestamps is true then the
2323
// timestamp info of the latest received channel update messages of the channel
2324
// will be included in the response.
2325
func (c *KVStore) FilterChannelRange(startHeight,
2326
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
14✔
2327

14✔
2328
        startChanID := &lnwire.ShortChannelID{
14✔
2329
                BlockHeight: startHeight,
14✔
2330
        }
14✔
2331

14✔
2332
        endChanID := lnwire.ShortChannelID{
14✔
2333
                BlockHeight: endHeight,
14✔
2334
                TxIndex:     math.MaxUint32 & 0x00ffffff,
14✔
2335
                TxPosition:  math.MaxUint16,
14✔
2336
        }
14✔
2337

14✔
2338
        // As we need to perform a range scan, we'll convert the starting and
14✔
2339
        // ending height to their corresponding values when encoded using short
14✔
2340
        // channel ID's.
14✔
2341
        var chanIDStart, chanIDEnd [8]byte
14✔
2342
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
14✔
2343
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
14✔
2344

14✔
2345
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
14✔
2346
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
28✔
2347
                edges := tx.ReadBucket(edgeBucket)
14✔
2348
                if edges == nil {
14✔
2349
                        return ErrGraphNoEdgesFound
×
2350
                }
×
2351
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
14✔
2352
                if edgeIndex == nil {
14✔
2353
                        return ErrGraphNoEdgesFound
×
2354
                }
×
2355

2356
                cursor := edgeIndex.ReadCursor()
14✔
2357

14✔
2358
                // We'll now iterate through the database, and find each
14✔
2359
                // channel ID that resides within the specified range.
14✔
2360
                //
14✔
2361
                //nolint:ll
14✔
2362
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
14✔
2363
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
61✔
2364
                        // Don't send alias SCIDs during gossip sync.
47✔
2365
                        edgeReader := bytes.NewReader(v)
47✔
2366
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
47✔
2367
                        if err != nil {
47✔
2368
                                return err
×
2369
                        }
×
2370

2371
                        if edgeInfo.AuthProof == nil {
50✔
2372
                                continue
3✔
2373
                        }
2374

2375
                        // This channel ID rests within the target range, so
2376
                        // we'll add it to our returned set.
2377
                        rawCid := byteOrder.Uint64(k)
47✔
2378
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
47✔
2379

47✔
2380
                        chanInfo := NewChannelUpdateInfo(
47✔
2381
                                cid, time.Time{}, time.Time{},
47✔
2382
                        )
47✔
2383

47✔
2384
                        if !withTimestamps {
69✔
2385
                                channelsPerBlock[cid.BlockHeight] = append(
22✔
2386
                                        channelsPerBlock[cid.BlockHeight],
22✔
2387
                                        chanInfo,
22✔
2388
                                )
22✔
2389

22✔
2390
                                continue
22✔
2391
                        }
2392

2393
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
25✔
2394

25✔
2395
                        rawPolicy := edges.Get(node1Key)
25✔
2396
                        if len(rawPolicy) != 0 {
34✔
2397
                                r := bytes.NewReader(rawPolicy)
9✔
2398

9✔
2399
                                edge, err := deserializeChanEdgePolicyRaw(r)
9✔
2400
                                if err != nil && !errors.Is(
9✔
2401
                                        err, ErrEdgePolicyOptionalFieldNotFound,
9✔
2402
                                ) {
9✔
2403

×
2404
                                        return err
×
2405
                                }
×
2406

2407
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
9✔
2408
                        }
2409

2410
                        rawPolicy = edges.Get(node2Key)
25✔
2411
                        if len(rawPolicy) != 0 {
39✔
2412
                                r := bytes.NewReader(rawPolicy)
14✔
2413

14✔
2414
                                edge, err := deserializeChanEdgePolicyRaw(r)
14✔
2415
                                if err != nil && !errors.Is(
14✔
2416
                                        err, ErrEdgePolicyOptionalFieldNotFound,
14✔
2417
                                ) {
14✔
2418

×
2419
                                        return err
×
2420
                                }
×
2421

2422
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
14✔
2423
                        }
2424

2425
                        channelsPerBlock[cid.BlockHeight] = append(
25✔
2426
                                channelsPerBlock[cid.BlockHeight], chanInfo,
25✔
2427
                        )
25✔
2428
                }
2429

2430
                return nil
14✔
2431
        }, func() {
14✔
2432
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
14✔
2433
        })
14✔
2434

2435
        switch {
14✔
2436
        // If we don't know of any channels yet, then there's nothing to
2437
        // filter, so we'll return an empty slice.
2438
        case errors.Is(err, ErrGraphNoEdgesFound) || len(channelsPerBlock) == 0:
6✔
2439
                return nil, nil
6✔
2440

2441
        case err != nil:
×
2442
                return nil, err
×
2443
        }
2444

2445
        // Return the channel ranges in ascending block height order.
2446
        blocks := make([]uint32, 0, len(channelsPerBlock))
11✔
2447
        for block := range channelsPerBlock {
36✔
2448
                blocks = append(blocks, block)
25✔
2449
        }
25✔
2450
        sort.Slice(blocks, func(i, j int) bool {
33✔
2451
                return blocks[i] < blocks[j]
22✔
2452
        })
22✔
2453

2454
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
11✔
2455
        for _, block := range blocks {
36✔
2456
                channelRanges = append(channelRanges, BlockChannelRange{
25✔
2457
                        Height:   block,
25✔
2458
                        Channels: channelsPerBlock[block],
25✔
2459
                })
25✔
2460
        }
25✔
2461

2462
        return channelRanges, nil
11✔
2463
}
2464

2465
// FetchChanInfos returns the set of channel edges that correspond to the passed
2466
// channel ID's. If an edge is the query is unknown to the database, it will
2467
// skipped and the result will contain only those edges that exist at the time
2468
// of the query. This can be used to respond to peer queries that are seeking to
2469
// fill in gaps in their view of the channel graph.
2470
func (c *KVStore) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
7✔
2471
        return c.fetchChanInfos(nil, chanIDs)
7✔
2472
}
7✔
2473

2474
// fetchChanInfos returns the set of channel edges that correspond to the passed
2475
// channel ID's. If an edge is the query is unknown to the database, it will
2476
// skipped and the result will contain only those edges that exist at the time
2477
// of the query. This can be used to respond to peer queries that are seeking to
2478
// fill in gaps in their view of the channel graph.
2479
//
2480
// NOTE: An optional transaction may be provided. If none is provided, then a
2481
// new one will be created.
2482
func (c *KVStore) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2483
        []ChannelEdge, error) {
7✔
2484
        // TODO(roasbeef): sort cids?
7✔
2485

7✔
2486
        var (
7✔
2487
                chanEdges []ChannelEdge
7✔
2488
                cidBytes  [8]byte
7✔
2489
        )
7✔
2490

7✔
2491
        fetchChanInfos := func(tx kvdb.RTx) error {
14✔
2492
                edges := tx.ReadBucket(edgeBucket)
7✔
2493
                if edges == nil {
7✔
2494
                        return ErrGraphNoEdgesFound
×
2495
                }
×
2496
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
7✔
2497
                if edgeIndex == nil {
7✔
2498
                        return ErrGraphNoEdgesFound
×
2499
                }
×
2500
                nodes := tx.ReadBucket(nodeBucket)
7✔
2501
                if nodes == nil {
7✔
2502
                        return ErrGraphNotFound
×
2503
                }
×
2504

2505
                for _, cid := range chanIDs {
21✔
2506
                        byteOrder.PutUint64(cidBytes[:], cid)
14✔
2507

14✔
2508
                        // First, we'll fetch the static edge information. If
14✔
2509
                        // the edge is unknown, we will skip the edge and
14✔
2510
                        // continue gathering all known edges.
14✔
2511
                        edgeInfo, err := fetchChanEdgeInfo(
14✔
2512
                                edgeIndex, cidBytes[:],
14✔
2513
                        )
14✔
2514
                        switch {
14✔
2515
                        case errors.Is(err, ErrEdgeNotFound):
3✔
2516
                                continue
3✔
2517
                        case err != nil:
×
2518
                                return err
×
2519
                        }
2520

2521
                        // With the static information obtained, we'll now
2522
                        // fetch the dynamic policy info.
2523
                        edge1, edge2, err := fetchChanEdgePolicies(
11✔
2524
                                edgeIndex, edges, cidBytes[:],
11✔
2525
                        )
11✔
2526
                        if err != nil {
11✔
2527
                                return err
×
2528
                        }
×
2529

2530
                        node1, err := fetchLightningNode(
11✔
2531
                                nodes, edgeInfo.NodeKey1Bytes[:],
11✔
2532
                        )
11✔
2533
                        if err != nil {
11✔
2534
                                return err
×
2535
                        }
×
2536

2537
                        node2, err := fetchLightningNode(
11✔
2538
                                nodes, edgeInfo.NodeKey2Bytes[:],
11✔
2539
                        )
11✔
2540
                        if err != nil {
11✔
2541
                                return err
×
2542
                        }
×
2543

2544
                        chanEdges = append(chanEdges, ChannelEdge{
11✔
2545
                                Info:    &edgeInfo,
11✔
2546
                                Policy1: edge1,
11✔
2547
                                Policy2: edge2,
11✔
2548
                                Node1:   &node1,
11✔
2549
                                Node2:   &node2,
11✔
2550
                        })
11✔
2551
                }
2552

2553
                return nil
7✔
2554
        }
2555

2556
        if tx == nil {
14✔
2557
                err := kvdb.View(c.db, fetchChanInfos, func() {
14✔
2558
                        chanEdges = nil
7✔
2559
                })
7✔
2560
                if err != nil {
7✔
2561
                        return nil, err
×
2562
                }
×
2563

2564
                return chanEdges, nil
7✔
2565
        }
2566

UNCOV
2567
        err := fetchChanInfos(tx)
×
UNCOV
2568
        if err != nil {
×
2569
                return nil, err
×
2570
        }
×
2571

UNCOV
2572
        return chanEdges, nil
×
2573
}
2574

2575
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2576
        edge1, edge2 *models.ChannelEdgePolicy) error {
147✔
2577

147✔
2578
        // First, we'll fetch the edge update index bucket which currently
147✔
2579
        // stores an entry for the channel we're about to delete.
147✔
2580
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
147✔
2581
        if updateIndex == nil {
147✔
2582
                // No edges in bucket, return early.
×
2583
                return nil
×
2584
        }
×
2585

2586
        // Now that we have the bucket, we'll attempt to construct a template
2587
        // for the index key: updateTime || chanid.
2588
        var indexKey [8 + 8]byte
147✔
2589
        byteOrder.PutUint64(indexKey[8:], chanID)
147✔
2590

147✔
2591
        // With the template constructed, we'll attempt to delete an entry that
147✔
2592
        // would have been created by both edges: we'll alternate the update
147✔
2593
        // times, as one may had overridden the other.
147✔
2594
        if edge1 != nil {
160✔
2595
                byteOrder.PutUint64(
13✔
2596
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
13✔
2597
                )
13✔
2598
                if err := updateIndex.Delete(indexKey[:]); err != nil {
13✔
2599
                        return err
×
2600
                }
×
2601
        }
2602

2603
        // We'll also attempt to delete the entry that may have been created by
2604
        // the second edge.
2605
        if edge2 != nil {
162✔
2606
                byteOrder.PutUint64(
15✔
2607
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
15✔
2608
                )
15✔
2609
                if err := updateIndex.Delete(indexKey[:]); err != nil {
15✔
2610
                        return err
×
2611
                }
×
2612
        }
2613

2614
        return nil
147✔
2615
}
2616

2617
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2618
// cache. It then goes on to delete any policy info and edge info for this
2619
// channel from the DB and finally, if isZombie is true, it will add an entry
2620
// for this channel in the zombie index.
2621
//
2622
// NOTE: this method MUST only be called if the cacheMu has already been
2623
// acquired.
2624
func (c *KVStore) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2625
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2626
        strictZombie bool) error {
210✔
2627

210✔
2628
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
210✔
2629
        if err != nil {
273✔
2630
                return err
63✔
2631
        }
63✔
2632

2633
        if c.graphCache != nil {
294✔
2634
                c.graphCache.RemoveChannel(
147✔
2635
                        edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
147✔
2636
                        edgeInfo.ChannelID,
147✔
2637
                )
147✔
2638
        }
147✔
2639

2640
        // We'll also remove the entry in the edge update index bucket before
2641
        // we delete the edges themselves so we can access their last update
2642
        // times.
2643
        cid := byteOrder.Uint64(chanID)
147✔
2644
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
147✔
2645
        if err != nil {
147✔
2646
                return err
×
2647
        }
×
2648
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
147✔
2649
        if err != nil {
147✔
2650
                return err
×
2651
        }
×
2652

2653
        // The edge key is of the format pubKey || chanID. First we construct
2654
        // the latter half, populating the channel ID.
2655
        var edgeKey [33 + 8]byte
147✔
2656
        copy(edgeKey[33:], chanID)
147✔
2657

147✔
2658
        // With the latter half constructed, copy over the first public key to
147✔
2659
        // delete the edge in this direction, then the second to delete the
147✔
2660
        // edge in the opposite direction.
147✔
2661
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
147✔
2662
        if edges.Get(edgeKey[:]) != nil {
294✔
2663
                if err := edges.Delete(edgeKey[:]); err != nil {
147✔
2664
                        return err
×
2665
                }
×
2666
        }
2667
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
147✔
2668
        if edges.Get(edgeKey[:]) != nil {
294✔
2669
                if err := edges.Delete(edgeKey[:]); err != nil {
147✔
2670
                        return err
×
2671
                }
×
2672
        }
2673

2674
        // As part of deleting the edge we also remove all disabled entries
2675
        // from the edgePolicyDisabledIndex bucket. We do that for both
2676
        // directions.
2677
        err = updateEdgePolicyDisabledIndex(edges, cid, false, false)
147✔
2678
        if err != nil {
147✔
2679
                return err
×
2680
        }
×
2681
        err = updateEdgePolicyDisabledIndex(edges, cid, true, false)
147✔
2682
        if err != nil {
147✔
2683
                return err
×
2684
        }
×
2685

2686
        // With the edge data deleted, we can purge the information from the two
2687
        // edge indexes.
2688
        if err := edgeIndex.Delete(chanID); err != nil {
147✔
2689
                return err
×
2690
        }
×
2691
        var b bytes.Buffer
147✔
2692
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
147✔
2693
                return err
×
2694
        }
×
2695
        if err := chanIndex.Delete(b.Bytes()); err != nil {
147✔
2696
                return err
×
2697
        }
×
2698

2699
        // Finally, we'll mark the edge as a zombie within our index if it's
2700
        // being removed due to the channel becoming a zombie. We do this to
2701
        // ensure we don't store unnecessary data for spent channels.
2702
        if !isZombie {
272✔
2703
                return nil
125✔
2704
        }
125✔
2705

2706
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
25✔
2707
        if strictZombie {
28✔
2708
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
3✔
2709
        }
3✔
2710

2711
        return markEdgeZombie(
25✔
2712
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
25✔
2713
        )
25✔
2714
}
2715

2716
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2717
// particular pair of channel policies. The return values are one of:
2718
//  1. (pubkey1, pubkey2)
2719
//  2. (pubkey1, blank)
2720
//  3. (blank, pubkey2)
2721
//
2722
// A blank pubkey means that corresponding node will be unable to resurrect a
2723
// channel on its own. For example, node1 may continue to publish recent
2724
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2725
// we don't want another fresh update from node1 to resurrect, as the edge can
2726
// only become live once node2 finally sends something recent.
2727
//
2728
// In the case where we have neither update, we allow either party to resurrect
2729
// the channel. If the channel were to be marked zombie again, it would be
2730
// marked with the correct lagging channel since we received an update from only
2731
// one side.
2732
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
2733
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
3✔
2734

3✔
2735
        switch {
3✔
2736
        // If we don't have either edge policy, we'll return both pubkeys so
2737
        // that the channel can be resurrected by either party.
2738
        case e1 == nil && e2 == nil:
×
2739
                return info.NodeKey1Bytes, info.NodeKey2Bytes
×
2740

2741
        // If we're missing edge1, or if both edges are present but edge1 is
2742
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2743
        // means that only an update from edge1 will be able to resurrect the
2744
        // channel.
2745
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
1✔
2746
                return info.NodeKey1Bytes, [33]byte{}
1✔
2747

2748
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2749
        // return a blank pubkey for edge1. In this case, only an update from
2750
        // edge2 can resurect the channel.
2751
        default:
2✔
2752
                return [33]byte{}, info.NodeKey2Bytes
2✔
2753
        }
2754
}
2755

2756
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2757
// within the database for the referenced channel. The `flags` attribute within
2758
// the ChannelEdgePolicy determines which of the directed edges are being
2759
// updated. If the flag is 1, then the first node's information is being
2760
// updated, otherwise it's the second node's information. The node ordering is
2761
// determined by the lexicographical ordering of the identity public keys of the
2762
// nodes on either side of the channel.
2763
func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2764
        op ...batch.SchedulerOption) error {
2,666✔
2765

2,666✔
2766
        var (
2,666✔
2767
                isUpdate1    bool
2,666✔
2768
                edgeNotFound bool
2,666✔
2769
        )
2,666✔
2770

2,666✔
2771
        r := &batch.Request{
2,666✔
2772
                Reset: func() {
5,332✔
2773
                        isUpdate1 = false
2,666✔
2774
                        edgeNotFound = false
2,666✔
2775
                },
2,666✔
2776
                Update: func(tx kvdb.RwTx) error {
2,666✔
2777
                        var err error
2,666✔
2778
                        isUpdate1, err = updateEdgePolicy(
2,666✔
2779
                                tx, edge, c.graphCache,
2,666✔
2780
                        )
2,666✔
2781

2,666✔
2782
                        if err != nil {
2,669✔
2783
                                log.Errorf("UpdateEdgePolicy faild: %v", err)
3✔
2784
                        }
3✔
2785

2786
                        // Silence ErrEdgeNotFound so that the batch can
2787
                        // succeed, but propagate the error via local state.
2788
                        if errors.Is(err, ErrEdgeNotFound) {
2,669✔
2789
                                edgeNotFound = true
3✔
2790
                                return nil
3✔
2791
                        }
3✔
2792

2793
                        return err
2,663✔
2794
                },
2795
                OnCommit: func(err error) error {
2,666✔
2796
                        switch {
2,666✔
2797
                        case err != nil:
×
2798
                                return err
×
2799
                        case edgeNotFound:
3✔
2800
                                return ErrEdgeNotFound
3✔
2801
                        default:
2,663✔
2802
                                c.updateEdgeCache(edge, isUpdate1)
2,663✔
2803
                                return nil
2,663✔
2804
                        }
2805
                },
2806
        }
2807

2808
        for _, f := range op {
2,669✔
2809
                f(r)
3✔
2810
        }
3✔
2811

2812
        return c.chanScheduler.Execute(r)
2,666✔
2813
}
2814

2815
func (c *KVStore) updateEdgeCache(e *models.ChannelEdgePolicy,
2816
        isUpdate1 bool) {
2,663✔
2817

2,663✔
2818
        // If an entry for this channel is found in reject cache, we'll modify
2,663✔
2819
        // the entry with the updated timestamp for the direction that was just
2,663✔
2820
        // written. If the edge doesn't exist, we'll load the cache entry lazily
2,663✔
2821
        // during the next query for this edge.
2,663✔
2822
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2,671✔
2823
                if isUpdate1 {
14✔
2824
                        entry.upd1Time = e.LastUpdate.Unix()
6✔
2825
                } else {
11✔
2826
                        entry.upd2Time = e.LastUpdate.Unix()
5✔
2827
                }
5✔
2828
                c.rejectCache.insert(e.ChannelID, entry)
8✔
2829
        }
2830

2831
        // If an entry for this channel is found in channel cache, we'll modify
2832
        // the entry with the updated policy for the direction that was just
2833
        // written. If the edge doesn't exist, we'll defer loading the info and
2834
        // policies and lazily read from disk during the next query.
2835
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
2,666✔
2836
                if isUpdate1 {
6✔
2837
                        channel.Policy1 = e
3✔
2838
                } else {
6✔
2839
                        channel.Policy2 = e
3✔
2840
                }
3✔
2841
                c.chanCache.insert(e.ChannelID, channel)
3✔
2842
        }
2843
}
2844

2845
// updateEdgePolicy attempts to update an edge's policy within the relevant
2846
// buckets using an existing database transaction. The returned boolean will be
2847
// true if the updated policy belongs to node1, and false if the policy belonged
2848
// to node2.
2849
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2850
        graphCache *GraphCache) (bool, error) {
2,666✔
2851

2,666✔
2852
        edges := tx.ReadWriteBucket(edgeBucket)
2,666✔
2853
        if edges == nil {
2,666✔
2854
                return false, ErrEdgeNotFound
×
2855
        }
×
2856
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
2,666✔
2857
        if edgeIndex == nil {
2,666✔
2858
                return false, ErrEdgeNotFound
×
2859
        }
×
2860

2861
        // Create the channelID key be converting the channel ID
2862
        // integer into a byte slice.
2863
        var chanID [8]byte
2,666✔
2864
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
2,666✔
2865

2,666✔
2866
        // With the channel ID, we then fetch the value storing the two
2,666✔
2867
        // nodes which connect this channel edge.
2,666✔
2868
        nodeInfo := edgeIndex.Get(chanID[:])
2,666✔
2869
        if nodeInfo == nil {
2,669✔
2870
                return false, ErrEdgeNotFound
3✔
2871
        }
3✔
2872

2873
        // Depending on the flags value passed above, either the first
2874
        // or second edge policy is being updated.
2875
        var fromNode, toNode []byte
2,663✔
2876
        var isUpdate1 bool
2,663✔
2877
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,998✔
2878
                fromNode = nodeInfo[:33]
1,335✔
2879
                toNode = nodeInfo[33:66]
1,335✔
2880
                isUpdate1 = true
1,335✔
2881
        } else {
2,666✔
2882
                fromNode = nodeInfo[33:66]
1,331✔
2883
                toNode = nodeInfo[:33]
1,331✔
2884
                isUpdate1 = false
1,331✔
2885
        }
1,331✔
2886

2887
        // Finally, with the direction of the edge being updated
2888
        // identified, we update the on-disk edge representation.
2889
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
2,663✔
2890
        if err != nil {
2,663✔
2891
                return false, err
×
2892
        }
×
2893

2894
        var (
2,663✔
2895
                fromNodePubKey route.Vertex
2,663✔
2896
                toNodePubKey   route.Vertex
2,663✔
2897
        )
2,663✔
2898
        copy(fromNodePubKey[:], fromNode)
2,663✔
2899
        copy(toNodePubKey[:], toNode)
2,663✔
2900

2,663✔
2901
        if graphCache != nil {
4,940✔
2902
                graphCache.UpdatePolicy(
2,277✔
2903
                        edge, fromNodePubKey, toNodePubKey, isUpdate1,
2,277✔
2904
                )
2,277✔
2905
        }
2,277✔
2906

2907
        return isUpdate1, nil
2,663✔
2908
}
2909

2910
// isPublic determines whether the node is seen as public within the graph from
2911
// the source node's point of view. An existing database transaction can also be
2912
// specified.
2913
func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2914
        sourcePubKey []byte) (bool, error) {
16✔
2915

16✔
2916
        // In order to determine whether this node is publicly advertised within
16✔
2917
        // the graph, we'll need to look at all of its edges and check whether
16✔
2918
        // they extend to any other node than the source node. errDone will be
16✔
2919
        // used to terminate the check early.
16✔
2920
        nodeIsPublic := false
16✔
2921
        errDone := errors.New("done")
16✔
2922
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
16✔
2923
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
16✔
2924
                _ *models.ChannelEdgePolicy) error {
29✔
2925

13✔
2926
                // If this edge doesn't extend to the source node, we'll
13✔
2927
                // terminate our search as we can now conclude that the node is
13✔
2928
                // publicly advertised within the graph due to the local node
13✔
2929
                // knowing of the current edge.
13✔
2930
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
13✔
2931
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
19✔
2932

6✔
2933
                        nodeIsPublic = true
6✔
2934
                        return errDone
6✔
2935
                }
6✔
2936

2937
                // Since the edge _does_ extend to the source node, we'll also
2938
                // need to ensure that this is a public edge.
2939
                if info.AuthProof != nil {
19✔
2940
                        nodeIsPublic = true
9✔
2941
                        return errDone
9✔
2942
                }
9✔
2943

2944
                // Otherwise, we'll continue our search.
2945
                return nil
4✔
2946
        })
2947
        if err != nil && !errors.Is(err, errDone) {
16✔
2948
                return false, err
×
2949
        }
×
2950

2951
        return nodeIsPublic, nil
16✔
2952
}
2953

2954
// FetchLightningNodeTx attempts to look up a target node by its identity
2955
// public key. If the node isn't found in the database, then
2956
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
2957
// If none is provided, then a new one will be created.
2958
func (c *KVStore) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
2959
        *models.LightningNode, error) {
3,633✔
2960

3,633✔
2961
        return c.fetchLightningNode(tx, nodePub)
3,633✔
2962
}
3,633✔
2963

2964
// FetchLightningNode attempts to look up a target node by its identity public
2965
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2966
// returned.
2967
func (c *KVStore) FetchLightningNode(nodePub route.Vertex) (
2968
        *models.LightningNode, error) {
155✔
2969

155✔
2970
        return c.fetchLightningNode(nil, nodePub)
155✔
2971
}
155✔
2972

2973
// fetchLightningNode attempts to look up a target node by its identity public
2974
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2975
// returned. An optional transaction may be provided. If none is provided, then
2976
// a new one will be created.
2977
func (c *KVStore) fetchLightningNode(tx kvdb.RTx,
2978
        nodePub route.Vertex) (*models.LightningNode, error) {
3,785✔
2979

3,785✔
2980
        var node *models.LightningNode
3,785✔
2981
        fetch := func(tx kvdb.RTx) error {
7,570✔
2982
                // First grab the nodes bucket which stores the mapping from
3,785✔
2983
                // pubKey to node information.
3,785✔
2984
                nodes := tx.ReadBucket(nodeBucket)
3,785✔
2985
                if nodes == nil {
3,785✔
2986
                        return ErrGraphNotFound
×
2987
                }
×
2988

2989
                // If a key for this serialized public key isn't found, then
2990
                // the target node doesn't exist within the database.
2991
                nodeBytes := nodes.Get(nodePub[:])
3,785✔
2992
                if nodeBytes == nil {
3,802✔
2993
                        return ErrGraphNodeNotFound
17✔
2994
                }
17✔
2995

2996
                // If the node is found, then we can de deserialize the node
2997
                // information to return to the user.
2998
                nodeReader := bytes.NewReader(nodeBytes)
3,771✔
2999
                n, err := deserializeLightningNode(nodeReader)
3,771✔
3000
                if err != nil {
3,771✔
3001
                        return err
×
3002
                }
×
3003

3004
                node = &n
3,771✔
3005

3,771✔
3006
                return nil
3,771✔
3007
        }
3008

3009
        if tx == nil {
3,943✔
3010
                err := kvdb.View(
158✔
3011
                        c.db, fetch, func() {
316✔
3012
                                node = nil
158✔
3013
                        },
158✔
3014
                )
3015
                if err != nil {
164✔
3016
                        return nil, err
6✔
3017
                }
6✔
3018

3019
                return node, nil
155✔
3020
        }
3021

3022
        err := fetch(tx)
3,627✔
3023
        if err != nil {
3,638✔
3024
                return nil, err
11✔
3025
        }
11✔
3026

3027
        return node, nil
3,616✔
3028
}
3029

3030
// HasLightningNode determines if the graph has a vertex identified by the
3031
// target node identity public key. If the node exists in the database, a
3032
// timestamp of when the data for the node was lasted updated is returned along
3033
// with a true boolean. Otherwise, an empty time.Time is returned with a false
3034
// boolean.
3035
func (c *KVStore) HasLightningNode(nodePub [33]byte) (time.Time, bool,
3036
        error) {
19✔
3037

19✔
3038
        var (
19✔
3039
                updateTime time.Time
19✔
3040
                exists     bool
19✔
3041
        )
19✔
3042

19✔
3043
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
38✔
3044
                // First grab the nodes bucket which stores the mapping from
19✔
3045
                // pubKey to node information.
19✔
3046
                nodes := tx.ReadBucket(nodeBucket)
19✔
3047
                if nodes == nil {
19✔
3048
                        return ErrGraphNotFound
×
3049
                }
×
3050

3051
                // If a key for this serialized public key isn't found, we can
3052
                // exit early.
3053
                nodeBytes := nodes.Get(nodePub[:])
19✔
3054
                if nodeBytes == nil {
25✔
3055
                        exists = false
6✔
3056
                        return nil
6✔
3057
                }
6✔
3058

3059
                // Otherwise we continue on to obtain the time stamp
3060
                // representing the last time the data for this node was
3061
                // updated.
3062
                nodeReader := bytes.NewReader(nodeBytes)
16✔
3063
                node, err := deserializeLightningNode(nodeReader)
16✔
3064
                if err != nil {
16✔
3065
                        return err
×
3066
                }
×
3067

3068
                exists = true
16✔
3069
                updateTime = node.LastUpdate
16✔
3070

16✔
3071
                return nil
16✔
3072
        }, func() {
19✔
3073
                updateTime = time.Time{}
19✔
3074
                exists = false
19✔
3075
        })
19✔
3076
        if err != nil {
19✔
3077
                return time.Time{}, exists, err
×
3078
        }
×
3079

3080
        return updateTime, exists, nil
19✔
3081
}
3082

3083
// nodeTraversal is used to traverse all channels of a node given by its
3084
// public key and passes channel information into the specified callback.
3085
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3086
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3087
                *models.ChannelEdgePolicy) error) error {
1,269✔
3088

1,269✔
3089
        traversal := func(tx kvdb.RTx) error {
2,538✔
3090
                edges := tx.ReadBucket(edgeBucket)
1,269✔
3091
                if edges == nil {
1,269✔
3092
                        return ErrGraphNotFound
×
3093
                }
×
3094
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1,269✔
3095
                if edgeIndex == nil {
1,269✔
3096
                        return ErrGraphNoEdgesFound
×
3097
                }
×
3098

3099
                // In order to reach all the edges for this node, we take
3100
                // advantage of the construction of the key-space within the
3101
                // edge bucket. The keys are stored in the form: pubKey ||
3102
                // chanID. Therefore, starting from a chanID of zero, we can
3103
                // scan forward in the bucket, grabbing all the edges for the
3104
                // node. Once the prefix no longer matches, then we know we're
3105
                // done.
3106
                var nodeStart [33 + 8]byte
1,269✔
3107
                copy(nodeStart[:], nodePub)
1,269✔
3108
                copy(nodeStart[33:], chanStart[:])
1,269✔
3109

1,269✔
3110
                // Starting from the key pubKey || 0, we seek forward in the
1,269✔
3111
                // bucket until the retrieved key no longer has the public key
1,269✔
3112
                // as its prefix. This indicates that we've stepped over into
1,269✔
3113
                // another node's edges, so we can terminate our scan.
1,269✔
3114
                edgeCursor := edges.ReadCursor()
1,269✔
3115
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
5,112✔
3116
                        // If the prefix still matches, the channel id is
3,843✔
3117
                        // returned in nodeEdge. Channel id is used to lookup
3,843✔
3118
                        // the node at the other end of the channel and both
3,843✔
3119
                        // edge policies.
3,843✔
3120
                        chanID := nodeEdge[33:]
3,843✔
3121
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3,843✔
3122
                        if err != nil {
3,843✔
3123
                                return err
×
3124
                        }
×
3125

3126
                        outgoingPolicy, err := fetchChanEdgePolicy(
3,843✔
3127
                                edges, chanID, nodePub,
3,843✔
3128
                        )
3,843✔
3129
                        if err != nil {
3,843✔
3130
                                return err
×
3131
                        }
×
3132

3133
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3,843✔
3134
                        if err != nil {
3,843✔
3135
                                return err
×
3136
                        }
×
3137

3138
                        incomingPolicy, err := fetchChanEdgePolicy(
3,843✔
3139
                                edges, chanID, otherNode[:],
3,843✔
3140
                        )
3,843✔
3141
                        if err != nil {
3,843✔
3142
                                return err
×
3143
                        }
×
3144

3145
                        // Finally, we execute the callback.
3146
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3,843✔
3147
                        if err != nil {
3,855✔
3148
                                return err
12✔
3149
                        }
12✔
3150
                }
3151

3152
                return nil
1,260✔
3153
        }
3154

3155
        // If no transaction was provided, then we'll create a new transaction
3156
        // to execute the transaction within.
3157
        if tx == nil {
1,281✔
3158
                return kvdb.View(db, traversal, func() {})
24✔
3159
        }
3160

3161
        // Otherwise, we re-use the existing transaction to execute the graph
3162
        // traversal.
3163
        return traversal(tx)
1,260✔
3164
}
3165

3166
// ForEachNodeChannel iterates through all channels of the given node,
3167
// executing the passed callback with an edge info structure and the policies
3168
// of each end of the channel. The first edge policy is the outgoing edge *to*
3169
// the connecting node, while the second is the incoming edge *from* the
3170
// connecting node. If the callback returns an error, then the iteration is
3171
// halted with the error propagated back up to the caller.
3172
//
3173
// Unknown policies are passed into the callback as nil values.
3174
func (c *KVStore) ForEachNodeChannel(nodePub route.Vertex,
3175
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3176
                *models.ChannelEdgePolicy) error) error {
9✔
3177

9✔
3178
        return nodeTraversal(nil, nodePub[:], c.db, cb)
9✔
3179
}
9✔
3180

3181
// ForEachNodeChannelTx iterates through all channels of the given node,
3182
// executing the passed callback with an edge info structure and the policies
3183
// of each end of the channel. The first edge policy is the outgoing edge *to*
3184
// the connecting node, while the second is the incoming edge *from* the
3185
// connecting node. If the callback returns an error, then the iteration is
3186
// halted with the error propagated back up to the caller.
3187
//
3188
// Unknown policies are passed into the callback as nil values.
3189
//
3190
// If the caller wishes to re-use an existing boltdb transaction, then it
3191
// should be passed as the first argument.  Otherwise, the first argument should
3192
// be nil and a fresh transaction will be created to execute the graph
3193
// traversal.
3194
func (c *KVStore) ForEachNodeChannelTx(tx kvdb.RTx,
3195
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3196
                *models.ChannelEdgePolicy,
3197
                *models.ChannelEdgePolicy) error) error {
1,021✔
3198

1,021✔
3199
        return nodeTraversal(tx, nodePub[:], c.db, cb)
1,021✔
3200
}
1,021✔
3201

3202
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3203
// the target node in the channel. This is useful when one knows the pubkey of
3204
// one of the nodes, and wishes to obtain the full LightningNode for the other
3205
// end of the channel.
3206
func (c *KVStore) FetchOtherNode(tx kvdb.RTx,
3207
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
3208
        *models.LightningNode, error) {
3✔
3209

3✔
3210
        // Ensure that the node passed in is actually a member of the channel.
3✔
3211
        var targetNodeBytes [33]byte
3✔
3212
        switch {
3✔
3213
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
3✔
3214
                targetNodeBytes = channel.NodeKey2Bytes
3✔
3215
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
3✔
3216
                targetNodeBytes = channel.NodeKey1Bytes
3✔
3217
        default:
×
3218
                return nil, fmt.Errorf("node not participating in this channel")
×
3219
        }
3220

3221
        var targetNode *models.LightningNode
3✔
3222
        fetchNodeFunc := func(tx kvdb.RTx) error {
6✔
3223
                // First grab the nodes bucket which stores the mapping from
3✔
3224
                // pubKey to node information.
3✔
3225
                nodes := tx.ReadBucket(nodeBucket)
3✔
3226
                if nodes == nil {
3✔
3227
                        return ErrGraphNotFound
×
3228
                }
×
3229

3230
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
3✔
3231
                if err != nil {
3✔
3232
                        return err
×
3233
                }
×
3234

3235
                targetNode = &node
3✔
3236

3✔
3237
                return nil
3✔
3238
        }
3239

3240
        // If the transaction is nil, then we'll need to create a new one,
3241
        // otherwise we can use the existing db transaction.
3242
        var err error
3✔
3243
        if tx == nil {
3✔
3244
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3245
                        targetNode = nil
×
3246
                })
×
3247
        } else {
3✔
3248
                err = fetchNodeFunc(tx)
3✔
3249
        }
3✔
3250

3251
        return targetNode, err
3✔
3252
}
3253

3254
// computeEdgePolicyKeys is a helper function that can be used to compute the
3255
// keys used to index the channel edge policy info for the two nodes of the
3256
// edge. The keys for node 1 and node 2 are returned respectively.
3257
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
25✔
3258
        var (
25✔
3259
                node1Key [33 + 8]byte
25✔
3260
                node2Key [33 + 8]byte
25✔
3261
        )
25✔
3262

25✔
3263
        copy(node1Key[:], info.NodeKey1Bytes[:])
25✔
3264
        copy(node2Key[:], info.NodeKey2Bytes[:])
25✔
3265

25✔
3266
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
25✔
3267
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
25✔
3268

25✔
3269
        return node1Key[:], node2Key[:]
25✔
3270
}
25✔
3271

3272
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3273
// the channel identified by the funding outpoint. If the channel can't be
3274
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3275
// information for the channel itself is returned as well as two structs that
3276
// contain the routing policies for the channel in either direction.
3277
func (c *KVStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3278
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3279
        *models.ChannelEdgePolicy, error) {
14✔
3280

14✔
3281
        var (
14✔
3282
                edgeInfo *models.ChannelEdgeInfo
14✔
3283
                policy1  *models.ChannelEdgePolicy
14✔
3284
                policy2  *models.ChannelEdgePolicy
14✔
3285
        )
14✔
3286

14✔
3287
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
28✔
3288
                // First, grab the node bucket. This will be used to populate
14✔
3289
                // the Node pointers in each edge read from disk.
14✔
3290
                nodes := tx.ReadBucket(nodeBucket)
14✔
3291
                if nodes == nil {
14✔
3292
                        return ErrGraphNotFound
×
3293
                }
×
3294

3295
                // Next, grab the edge bucket which stores the edges, and also
3296
                // the index itself so we can group the directed edges together
3297
                // logically.
3298
                edges := tx.ReadBucket(edgeBucket)
14✔
3299
                if edges == nil {
14✔
3300
                        return ErrGraphNoEdgesFound
×
3301
                }
×
3302
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
14✔
3303
                if edgeIndex == nil {
14✔
3304
                        return ErrGraphNoEdgesFound
×
3305
                }
×
3306

3307
                // If the channel's outpoint doesn't exist within the outpoint
3308
                // index, then the edge does not exist.
3309
                chanIndex := edges.NestedReadBucket(channelPointBucket)
14✔
3310
                if chanIndex == nil {
14✔
3311
                        return ErrGraphNoEdgesFound
×
3312
                }
×
3313
                var b bytes.Buffer
14✔
3314
                if err := WriteOutpoint(&b, op); err != nil {
14✔
3315
                        return err
×
3316
                }
×
3317
                chanID := chanIndex.Get(b.Bytes())
14✔
3318
                if chanID == nil {
27✔
3319
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
13✔
3320
                }
13✔
3321

3322
                // If the channel is found to exists, then we'll first retrieve
3323
                // the general information for the channel.
3324
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
4✔
3325
                if err != nil {
4✔
3326
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3327
                }
×
3328
                edgeInfo = &edge
4✔
3329

4✔
3330
                // Once we have the information about the channels' parameters,
4✔
3331
                // we'll fetch the routing policies for each for the directed
4✔
3332
                // edges.
4✔
3333
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
4✔
3334
                if err != nil {
4✔
3335
                        return fmt.Errorf("failed to find policy: %w", err)
×
3336
                }
×
3337

3338
                policy1 = e1
4✔
3339
                policy2 = e2
4✔
3340

4✔
3341
                return nil
4✔
3342
        }, func() {
14✔
3343
                edgeInfo = nil
14✔
3344
                policy1 = nil
14✔
3345
                policy2 = nil
14✔
3346
        })
14✔
3347
        if err != nil {
27✔
3348
                return nil, nil, nil, err
13✔
3349
        }
13✔
3350

3351
        return edgeInfo, policy1, policy2, nil
4✔
3352
}
3353

3354
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3355
// channel identified by the channel ID. If the channel can't be found, then
3356
// ErrEdgeNotFound is returned. A struct which houses the general information
3357
// for the channel itself is returned as well as two structs that contain the
3358
// routing policies for the channel in either direction.
3359
//
3360
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3361
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3362
// the ChannelEdgeInfo will only include the public keys of each node.
3363
func (c *KVStore) FetchChannelEdgesByID(chanID uint64) (
3364
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3365
        *models.ChannelEdgePolicy, error) {
27✔
3366

27✔
3367
        var (
27✔
3368
                edgeInfo  *models.ChannelEdgeInfo
27✔
3369
                policy1   *models.ChannelEdgePolicy
27✔
3370
                policy2   *models.ChannelEdgePolicy
27✔
3371
                channelID [8]byte
27✔
3372
        )
27✔
3373

27✔
3374
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
54✔
3375
                // First, grab the node bucket. This will be used to populate
27✔
3376
                // the Node pointers in each edge read from disk.
27✔
3377
                nodes := tx.ReadBucket(nodeBucket)
27✔
3378
                if nodes == nil {
27✔
3379
                        return ErrGraphNotFound
×
3380
                }
×
3381

3382
                // Next, grab the edge bucket which stores the edges, and also
3383
                // the index itself so we can group the directed edges together
3384
                // logically.
3385
                edges := tx.ReadBucket(edgeBucket)
27✔
3386
                if edges == nil {
27✔
3387
                        return ErrGraphNoEdgesFound
×
3388
                }
×
3389
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
27✔
3390
                if edgeIndex == nil {
27✔
3391
                        return ErrGraphNoEdgesFound
×
3392
                }
×
3393

3394
                byteOrder.PutUint64(channelID[:], chanID)
27✔
3395

27✔
3396
                // Now, attempt to fetch edge.
27✔
3397
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
27✔
3398

27✔
3399
                // If it doesn't exist, we'll quickly check our zombie index to
27✔
3400
                // see if we've previously marked it as so.
27✔
3401
                if errors.Is(err, ErrEdgeNotFound) {
31✔
3402
                        // If the zombie index doesn't exist, or the edge is not
4✔
3403
                        // marked as a zombie within it, then we'll return the
4✔
3404
                        // original ErrEdgeNotFound error.
4✔
3405
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3406
                        if zombieIndex == nil {
4✔
3407
                                return ErrEdgeNotFound
×
3408
                        }
×
3409

3410
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
4✔
3411
                                zombieIndex, chanID,
4✔
3412
                        )
4✔
3413
                        if !isZombie {
7✔
3414
                                return ErrEdgeNotFound
3✔
3415
                        }
3✔
3416

3417
                        // Otherwise, the edge is marked as a zombie, so we'll
3418
                        // populate the edge info with the public keys of each
3419
                        // party as this is the only information we have about
3420
                        // it and return an error signaling so.
3421
                        edgeInfo = &models.ChannelEdgeInfo{
4✔
3422
                                NodeKey1Bytes: pubKey1,
4✔
3423
                                NodeKey2Bytes: pubKey2,
4✔
3424
                        }
4✔
3425

4✔
3426
                        return ErrZombieEdge
4✔
3427
                }
3428

3429
                // Otherwise, we'll just return the error if any.
3430
                if err != nil {
26✔
3431
                        return err
×
3432
                }
×
3433

3434
                edgeInfo = &edge
26✔
3435

26✔
3436
                // Then we'll attempt to fetch the accompanying policies of this
26✔
3437
                // edge.
26✔
3438
                e1, e2, err := fetchChanEdgePolicies(
26✔
3439
                        edgeIndex, edges, channelID[:],
26✔
3440
                )
26✔
3441
                if err != nil {
26✔
3442
                        return err
×
3443
                }
×
3444

3445
                policy1 = e1
26✔
3446
                policy2 = e2
26✔
3447

26✔
3448
                return nil
26✔
3449
        }, func() {
27✔
3450
                edgeInfo = nil
27✔
3451
                policy1 = nil
27✔
3452
                policy2 = nil
27✔
3453
        })
27✔
3454
        if errors.Is(err, ErrZombieEdge) {
31✔
3455
                return edgeInfo, nil, nil, err
4✔
3456
        }
4✔
3457
        if err != nil {
29✔
3458
                return nil, nil, nil, err
3✔
3459
        }
3✔
3460

3461
        return edgeInfo, policy1, policy2, nil
26✔
3462
}
3463

3464
// IsPublicNode is a helper method that determines whether the node with the
3465
// given public key is seen as a public node in the graph from the graph's
3466
// source node's point of view.
3467
func (c *KVStore) IsPublicNode(pubKey [33]byte) (bool, error) {
16✔
3468
        var nodeIsPublic bool
16✔
3469
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
32✔
3470
                nodes := tx.ReadBucket(nodeBucket)
16✔
3471
                if nodes == nil {
16✔
3472
                        return ErrGraphNodesNotFound
×
3473
                }
×
3474
                ourPubKey := nodes.Get(sourceKey)
16✔
3475
                if ourPubKey == nil {
16✔
3476
                        return ErrSourceNodeNotSet
×
3477
                }
×
3478
                node, err := fetchLightningNode(nodes, pubKey[:])
16✔
3479
                if err != nil {
16✔
3480
                        return err
×
3481
                }
×
3482

3483
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
16✔
3484

16✔
3485
                return err
16✔
3486
        }, func() {
16✔
3487
                nodeIsPublic = false
16✔
3488
        })
16✔
3489
        if err != nil {
16✔
3490
                return false, err
×
3491
        }
×
3492

3493
        return nodeIsPublic, nil
16✔
3494
}
3495

3496
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3497
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
49✔
3498
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
49✔
3499
        if err != nil {
49✔
3500
                return nil, err
×
3501
        }
×
3502

3503
        // With the witness script generated, we'll now turn it into a p2wsh
3504
        // script:
3505
        //  * OP_0 <sha256(script)>
3506
        bldr := txscript.NewScriptBuilder(
49✔
3507
                txscript.WithScriptAllocSize(input.P2WSHSize),
49✔
3508
        )
49✔
3509
        bldr.AddOp(txscript.OP_0)
49✔
3510
        scriptHash := sha256.Sum256(witnessScript)
49✔
3511
        bldr.AddData(scriptHash[:])
49✔
3512

49✔
3513
        return bldr.Script()
49✔
3514
}
3515

3516
// EdgePoint couples the outpoint of a channel with the funding script that it
3517
// creates. The FilteredChainView will use this to watch for spends of this
3518
// edge point on chain. We require both of these values as depending on the
3519
// concrete implementation, either the pkScript, or the out point will be used.
3520
type EdgePoint struct {
3521
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3522
        FundingPkScript []byte
3523

3524
        // OutPoint is the outpoint of the target channel.
3525
        OutPoint wire.OutPoint
3526
}
3527

3528
// String returns a human readable version of the target EdgePoint. We return
3529
// the outpoint directly as it is enough to uniquely identify the edge point.
3530
func (e *EdgePoint) String() string {
×
3531
        return e.OutPoint.String()
×
3532
}
×
3533

3534
// ChannelView returns the verifiable edge information for each active channel
3535
// within the known channel graph. The set of UTXO's (along with their scripts)
3536
// returned are the ones that need to be watched on chain to detect channel
3537
// closes on the resident blockchain.
3538
func (c *KVStore) ChannelView() ([]EdgePoint, error) {
25✔
3539
        var edgePoints []EdgePoint
25✔
3540
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
50✔
3541
                // We're going to iterate over the entire channel index, so
25✔
3542
                // we'll need to fetch the edgeBucket to get to the index as
25✔
3543
                // it's a sub-bucket.
25✔
3544
                edges := tx.ReadBucket(edgeBucket)
25✔
3545
                if edges == nil {
25✔
3546
                        return ErrGraphNoEdgesFound
×
3547
                }
×
3548
                chanIndex := edges.NestedReadBucket(channelPointBucket)
25✔
3549
                if chanIndex == nil {
25✔
3550
                        return ErrGraphNoEdgesFound
×
3551
                }
×
3552
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
25✔
3553
                if edgeIndex == nil {
25✔
3554
                        return ErrGraphNoEdgesFound
×
3555
                }
×
3556

3557
                // Once we have the proper bucket, we'll range over each key
3558
                // (which is the channel point for the channel) and decode it,
3559
                // accumulating each entry.
3560
                return chanIndex.ForEach(
25✔
3561
                        func(chanPointBytes, chanID []byte) error {
70✔
3562
                                chanPointReader := bytes.NewReader(
45✔
3563
                                        chanPointBytes,
45✔
3564
                                )
45✔
3565

45✔
3566
                                var chanPoint wire.OutPoint
45✔
3567
                                err := ReadOutpoint(chanPointReader, &chanPoint)
45✔
3568
                                if err != nil {
45✔
3569
                                        return err
×
3570
                                }
×
3571

3572
                                edgeInfo, err := fetchChanEdgeInfo(
45✔
3573
                                        edgeIndex, chanID,
45✔
3574
                                )
45✔
3575
                                if err != nil {
45✔
3576
                                        return err
×
3577
                                }
×
3578

3579
                                pkScript, err := genMultiSigP2WSH(
45✔
3580
                                        edgeInfo.BitcoinKey1Bytes[:],
45✔
3581
                                        edgeInfo.BitcoinKey2Bytes[:],
45✔
3582
                                )
45✔
3583
                                if err != nil {
45✔
3584
                                        return err
×
3585
                                }
×
3586

3587
                                edgePoints = append(edgePoints, EdgePoint{
45✔
3588
                                        FundingPkScript: pkScript,
45✔
3589
                                        OutPoint:        chanPoint,
45✔
3590
                                })
45✔
3591

45✔
3592
                                return nil
45✔
3593
                        },
3594
                )
3595
        }, func() {
25✔
3596
                edgePoints = nil
25✔
3597
        }); err != nil {
25✔
3598
                return nil, err
×
3599
        }
×
3600

3601
        return edgePoints, nil
25✔
3602
}
3603

3604
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3605
// zombie. This method is used on an ad-hoc basis, when channels need to be
3606
// marked as zombies outside the normal pruning cycle.
3607
func (c *KVStore) MarkEdgeZombie(chanID uint64,
3608
        pubKey1, pubKey2 [33]byte) error {
115✔
3609

115✔
3610
        c.cacheMu.Lock()
115✔
3611
        defer c.cacheMu.Unlock()
115✔
3612

115✔
3613
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
230✔
3614
                edges := tx.ReadWriteBucket(edgeBucket)
115✔
3615
                if edges == nil {
115✔
3616
                        return ErrGraphNoEdgesFound
×
3617
                }
×
3618
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
115✔
3619
                if err != nil {
115✔
3620
                        return fmt.Errorf("unable to create zombie "+
×
3621
                                "bucket: %w", err)
×
3622
                }
×
3623

3624
                if c.graphCache != nil {
230✔
3625
                        c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
115✔
3626
                }
115✔
3627

3628
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
115✔
3629
        })
3630
        if err != nil {
115✔
3631
                return err
×
3632
        }
×
3633

3634
        c.rejectCache.remove(chanID)
115✔
3635
        c.chanCache.remove(chanID)
115✔
3636

115✔
3637
        return nil
115✔
3638
}
3639

3640
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3641
// keys should represent the node public keys of the two parties involved in the
3642
// edge.
3643
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3644
        pubKey2 [33]byte) error {
140✔
3645

140✔
3646
        var k [8]byte
140✔
3647
        byteOrder.PutUint64(k[:], chanID)
140✔
3648

140✔
3649
        var v [66]byte
140✔
3650
        copy(v[:33], pubKey1[:])
140✔
3651
        copy(v[33:], pubKey2[:])
140✔
3652

140✔
3653
        return zombieIndex.Put(k[:], v[:])
140✔
3654
}
140✔
3655

3656
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3657
func (c *KVStore) MarkEdgeLive(chanID uint64) error {
2✔
3658
        c.cacheMu.Lock()
2✔
3659
        defer c.cacheMu.Unlock()
2✔
3660

2✔
3661
        return c.markEdgeLiveUnsafe(nil, chanID)
2✔
3662
}
2✔
3663

3664
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3665
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3666
// case a new transaction will be created.
3667
//
3668
// NOTE: this method MUST only be called if the cacheMu has already been
3669
// acquired.
3670
func (c *KVStore) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
23✔
3671
        dbFn := func(tx kvdb.RwTx) error {
46✔
3672
                edges := tx.ReadWriteBucket(edgeBucket)
23✔
3673
                if edges == nil {
23✔
3674
                        return ErrGraphNoEdgesFound
×
3675
                }
×
3676
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
23✔
3677
                if zombieIndex == nil {
23✔
3678
                        return nil
×
3679
                }
×
3680

3681
                var k [8]byte
23✔
3682
                byteOrder.PutUint64(k[:], chanID)
23✔
3683

23✔
3684
                if len(zombieIndex.Get(k[:])) == 0 {
24✔
3685
                        return ErrZombieEdgeNotFound
1✔
3686
                }
1✔
3687

3688
                return zombieIndex.Delete(k[:])
22✔
3689
        }
3690

3691
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3692
        // the existing transaction
3693
        var err error
23✔
3694
        if tx == nil {
25✔
3695
                err = kvdb.Update(c.db, dbFn, func() {})
4✔
3696
        } else {
21✔
3697
                err = dbFn(tx)
21✔
3698
        }
21✔
3699
        if err != nil {
24✔
3700
                return err
1✔
3701
        }
1✔
3702

3703
        c.rejectCache.remove(chanID)
22✔
3704
        c.chanCache.remove(chanID)
22✔
3705

22✔
3706
        return nil
22✔
3707
}
3708

3709
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3710
// zombie, then the two node public keys corresponding to this edge are also
3711
// returned.
3712
func (c *KVStore) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
5✔
3713
        var (
5✔
3714
                isZombie         bool
5✔
3715
                pubKey1, pubKey2 [33]byte
5✔
3716
        )
5✔
3717

5✔
3718
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
3719
                edges := tx.ReadBucket(edgeBucket)
5✔
3720
                if edges == nil {
5✔
3721
                        return ErrGraphNoEdgesFound
×
3722
                }
×
3723
                zombieIndex := edges.NestedReadBucket(zombieBucket)
5✔
3724
                if zombieIndex == nil {
5✔
3725
                        return nil
×
3726
                }
×
3727

3728
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
5✔
3729

5✔
3730
                return nil
5✔
3731
        }, func() {
5✔
3732
                isZombie = false
5✔
3733
                pubKey1 = [33]byte{}
5✔
3734
                pubKey2 = [33]byte{}
5✔
3735
        })
5✔
3736
        if err != nil {
5✔
3737
                return false, [33]byte{}, [33]byte{}
×
3738
        }
×
3739

3740
        return isZombie, pubKey1, pubKey2
5✔
3741
}
3742

3743
// isZombieEdge returns whether an entry exists for the given channel in the
3744
// zombie index. If an entry exists, then the two node public keys corresponding
3745
// to this edge are also returned.
3746
func isZombieEdge(zombieIndex kvdb.RBucket,
3747
        chanID uint64) (bool, [33]byte, [33]byte) {
201✔
3748

201✔
3749
        var k [8]byte
201✔
3750
        byteOrder.PutUint64(k[:], chanID)
201✔
3751

201✔
3752
        v := zombieIndex.Get(k[:])
201✔
3753
        if v == nil {
315✔
3754
                return false, [33]byte{}, [33]byte{}
114✔
3755
        }
114✔
3756

3757
        var pubKey1, pubKey2 [33]byte
90✔
3758
        copy(pubKey1[:], v[:33])
90✔
3759
        copy(pubKey2[:], v[33:])
90✔
3760

90✔
3761
        return true, pubKey1, pubKey2
90✔
3762
}
3763

3764
// NumZombies returns the current number of zombie channels in the graph.
3765
func (c *KVStore) NumZombies() (uint64, error) {
4✔
3766
        var numZombies uint64
4✔
3767
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
3768
                edges := tx.ReadBucket(edgeBucket)
4✔
3769
                if edges == nil {
4✔
3770
                        return nil
×
3771
                }
×
3772
                zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3773
                if zombieIndex == nil {
4✔
3774
                        return nil
×
3775
                }
×
3776

3777
                return zombieIndex.ForEach(func(_, _ []byte) error {
6✔
3778
                        numZombies++
2✔
3779
                        return nil
2✔
3780
                })
2✔
3781
        }, func() {
4✔
3782
                numZombies = 0
4✔
3783
        })
4✔
3784
        if err != nil {
4✔
3785
                return 0, err
×
3786
        }
×
3787

3788
        return numZombies, nil
4✔
3789
}
3790

3791
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3792
// that we can ignore channel announcements that we know to be closed without
3793
// having to validate them and fetch a block.
3794
func (c *KVStore) PutClosedScid(scid lnwire.ShortChannelID) error {
1✔
3795
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
3796
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
1✔
3797
                if err != nil {
1✔
3798
                        return err
×
3799
                }
×
3800

3801
                var k [8]byte
1✔
3802
                byteOrder.PutUint64(k[:], scid.ToUint64())
1✔
3803

1✔
3804
                return closedScids.Put(k[:], []byte{})
1✔
3805
        }, func() {})
1✔
3806
}
3807

3808
// IsClosedScid checks whether a channel identified by the passed in scid is
3809
// closed. This helps avoid having to perform expensive validation checks.
3810
// TODO: Add an LRU cache to cut down on disc reads.
3811
func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
5✔
3812
        var isClosed bool
5✔
3813
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
3814
                closedScids := tx.ReadBucket(closedScidBucket)
5✔
3815
                if closedScids == nil {
5✔
3816
                        return ErrClosedScidsNotFound
×
3817
                }
×
3818

3819
                var k [8]byte
5✔
3820
                byteOrder.PutUint64(k[:], scid.ToUint64())
5✔
3821

5✔
3822
                if closedScids.Get(k[:]) != nil {
6✔
3823
                        isClosed = true
1✔
3824
                        return nil
1✔
3825
                }
1✔
3826

3827
                return nil
4✔
3828
        }, func() {
5✔
3829
                isClosed = false
5✔
3830
        })
5✔
3831
        if err != nil {
5✔
3832
                return false, err
×
3833
        }
×
3834

3835
        return isClosed, nil
5✔
3836
}
3837

3838
// GraphSession will provide the call-back with access to a NodeTraverser
3839
// instance which can be used to perform queries against the channel graph.
3840
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
54✔
3841
        return c.db.View(func(tx walletdb.ReadTx) error {
108✔
3842
                return cb(&nodeTraverserSession{
54✔
3843
                        db: c,
54✔
3844
                        tx: tx,
54✔
3845
                })
54✔
3846
        }, func() {})
108✔
3847
}
3848

3849
// nodeTraverserSession implements the NodeTraverser interface but with a
3850
// backing read only transaction for a consistent view of the graph.
3851
type nodeTraverserSession struct {
3852
        tx kvdb.RTx
3853
        db *KVStore
3854
}
3855

3856
// ForEachNodeDirectedChannel calls the callback for every channel of the given
3857
// node.
3858
//
3859
// NOTE: Part of the NodeTraverser interface.
3860
func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex,
3861
        cb func(channel *DirectedChannel) error) error {
239✔
3862

239✔
3863
        return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb)
239✔
3864
}
239✔
3865

3866
// FetchNodeFeatures returns the features of the given node. If the node is
3867
// unknown, assume no additional features are supported.
3868
//
3869
// NOTE: Part of the NodeTraverser interface.
3870
func (c *nodeTraverserSession) FetchNodeFeatures(nodePub route.Vertex) (
3871
        *lnwire.FeatureVector, error) {
254✔
3872

254✔
3873
        return c.db.fetchNodeFeatures(c.tx, nodePub)
254✔
3874
}
254✔
3875

3876
func putLightningNode(nodeBucket, aliasBucket, updateIndex kvdb.RwBucket,
3877
        node *models.LightningNode) error {
1,005✔
3878

1,005✔
3879
        var (
1,005✔
3880
                scratch [16]byte
1,005✔
3881
                b       bytes.Buffer
1,005✔
3882
        )
1,005✔
3883

1,005✔
3884
        pub, err := node.PubKey()
1,005✔
3885
        if err != nil {
1,005✔
3886
                return err
×
3887
        }
×
3888
        nodePub := pub.SerializeCompressed()
1,005✔
3889

1,005✔
3890
        // If the node has the update time set, write it, else write 0.
1,005✔
3891
        updateUnix := uint64(0)
1,005✔
3892
        if node.LastUpdate.Unix() > 0 {
1,871✔
3893
                updateUnix = uint64(node.LastUpdate.Unix())
866✔
3894
        }
866✔
3895

3896
        byteOrder.PutUint64(scratch[:8], updateUnix)
1,005✔
3897
        if _, err := b.Write(scratch[:8]); err != nil {
1,005✔
3898
                return err
×
3899
        }
×
3900

3901
        if _, err := b.Write(nodePub); err != nil {
1,005✔
3902
                return err
×
3903
        }
×
3904

3905
        // If we got a node announcement for this node, we will have the rest
3906
        // of the data available. If not we don't have more data to write.
3907
        if !node.HaveNodeAnnouncement {
1,094✔
3908
                // Write HaveNodeAnnouncement=0.
89✔
3909
                byteOrder.PutUint16(scratch[:2], 0)
89✔
3910
                if _, err := b.Write(scratch[:2]); err != nil {
89✔
3911
                        return err
×
3912
                }
×
3913

3914
                return nodeBucket.Put(nodePub, b.Bytes())
89✔
3915
        }
3916

3917
        // Write HaveNodeAnnouncement=1.
3918
        byteOrder.PutUint16(scratch[:2], 1)
919✔
3919
        if _, err := b.Write(scratch[:2]); err != nil {
919✔
3920
                return err
×
3921
        }
×
3922

3923
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
919✔
3924
                return err
×
3925
        }
×
3926
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
919✔
3927
                return err
×
3928
        }
×
3929
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
919✔
3930
                return err
×
3931
        }
×
3932

3933
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
919✔
3934
                return err
×
3935
        }
×
3936

3937
        if err := node.Features.Encode(&b); err != nil {
919✔
3938
                return err
×
3939
        }
×
3940

3941
        numAddresses := uint16(len(node.Addresses))
919✔
3942
        byteOrder.PutUint16(scratch[:2], numAddresses)
919✔
3943
        if _, err := b.Write(scratch[:2]); err != nil {
919✔
3944
                return err
×
3945
        }
×
3946

3947
        for _, address := range node.Addresses {
2,066✔
3948
                if err := SerializeAddr(&b, address); err != nil {
1,147✔
3949
                        return err
×
3950
                }
×
3951
        }
3952

3953
        sigLen := len(node.AuthSigBytes)
919✔
3954
        if sigLen > 80 {
919✔
3955
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
3956
                        sigLen)
×
3957
        }
×
3958

3959
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
919✔
3960
        if err != nil {
919✔
3961
                return err
×
3962
        }
×
3963

3964
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
919✔
3965
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
3966
        }
×
3967
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
919✔
3968
        if err != nil {
919✔
3969
                return err
×
3970
        }
×
3971

3972
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
919✔
3973
                return err
×
3974
        }
×
3975

3976
        // With the alias bucket updated, we'll now update the index that
3977
        // tracks the time series of node updates.
3978
        var indexKey [8 + 33]byte
919✔
3979
        byteOrder.PutUint64(indexKey[:8], updateUnix)
919✔
3980
        copy(indexKey[8:], nodePub)
919✔
3981

919✔
3982
        // If there was already an old index entry for this node, then we'll
919✔
3983
        // delete the old one before we write the new entry.
919✔
3984
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
1,026✔
3985
                // Extract out the old update time to we can reconstruct the
107✔
3986
                // prior index key to delete it from the index.
107✔
3987
                oldUpdateTime := nodeBytes[:8]
107✔
3988

107✔
3989
                var oldIndexKey [8 + 33]byte
107✔
3990
                copy(oldIndexKey[:8], oldUpdateTime)
107✔
3991
                copy(oldIndexKey[8:], nodePub)
107✔
3992

107✔
3993
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
107✔
3994
                        return err
×
3995
                }
×
3996
        }
3997

3998
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
919✔
3999
                return err
×
4000
        }
×
4001

4002
        return nodeBucket.Put(nodePub, b.Bytes())
919✔
4003
}
4004

4005
func fetchLightningNode(nodeBucket kvdb.RBucket,
4006
        nodePub []byte) (models.LightningNode, error) {
3,634✔
4007

3,634✔
4008
        nodeBytes := nodeBucket.Get(nodePub)
3,634✔
4009
        if nodeBytes == nil {
3,722✔
4010
                return models.LightningNode{}, ErrGraphNodeNotFound
88✔
4011
        }
88✔
4012

4013
        nodeReader := bytes.NewReader(nodeBytes)
3,549✔
4014

3,549✔
4015
        return deserializeLightningNode(nodeReader)
3,549✔
4016
}
4017

4018
func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex,
4019
        *lnwire.FeatureVector, error) {
123✔
4020

123✔
4021
        var (
123✔
4022
                pubKey      route.Vertex
123✔
4023
                features    = lnwire.EmptyFeatureVector()
123✔
4024
                nodeScratch [8]byte
123✔
4025
        )
123✔
4026

123✔
4027
        // Skip ahead:
123✔
4028
        // - LastUpdate (8 bytes)
123✔
4029
        if _, err := r.Read(nodeScratch[:]); err != nil {
123✔
4030
                return pubKey, nil, err
×
4031
        }
×
4032

4033
        if _, err := io.ReadFull(r, pubKey[:]); err != nil {
123✔
4034
                return pubKey, nil, err
×
4035
        }
×
4036

4037
        // Read the node announcement flag.
4038
        if _, err := r.Read(nodeScratch[:2]); err != nil {
123✔
4039
                return pubKey, nil, err
×
4040
        }
×
4041
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
123✔
4042

123✔
4043
        // The rest of the data is optional, and will only be there if we got a
123✔
4044
        // node announcement for this node.
123✔
4045
        if hasNodeAnn == 0 {
126✔
4046
                return pubKey, features, nil
3✔
4047
        }
3✔
4048

4049
        // We did get a node announcement for this node, so we'll have the rest
4050
        // of the data available.
4051
        var rgb uint8
123✔
4052
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
123✔
4053
                return pubKey, nil, err
×
4054
        }
×
4055
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
123✔
4056
                return pubKey, nil, err
×
4057
        }
×
4058
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
123✔
4059
                return pubKey, nil, err
×
4060
        }
×
4061

4062
        if _, err := wire.ReadVarString(r, 0); err != nil {
123✔
4063
                return pubKey, nil, err
×
4064
        }
×
4065

4066
        if err := features.Decode(r); err != nil {
123✔
4067
                return pubKey, nil, err
×
4068
        }
×
4069

4070
        return pubKey, features, nil
123✔
4071
}
4072

4073
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
8,508✔
4074
        var (
8,508✔
4075
                node    models.LightningNode
8,508✔
4076
                scratch [8]byte
8,508✔
4077
                err     error
8,508✔
4078
        )
8,508✔
4079

8,508✔
4080
        // Always populate a feature vector, even if we don't have a node
8,508✔
4081
        // announcement and short circuit below.
8,508✔
4082
        node.Features = lnwire.EmptyFeatureVector()
8,508✔
4083

8,508✔
4084
        if _, err := r.Read(scratch[:]); err != nil {
8,508✔
4085
                return models.LightningNode{}, err
×
4086
        }
×
4087

4088
        unix := int64(byteOrder.Uint64(scratch[:]))
8,508✔
4089
        node.LastUpdate = time.Unix(unix, 0)
8,508✔
4090

8,508✔
4091
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
8,508✔
4092
                return models.LightningNode{}, err
×
4093
        }
×
4094

4095
        if _, err := r.Read(scratch[:2]); err != nil {
8,508✔
4096
                return models.LightningNode{}, err
×
4097
        }
×
4098

4099
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
8,508✔
4100
        if hasNodeAnn == 1 {
16,872✔
4101
                node.HaveNodeAnnouncement = true
8,364✔
4102
        } else {
8,511✔
4103
                node.HaveNodeAnnouncement = false
147✔
4104
        }
147✔
4105

4106
        // The rest of the data is optional, and will only be there if we got a
4107
        // node announcement for this node.
4108
        if !node.HaveNodeAnnouncement {
8,655✔
4109
                return node, nil
147✔
4110
        }
147✔
4111

4112
        // We did get a node announcement for this node, so we'll have the rest
4113
        // of the data available.
4114
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
8,364✔
4115
                return models.LightningNode{}, err
×
4116
        }
×
4117
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
8,364✔
4118
                return models.LightningNode{}, err
×
4119
        }
×
4120
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
8,364✔
4121
                return models.LightningNode{}, err
×
4122
        }
×
4123

4124
        node.Alias, err = wire.ReadVarString(r, 0)
8,364✔
4125
        if err != nil {
8,364✔
4126
                return models.LightningNode{}, err
×
4127
        }
×
4128

4129
        err = node.Features.Decode(r)
8,364✔
4130
        if err != nil {
8,364✔
4131
                return models.LightningNode{}, err
×
4132
        }
×
4133

4134
        if _, err := r.Read(scratch[:2]); err != nil {
8,364✔
4135
                return models.LightningNode{}, err
×
4136
        }
×
4137
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
8,364✔
4138

8,364✔
4139
        var addresses []net.Addr
8,364✔
4140
        for i := 0; i < numAddresses; i++ {
18,961✔
4141
                address, err := DeserializeAddr(r)
10,597✔
4142
                if err != nil {
10,597✔
4143
                        return models.LightningNode{}, err
×
4144
                }
×
4145
                addresses = append(addresses, address)
10,597✔
4146
        }
4147
        node.Addresses = addresses
8,364✔
4148

8,364✔
4149
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,364✔
4150
        if err != nil {
8,364✔
4151
                return models.LightningNode{}, err
×
4152
        }
×
4153

4154
        // We'll try and see if there are any opaque bytes left, if not, then
4155
        // we'll ignore the EOF error and return the node as is.
4156
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
8,364✔
4157
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,364✔
4158
        )
8,364✔
4159
        switch {
8,364✔
4160
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4161
        case errors.Is(err, io.EOF):
×
4162
        case err != nil:
×
4163
                return models.LightningNode{}, err
×
4164
        }
4165

4166
        return node, nil
8,364✔
4167
}
4168

4169
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4170
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
1,489✔
4171

1,489✔
4172
        var b bytes.Buffer
1,489✔
4173

1,489✔
4174
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
1,489✔
4175
                return err
×
4176
        }
×
4177
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
1,489✔
4178
                return err
×
4179
        }
×
4180
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
1,489✔
4181
                return err
×
4182
        }
×
4183
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
1,489✔
4184
                return err
×
4185
        }
×
4186

4187
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
1,489✔
4188
                return err
×
4189
        }
×
4190

4191
        authProof := edgeInfo.AuthProof
1,489✔
4192
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
1,489✔
4193
        if authProof != nil {
2,895✔
4194
                nodeSig1 = authProof.NodeSig1Bytes
1,406✔
4195
                nodeSig2 = authProof.NodeSig2Bytes
1,406✔
4196
                bitcoinSig1 = authProof.BitcoinSig1Bytes
1,406✔
4197
                bitcoinSig2 = authProof.BitcoinSig2Bytes
1,406✔
4198
        }
1,406✔
4199

4200
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
1,489✔
4201
                return err
×
4202
        }
×
4203
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
1,489✔
4204
                return err
×
4205
        }
×
4206
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
1,489✔
4207
                return err
×
4208
        }
×
4209
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
1,489✔
4210
                return err
×
4211
        }
×
4212

4213
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
1,489✔
4214
                return err
×
4215
        }
×
4216
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
1,489✔
4217
        if err != nil {
1,489✔
4218
                return err
×
4219
        }
×
4220
        if _, err := b.Write(chanID[:]); err != nil {
1,489✔
4221
                return err
×
4222
        }
×
4223
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
1,489✔
4224
                return err
×
4225
        }
×
4226

4227
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
1,489✔
4228
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4229
        }
×
4230
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
1,489✔
4231
        if err != nil {
1,489✔
4232
                return err
×
4233
        }
×
4234

4235
        return edgeIndex.Put(chanID[:], b.Bytes())
1,489✔
4236
}
4237

4238
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4239
        chanID []byte) (models.ChannelEdgeInfo, error) {
4,171✔
4240

4,171✔
4241
        edgeInfoBytes := edgeIndex.Get(chanID)
4,171✔
4242
        if edgeInfoBytes == nil {
4,241✔
4243
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
70✔
4244
        }
70✔
4245

4246
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
4,104✔
4247

4,104✔
4248
        return deserializeChanEdgeInfo(edgeInfoReader)
4,104✔
4249
}
4250

4251
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
4,740✔
4252
        var (
4,740✔
4253
                err      error
4,740✔
4254
                edgeInfo models.ChannelEdgeInfo
4,740✔
4255
        )
4,740✔
4256

4,740✔
4257
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
4,740✔
4258
                return models.ChannelEdgeInfo{}, err
×
4259
        }
×
4260
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
4,740✔
4261
                return models.ChannelEdgeInfo{}, err
×
4262
        }
×
4263
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
4,740✔
4264
                return models.ChannelEdgeInfo{}, err
×
4265
        }
×
4266
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
4,740✔
4267
                return models.ChannelEdgeInfo{}, err
×
4268
        }
×
4269

4270
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
4,740✔
4271
        if err != nil {
4,740✔
4272
                return models.ChannelEdgeInfo{}, err
×
4273
        }
×
4274

4275
        proof := &models.ChannelAuthProof{}
4,740✔
4276

4,740✔
4277
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,740✔
4278
        if err != nil {
4,740✔
4279
                return models.ChannelEdgeInfo{}, err
×
4280
        }
×
4281
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,740✔
4282
        if err != nil {
4,740✔
4283
                return models.ChannelEdgeInfo{}, err
×
4284
        }
×
4285
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,740✔
4286
        if err != nil {
4,740✔
4287
                return models.ChannelEdgeInfo{}, err
×
4288
        }
×
4289
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,740✔
4290
        if err != nil {
4,740✔
4291
                return models.ChannelEdgeInfo{}, err
×
4292
        }
×
4293

4294
        if !proof.IsEmpty() {
6,531✔
4295
                edgeInfo.AuthProof = proof
1,791✔
4296
        }
1,791✔
4297

4298
        edgeInfo.ChannelPoint = wire.OutPoint{}
4,740✔
4299
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
4,740✔
4300
                return models.ChannelEdgeInfo{}, err
×
4301
        }
×
4302
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
4,740✔
4303
                return models.ChannelEdgeInfo{}, err
×
4304
        }
×
4305
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
4,740✔
4306
                return models.ChannelEdgeInfo{}, err
×
4307
        }
×
4308

4309
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
4,740✔
4310
                return models.ChannelEdgeInfo{}, err
×
4311
        }
×
4312

4313
        // We'll try and see if there are any opaque bytes left, if not, then
4314
        // we'll ignore the EOF error and return the edge as is.
4315
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
4,740✔
4316
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
4,740✔
4317
        )
4,740✔
4318
        switch {
4,740✔
4319
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4320
        case errors.Is(err, io.EOF):
×
4321
        case err != nil:
×
4322
                return models.ChannelEdgeInfo{}, err
×
4323
        }
4324

4325
        return edgeInfo, nil
4,740✔
4326
}
4327

4328
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4329
        from, to []byte) error {
2,663✔
4330

2,663✔
4331
        var edgeKey [33 + 8]byte
2,663✔
4332
        copy(edgeKey[:], from)
2,663✔
4333
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
2,663✔
4334

2,663✔
4335
        var b bytes.Buffer
2,663✔
4336
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
2,663✔
4337
                return err
×
4338
        }
×
4339

4340
        // Before we write out the new edge, we'll create a new entry in the
4341
        // update index in order to keep it fresh.
4342
        updateUnix := uint64(edge.LastUpdate.Unix())
2,663✔
4343
        var indexKey [8 + 8]byte
2,663✔
4344
        byteOrder.PutUint64(indexKey[:8], updateUnix)
2,663✔
4345
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
2,663✔
4346

2,663✔
4347
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
2,663✔
4348
        if err != nil {
2,663✔
4349
                return err
×
4350
        }
×
4351

4352
        // If there was already an entry for this edge, then we'll need to
4353
        // delete the old one to ensure we don't leave around any after-images.
4354
        // An unknown policy value does not have a update time recorded, so
4355
        // it also does not need to be removed.
4356
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
2,663✔
4357
                !bytes.Equal(edgeBytes, unknownPolicy) {
2,690✔
4358

27✔
4359
                // In order to delete the old entry, we'll need to obtain the
27✔
4360
                // *prior* update time in order to delete it. To do this, we'll
27✔
4361
                // need to deserialize the existing policy within the database
27✔
4362
                // (now outdated by the new one), and delete its corresponding
27✔
4363
                // entry within the update index. We'll ignore any
27✔
4364
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
27✔
4365
                // the channel ID and update time to delete the entry.
27✔
4366
                // TODO(halseth): get rid of these invalid policies in a
27✔
4367
                // migration.
27✔
4368
                oldEdgePolicy, err := deserializeChanEdgePolicy(
27✔
4369
                        bytes.NewReader(edgeBytes),
27✔
4370
                )
27✔
4371
                if err != nil &&
27✔
4372
                        !errors.Is(err, ErrEdgePolicyOptionalFieldNotFound) {
27✔
4373

×
4374
                        return err
×
4375
                }
×
4376

4377
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
27✔
4378

27✔
4379
                var oldIndexKey [8 + 8]byte
27✔
4380
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
27✔
4381
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
27✔
4382

27✔
4383
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
27✔
4384
                        return err
×
4385
                }
×
4386
        }
4387

4388
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
2,663✔
4389
                return err
×
4390
        }
×
4391

4392
        err = updateEdgePolicyDisabledIndex(
2,663✔
4393
                edges, edge.ChannelID,
2,663✔
4394
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
2,663✔
4395
                edge.IsDisabled(),
2,663✔
4396
        )
2,663✔
4397
        if err != nil {
2,663✔
4398
                return err
×
4399
        }
×
4400

4401
        return edges.Put(edgeKey[:], b.Bytes())
2,663✔
4402
}
4403

4404
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4405
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4406
// one.
4407
// The direction represents the direction of the edge and disabled is used for
4408
// deciding whether to remove or add an entry to the bucket.
4409
// In general a channel is disabled if two entries for the same chanID exist
4410
// in this bucket.
4411
// Maintaining the bucket this way allows a fast retrieval of disabled
4412
// channels, for example when prune is needed.
4413
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4414
        direction bool, disabled bool) error {
2,951✔
4415

2,951✔
4416
        var disabledEdgeKey [8 + 1]byte
2,951✔
4417
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
2,951✔
4418
        if direction {
4,426✔
4419
                disabledEdgeKey[8] = 1
1,475✔
4420
        }
1,475✔
4421

4422
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
2,951✔
4423
                disabledEdgePolicyBucket,
2,951✔
4424
        )
2,951✔
4425
        if err != nil {
2,951✔
4426
                return err
×
4427
        }
×
4428

4429
        if disabled {
2,980✔
4430
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
29✔
4431
        }
29✔
4432

4433
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
2,925✔
4434
}
4435

4436
// putChanEdgePolicyUnknown marks the edge policy as unknown
4437
// in the edges bucket.
4438
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4439
        from []byte) error {
2,973✔
4440

2,973✔
4441
        var edgeKey [33 + 8]byte
2,973✔
4442
        copy(edgeKey[:], from)
2,973✔
4443
        byteOrder.PutUint64(edgeKey[33:], channelID)
2,973✔
4444

2,973✔
4445
        if edges.Get(edgeKey[:]) != nil {
2,973✔
4446
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4447
                        " when there is already a policy present", channelID)
×
4448
        }
×
4449

4450
        return edges.Put(edgeKey[:], unknownPolicy)
2,973✔
4451
}
4452

4453
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4454
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
8,167✔
4455

8,167✔
4456
        var edgeKey [33 + 8]byte
8,167✔
4457
        copy(edgeKey[:], nodePub)
8,167✔
4458
        copy(edgeKey[33:], chanID)
8,167✔
4459

8,167✔
4460
        edgeBytes := edges.Get(edgeKey[:])
8,167✔
4461
        if edgeBytes == nil {
8,167✔
4462
                return nil, ErrEdgeNotFound
×
4463
        }
×
4464

4465
        // No need to deserialize unknown policy.
4466
        if bytes.Equal(edgeBytes, unknownPolicy) {
8,537✔
4467
                return nil, nil
370✔
4468
        }
370✔
4469

4470
        edgeReader := bytes.NewReader(edgeBytes)
7,800✔
4471

7,800✔
4472
        ep, err := deserializeChanEdgePolicy(edgeReader)
7,800✔
4473
        switch {
7,800✔
4474
        // If the db policy was missing an expected optional field, we return
4475
        // nil as if the policy was unknown.
4476
        case errors.Is(err, ErrEdgePolicyOptionalFieldNotFound):
1✔
4477
                return nil, nil
1✔
4478

4479
        case err != nil:
×
4480
                return nil, err
×
4481
        }
4482

4483
        return ep, nil
7,799✔
4484
}
4485

4486
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4487
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4488
        error) {
245✔
4489

245✔
4490
        edgeInfo := edgeIndex.Get(chanID)
245✔
4491
        if edgeInfo == nil {
245✔
4492
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4493
                        chanID)
×
4494
        }
×
4495

4496
        // The first node is contained within the first half of the edge
4497
        // information. We only propagate the error here and below if it's
4498
        // something other than edge non-existence.
4499
        node1Pub := edgeInfo[:33]
245✔
4500
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
245✔
4501
        if err != nil {
245✔
4502
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4503
                        node1Pub)
×
4504
        }
×
4505

4506
        // Similarly, the second node is contained within the latter
4507
        // half of the edge information.
4508
        node2Pub := edgeInfo[33:66]
245✔
4509
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
245✔
4510
        if err != nil {
245✔
4511
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4512
                        node2Pub)
×
4513
        }
×
4514

4515
        return edge1, edge2, nil
245✔
4516
}
4517

4518
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4519
        to []byte) error {
2,665✔
4520

2,665✔
4521
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
2,665✔
4522
        if err != nil {
2,665✔
4523
                return err
×
4524
        }
×
4525

4526
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
2,665✔
4527
                return err
×
4528
        }
×
4529

4530
        var scratch [8]byte
2,665✔
4531
        updateUnix := uint64(edge.LastUpdate.Unix())
2,665✔
4532
        byteOrder.PutUint64(scratch[:], updateUnix)
2,665✔
4533
        if _, err := w.Write(scratch[:]); err != nil {
2,665✔
4534
                return err
×
4535
        }
×
4536

4537
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
2,665✔
4538
                return err
×
4539
        }
×
4540
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
2,665✔
4541
                return err
×
4542
        }
×
4543
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
2,665✔
4544
                return err
×
4545
        }
×
4546
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
2,665✔
4547
                return err
×
4548
        }
×
4549
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
2,665✔
4550
        if err != nil {
2,665✔
4551
                return err
×
4552
        }
×
4553
        err = binary.Write(
2,665✔
4554
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
2,665✔
4555
        )
2,665✔
4556
        if err != nil {
2,665✔
4557
                return err
×
4558
        }
×
4559

4560
        if _, err := w.Write(to); err != nil {
2,665✔
4561
                return err
×
4562
        }
×
4563

4564
        // If the max_htlc field is present, we write it. To be compatible with
4565
        // older versions that wasn't aware of this field, we write it as part
4566
        // of the opaque data.
4567
        // TODO(halseth): clean up when moving to TLV.
4568
        var opaqueBuf bytes.Buffer
2,665✔
4569
        if edge.MessageFlags.HasMaxHtlc() {
4,946✔
4570
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
2,281✔
4571
                if err != nil {
2,281✔
4572
                        return err
×
4573
                }
×
4574
        }
4575

4576
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
2,665✔
4577
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4578
        }
×
4579
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
2,665✔
4580
                return err
×
4581
        }
×
4582

4583
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
2,665✔
4584
                return err
×
4585
        }
×
4586

4587
        return nil
2,665✔
4588
}
4589

4590
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
7,825✔
4591
        // Deserialize the policy. Note that in case an optional field is not
7,825✔
4592
        // found, both an error and a populated policy object are returned.
7,825✔
4593
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
7,825✔
4594
        if deserializeErr != nil &&
7,825✔
4595
                !errors.Is(deserializeErr, ErrEdgePolicyOptionalFieldNotFound) {
7,825✔
4596

×
4597
                return nil, deserializeErr
×
4598
        }
×
4599

4600
        return edge, deserializeErr
7,825✔
4601
}
4602

4603
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4604
        error) {
8,832✔
4605

8,832✔
4606
        edge := &models.ChannelEdgePolicy{}
8,832✔
4607

8,832✔
4608
        var err error
8,832✔
4609
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,832✔
4610
        if err != nil {
8,832✔
4611
                return nil, err
×
4612
        }
×
4613

4614
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
8,832✔
4615
                return nil, err
×
4616
        }
×
4617

4618
        var scratch [8]byte
8,832✔
4619
        if _, err := r.Read(scratch[:]); err != nil {
8,832✔
4620
                return nil, err
×
4621
        }
×
4622
        unix := int64(byteOrder.Uint64(scratch[:]))
8,832✔
4623
        edge.LastUpdate = time.Unix(unix, 0)
8,832✔
4624

8,832✔
4625
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
8,832✔
4626
                return nil, err
×
4627
        }
×
4628
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
8,832✔
4629
                return nil, err
×
4630
        }
×
4631
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
8,832✔
4632
                return nil, err
×
4633
        }
×
4634

4635
        var n uint64
8,832✔
4636
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,832✔
4637
                return nil, err
×
4638
        }
×
4639
        edge.MinHTLC = lnwire.MilliSatoshi(n)
8,832✔
4640

8,832✔
4641
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,832✔
4642
                return nil, err
×
4643
        }
×
4644
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
8,832✔
4645

8,832✔
4646
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,832✔
4647
                return nil, err
×
4648
        }
×
4649
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
8,832✔
4650

8,832✔
4651
        if _, err := r.Read(edge.ToNode[:]); err != nil {
8,832✔
4652
                return nil, err
×
4653
        }
×
4654

4655
        // We'll try and see if there are any opaque bytes left, if not, then
4656
        // we'll ignore the EOF error and return the edge as is.
4657
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
8,832✔
4658
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,832✔
4659
        )
8,832✔
4660
        switch {
8,832✔
4661
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4662
        case errors.Is(err, io.EOF):
3✔
4663
        case err != nil:
×
4664
                return nil, err
×
4665
        }
4666

4667
        // See if optional fields are present.
4668
        if edge.MessageFlags.HasMaxHtlc() {
17,290✔
4669
                // The max_htlc field should be at the beginning of the opaque
8,458✔
4670
                // bytes.
8,458✔
4671
                opq := edge.ExtraOpaqueData
8,458✔
4672

8,458✔
4673
                // If the max_htlc field is not present, it might be old data
8,458✔
4674
                // stored before this field was validated. We'll return the
8,458✔
4675
                // edge along with an error.
8,458✔
4676
                if len(opq) < 8 {
8,461✔
4677
                        return edge, ErrEdgePolicyOptionalFieldNotFound
3✔
4678
                }
3✔
4679

4680
                maxHtlc := byteOrder.Uint64(opq[:8])
8,455✔
4681
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
8,455✔
4682

8,455✔
4683
                // Exclude the parsed field from the rest of the opaque data.
8,455✔
4684
                edge.ExtraOpaqueData = opq[8:]
8,455✔
4685
        }
4686

4687
        return edge, nil
8,829✔
4688
}
4689

4690
// chanGraphNodeTx is an implementation of the NodeRTx interface backed by the
4691
// KVStore and a kvdb.RTx.
4692
type chanGraphNodeTx struct {
4693
        tx   kvdb.RTx
4694
        db   *KVStore
4695
        node *models.LightningNode
4696
}
4697

4698
// A compile-time constraint to ensure chanGraphNodeTx implements the NodeRTx
4699
// interface.
4700
var _ NodeRTx = (*chanGraphNodeTx)(nil)
4701

4702
func newChanGraphNodeTx(tx kvdb.RTx, db *KVStore,
4703
        node *models.LightningNode) *chanGraphNodeTx {
3,917✔
4704

3,917✔
4705
        return &chanGraphNodeTx{
3,917✔
4706
                tx:   tx,
3,917✔
4707
                db:   db,
3,917✔
4708
                node: node,
3,917✔
4709
        }
3,917✔
4710
}
3,917✔
4711

4712
// Node returns the raw information of the node.
4713
//
4714
// NOTE: This is a part of the NodeRTx interface.
4715
func (c *chanGraphNodeTx) Node() *models.LightningNode {
4,842✔
4716
        return c.node
4,842✔
4717
}
4,842✔
4718

4719
// FetchNode fetches the node with the given pub key under the same transaction
4720
// used to fetch the current node. The returned node is also a NodeRTx and any
4721
// operations on that NodeRTx will also be done under the same transaction.
4722
//
4723
// NOTE: This is a part of the NodeRTx interface.
4724
func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
2,944✔
4725
        node, err := c.db.FetchLightningNodeTx(c.tx, nodePub)
2,944✔
4726
        if err != nil {
2,944✔
4727
                return nil, err
×
4728
        }
×
4729

4730
        return newChanGraphNodeTx(c.tx, c.db, node), nil
2,944✔
4731
}
4732

4733
// ForEachChannel can be used to iterate over the node's channels under
4734
// the same transaction used to fetch the node.
4735
//
4736
// NOTE: This is a part of the NodeRTx interface.
4737
func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
4738
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
965✔
4739

965✔
4740
        return c.db.ForEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
965✔
4741
                func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1,
965✔
4742
                        policy2 *models.ChannelEdgePolicy) error {
3,909✔
4743

2,944✔
4744
                        return f(info, policy1, policy2)
2,944✔
4745
                },
2,944✔
4746
        )
4747
}
4748

4749
// MakeTestGraph creates a new instance of the KVStore for testing
4750
// purposes.
4751
func MakeTestGraph(t testing.TB, modifiers ...KVStoreOptionModifier) (
4752
        *ChannelGraph, error) {
40✔
4753

40✔
4754
        opts := DefaultOptions()
40✔
4755
        for _, modifier := range modifiers {
40✔
4756
                modifier(opts)
×
4757
        }
×
4758

4759
        // Next, create KVStore for the first time.
4760
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
40✔
4761
        if err != nil {
40✔
4762
                backendCleanup()
×
4763

×
4764
                return nil, err
×
4765
        }
×
4766

4767
        graph, err := NewChannelGraph(&Config{
40✔
4768
                KVDB:        backend,
40✔
4769
                KVStoreOpts: modifiers,
40✔
4770
        })
40✔
4771
        if err != nil {
40✔
4772
                backendCleanup()
×
4773

×
4774
                return nil, err
×
4775
        }
×
4776

4777
        t.Cleanup(func() {
80✔
4778
                _ = backend.Close()
40✔
4779
                backendCleanup()
40✔
4780
        })
40✔
4781

4782
        return graph, nil
40✔
4783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc