• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13566028875

27 Feb 2025 12:09PM UTC coverage: 49.396% (-9.4%) from 58.748%
13566028875

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

34 of 54 new or added lines in 4 files covered. (62.96%)

27464 existing lines in 436 files now uncovered.

101095 of 204664 relevant lines covered (49.4%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.58
/graph/db/kv_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/btcsuite/btcwallet/walletdb"
22
        "github.com/lightningnetwork/lnd/aliasmgr"
23
        "github.com/lightningnetwork/lnd/batch"
24
        "github.com/lightningnetwork/lnd/graph/db/models"
25
        "github.com/lightningnetwork/lnd/input"
26
        "github.com/lightningnetwork/lnd/kvdb"
27
        "github.com/lightningnetwork/lnd/lnwire"
28
        "github.com/lightningnetwork/lnd/routing/route"
29
        "github.com/stretchr/testify/require"
30
)
31

32
var (
33
        // nodeBucket is a bucket which houses all the vertices or nodes within
34
        // the channel graph. This bucket has a single-sub bucket which adds an
35
        // additional index from pubkey -> alias. Within the top-level of this
36
        // bucket, the key space maps a node's compressed public key to the
37
        // serialized information for that node. Additionally, there's a
38
        // special key "source" which stores the pubkey of the source node. The
39
        // source node is used as the starting point for all graph/queries and
40
        // traversals. The graph is formed as a star-graph with the source node
41
        // at the center.
42
        //
43
        // maps: pubKey -> nodeInfo
44
        // maps: source -> selfPubKey
45
        nodeBucket = []byte("graph-node")
46

47
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
48
        // will be used to quickly look up the "freshness" of a node's last
49
        // update to the network. The bucket only contains keys, and no values,
50
        // it's mapping:
51
        //
52
        // maps: updateTime || nodeID -> nil
53
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
54

55
        // sourceKey is a special key that resides within the nodeBucket. The
56
        // sourceKey maps a key to the public key of the "self node".
57
        sourceKey = []byte("source")
58

59
        // aliasIndexBucket is a sub-bucket that's nested within the main
60
        // nodeBucket. This bucket maps the public key of a node to its
61
        // current alias. This bucket is provided as it can be used within a
62
        // future UI layer to add an additional degree of confirmation.
63
        aliasIndexBucket = []byte("alias")
64

65
        // edgeBucket is a bucket which houses all of the edge or channel
66
        // information within the channel graph. This bucket essentially acts
67
        // as an adjacency list, which in conjunction with a range scan, can be
68
        // used to iterate over all the incoming and outgoing edges for a
69
        // particular node. Key in the bucket use a prefix scheme which leads
70
        // with the node's public key and sends with the compact edge ID.
71
        // For each chanID, there will be two entries within the bucket, as the
72
        // graph is directed: nodes may have different policies w.r.t to fees
73
        // for their respective directions.
74
        //
75
        // maps: pubKey || chanID -> channel edge policy for node
76
        edgeBucket = []byte("graph-edge")
77

78
        // unknownPolicy is represented as an empty slice. It is
79
        // used as the value in edgeBucket for unknown channel edge policies.
80
        // Unknown policies are still stored in the database to enable efficient
81
        // lookup of incoming channel edges.
82
        unknownPolicy = []byte{}
83

84
        // chanStart is an array of all zero bytes which is used to perform
85
        // range scans within the edgeBucket to obtain all of the outgoing
86
        // edges for a particular node.
87
        chanStart [8]byte
88

89
        // edgeIndexBucket is an index which can be used to iterate all edges
90
        // in the bucket, grouping them according to their in/out nodes.
91
        // Additionally, the items in this bucket also contain the complete
92
        // edge information for a channel. The edge information includes the
93
        // capacity of the channel, the nodes that made the channel, etc. This
94
        // bucket resides within the edgeBucket above. Creation of an edge
95
        // proceeds in two phases: first the edge is added to the edge index,
96
        // afterwards the edgeBucket can be updated with the latest details of
97
        // the edge as they are announced on the network.
98
        //
99
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
100
        edgeIndexBucket = []byte("edge-index")
101

102
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
103
        // bucket contains an index which allows us to gauge the "freshness" of
104
        // a channel's last updates.
105
        //
106
        // maps: updateTime || chanID -> nil
107
        edgeUpdateIndexBucket = []byte("edge-update-index")
108

109
        // channelPointBucket maps a channel's full outpoint (txid:index) to
110
        // its short 8-byte channel ID. This bucket resides within the
111
        // edgeBucket above, and can be used to quickly remove an edge due to
112
        // the outpoint being spent, or to query for existence of a channel.
113
        //
114
        // maps: outPoint -> chanID
115
        channelPointBucket = []byte("chan-index")
116

117
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
118
        // responsible for maintaining an index of zombie channels. Each entry
119
        // exists within the bucket as follows:
120
        //
121
        // maps: chanID -> pubKey1 || pubKey2
122
        //
123
        // The chanID represents the channel ID of the edge that is marked as a
124
        // zombie and is used as the key, which maps to the public keys of the
125
        // edge's participants.
126
        zombieBucket = []byte("zombie-index")
127

128
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
129
        // bucket responsible for maintaining an index of disabled edge
130
        // policies. Each entry exists within the bucket as follows:
131
        //
132
        // maps: <chanID><direction> -> []byte{}
133
        //
134
        // The chanID represents the channel ID of the edge and the direction is
135
        // one byte representing the direction of the edge. The main purpose of
136
        // this index is to allow pruning disabled channels in a fast way
137
        // without the need to iterate all over the graph.
138
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
139

140
        // graphMetaBucket is a top-level bucket which stores various meta-deta
141
        // related to the on-disk channel graph. Data stored in this bucket
142
        // includes the block to which the graph has been synced to, the total
143
        // number of channels, etc.
144
        graphMetaBucket = []byte("graph-meta")
145

146
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
147
        // a mapping from the block height to the hash for the blocks used to
148
        // prune the graph.
149
        // Once a new block is discovered, any channels that have been closed
150
        // (by spending the outpoint) can safely be removed from the graph, and
151
        // the block is added to the prune log. We need to keep such a log for
152
        // the case where a reorg happens, and we must "rewind" the state of the
153
        // graph by removing channels that were previously confirmed. In such a
154
        // case we'll remove all entries from the prune log with a block height
155
        // that no longer exists.
156
        pruneLogBucket = []byte("prune-log")
157

158
        // closedScidBucket is a top-level bucket that stores scids for
159
        // channels that we know to be closed. This is used so that we don't
160
        // need to perform expensive validation checks if we receive a channel
161
        // announcement for the channel again.
162
        //
163
        // maps: scid -> []byte{}
164
        closedScidBucket = []byte("closed-scid")
165
)
166

167
const (
168
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
169
        // we'll permit to be written to disk. We limit this as otherwise, it
170
        // would be possible for a node to create a ton of updates and slowly
171
        // fill our disk, and also waste bandwidth due to relaying.
172
        MaxAllowedExtraOpaqueBytes = 10000
173
)
174

175
// KVStore is a persistent, on-disk graph representation of the Lightning
176
// Network. This struct can be used to implement path finding algorithms on top
177
// of, and also to update a node's view based on information received from the
178
// p2p network. Internally, the graph is stored using a modified adjacency list
179
// representation with some added object interaction possible with each
180
// serialized edge/node. The graph is stored is directed, meaning that are two
181
// edges stored for each channel: an inbound/outbound edge for each node pair.
182
// Nodes, edges, and edge information can all be added to the graph
183
// independently. Edge removal results in the deletion of all edge information
184
// for that edge.
185
type KVStore struct {
186
        db kvdb.Backend
187

188
        // cacheMu guards all caches (rejectCache and chanCache). If
189
        // this mutex will be acquired at the same time as the DB mutex then
190
        // the cacheMu MUST be acquired first to prevent deadlock.
191
        cacheMu     sync.RWMutex
192
        rejectCache *rejectCache
193
        chanCache   *channelCache
194

195
        chanScheduler batch.Scheduler
196
        nodeScheduler batch.Scheduler
197
}
198

199
// NewKVStore allocates a new KVStore backed by a DB instance. The
200
// returned instance has its own unique reject cache and channel cache.
201
func NewKVStore(db kvdb.Backend, options ...KVStoreOptionModifier) (*KVStore,
202
        error) {
3✔
203

3✔
204
        opts := DefaultOptions()
3✔
205
        for _, o := range options {
6✔
206
                o(opts)
3✔
207
        }
3✔
208

209
        if !opts.NoMigration {
6✔
210
                if err := initKVStore(db); err != nil {
3✔
211
                        return nil, err
×
212
                }
×
213
        }
214

215
        g := &KVStore{
3✔
216
                db:          db,
3✔
217
                rejectCache: newRejectCache(opts.RejectCacheSize),
3✔
218
                chanCache:   newChannelCache(opts.ChannelCacheSize),
3✔
219
        }
3✔
220
        g.chanScheduler = batch.NewTimeScheduler(
3✔
221
                db, &g.cacheMu, opts.BatchCommitInterval,
3✔
222
        )
3✔
223
        g.nodeScheduler = batch.NewTimeScheduler(
3✔
224
                db, nil, opts.BatchCommitInterval,
3✔
225
        )
3✔
226

3✔
227
        return g, nil
3✔
228
}
229

230
// channelMapKey is the key structure used for storing channel edge policies.
231
type channelMapKey struct {
232
        nodeKey route.Vertex
233
        chanID  [8]byte
234
}
235

236
// getChannelMap loads all channel edge policies from the database and stores
237
// them in a map.
238
func (c *KVStore) getChannelMap(edges kvdb.RBucket) (
239
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
3✔
240

3✔
241
        // Create a map to store all channel edge policies.
3✔
242
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
3✔
243

3✔
244
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
6✔
245
                // Skip embedded buckets.
3✔
246
                if bytes.Equal(k, edgeIndexBucket) ||
3✔
247
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
3✔
248
                        bytes.Equal(k, zombieBucket) ||
3✔
249
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
3✔
250
                        bytes.Equal(k, channelPointBucket) {
6✔
251

3✔
252
                        return nil
3✔
253
                }
3✔
254

255
                // Validate key length.
256
                if len(k) != 33+8 {
3✔
257
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
258
                }
×
259

260
                var key channelMapKey
3✔
261
                copy(key.nodeKey[:], k[:33])
3✔
262
                copy(key.chanID[:], k[33:])
3✔
263

3✔
264
                // No need to deserialize unknown policy.
3✔
265
                if bytes.Equal(edgeBytes, unknownPolicy) {
3✔
266
                        return nil
×
267
                }
×
268

269
                edgeReader := bytes.NewReader(edgeBytes)
3✔
270
                edge, err := deserializeChanEdgePolicyRaw(
3✔
271
                        edgeReader,
3✔
272
                )
3✔
273

3✔
274
                switch {
3✔
275
                // If the db policy was missing an expected optional field, we
276
                // return nil as if the policy was unknown.
277
                case errors.Is(err, ErrEdgePolicyOptionalFieldNotFound):
×
278
                        return nil
×
279

280
                case err != nil:
×
281
                        return err
×
282
                }
283

284
                channelMap[key] = edge
3✔
285

3✔
286
                return nil
3✔
287
        })
288
        if err != nil {
3✔
289
                return nil, err
×
290
        }
×
291

292
        return channelMap, nil
3✔
293
}
294

295
var graphTopLevelBuckets = [][]byte{
296
        nodeBucket,
297
        edgeBucket,
298
        graphMetaBucket,
299
        closedScidBucket,
300
}
301

302
// Wipe completely deletes all saved state within all used buckets within the
303
// database. The deletion is done in a single transaction, therefore this
304
// operation is fully atomic.
305
func (c *KVStore) Wipe() error {
×
306
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
307
                for _, tlb := range graphTopLevelBuckets {
×
308
                        err := tx.DeleteTopLevelBucket(tlb)
×
309
                        if err != nil &&
×
310
                                !errors.Is(err, kvdb.ErrBucketNotFound) {
×
311

×
312
                                return err
×
313
                        }
×
314
                }
315

316
                return nil
×
317
        }, func() {})
×
318
        if err != nil {
×
319
                return err
×
320
        }
×
321

322
        return initKVStore(c.db)
×
323
}
324

325
// createChannelDB creates and initializes a fresh version of  In
326
// the case that the target path has not yet been created or doesn't yet exist,
327
// then the path is created. Additionally, all required top-level buckets used
328
// within the database are created.
329
func initKVStore(db kvdb.Backend) error {
3✔
330
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
6✔
331
                for _, tlb := range graphTopLevelBuckets {
6✔
332
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
3✔
333
                                return err
×
334
                        }
×
335
                }
336

337
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
338
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
3✔
339
                if err != nil {
3✔
340
                        return err
×
341
                }
×
342
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
3✔
343
                if err != nil {
3✔
344
                        return err
×
345
                }
×
346

347
                edges := tx.ReadWriteBucket(edgeBucket)
3✔
348
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
3✔
349
                if err != nil {
3✔
350
                        return err
×
351
                }
×
352
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
3✔
353
                if err != nil {
3✔
354
                        return err
×
355
                }
×
356
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
3✔
357
                if err != nil {
3✔
358
                        return err
×
359
                }
×
360
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
3✔
361
                if err != nil {
3✔
362
                        return err
×
363
                }
×
364

365
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
3✔
366
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
3✔
367

3✔
368
                return err
3✔
369
        }, func() {})
3✔
370
        if err != nil {
3✔
371
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
372
        }
×
373

374
        return nil
3✔
375
}
376

377
// AddrsForNode returns all known addresses for the target node public key that
378
// the graph DB is aware of. The returned boolean indicates if the given node is
379
// unknown to the graph DB or not.
380
//
381
// NOTE: this is part of the channeldb.AddrSource interface.
382
func (c *KVStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
383
        error) {
3✔
384

3✔
385
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
3✔
386
        if err != nil {
3✔
387
                return false, nil, err
×
388
        }
×
389

390
        node, err := c.FetchLightningNode(pubKey)
3✔
391
        // We don't consider it an error if the graph is unaware of the node.
3✔
392
        switch {
3✔
393
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
394
                return false, nil, err
×
395

396
        case errors.Is(err, ErrGraphNodeNotFound):
3✔
397
                return false, nil, nil
3✔
398
        }
399

400
        return true, node.Addresses, nil
3✔
401
}
402

403
// ForEachChannel iterates through all the channel edges stored within the
404
// graph and invokes the passed callback for each edge. The callback takes two
405
// edges as since this is a directed graph, both the in/out edges are visited.
406
// If the callback returns an error, then the transaction is aborted and the
407
// iteration stops early.
408
//
409
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
410
// for that particular channel edge routing policy will be passed into the
411
// callback.
412
func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
413
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
3✔
414

3✔
415
        return c.db.View(func(tx kvdb.RTx) error {
6✔
416
                edges := tx.ReadBucket(edgeBucket)
3✔
417
                if edges == nil {
3✔
418
                        return ErrGraphNoEdgesFound
×
419
                }
×
420

421
                // First, load all edges in memory indexed by node and channel
422
                // id.
423
                channelMap, err := c.getChannelMap(edges)
3✔
424
                if err != nil {
3✔
425
                        return err
×
426
                }
×
427

428
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
429
                if edgeIndex == nil {
3✔
430
                        return ErrGraphNoEdgesFound
×
431
                }
×
432

433
                // Load edge index, recombine each channel with the policies
434
                // loaded above and invoke the callback.
435
                return kvdb.ForAll(
3✔
436
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
6✔
437
                                var chanID [8]byte
3✔
438
                                copy(chanID[:], k)
3✔
439

3✔
440
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
3✔
441
                                info, err := deserializeChanEdgeInfo(
3✔
442
                                        edgeInfoReader,
3✔
443
                                )
3✔
444
                                if err != nil {
3✔
445
                                        return err
×
446
                                }
×
447

448
                                policy1 := channelMap[channelMapKey{
3✔
449
                                        nodeKey: info.NodeKey1Bytes,
3✔
450
                                        chanID:  chanID,
3✔
451
                                }]
3✔
452

3✔
453
                                policy2 := channelMap[channelMapKey{
3✔
454
                                        nodeKey: info.NodeKey2Bytes,
3✔
455
                                        chanID:  chanID,
3✔
456
                                }]
3✔
457

3✔
458
                                return cb(&info, policy1, policy2)
3✔
459
                        },
460
                )
461
        }, func() {})
3✔
462
}
463

464
// forEachNodeDirectedChannel iterates through all channels of a given node,
465
// executing the passed callback on the directed edge representing the channel
466
// and its incoming policy. If the callback returns an error, then the iteration
467
// is halted with the error propagated back up to the caller. An optional read
468
// transaction may be provided. If none is provided, a new one will be created.
469
//
470
// Unknown policies are passed into the callback as nil values.
471
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
472
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
3✔
473

3✔
474
        // Fallback that uses the database.
3✔
475
        toNodeCallback := func() route.Vertex {
6✔
476
                return node
3✔
477
        }
3✔
478
        toNodeFeatures, err := c.fetchNodeFeatures(tx, node)
3✔
479
        if err != nil {
3✔
480
                return err
×
481
        }
×
482

483
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
3✔
484
                p2 *models.ChannelEdgePolicy) error {
6✔
485

3✔
486
                var cachedInPolicy *models.CachedEdgePolicy
3✔
487
                if p2 != nil {
6✔
488
                        cachedInPolicy = models.NewCachedPolicy(p2)
3✔
489
                        cachedInPolicy.ToNodePubKey = toNodeCallback
3✔
490
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
3✔
491
                }
3✔
492

493
                var inboundFee lnwire.Fee
3✔
494
                if p1 != nil {
6✔
495
                        // Extract inbound fee. If there is a decoding error,
3✔
496
                        // skip this edge.
3✔
497
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
3✔
498
                        if err != nil {
3✔
UNCOV
499
                                return nil
×
UNCOV
500
                        }
×
501
                }
502

503
                directedChannel := &DirectedChannel{
3✔
504
                        ChannelID:    e.ChannelID,
3✔
505
                        IsNode1:      node == e.NodeKey1Bytes,
3✔
506
                        OtherNode:    e.NodeKey2Bytes,
3✔
507
                        Capacity:     e.Capacity,
3✔
508
                        OutPolicySet: p1 != nil,
3✔
509
                        InPolicy:     cachedInPolicy,
3✔
510
                        InboundFee:   inboundFee,
3✔
511
                }
3✔
512

3✔
513
                if node == e.NodeKey2Bytes {
6✔
514
                        directedChannel.OtherNode = e.NodeKey1Bytes
3✔
515
                }
3✔
516

517
                return cb(directedChannel)
3✔
518
        }
519

520
        return nodeTraversal(tx, node[:], c.db, dbCallback)
3✔
521
}
522

523
// fetchNodeFeatures returns the features of a given node. If no features are
524
// known for the node, an empty feature vector is returned. An optional read
525
// transaction may be provided. If none is provided, a new one will be created.
526
func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
527
        node route.Vertex) (*lnwire.FeatureVector, error) {
3✔
528

3✔
529
        // Fallback that uses the database.
3✔
530
        targetNode, err := c.FetchLightningNodeTx(tx, node)
3✔
531
        switch {
3✔
532
        // If the node exists and has features, return them directly.
533
        case err == nil:
3✔
534
                return targetNode.Features, nil
3✔
535

536
        // If we couldn't find a node announcement, populate a blank feature
537
        // vector.
UNCOV
538
        case errors.Is(err, ErrGraphNodeNotFound):
×
UNCOV
539
                return lnwire.EmptyFeatureVector(), nil
×
540

541
        // Otherwise, bubble the error up.
542
        default:
×
543
                return nil, err
×
544
        }
545
}
546

547
// ForEachNodeDirectedChannel iterates through all channels of a given node,
548
// executing the passed callback on the directed edge representing the channel
549
// and its incoming policy. If the callback returns an error, then the iteration
550
// is halted with the error propagated back up to the caller.
551
//
552
// Unknown policies are passed into the callback as nil values.
553
//
554
// NOTE: this is part of the graphdb.NodeTraverser interface.
555
func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
556
        cb func(channel *DirectedChannel) error) error {
3✔
557

3✔
558
        return c.forEachNodeDirectedChannel(nil, nodePub, cb)
3✔
559
}
3✔
560

561
// FetchNodeFeatures returns the features of the given node. If no features are
562
// known for the node, an empty feature vector is returned.
563
//
564
// NOTE: this is part of the graphdb.NodeTraverser interface.
565
func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
566
        *lnwire.FeatureVector, error) {
3✔
567

3✔
568
        return c.fetchNodeFeatures(nil, nodePub)
3✔
569
}
3✔
570

571
// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
572
// data to the call-back.
573
//
574
// NOTE: The callback contents MUST not be modified.
575
func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
UNCOV
576
        chans map[uint64]*DirectedChannel) error) error {
×
UNCOV
577

×
UNCOV
578
        // Otherwise call back to a version that uses the database directly.
×
UNCOV
579
        // We'll iterate over each node, then the set of channels for each
×
UNCOV
580
        // node, and construct a similar callback functiopn signature as the
×
UNCOV
581
        // main funcotin expects.
×
UNCOV
582
        return c.forEachNode(func(tx kvdb.RTx,
×
UNCOV
583
                node *models.LightningNode) error {
×
UNCOV
584

×
UNCOV
585
                channels := make(map[uint64]*DirectedChannel)
×
UNCOV
586

×
UNCOV
587
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
×
UNCOV
588
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
×
UNCOV
589
                                p1 *models.ChannelEdgePolicy,
×
UNCOV
590
                                p2 *models.ChannelEdgePolicy) error {
×
UNCOV
591

×
UNCOV
592
                                toNodeCallback := func() route.Vertex {
×
593
                                        return node.PubKeyBytes
×
594
                                }
×
UNCOV
595
                                toNodeFeatures, err := c.fetchNodeFeatures(
×
UNCOV
596
                                        tx, node.PubKeyBytes,
×
UNCOV
597
                                )
×
UNCOV
598
                                if err != nil {
×
599
                                        return err
×
600
                                }
×
601

UNCOV
602
                                var cachedInPolicy *models.CachedEdgePolicy
×
UNCOV
603
                                if p2 != nil {
×
UNCOV
604
                                        cachedInPolicy =
×
UNCOV
605
                                                models.NewCachedPolicy(p2)
×
UNCOV
606
                                        cachedInPolicy.ToNodePubKey =
×
UNCOV
607
                                                toNodeCallback
×
UNCOV
608
                                        cachedInPolicy.ToNodeFeatures =
×
UNCOV
609
                                                toNodeFeatures
×
UNCOV
610
                                }
×
611

UNCOV
612
                                directedChannel := &DirectedChannel{
×
UNCOV
613
                                        ChannelID: e.ChannelID,
×
UNCOV
614
                                        IsNode1: node.PubKeyBytes ==
×
UNCOV
615
                                                e.NodeKey1Bytes,
×
UNCOV
616
                                        OtherNode:    e.NodeKey2Bytes,
×
UNCOV
617
                                        Capacity:     e.Capacity,
×
UNCOV
618
                                        OutPolicySet: p1 != nil,
×
UNCOV
619
                                        InPolicy:     cachedInPolicy,
×
UNCOV
620
                                }
×
UNCOV
621

×
UNCOV
622
                                if node.PubKeyBytes == e.NodeKey2Bytes {
×
UNCOV
623
                                        directedChannel.OtherNode =
×
UNCOV
624
                                                e.NodeKey1Bytes
×
UNCOV
625
                                }
×
626

UNCOV
627
                                channels[e.ChannelID] = directedChannel
×
UNCOV
628

×
UNCOV
629
                                return nil
×
630
                        })
UNCOV
631
                if err != nil {
×
632
                        return err
×
633
                }
×
634

UNCOV
635
                return cb(node.PubKeyBytes, channels)
×
636
        })
637
}
638

639
// DisabledChannelIDs returns the channel ids of disabled channels.
640
// A channel is disabled when two of the associated ChanelEdgePolicies
641
// have their disabled bit on.
UNCOV
642
func (c *KVStore) DisabledChannelIDs() ([]uint64, error) {
×
UNCOV
643
        var disabledChanIDs []uint64
×
UNCOV
644
        var chanEdgeFound map[uint64]struct{}
×
UNCOV
645

×
UNCOV
646
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
UNCOV
647
                edges := tx.ReadBucket(edgeBucket)
×
UNCOV
648
                if edges == nil {
×
649
                        return ErrGraphNoEdgesFound
×
650
                }
×
651

UNCOV
652
                disabledEdgePolicyIndex := edges.NestedReadBucket(
×
UNCOV
653
                        disabledEdgePolicyBucket,
×
UNCOV
654
                )
×
UNCOV
655
                if disabledEdgePolicyIndex == nil {
×
UNCOV
656
                        return nil
×
UNCOV
657
                }
×
658

659
                // We iterate over all disabled policies and we add each channel
660
                // that has more than one disabled policy to disabledChanIDs
661
                // array.
UNCOV
662
                return disabledEdgePolicyIndex.ForEach(
×
UNCOV
663
                        func(k, v []byte) error {
×
UNCOV
664
                                chanID := byteOrder.Uint64(k[:8])
×
UNCOV
665
                                _, edgeFound := chanEdgeFound[chanID]
×
UNCOV
666
                                if edgeFound {
×
UNCOV
667
                                        delete(chanEdgeFound, chanID)
×
UNCOV
668
                                        disabledChanIDs = append(
×
UNCOV
669
                                                disabledChanIDs, chanID,
×
UNCOV
670
                                        )
×
UNCOV
671

×
UNCOV
672
                                        return nil
×
UNCOV
673
                                }
×
674

UNCOV
675
                                chanEdgeFound[chanID] = struct{}{}
×
UNCOV
676

×
UNCOV
677
                                return nil
×
678
                        },
679
                )
UNCOV
680
        }, func() {
×
UNCOV
681
                disabledChanIDs = nil
×
UNCOV
682
                chanEdgeFound = make(map[uint64]struct{})
×
UNCOV
683
        })
×
UNCOV
684
        if err != nil {
×
685
                return nil, err
×
686
        }
×
687

UNCOV
688
        return disabledChanIDs, nil
×
689
}
690

691
// ForEachNode iterates through all the stored vertices/nodes in the graph,
692
// executing the passed callback with each node encountered. If the callback
693
// returns an error, then the transaction is aborted and the iteration stops
694
// early. Any operations performed on the NodeTx passed to the call-back are
695
// executed under the same read transaction and so, methods on the NodeTx object
696
// _MUST_ only be called from within the call-back.
697
func (c *KVStore) ForEachNode(cb func(tx NodeRTx) error) error {
3✔
698
        return c.forEachNode(func(tx kvdb.RTx,
3✔
699
                node *models.LightningNode) error {
6✔
700

3✔
701
                return cb(newChanGraphNodeTx(tx, c, node))
3✔
702
        })
3✔
703
}
704

705
// forEachNode iterates through all the stored vertices/nodes in the graph,
706
// executing the passed callback with each node encountered. If the callback
707
// returns an error, then the transaction is aborted and the iteration stops
708
// early.
709
//
710
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
711
// traversal when graph gets mega.
712
func (c *KVStore) forEachNode(
713
        cb func(kvdb.RTx, *models.LightningNode) error) error {
3✔
714

3✔
715
        traversal := func(tx kvdb.RTx) error {
6✔
716
                // First grab the nodes bucket which stores the mapping from
3✔
717
                // pubKey to node information.
3✔
718
                nodes := tx.ReadBucket(nodeBucket)
3✔
719
                if nodes == nil {
3✔
720
                        return ErrGraphNotFound
×
721
                }
×
722

723
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
6✔
724
                        // If this is the source key, then we skip this
3✔
725
                        // iteration as the value for this key is a pubKey
3✔
726
                        // rather than raw node information.
3✔
727
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
6✔
728
                                return nil
3✔
729
                        }
3✔
730

731
                        nodeReader := bytes.NewReader(nodeBytes)
3✔
732
                        node, err := deserializeLightningNode(nodeReader)
3✔
733
                        if err != nil {
3✔
734
                                return err
×
735
                        }
×
736

737
                        // Execute the callback, the transaction will abort if
738
                        // this returns an error.
739
                        return cb(tx, &node)
3✔
740
                })
741
        }
742

743
        return kvdb.View(c.db, traversal, func() {})
6✔
744
}
745

746
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
747
// graph, executing the passed callback with each node encountered. If the
748
// callback returns an error, then the transaction is aborted and the iteration
749
// stops early.
750
func (c *KVStore) ForEachNodeCacheable(cb func(route.Vertex,
751
        *lnwire.FeatureVector) error) error {
3✔
752

3✔
753
        traversal := func(tx kvdb.RTx) error {
6✔
754
                // First grab the nodes bucket which stores the mapping from
3✔
755
                // pubKey to node information.
3✔
756
                nodes := tx.ReadBucket(nodeBucket)
3✔
757
                if nodes == nil {
3✔
758
                        return ErrGraphNotFound
×
759
                }
×
760

761
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
6✔
762
                        // If this is the source key, then we skip this
3✔
763
                        // iteration as the value for this key is a pubKey
3✔
764
                        // rather than raw node information.
3✔
765
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
6✔
766
                                return nil
3✔
767
                        }
3✔
768

769
                        nodeReader := bytes.NewReader(nodeBytes)
3✔
770
                        node, features, err := deserializeLightningNodeCacheable( //nolint:ll
3✔
771
                                nodeReader,
3✔
772
                        )
3✔
773
                        if err != nil {
3✔
774
                                return err
×
775
                        }
×
776

777
                        // Execute the callback, the transaction will abort if
778
                        // this returns an error.
779
                        return cb(node, features)
3✔
780
                })
781
        }
782

783
        return kvdb.View(c.db, traversal, func() {})
6✔
784
}
785

786
// SourceNode returns the source node of the graph. The source node is treated
787
// as the center node within a star-graph. This method may be used to kick off
788
// a path finding algorithm in order to explore the reachability of another
789
// node based off the source node.
790
func (c *KVStore) SourceNode() (*models.LightningNode, error) {
3✔
791
        var source *models.LightningNode
3✔
792
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
793
                // First grab the nodes bucket which stores the mapping from
3✔
794
                // pubKey to node information.
3✔
795
                nodes := tx.ReadBucket(nodeBucket)
3✔
796
                if nodes == nil {
3✔
797
                        return ErrGraphNotFound
×
798
                }
×
799

800
                node, err := c.sourceNode(nodes)
3✔
801
                if err != nil {
3✔
UNCOV
802
                        return err
×
UNCOV
803
                }
×
804
                source = node
3✔
805

3✔
806
                return nil
3✔
807
        }, func() {
3✔
808
                source = nil
3✔
809
        })
3✔
810
        if err != nil {
3✔
UNCOV
811
                return nil, err
×
UNCOV
812
        }
×
813

814
        return source, nil
3✔
815
}
816

817
// sourceNode uses an existing database transaction and returns the source node
818
// of the graph. The source node is treated as the center node within a
819
// star-graph. This method may be used to kick off a path finding algorithm in
820
// order to explore the reachability of another node based off the source node.
821
func (c *KVStore) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
822
        error) {
3✔
823

3✔
824
        selfPub := nodes.Get(sourceKey)
3✔
825
        if selfPub == nil {
3✔
UNCOV
826
                return nil, ErrSourceNodeNotSet
×
UNCOV
827
        }
×
828

829
        // With the pubKey of the source node retrieved, we're able to
830
        // fetch the full node information.
831
        node, err := fetchLightningNode(nodes, selfPub)
3✔
832
        if err != nil {
3✔
833
                return nil, err
×
834
        }
×
835

836
        return &node, nil
3✔
837
}
838

839
// SetSourceNode sets the source node within the graph database. The source
840
// node is to be used as the center of a star-graph within path finding
841
// algorithms.
842
func (c *KVStore) SetSourceNode(node *models.LightningNode) error {
3✔
843
        nodePubBytes := node.PubKeyBytes[:]
3✔
844

3✔
845
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
846
                // First grab the nodes bucket which stores the mapping from
3✔
847
                // pubKey to node information.
3✔
848
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
3✔
849
                if err != nil {
3✔
850
                        return err
×
851
                }
×
852

853
                // Next we create the mapping from source to the targeted
854
                // public key.
855
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
3✔
856
                        return err
×
857
                }
×
858

859
                // Finally, we commit the information of the lightning node
860
                // itself.
861
                return addLightningNode(tx, node)
3✔
862
        }, func() {})
3✔
863
}
864

865
// AddLightningNode adds a vertex/node to the graph database. If the node is not
866
// in the database from before, this will add a new, unconnected one to the
867
// graph. If it is present from before, this will update that node's
868
// information. Note that this method is expected to only be called to update an
869
// already present node from a node announcement, or to insert a node found in a
870
// channel update.
871
//
872
// TODO(roasbeef): also need sig of announcement.
873
func (c *KVStore) AddLightningNode(node *models.LightningNode,
874
        op ...batch.SchedulerOption) error {
3✔
875

3✔
876
        r := &batch.Request{
3✔
877
                Update: func(tx kvdb.RwTx) error {
6✔
878
                        return addLightningNode(tx, node)
3✔
879
                },
3✔
880
        }
881

882
        for _, f := range op {
6✔
883
                f(r)
3✔
884
        }
3✔
885

886
        return c.nodeScheduler.Execute(r)
3✔
887
}
888

889
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
3✔
890
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
3✔
891
        if err != nil {
3✔
892
                return err
×
893
        }
×
894

895
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
3✔
896
        if err != nil {
3✔
897
                return err
×
898
        }
×
899

900
        updateIndex, err := nodes.CreateBucketIfNotExists(
3✔
901
                nodeUpdateIndexBucket,
3✔
902
        )
3✔
903
        if err != nil {
3✔
904
                return err
×
905
        }
×
906

907
        return putLightningNode(nodes, aliases, updateIndex, node)
3✔
908
}
909

910
// LookupAlias attempts to return the alias as advertised by the target node.
911
// TODO(roasbeef): currently assumes that aliases are unique...
912
func (c *KVStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
3✔
913
        var alias string
3✔
914

3✔
915
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
916
                nodes := tx.ReadBucket(nodeBucket)
3✔
917
                if nodes == nil {
3✔
918
                        return ErrGraphNodesNotFound
×
919
                }
×
920

921
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
3✔
922
                if aliases == nil {
3✔
923
                        return ErrGraphNodesNotFound
×
924
                }
×
925

926
                nodePub := pub.SerializeCompressed()
3✔
927
                a := aliases.Get(nodePub)
3✔
928
                if a == nil {
3✔
UNCOV
929
                        return ErrNodeAliasNotFound
×
UNCOV
930
                }
×
931

932
                // TODO(roasbeef): should actually be using the utf-8
933
                // package...
934
                alias = string(a)
3✔
935

3✔
936
                return nil
3✔
937
        }, func() {
3✔
938
                alias = ""
3✔
939
        })
3✔
940
        if err != nil {
3✔
UNCOV
941
                return "", err
×
UNCOV
942
        }
×
943

944
        return alias, nil
3✔
945
}
946

947
// DeleteLightningNode starts a new database transaction to remove a vertex/node
948
// from the database according to the node's public key.
UNCOV
949
func (c *KVStore) DeleteLightningNode(nodePub route.Vertex) error {
×
UNCOV
950
        // TODO(roasbeef): ensure dangling edges are removed...
×
UNCOV
951
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
UNCOV
952
                nodes := tx.ReadWriteBucket(nodeBucket)
×
UNCOV
953
                if nodes == nil {
×
954
                        return ErrGraphNodeNotFound
×
955
                }
×
956

UNCOV
957
                return c.deleteLightningNode(nodes, nodePub[:])
×
UNCOV
958
        }, func() {})
×
959
}
960

961
// deleteLightningNode uses an existing database transaction to remove a
962
// vertex/node from the database according to the node's public key.
963
func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket,
964
        compressedPubKey []byte) error {
3✔
965

3✔
966
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
3✔
967
        if aliases == nil {
3✔
968
                return ErrGraphNodesNotFound
×
969
        }
×
970

971
        if err := aliases.Delete(compressedPubKey); err != nil {
3✔
972
                return err
×
973
        }
×
974

975
        // Before we delete the node, we'll fetch its current state so we can
976
        // determine when its last update was to clear out the node update
977
        // index.
978
        node, err := fetchLightningNode(nodes, compressedPubKey)
3✔
979
        if err != nil {
3✔
980
                return err
×
981
        }
×
982

983
        if err := nodes.Delete(compressedPubKey); err != nil {
3✔
984
                return err
×
985
        }
×
986

987
        // Finally, we'll delete the index entry for the node within the
988
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
989
        // need to track its last update.
990
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
3✔
991
        if nodeUpdateIndex == nil {
3✔
992
                return ErrGraphNodesNotFound
×
993
        }
×
994

995
        // In order to delete the entry, we'll need to reconstruct the key for
996
        // its last update.
997
        updateUnix := uint64(node.LastUpdate.Unix())
3✔
998
        var indexKey [8 + 33]byte
3✔
999
        byteOrder.PutUint64(indexKey[:8], updateUnix)
3✔
1000
        copy(indexKey[8:], compressedPubKey)
3✔
1001

3✔
1002
        return nodeUpdateIndex.Delete(indexKey[:])
3✔
1003
}
1004

1005
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1006
// undirected edge from the two target nodes are created. The information stored
1007
// denotes the static attributes of the channel, such as the channelID, the keys
1008
// involved in creation of the channel, and the set of features that the channel
1009
// supports. The chanPoint and chanID are used to uniquely identify the edge
1010
// globally within the database.
1011
func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
1012
        op ...batch.SchedulerOption) error {
3✔
1013

3✔
1014
        var alreadyExists bool
3✔
1015
        r := &batch.Request{
3✔
1016
                Reset: func() {
6✔
1017
                        alreadyExists = false
3✔
1018
                },
3✔
1019
                Update: func(tx kvdb.RwTx) error {
3✔
1020
                        err := c.addChannelEdge(tx, edge)
3✔
1021

3✔
1022
                        // Silence ErrEdgeAlreadyExist so that the batch can
3✔
1023
                        // succeed, but propagate the error via local state.
3✔
1024
                        if errors.Is(err, ErrEdgeAlreadyExist) {
3✔
UNCOV
1025
                                alreadyExists = true
×
UNCOV
1026
                                return nil
×
UNCOV
1027
                        }
×
1028

1029
                        return err
3✔
1030
                },
1031
                OnCommit: func(err error) error {
3✔
1032
                        switch {
3✔
1033
                        case err != nil:
×
1034
                                return err
×
UNCOV
1035
                        case alreadyExists:
×
UNCOV
1036
                                return ErrEdgeAlreadyExist
×
1037
                        default:
3✔
1038
                                c.rejectCache.remove(edge.ChannelID)
3✔
1039
                                c.chanCache.remove(edge.ChannelID)
3✔
1040
                                return nil
3✔
1041
                        }
1042
                },
1043
        }
1044

1045
        for _, f := range op {
6✔
1046
                if f == nil {
3✔
1047
                        return fmt.Errorf("nil scheduler option was used")
×
1048
                }
×
1049

1050
                f(r)
3✔
1051
        }
1052

1053
        return c.chanScheduler.Execute(r)
3✔
1054
}
1055

1056
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1057
// utilize an existing db transaction.
1058
func (c *KVStore) addChannelEdge(tx kvdb.RwTx,
1059
        edge *models.ChannelEdgeInfo) error {
3✔
1060

3✔
1061
        // Construct the channel's primary key which is the 8-byte channel ID.
3✔
1062
        var chanKey [8]byte
3✔
1063
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
3✔
1064

3✔
1065
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
3✔
1066
        if err != nil {
3✔
1067
                return err
×
1068
        }
×
1069
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
3✔
1070
        if err != nil {
3✔
1071
                return err
×
1072
        }
×
1073
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
3✔
1074
        if err != nil {
3✔
1075
                return err
×
1076
        }
×
1077
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
3✔
1078
        if err != nil {
3✔
1079
                return err
×
1080
        }
×
1081

1082
        // First, attempt to check if this edge has already been created. If
1083
        // so, then we can exit early as this method is meant to be idempotent.
1084
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
3✔
UNCOV
1085
                return ErrEdgeAlreadyExist
×
UNCOV
1086
        }
×
1087

1088
        // Before we insert the channel into the database, we'll ensure that
1089
        // both nodes already exist in the channel graph. If either node
1090
        // doesn't, then we'll insert a "shell" node that just includes its
1091
        // public key, so subsequent validation and queries can work properly.
1092
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
3✔
1093
        switch {
3✔
1094
        case errors.Is(node1Err, ErrGraphNodeNotFound):
3✔
1095
                node1Shell := models.LightningNode{
3✔
1096
                        PubKeyBytes:          edge.NodeKey1Bytes,
3✔
1097
                        HaveNodeAnnouncement: false,
3✔
1098
                }
3✔
1099
                err := addLightningNode(tx, &node1Shell)
3✔
1100
                if err != nil {
3✔
1101
                        return fmt.Errorf("unable to create shell node "+
×
1102
                                "for: %x: %w", edge.NodeKey1Bytes, err)
×
1103
                }
×
1104
        case node1Err != nil:
×
1105
                return node1Err
×
1106
        }
1107

1108
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
3✔
1109
        switch {
3✔
1110
        case errors.Is(node2Err, ErrGraphNodeNotFound):
3✔
1111
                node2Shell := models.LightningNode{
3✔
1112
                        PubKeyBytes:          edge.NodeKey2Bytes,
3✔
1113
                        HaveNodeAnnouncement: false,
3✔
1114
                }
3✔
1115
                err := addLightningNode(tx, &node2Shell)
3✔
1116
                if err != nil {
3✔
1117
                        return fmt.Errorf("unable to create shell node "+
×
1118
                                "for: %x: %w", edge.NodeKey2Bytes, err)
×
1119
                }
×
1120
        case node2Err != nil:
×
1121
                return node2Err
×
1122
        }
1123

1124
        // If the edge hasn't been created yet, then we'll first add it to the
1125
        // edge index in order to associate the edge between two nodes and also
1126
        // store the static components of the channel.
1127
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
3✔
1128
                return err
×
1129
        }
×
1130

1131
        // Mark edge policies for both sides as unknown. This is to enable
1132
        // efficient incoming channel lookup for a node.
1133
        keys := []*[33]byte{
3✔
1134
                &edge.NodeKey1Bytes,
3✔
1135
                &edge.NodeKey2Bytes,
3✔
1136
        }
3✔
1137
        for _, key := range keys {
6✔
1138
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
3✔
1139
                if err != nil {
3✔
1140
                        return err
×
1141
                }
×
1142
        }
1143

1144
        // Finally we add it to the channel index which maps channel points
1145
        // (outpoints) to the shorter channel ID's.
1146
        var b bytes.Buffer
3✔
1147
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
3✔
1148
                return err
×
1149
        }
×
1150

1151
        return chanIndex.Put(b.Bytes(), chanKey[:])
3✔
1152
}
1153

1154
// HasChannelEdge returns true if the database knows of a channel edge with the
1155
// passed channel ID, and false otherwise. If an edge with that ID is found
1156
// within the graph, then two time stamps representing the last time the edge
1157
// was updated for both directed edges are returned along with the boolean. If
1158
// it is not found, then the zombie index is checked and its result is returned
1159
// as the second boolean.
1160
func (c *KVStore) HasChannelEdge(
1161
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
3✔
1162

3✔
1163
        var (
3✔
1164
                upd1Time time.Time
3✔
1165
                upd2Time time.Time
3✔
1166
                exists   bool
3✔
1167
                isZombie bool
3✔
1168
        )
3✔
1169

3✔
1170
        // We'll query the cache with the shared lock held to allow multiple
3✔
1171
        // readers to access values in the cache concurrently if they exist.
3✔
1172
        c.cacheMu.RLock()
3✔
1173
        if entry, ok := c.rejectCache.get(chanID); ok {
6✔
1174
                c.cacheMu.RUnlock()
3✔
1175
                upd1Time = time.Unix(entry.upd1Time, 0)
3✔
1176
                upd2Time = time.Unix(entry.upd2Time, 0)
3✔
1177
                exists, isZombie = entry.flags.unpack()
3✔
1178

3✔
1179
                return upd1Time, upd2Time, exists, isZombie, nil
3✔
1180
        }
3✔
1181
        c.cacheMu.RUnlock()
3✔
1182

3✔
1183
        c.cacheMu.Lock()
3✔
1184
        defer c.cacheMu.Unlock()
3✔
1185

3✔
1186
        // The item was not found with the shared lock, so we'll acquire the
3✔
1187
        // exclusive lock and check the cache again in case another method added
3✔
1188
        // the entry to the cache while no lock was held.
3✔
1189
        if entry, ok := c.rejectCache.get(chanID); ok {
6✔
1190
                upd1Time = time.Unix(entry.upd1Time, 0)
3✔
1191
                upd2Time = time.Unix(entry.upd2Time, 0)
3✔
1192
                exists, isZombie = entry.flags.unpack()
3✔
1193

3✔
1194
                return upd1Time, upd2Time, exists, isZombie, nil
3✔
1195
        }
3✔
1196

1197
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1198
                edges := tx.ReadBucket(edgeBucket)
3✔
1199
                if edges == nil {
3✔
1200
                        return ErrGraphNoEdgesFound
×
1201
                }
×
1202
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1203
                if edgeIndex == nil {
3✔
1204
                        return ErrGraphNoEdgesFound
×
1205
                }
×
1206

1207
                var channelID [8]byte
3✔
1208
                byteOrder.PutUint64(channelID[:], chanID)
3✔
1209

3✔
1210
                // If the edge doesn't exist, then we'll also check our zombie
3✔
1211
                // index.
3✔
1212
                if edgeIndex.Get(channelID[:]) == nil {
6✔
1213
                        exists = false
3✔
1214
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
3✔
1215
                        if zombieIndex != nil {
6✔
1216
                                isZombie, _, _ = isZombieEdge(
3✔
1217
                                        zombieIndex, chanID,
3✔
1218
                                )
3✔
1219
                        }
3✔
1220

1221
                        return nil
3✔
1222
                }
1223

1224
                exists = true
3✔
1225
                isZombie = false
3✔
1226

3✔
1227
                // If the channel has been found in the graph, then retrieve
3✔
1228
                // the edges itself so we can return the last updated
3✔
1229
                // timestamps.
3✔
1230
                nodes := tx.ReadBucket(nodeBucket)
3✔
1231
                if nodes == nil {
3✔
1232
                        return ErrGraphNodeNotFound
×
1233
                }
×
1234

1235
                e1, e2, err := fetchChanEdgePolicies(
3✔
1236
                        edgeIndex, edges, channelID[:],
3✔
1237
                )
3✔
1238
                if err != nil {
3✔
1239
                        return err
×
1240
                }
×
1241

1242
                // As we may have only one of the edges populated, only set the
1243
                // update time if the edge was found in the database.
1244
                if e1 != nil {
6✔
1245
                        upd1Time = e1.LastUpdate
3✔
1246
                }
3✔
1247
                if e2 != nil {
6✔
1248
                        upd2Time = e2.LastUpdate
3✔
1249
                }
3✔
1250

1251
                return nil
3✔
1252
        }, func() {}); err != nil {
3✔
1253
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1254
        }
×
1255

1256
        c.rejectCache.insert(chanID, rejectCacheEntry{
3✔
1257
                upd1Time: upd1Time.Unix(),
3✔
1258
                upd2Time: upd2Time.Unix(),
3✔
1259
                flags:    packRejectFlags(exists, isZombie),
3✔
1260
        })
3✔
1261

3✔
1262
        return upd1Time, upd2Time, exists, isZombie, nil
3✔
1263
}
1264

1265
// AddEdgeProof sets the proof of an existing edge in the graph database.
1266
func (c *KVStore) AddEdgeProof(chanID lnwire.ShortChannelID,
1267
        proof *models.ChannelAuthProof) error {
3✔
1268

3✔
1269
        // Construct the channel's primary key which is the 8-byte channel ID.
3✔
1270
        var chanKey [8]byte
3✔
1271
        binary.BigEndian.PutUint64(chanKey[:], chanID.ToUint64())
3✔
1272

3✔
1273
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
1274
                edges := tx.ReadWriteBucket(edgeBucket)
3✔
1275
                if edges == nil {
3✔
1276
                        return ErrEdgeNotFound
×
1277
                }
×
1278

1279
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
3✔
1280
                if edgeIndex == nil {
3✔
1281
                        return ErrEdgeNotFound
×
1282
                }
×
1283

1284
                edge, err := fetchChanEdgeInfo(edgeIndex, chanKey[:])
3✔
1285
                if err != nil {
3✔
1286
                        return err
×
1287
                }
×
1288

1289
                edge.AuthProof = proof
3✔
1290

3✔
1291
                return putChanEdgeInfo(edgeIndex, &edge, chanKey)
3✔
1292
        }, func() {})
3✔
1293
}
1294

1295
const (
1296
        // pruneTipBytes is the total size of the value which stores a prune
1297
        // entry of the graph in the prune log. The "prune tip" is the last
1298
        // entry in the prune log, and indicates if the channel graph is in
1299
        // sync with the current UTXO state. The structure of the value
1300
        // is: blockHash, taking 32 bytes total.
1301
        pruneTipBytes = 32
1302
)
1303

1304
// PruneGraph prunes newly closed channels from the channel graph in response
1305
// to a new block being solved on the network. Any transactions which spend the
1306
// funding output of any known channels within he graph will be deleted.
1307
// Additionally, the "prune tip", or the last block which has been used to
1308
// prune the graph is stored so callers can ensure the graph is fully in sync
1309
// with the current UTXO state. A slice of channels that have been closed by
1310
// the target block along with any pruned nodes are returned if the function
1311
// succeeds without error.
1312
func (c *KVStore) PruneGraph(spentOutputs []*wire.OutPoint,
1313
        blockHash *chainhash.Hash, blockHeight uint32) (
1314
        []*models.ChannelEdgeInfo, []route.Vertex, error) {
3✔
1315

3✔
1316
        c.cacheMu.Lock()
3✔
1317
        defer c.cacheMu.Unlock()
3✔
1318

3✔
1319
        var (
3✔
1320
                chansClosed []*models.ChannelEdgeInfo
3✔
1321
                prunedNodes []route.Vertex
3✔
1322
        )
3✔
1323

3✔
1324
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
1325
                // First grab the edges bucket which houses the information
3✔
1326
                // we'd like to delete
3✔
1327
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
3✔
1328
                if err != nil {
3✔
1329
                        return err
×
1330
                }
×
1331

1332
                // Next grab the two edge indexes which will also need to be
1333
                // updated.
1334
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
3✔
1335
                if err != nil {
3✔
1336
                        return err
×
1337
                }
×
1338
                chanIndex, err := edges.CreateBucketIfNotExists(
3✔
1339
                        channelPointBucket,
3✔
1340
                )
3✔
1341
                if err != nil {
3✔
1342
                        return err
×
1343
                }
×
1344
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
1345
                if nodes == nil {
3✔
1346
                        return ErrSourceNodeNotSet
×
1347
                }
×
1348
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
3✔
1349
                if err != nil {
3✔
1350
                        return err
×
1351
                }
×
1352

1353
                // For each of the outpoints that have been spent within the
1354
                // block, we attempt to delete them from the graph as if that
1355
                // outpoint was a channel, then it has now been closed.
1356
                for _, chanPoint := range spentOutputs {
6✔
1357
                        // TODO(roasbeef): load channel bloom filter, continue
3✔
1358
                        // if NOT if filter
3✔
1359

3✔
1360
                        var opBytes bytes.Buffer
3✔
1361
                        err := WriteOutpoint(&opBytes, chanPoint)
3✔
1362
                        if err != nil {
3✔
1363
                                return err
×
1364
                        }
×
1365

1366
                        // First attempt to see if the channel exists within
1367
                        // the database, if not, then we can exit early.
1368
                        chanID := chanIndex.Get(opBytes.Bytes())
3✔
1369
                        if chanID == nil {
3✔
UNCOV
1370
                                continue
×
1371
                        }
1372

1373
                        // Attempt to delete the channel, an ErrEdgeNotFound
1374
                        // will be returned if that outpoint isn't known to be
1375
                        // a channel. If no error is returned, then a channel
1376
                        // was successfully pruned.
1377
                        edgeInfo, err := c.delChannelEdgeUnsafe(
3✔
1378
                                edges, edgeIndex, chanIndex, zombieIndex,
3✔
1379
                                chanID, false, false,
3✔
1380
                        )
3✔
1381
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
3✔
1382
                                return err
×
1383
                        }
×
1384

1385
                        chansClosed = append(chansClosed, edgeInfo)
3✔
1386
                }
1387

1388
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
3✔
1389
                if err != nil {
3✔
1390
                        return err
×
1391
                }
×
1392

1393
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
3✔
1394
                        pruneLogBucket,
3✔
1395
                )
3✔
1396
                if err != nil {
3✔
1397
                        return err
×
1398
                }
×
1399

1400
                // With the graph pruned, add a new entry to the prune log,
1401
                // which can be used to check if the graph is fully synced with
1402
                // the current UTXO state.
1403
                var blockHeightBytes [4]byte
3✔
1404
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
3✔
1405

3✔
1406
                var newTip [pruneTipBytes]byte
3✔
1407
                copy(newTip[:], blockHash[:])
3✔
1408

3✔
1409
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
3✔
1410
                if err != nil {
3✔
1411
                        return err
×
1412
                }
×
1413

1414
                // Now that the graph has been pruned, we'll also attempt to
1415
                // prune any nodes that have had a channel closed within the
1416
                // latest block.
1417
                prunedNodes, err = c.pruneGraphNodes(nodes, edgeIndex)
3✔
1418

3✔
1419
                return err
3✔
1420
        }, func() {
3✔
1421
                chansClosed = nil
3✔
1422
                prunedNodes = nil
3✔
1423
        })
3✔
1424
        if err != nil {
3✔
1425
                return nil, nil, err
×
1426
        }
×
1427

1428
        for _, channel := range chansClosed {
6✔
1429
                c.rejectCache.remove(channel.ChannelID)
3✔
1430
                c.chanCache.remove(channel.ChannelID)
3✔
1431
        }
3✔
1432

1433
        return chansClosed, prunedNodes, nil
3✔
1434
}
1435

1436
// PruneGraphNodes is a garbage collection method which attempts to prune out
1437
// any nodes from the channel graph that are currently unconnected. This ensure
1438
// that we only maintain a graph of reachable nodes. In the event that a pruned
1439
// node gains more channels, it will be re-added back to the graph.
1440
func (c *KVStore) PruneGraphNodes() ([]route.Vertex, error) {
3✔
1441
        var prunedNodes []route.Vertex
3✔
1442
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
1443
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
1444
                if nodes == nil {
3✔
1445
                        return ErrGraphNodesNotFound
×
1446
                }
×
1447
                edges := tx.ReadWriteBucket(edgeBucket)
3✔
1448
                if edges == nil {
3✔
1449
                        return ErrGraphNotFound
×
1450
                }
×
1451
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
3✔
1452
                if edgeIndex == nil {
3✔
1453
                        return ErrGraphNoEdgesFound
×
1454
                }
×
1455

1456
                var err error
3✔
1457
                prunedNodes, err = c.pruneGraphNodes(nodes, edgeIndex)
3✔
1458
                if err != nil {
3✔
1459
                        return err
×
1460
                }
×
1461

1462
                return nil
3✔
1463
        }, func() {
3✔
1464
                prunedNodes = nil
3✔
1465
        })
3✔
1466

1467
        return prunedNodes, err
3✔
1468
}
1469

1470
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1471
// channel closed within the current block. If the node still has existing
1472
// channels in the graph, this will act as a no-op.
1473
func (c *KVStore) pruneGraphNodes(nodes kvdb.RwBucket,
1474
        edgeIndex kvdb.RwBucket) ([]route.Vertex, error) {
3✔
1475

3✔
1476
        log.Trace("Pruning nodes from graph with no open channels")
3✔
1477

3✔
1478
        // We'll retrieve the graph's source node to ensure we don't remove it
3✔
1479
        // even if it no longer has any open channels.
3✔
1480
        sourceNode, err := c.sourceNode(nodes)
3✔
1481
        if err != nil {
3✔
1482
                return nil, err
×
1483
        }
×
1484

1485
        // We'll use this map to keep count the number of references to a node
1486
        // in the graph. A node should only be removed once it has no more
1487
        // references in the graph.
1488
        nodeRefCounts := make(map[[33]byte]int)
3✔
1489
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
6✔
1490
                // If this is the source key, then we skip this
3✔
1491
                // iteration as the value for this key is a pubKey
3✔
1492
                // rather than raw node information.
3✔
1493
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
6✔
1494
                        return nil
3✔
1495
                }
3✔
1496

1497
                var nodePub [33]byte
3✔
1498
                copy(nodePub[:], pubKey)
3✔
1499
                nodeRefCounts[nodePub] = 0
3✔
1500

3✔
1501
                return nil
3✔
1502
        })
1503
        if err != nil {
3✔
1504
                return nil, err
×
1505
        }
×
1506

1507
        // To ensure we never delete the source node, we'll start off by
1508
        // bumping its ref count to 1.
1509
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
3✔
1510

3✔
1511
        // Next, we'll run through the edgeIndex which maps a channel ID to the
3✔
1512
        // edge info. We'll use this scan to populate our reference count map
3✔
1513
        // above.
3✔
1514
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
6✔
1515
                // The first 66 bytes of the edge info contain the pubkeys of
3✔
1516
                // the nodes that this edge attaches. We'll extract them, and
3✔
1517
                // add them to the ref count map.
3✔
1518
                var node1, node2 [33]byte
3✔
1519
                copy(node1[:], edgeInfoBytes[:33])
3✔
1520
                copy(node2[:], edgeInfoBytes[33:])
3✔
1521

3✔
1522
                // With the nodes extracted, we'll increase the ref count of
3✔
1523
                // each of the nodes.
3✔
1524
                nodeRefCounts[node1]++
3✔
1525
                nodeRefCounts[node2]++
3✔
1526

3✔
1527
                return nil
3✔
1528
        })
3✔
1529
        if err != nil {
3✔
1530
                return nil, err
×
1531
        }
×
1532

1533
        // Finally, we'll make a second pass over the set of nodes, and delete
1534
        // any nodes that have a ref count of zero.
1535
        var pruned []route.Vertex
3✔
1536
        for nodePubKey, refCount := range nodeRefCounts {
6✔
1537
                // If the ref count of the node isn't zero, then we can safely
3✔
1538
                // skip it as it still has edges to or from it within the
3✔
1539
                // graph.
3✔
1540
                if refCount != 0 {
6✔
1541
                        continue
3✔
1542
                }
1543

1544
                // If we reach this point, then there are no longer any edges
1545
                // that connect this node, so we can delete it.
1546
                err := c.deleteLightningNode(nodes, nodePubKey[:])
3✔
1547
                if err != nil {
3✔
1548
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1549
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1550

×
1551
                                log.Warnf("Unable to prune node %x from the "+
×
1552
                                        "graph: %v", nodePubKey, err)
×
1553
                                continue
×
1554
                        }
1555

1556
                        return nil, err
×
1557
                }
1558

1559
                log.Infof("Pruned unconnected node %x from channel graph",
3✔
1560
                        nodePubKey[:])
3✔
1561

3✔
1562
                pruned = append(pruned, nodePubKey)
3✔
1563
        }
1564

1565
        if len(pruned) > 0 {
6✔
1566
                log.Infof("Pruned %v unconnected nodes from the channel graph",
3✔
1567
                        len(pruned))
3✔
1568
        }
3✔
1569

1570
        return pruned, err
3✔
1571
}
1572

1573
// DisconnectBlockAtHeight is used to indicate that the block specified
1574
// by the passed height has been disconnected from the main chain. This
1575
// will "rewind" the graph back to the height below, deleting channels
1576
// that are no longer confirmed from the graph. The prune log will be
1577
// set to the last prune height valid for the remaining chain.
1578
// Channels that were removed from the graph resulting from the
1579
// disconnected block are returned.
1580
func (c *KVStore) DisconnectBlockAtHeight(height uint32) (
1581
        []*models.ChannelEdgeInfo, error) {
2✔
1582

2✔
1583
        // Every channel having a ShortChannelID starting at 'height'
2✔
1584
        // will no longer be confirmed.
2✔
1585
        startShortChanID := lnwire.ShortChannelID{
2✔
1586
                BlockHeight: height,
2✔
1587
        }
2✔
1588

2✔
1589
        // Delete everything after this height from the db up until the
2✔
1590
        // SCID alias range.
2✔
1591
        endShortChanID := aliasmgr.StartingAlias
2✔
1592

2✔
1593
        // The block height will be the 3 first bytes of the channel IDs.
2✔
1594
        var chanIDStart [8]byte
2✔
1595
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
2✔
1596
        var chanIDEnd [8]byte
2✔
1597
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
2✔
1598

2✔
1599
        c.cacheMu.Lock()
2✔
1600
        defer c.cacheMu.Unlock()
2✔
1601

2✔
1602
        // Keep track of the channels that are removed from the graph.
2✔
1603
        var removedChans []*models.ChannelEdgeInfo
2✔
1604

2✔
1605
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
4✔
1606
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
2✔
1607
                if err != nil {
2✔
1608
                        return err
×
1609
                }
×
1610
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
2✔
1611
                if err != nil {
2✔
1612
                        return err
×
1613
                }
×
1614
                chanIndex, err := edges.CreateBucketIfNotExists(
2✔
1615
                        channelPointBucket,
2✔
1616
                )
2✔
1617
                if err != nil {
2✔
1618
                        return err
×
1619
                }
×
1620
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
2✔
1621
                if err != nil {
2✔
1622
                        return err
×
1623
                }
×
1624

1625
                // Scan from chanIDStart to chanIDEnd, deleting every
1626
                // found edge.
1627
                // NOTE: we must delete the edges after the cursor loop, since
1628
                // modifying the bucket while traversing is not safe.
1629
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1630
                // so that the StartingAlias itself isn't deleted.
1631
                var keys [][]byte
2✔
1632
                cursor := edgeIndex.ReadWriteCursor()
2✔
1633

2✔
1634
                //nolint:ll
2✔
1635
                for k, _ := cursor.Seek(chanIDStart[:]); k != nil &&
2✔
1636
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, _ = cursor.Next() {
4✔
1637
                        keys = append(keys, k)
2✔
1638
                }
2✔
1639

1640
                for _, k := range keys {
4✔
1641
                        edgeInfo, err := c.delChannelEdgeUnsafe(
2✔
1642
                                edges, edgeIndex, chanIndex, zombieIndex,
2✔
1643
                                k, false, false,
2✔
1644
                        )
2✔
1645
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
2✔
1646
                                return err
×
1647
                        }
×
1648

1649
                        removedChans = append(removedChans, edgeInfo)
2✔
1650
                }
1651

1652
                // Delete all the entries in the prune log having a height
1653
                // greater or equal to the block disconnected.
1654
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
2✔
1655
                if err != nil {
2✔
1656
                        return err
×
1657
                }
×
1658

1659
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
2✔
1660
                        pruneLogBucket,
2✔
1661
                )
2✔
1662
                if err != nil {
2✔
1663
                        return err
×
1664
                }
×
1665

1666
                var pruneKeyStart [4]byte
2✔
1667
                byteOrder.PutUint32(pruneKeyStart[:], height)
2✔
1668

2✔
1669
                var pruneKeyEnd [4]byte
2✔
1670
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
2✔
1671

2✔
1672
                // To avoid modifying the bucket while traversing, we delete
2✔
1673
                // the keys in a second loop.
2✔
1674
                var pruneKeys [][]byte
2✔
1675
                pruneCursor := pruneBucket.ReadWriteCursor()
2✔
1676
                //nolint:ll
2✔
1677
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
2✔
1678
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
4✔
1679
                        pruneKeys = append(pruneKeys, k)
2✔
1680
                }
2✔
1681

1682
                for _, k := range pruneKeys {
4✔
1683
                        if err := pruneBucket.Delete(k); err != nil {
2✔
1684
                                return err
×
1685
                        }
×
1686
                }
1687

1688
                return nil
2✔
1689
        }, func() {
2✔
1690
                removedChans = nil
2✔
1691
        }); err != nil {
2✔
1692
                return nil, err
×
1693
        }
×
1694

1695
        for _, channel := range removedChans {
4✔
1696
                c.rejectCache.remove(channel.ChannelID)
2✔
1697
                c.chanCache.remove(channel.ChannelID)
2✔
1698
        }
2✔
1699

1700
        return removedChans, nil
2✔
1701
}
1702

1703
// PruneTip returns the block height and hash of the latest block that has been
1704
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1705
// to tell if the graph is currently in sync with the current best known UTXO
1706
// state.
1707
func (c *KVStore) PruneTip() (*chainhash.Hash, uint32, error) {
3✔
1708
        var (
3✔
1709
                tipHash   chainhash.Hash
3✔
1710
                tipHeight uint32
3✔
1711
        )
3✔
1712

3✔
1713
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1714
                graphMeta := tx.ReadBucket(graphMetaBucket)
3✔
1715
                if graphMeta == nil {
3✔
1716
                        return ErrGraphNotFound
×
1717
                }
×
1718
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
3✔
1719
                if pruneBucket == nil {
3✔
1720
                        return ErrGraphNeverPruned
×
1721
                }
×
1722

1723
                pruneCursor := pruneBucket.ReadCursor()
3✔
1724

3✔
1725
                // The prune key with the largest block height will be our
3✔
1726
                // prune tip.
3✔
1727
                k, v := pruneCursor.Last()
3✔
1728
                if k == nil {
6✔
1729
                        return ErrGraphNeverPruned
3✔
1730
                }
3✔
1731

1732
                // Once we have the prune tip, the value will be the block hash,
1733
                // and the key the block height.
1734
                copy(tipHash[:], v)
3✔
1735
                tipHeight = byteOrder.Uint32(k)
3✔
1736

3✔
1737
                return nil
3✔
1738
        }, func() {})
3✔
1739
        if err != nil {
6✔
1740
                return nil, 0, err
3✔
1741
        }
3✔
1742

1743
        return &tipHash, tipHeight, nil
3✔
1744
}
1745

1746
// DeleteChannelEdges removes edges with the given channel IDs from the
1747
// database and marks them as zombies. This ensures that we're unable to re-add
1748
// it to our database once again. If an edge does not exist within the
1749
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1750
// true, then when we mark these edges as zombies, we'll set up the keys such
1751
// that we require the node that failed to send the fresh update to be the one
1752
// that resurrects the channel from its zombie state. The markZombie bool
1753
// denotes whether or not to mark the channel as a zombie.
1754
func (c *KVStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1755
        chanIDs ...uint64) ([]*models.ChannelEdgeInfo, error) {
3✔
1756

3✔
1757
        // TODO(roasbeef): possibly delete from node bucket if node has no more
3✔
1758
        // channels
3✔
1759
        // TODO(roasbeef): don't delete both edges?
3✔
1760

3✔
1761
        c.cacheMu.Lock()
3✔
1762
        defer c.cacheMu.Unlock()
3✔
1763

3✔
1764
        var infos []*models.ChannelEdgeInfo
3✔
1765
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
1766
                edges := tx.ReadWriteBucket(edgeBucket)
3✔
1767
                if edges == nil {
3✔
1768
                        return ErrEdgeNotFound
×
1769
                }
×
1770
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
3✔
1771
                if edgeIndex == nil {
3✔
1772
                        return ErrEdgeNotFound
×
1773
                }
×
1774
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
3✔
1775
                if chanIndex == nil {
3✔
1776
                        return ErrEdgeNotFound
×
1777
                }
×
1778
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
1779
                if nodes == nil {
3✔
1780
                        return ErrGraphNodeNotFound
×
1781
                }
×
1782
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
3✔
1783
                if err != nil {
3✔
1784
                        return err
×
1785
                }
×
1786

1787
                var rawChanID [8]byte
3✔
1788
                for _, chanID := range chanIDs {
6✔
1789
                        byteOrder.PutUint64(rawChanID[:], chanID)
3✔
1790
                        edgeInfo, err := c.delChannelEdgeUnsafe(
3✔
1791
                                edges, edgeIndex, chanIndex, zombieIndex,
3✔
1792
                                rawChanID[:], markZombie, strictZombiePruning,
3✔
1793
                        )
3✔
1794
                        if err != nil {
3✔
UNCOV
1795
                                return err
×
UNCOV
1796
                        }
×
1797

1798
                        infos = append(infos, edgeInfo)
3✔
1799
                }
1800

1801
                return nil
3✔
1802
        }, func() {
3✔
1803
                infos = nil
3✔
1804
        })
3✔
1805
        if err != nil {
3✔
UNCOV
1806
                return nil, err
×
UNCOV
1807
        }
×
1808

1809
        for _, chanID := range chanIDs {
6✔
1810
                c.rejectCache.remove(chanID)
3✔
1811
                c.chanCache.remove(chanID)
3✔
1812
        }
3✔
1813

1814
        return infos, nil
3✔
1815
}
1816

1817
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1818
// passed channel point (outpoint). If the passed channel doesn't exist within
1819
// the database, then ErrEdgeNotFound is returned.
1820
func (c *KVStore) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
3✔
1821
        var chanID uint64
3✔
1822
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1823
                var err error
3✔
1824
                chanID, err = getChanID(tx, chanPoint)
3✔
1825
                return err
3✔
1826
        }, func() {
6✔
1827
                chanID = 0
3✔
1828
        }); err != nil {
6✔
1829
                return 0, err
3✔
1830
        }
3✔
1831

1832
        return chanID, nil
3✔
1833
}
1834

1835
// getChanID returns the assigned channel ID for a given channel point.
1836
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
3✔
1837
        var b bytes.Buffer
3✔
1838
        if err := WriteOutpoint(&b, chanPoint); err != nil {
3✔
1839
                return 0, err
×
1840
        }
×
1841

1842
        edges := tx.ReadBucket(edgeBucket)
3✔
1843
        if edges == nil {
3✔
1844
                return 0, ErrGraphNoEdgesFound
×
1845
        }
×
1846
        chanIndex := edges.NestedReadBucket(channelPointBucket)
3✔
1847
        if chanIndex == nil {
3✔
1848
                return 0, ErrGraphNoEdgesFound
×
1849
        }
×
1850

1851
        chanIDBytes := chanIndex.Get(b.Bytes())
3✔
1852
        if chanIDBytes == nil {
6✔
1853
                return 0, ErrEdgeNotFound
3✔
1854
        }
3✔
1855

1856
        chanID := byteOrder.Uint64(chanIDBytes)
3✔
1857

3✔
1858
        return chanID, nil
3✔
1859
}
1860

1861
// TODO(roasbeef): allow updates to use Batch?
1862

1863
// HighestChanID returns the "highest" known channel ID in the channel graph.
1864
// This represents the "newest" channel from the PoV of the chain. This method
1865
// can be used by peers to quickly determine if they're graphs are in sync.
1866
func (c *KVStore) HighestChanID() (uint64, error) {
3✔
1867
        var cid uint64
3✔
1868

3✔
1869
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1870
                edges := tx.ReadBucket(edgeBucket)
3✔
1871
                if edges == nil {
3✔
1872
                        return ErrGraphNoEdgesFound
×
1873
                }
×
1874
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1875
                if edgeIndex == nil {
3✔
1876
                        return ErrGraphNoEdgesFound
×
1877
                }
×
1878

1879
                // In order to find the highest chan ID, we'll fetch a cursor
1880
                // and use that to seek to the "end" of our known rage.
1881
                cidCursor := edgeIndex.ReadCursor()
3✔
1882

3✔
1883
                lastChanID, _ := cidCursor.Last()
3✔
1884

3✔
1885
                // If there's no key, then this means that we don't actually
3✔
1886
                // know of any channels, so we'll return a predicable error.
3✔
1887
                if lastChanID == nil {
6✔
1888
                        return ErrGraphNoEdgesFound
3✔
1889
                }
3✔
1890

1891
                // Otherwise, we'll de serialize the channel ID and return it
1892
                // to the caller.
1893
                cid = byteOrder.Uint64(lastChanID)
3✔
1894

3✔
1895
                return nil
3✔
1896
        }, func() {
3✔
1897
                cid = 0
3✔
1898
        })
3✔
1899
        if err != nil && !errors.Is(err, ErrGraphNoEdgesFound) {
3✔
1900
                return 0, err
×
1901
        }
×
1902

1903
        return cid, nil
3✔
1904
}
1905

1906
// ChannelEdge represents the complete set of information for a channel edge in
1907
// the known channel graph. This struct couples the core information of the
1908
// edge as well as each of the known advertised edge policies.
1909
type ChannelEdge struct {
1910
        // Info contains all the static information describing the channel.
1911
        Info *models.ChannelEdgeInfo
1912

1913
        // Policy1 points to the "first" edge policy of the channel containing
1914
        // the dynamic information required to properly route through the edge.
1915
        Policy1 *models.ChannelEdgePolicy
1916

1917
        // Policy2 points to the "second" edge policy of the channel containing
1918
        // the dynamic information required to properly route through the edge.
1919
        Policy2 *models.ChannelEdgePolicy
1920

1921
        // Node1 is "node 1" in the channel. This is the node that would have
1922
        // produced Policy1 if it exists.
1923
        Node1 *models.LightningNode
1924

1925
        // Node2 is "node 2" in the channel. This is the node that would have
1926
        // produced Policy2 if it exists.
1927
        Node2 *models.LightningNode
1928
}
1929

1930
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1931
// one edge that has an update timestamp within the specified horizon.
1932
func (c *KVStore) ChanUpdatesInHorizon(startTime,
1933
        endTime time.Time) ([]ChannelEdge, error) {
3✔
1934

3✔
1935
        // To ensure we don't return duplicate ChannelEdges, we'll use an
3✔
1936
        // additional map to keep track of the edges already seen to prevent
3✔
1937
        // re-adding it.
3✔
1938
        var edgesSeen map[uint64]struct{}
3✔
1939
        var edgesToCache map[uint64]ChannelEdge
3✔
1940
        var edgesInHorizon []ChannelEdge
3✔
1941

3✔
1942
        c.cacheMu.Lock()
3✔
1943
        defer c.cacheMu.Unlock()
3✔
1944

3✔
1945
        var hits int
3✔
1946
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1947
                edges := tx.ReadBucket(edgeBucket)
3✔
1948
                if edges == nil {
3✔
1949
                        return ErrGraphNoEdgesFound
×
1950
                }
×
1951
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1952
                if edgeIndex == nil {
3✔
1953
                        return ErrGraphNoEdgesFound
×
1954
                }
×
1955
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
3✔
1956
                if edgeUpdateIndex == nil {
3✔
1957
                        return ErrGraphNoEdgesFound
×
1958
                }
×
1959

1960
                nodes := tx.ReadBucket(nodeBucket)
3✔
1961
                if nodes == nil {
3✔
1962
                        return ErrGraphNodesNotFound
×
1963
                }
×
1964

1965
                // We'll now obtain a cursor to perform a range query within
1966
                // the index to find all channels within the horizon.
1967
                updateCursor := edgeUpdateIndex.ReadCursor()
3✔
1968

3✔
1969
                var startTimeBytes, endTimeBytes [8 + 8]byte
3✔
1970
                byteOrder.PutUint64(
3✔
1971
                        startTimeBytes[:8], uint64(startTime.Unix()),
3✔
1972
                )
3✔
1973
                byteOrder.PutUint64(
3✔
1974
                        endTimeBytes[:8], uint64(endTime.Unix()),
3✔
1975
                )
3✔
1976

3✔
1977
                // With our start and end times constructed, we'll step through
3✔
1978
                // the index collecting the info and policy of each update of
3✔
1979
                // each channel that has a last update within the time range.
3✔
1980
                //
3✔
1981
                //nolint:ll
3✔
1982
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
3✔
1983
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
6✔
1984
                        // We have a new eligible entry, so we'll slice of the
3✔
1985
                        // chan ID so we can query it in the DB.
3✔
1986
                        chanID := indexKey[8:]
3✔
1987

3✔
1988
                        // If we've already retrieved the info and policies for
3✔
1989
                        // this edge, then we can skip it as we don't need to do
3✔
1990
                        // so again.
3✔
1991
                        chanIDInt := byteOrder.Uint64(chanID)
3✔
1992
                        if _, ok := edgesSeen[chanIDInt]; ok {
3✔
UNCOV
1993
                                continue
×
1994
                        }
1995

1996
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
5✔
1997
                                hits++
2✔
1998
                                edgesSeen[chanIDInt] = struct{}{}
2✔
1999
                                edgesInHorizon = append(edgesInHorizon, channel)
2✔
2000

2✔
2001
                                continue
2✔
2002
                        }
2003

2004
                        // First, we'll fetch the static edge information.
2005
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3✔
2006
                        if err != nil {
3✔
2007
                                chanID := byteOrder.Uint64(chanID)
×
2008
                                return fmt.Errorf("unable to fetch info for "+
×
2009
                                        "edge with chan_id=%v: %v", chanID, err)
×
2010
                        }
×
2011

2012
                        // With the static information obtained, we'll now
2013
                        // fetch the dynamic policy info.
2014
                        edge1, edge2, err := fetchChanEdgePolicies(
3✔
2015
                                edgeIndex, edges, chanID,
3✔
2016
                        )
3✔
2017
                        if err != nil {
3✔
2018
                                chanID := byteOrder.Uint64(chanID)
×
2019
                                return fmt.Errorf("unable to fetch policies "+
×
2020
                                        "for edge with chan_id=%v: %v", chanID,
×
2021
                                        err)
×
2022
                        }
×
2023

2024
                        node1, err := fetchLightningNode(
3✔
2025
                                nodes, edgeInfo.NodeKey1Bytes[:],
3✔
2026
                        )
3✔
2027
                        if err != nil {
3✔
2028
                                return err
×
2029
                        }
×
2030

2031
                        node2, err := fetchLightningNode(
3✔
2032
                                nodes, edgeInfo.NodeKey2Bytes[:],
3✔
2033
                        )
3✔
2034
                        if err != nil {
3✔
2035
                                return err
×
2036
                        }
×
2037

2038
                        // Finally, we'll collate this edge with the rest of
2039
                        // edges to be returned.
2040
                        edgesSeen[chanIDInt] = struct{}{}
3✔
2041
                        channel := ChannelEdge{
3✔
2042
                                Info:    &edgeInfo,
3✔
2043
                                Policy1: edge1,
3✔
2044
                                Policy2: edge2,
3✔
2045
                                Node1:   &node1,
3✔
2046
                                Node2:   &node2,
3✔
2047
                        }
3✔
2048
                        edgesInHorizon = append(edgesInHorizon, channel)
3✔
2049
                        edgesToCache[chanIDInt] = channel
3✔
2050
                }
2051

2052
                return nil
3✔
2053
        }, func() {
3✔
2054
                edgesSeen = make(map[uint64]struct{})
3✔
2055
                edgesToCache = make(map[uint64]ChannelEdge)
3✔
2056
                edgesInHorizon = nil
3✔
2057
        })
3✔
2058
        switch {
3✔
2059
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2060
                fallthrough
×
2061
        case errors.Is(err, ErrGraphNodesNotFound):
×
2062
                break
×
2063

2064
        case err != nil:
×
2065
                return nil, err
×
2066
        }
2067

2068
        // Insert any edges loaded from disk into the cache.
2069
        for chanid, channel := range edgesToCache {
6✔
2070
                c.chanCache.insert(chanid, channel)
3✔
2071
        }
3✔
2072

2073
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
3✔
2074
                float64(hits)/float64(len(edgesInHorizon)), hits,
3✔
2075
                len(edgesInHorizon))
3✔
2076

3✔
2077
        return edgesInHorizon, nil
3✔
2078
}
2079

2080
// NodeUpdatesInHorizon returns all the known lightning node which have an
2081
// update timestamp within the passed range. This method can be used by two
2082
// nodes to quickly determine if they have the same set of up to date node
2083
// announcements.
2084
func (c *KVStore) NodeUpdatesInHorizon(startTime,
2085
        endTime time.Time) ([]models.LightningNode, error) {
3✔
2086

3✔
2087
        var nodesInHorizon []models.LightningNode
3✔
2088

3✔
2089
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
2090
                nodes := tx.ReadBucket(nodeBucket)
3✔
2091
                if nodes == nil {
3✔
2092
                        return ErrGraphNodesNotFound
×
2093
                }
×
2094

2095
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
3✔
2096
                if nodeUpdateIndex == nil {
3✔
2097
                        return ErrGraphNodesNotFound
×
2098
                }
×
2099

2100
                // We'll now obtain a cursor to perform a range query within
2101
                // the index to find all node announcements within the horizon.
2102
                updateCursor := nodeUpdateIndex.ReadCursor()
3✔
2103

3✔
2104
                var startTimeBytes, endTimeBytes [8 + 33]byte
3✔
2105
                byteOrder.PutUint64(
3✔
2106
                        startTimeBytes[:8], uint64(startTime.Unix()),
3✔
2107
                )
3✔
2108
                byteOrder.PutUint64(
3✔
2109
                        endTimeBytes[:8], uint64(endTime.Unix()),
3✔
2110
                )
3✔
2111

3✔
2112
                // With our start and end times constructed, we'll step through
3✔
2113
                // the index collecting info for each node within the time
3✔
2114
                // range.
3✔
2115
                //
3✔
2116
                //nolint:ll
3✔
2117
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
3✔
2118
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
6✔
2119
                        nodePub := indexKey[8:]
3✔
2120
                        node, err := fetchLightningNode(nodes, nodePub)
3✔
2121
                        if err != nil {
3✔
2122
                                return err
×
2123
                        }
×
2124

2125
                        nodesInHorizon = append(nodesInHorizon, node)
3✔
2126
                }
2127

2128
                return nil
3✔
2129
        }, func() {
3✔
2130
                nodesInHorizon = nil
3✔
2131
        })
3✔
2132
        switch {
3✔
2133
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2134
                fallthrough
×
2135
        case errors.Is(err, ErrGraphNodesNotFound):
×
2136
                break
×
2137

2138
        case err != nil:
×
2139
                return nil, err
×
2140
        }
2141

2142
        return nodesInHorizon, nil
3✔
2143
}
2144

2145
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2146
// ID's that we don't know and are not known zombies of the passed set. In other
2147
// words, we perform a set difference of our set of chan ID's and the ones
2148
// passed in. This method can be used by callers to determine the set of
2149
// channels another peer knows of that we don't. The ChannelUpdateInfos for the
2150
// known zombies is also returned.
2151
func (c *KVStore) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo) ([]uint64,
2152
        []ChannelUpdateInfo, error) {
3✔
2153

3✔
2154
        var (
3✔
2155
                newChanIDs   []uint64
3✔
2156
                knownZombies []ChannelUpdateInfo
3✔
2157
        )
3✔
2158

3✔
2159
        c.cacheMu.Lock()
3✔
2160
        defer c.cacheMu.Unlock()
3✔
2161

3✔
2162
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
2163
                edges := tx.ReadBucket(edgeBucket)
3✔
2164
                if edges == nil {
3✔
2165
                        return ErrGraphNoEdgesFound
×
2166
                }
×
2167
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
2168
                if edgeIndex == nil {
3✔
2169
                        return ErrGraphNoEdgesFound
×
2170
                }
×
2171

2172
                // Fetch the zombie index, it may not exist if no edges have
2173
                // ever been marked as zombies. If the index has been
2174
                // initialized, we will use it later to skip known zombie edges.
2175
                zombieIndex := edges.NestedReadBucket(zombieBucket)
3✔
2176

3✔
2177
                // We'll run through the set of chanIDs and collate only the
3✔
2178
                // set of channel that are unable to be found within our db.
3✔
2179
                var cidBytes [8]byte
3✔
2180
                for _, info := range chansInfo {
6✔
2181
                        scid := info.ShortChannelID.ToUint64()
3✔
2182
                        byteOrder.PutUint64(cidBytes[:], scid)
3✔
2183

3✔
2184
                        // If the edge is already known, skip it.
3✔
2185
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
6✔
2186
                                continue
3✔
2187
                        }
2188

2189
                        // If the edge is a known zombie, skip it.
2190
                        if zombieIndex != nil {
6✔
2191
                                isZombie, _, _ := isZombieEdge(
3✔
2192
                                        zombieIndex, scid,
3✔
2193
                                )
3✔
2194

3✔
2195
                                if isZombie {
3✔
UNCOV
2196
                                        knownZombies = append(
×
UNCOV
2197
                                                knownZombies, info,
×
UNCOV
2198
                                        )
×
UNCOV
2199

×
UNCOV
2200
                                        continue
×
2201
                                }
2202
                        }
2203

2204
                        newChanIDs = append(newChanIDs, scid)
3✔
2205
                }
2206

2207
                return nil
3✔
2208
        }, func() {
3✔
2209
                newChanIDs = nil
3✔
2210
                knownZombies = nil
3✔
2211
        })
3✔
2212
        switch {
3✔
2213
        // If we don't know of any edges yet, then we'll return the entire set
2214
        // of chan IDs specified.
2215
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2216
                ogChanIDs := make([]uint64, len(chansInfo))
×
2217
                for i, info := range chansInfo {
×
2218
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2219
                }
×
2220

2221
                return ogChanIDs, nil, nil
×
2222

2223
        case err != nil:
×
2224
                return nil, nil, err
×
2225
        }
2226

2227
        return newChanIDs, knownZombies, nil
3✔
2228
}
2229

2230
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2231
// latest received channel updates for the channel.
2232
type ChannelUpdateInfo struct {
2233
        // ShortChannelID is the SCID identifier of the channel.
2234
        ShortChannelID lnwire.ShortChannelID
2235

2236
        // Node1UpdateTimestamp is the timestamp of the latest received update
2237
        // from the node 1 channel peer. This will be set to zero time if no
2238
        // update has yet been received from this node.
2239
        Node1UpdateTimestamp time.Time
2240

2241
        // Node2UpdateTimestamp is the timestamp of the latest received update
2242
        // from the node 2 channel peer. This will be set to zero time if no
2243
        // update has yet been received from this node.
2244
        Node2UpdateTimestamp time.Time
2245
}
2246

2247
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2248
// timestamps with zero seconds unix timestamp which equals
2249
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2250
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2251
        node2Timestamp time.Time) ChannelUpdateInfo {
3✔
2252

3✔
2253
        chanInfo := ChannelUpdateInfo{
3✔
2254
                ShortChannelID:       scid,
3✔
2255
                Node1UpdateTimestamp: node1Timestamp,
3✔
2256
                Node2UpdateTimestamp: node2Timestamp,
3✔
2257
        }
3✔
2258

3✔
2259
        if node1Timestamp.IsZero() {
6✔
2260
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
3✔
2261
        }
3✔
2262

2263
        if node2Timestamp.IsZero() {
6✔
2264
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
3✔
2265
        }
3✔
2266

2267
        return chanInfo
3✔
2268
}
2269

2270
// BlockChannelRange represents a range of channels for a given block height.
2271
type BlockChannelRange struct {
2272
        // Height is the height of the block all of the channels below were
2273
        // included in.
2274
        Height uint32
2275

2276
        // Channels is the list of channels identified by their short ID
2277
        // representation known to us that were included in the block height
2278
        // above. The list may include channel update timestamp information if
2279
        // requested.
2280
        Channels []ChannelUpdateInfo
2281
}
2282

2283
// FilterChannelRange returns the channel ID's of all known channels which were
2284
// mined in a block height within the passed range. The channel IDs are grouped
2285
// by their common block height. This method can be used to quickly share with a
2286
// peer the set of channels we know of within a particular range to catch them
2287
// up after a period of time offline. If withTimestamps is true then the
2288
// timestamp info of the latest received channel update messages of the channel
2289
// will be included in the response.
2290
func (c *KVStore) FilterChannelRange(startHeight,
2291
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
3✔
2292

3✔
2293
        startChanID := &lnwire.ShortChannelID{
3✔
2294
                BlockHeight: startHeight,
3✔
2295
        }
3✔
2296

3✔
2297
        endChanID := lnwire.ShortChannelID{
3✔
2298
                BlockHeight: endHeight,
3✔
2299
                TxIndex:     math.MaxUint32 & 0x00ffffff,
3✔
2300
                TxPosition:  math.MaxUint16,
3✔
2301
        }
3✔
2302

3✔
2303
        // As we need to perform a range scan, we'll convert the starting and
3✔
2304
        // ending height to their corresponding values when encoded using short
3✔
2305
        // channel ID's.
3✔
2306
        var chanIDStart, chanIDEnd [8]byte
3✔
2307
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
3✔
2308
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
3✔
2309

3✔
2310
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
3✔
2311
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
2312
                edges := tx.ReadBucket(edgeBucket)
3✔
2313
                if edges == nil {
3✔
2314
                        return ErrGraphNoEdgesFound
×
2315
                }
×
2316
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
2317
                if edgeIndex == nil {
3✔
2318
                        return ErrGraphNoEdgesFound
×
2319
                }
×
2320

2321
                cursor := edgeIndex.ReadCursor()
3✔
2322

3✔
2323
                // We'll now iterate through the database, and find each
3✔
2324
                // channel ID that resides within the specified range.
3✔
2325
                //
3✔
2326
                //nolint:ll
3✔
2327
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
3✔
2328
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
6✔
2329
                        // Don't send alias SCIDs during gossip sync.
3✔
2330
                        edgeReader := bytes.NewReader(v)
3✔
2331
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
3✔
2332
                        if err != nil {
3✔
2333
                                return err
×
2334
                        }
×
2335

2336
                        if edgeInfo.AuthProof == nil {
6✔
2337
                                continue
3✔
2338
                        }
2339

2340
                        // This channel ID rests within the target range, so
2341
                        // we'll add it to our returned set.
2342
                        rawCid := byteOrder.Uint64(k)
3✔
2343
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
3✔
2344

3✔
2345
                        chanInfo := NewChannelUpdateInfo(
3✔
2346
                                cid, time.Time{}, time.Time{},
3✔
2347
                        )
3✔
2348

3✔
2349
                        if !withTimestamps {
3✔
UNCOV
2350
                                channelsPerBlock[cid.BlockHeight] = append(
×
UNCOV
2351
                                        channelsPerBlock[cid.BlockHeight],
×
UNCOV
2352
                                        chanInfo,
×
UNCOV
2353
                                )
×
UNCOV
2354

×
UNCOV
2355
                                continue
×
2356
                        }
2357

2358
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
3✔
2359

3✔
2360
                        rawPolicy := edges.Get(node1Key)
3✔
2361
                        if len(rawPolicy) != 0 {
6✔
2362
                                r := bytes.NewReader(rawPolicy)
3✔
2363

3✔
2364
                                edge, err := deserializeChanEdgePolicyRaw(r)
3✔
2365
                                if err != nil && !errors.Is(
3✔
2366
                                        err, ErrEdgePolicyOptionalFieldNotFound,
3✔
2367
                                ) {
3✔
2368

×
2369
                                        return err
×
2370
                                }
×
2371

2372
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
3✔
2373
                        }
2374

2375
                        rawPolicy = edges.Get(node2Key)
3✔
2376
                        if len(rawPolicy) != 0 {
6✔
2377
                                r := bytes.NewReader(rawPolicy)
3✔
2378

3✔
2379
                                edge, err := deserializeChanEdgePolicyRaw(r)
3✔
2380
                                if err != nil && !errors.Is(
3✔
2381
                                        err, ErrEdgePolicyOptionalFieldNotFound,
3✔
2382
                                ) {
3✔
2383

×
2384
                                        return err
×
2385
                                }
×
2386

2387
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
3✔
2388
                        }
2389

2390
                        channelsPerBlock[cid.BlockHeight] = append(
3✔
2391
                                channelsPerBlock[cid.BlockHeight], chanInfo,
3✔
2392
                        )
3✔
2393
                }
2394

2395
                return nil
3✔
2396
        }, func() {
3✔
2397
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
3✔
2398
        })
3✔
2399

2400
        switch {
3✔
2401
        // If we don't know of any channels yet, then there's nothing to
2402
        // filter, so we'll return an empty slice.
2403
        case errors.Is(err, ErrGraphNoEdgesFound) || len(channelsPerBlock) == 0:
3✔
2404
                return nil, nil
3✔
2405

2406
        case err != nil:
×
2407
                return nil, err
×
2408
        }
2409

2410
        // Return the channel ranges in ascending block height order.
2411
        blocks := make([]uint32, 0, len(channelsPerBlock))
3✔
2412
        for block := range channelsPerBlock {
6✔
2413
                blocks = append(blocks, block)
3✔
2414
        }
3✔
2415
        sort.Slice(blocks, func(i, j int) bool {
6✔
2416
                return blocks[i] < blocks[j]
3✔
2417
        })
3✔
2418

2419
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
3✔
2420
        for _, block := range blocks {
6✔
2421
                channelRanges = append(channelRanges, BlockChannelRange{
3✔
2422
                        Height:   block,
3✔
2423
                        Channels: channelsPerBlock[block],
3✔
2424
                })
3✔
2425
        }
3✔
2426

2427
        return channelRanges, nil
3✔
2428
}
2429

2430
// FetchChanInfos returns the set of channel edges that correspond to the passed
2431
// channel ID's. If an edge is the query is unknown to the database, it will
2432
// skipped and the result will contain only those edges that exist at the time
2433
// of the query. This can be used to respond to peer queries that are seeking to
2434
// fill in gaps in their view of the channel graph.
2435
func (c *KVStore) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
3✔
2436
        return c.fetchChanInfos(nil, chanIDs)
3✔
2437
}
3✔
2438

2439
// fetchChanInfos returns the set of channel edges that correspond to the passed
2440
// channel ID's. If an edge is the query is unknown to the database, it will
2441
// skipped and the result will contain only those edges that exist at the time
2442
// of the query. This can be used to respond to peer queries that are seeking to
2443
// fill in gaps in their view of the channel graph.
2444
//
2445
// NOTE: An optional transaction may be provided. If none is provided, then a
2446
// new one will be created.
2447
func (c *KVStore) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2448
        []ChannelEdge, error) {
3✔
2449
        // TODO(roasbeef): sort cids?
3✔
2450

3✔
2451
        var (
3✔
2452
                chanEdges []ChannelEdge
3✔
2453
                cidBytes  [8]byte
3✔
2454
        )
3✔
2455

3✔
2456
        fetchChanInfos := func(tx kvdb.RTx) error {
6✔
2457
                edges := tx.ReadBucket(edgeBucket)
3✔
2458
                if edges == nil {
3✔
2459
                        return ErrGraphNoEdgesFound
×
2460
                }
×
2461
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
2462
                if edgeIndex == nil {
3✔
2463
                        return ErrGraphNoEdgesFound
×
2464
                }
×
2465
                nodes := tx.ReadBucket(nodeBucket)
3✔
2466
                if nodes == nil {
3✔
2467
                        return ErrGraphNotFound
×
2468
                }
×
2469

2470
                for _, cid := range chanIDs {
6✔
2471
                        byteOrder.PutUint64(cidBytes[:], cid)
3✔
2472

3✔
2473
                        // First, we'll fetch the static edge information. If
3✔
2474
                        // the edge is unknown, we will skip the edge and
3✔
2475
                        // continue gathering all known edges.
3✔
2476
                        edgeInfo, err := fetchChanEdgeInfo(
3✔
2477
                                edgeIndex, cidBytes[:],
3✔
2478
                        )
3✔
2479
                        switch {
3✔
UNCOV
2480
                        case errors.Is(err, ErrEdgeNotFound):
×
UNCOV
2481
                                continue
×
2482
                        case err != nil:
×
2483
                                return err
×
2484
                        }
2485

2486
                        // With the static information obtained, we'll now
2487
                        // fetch the dynamic policy info.
2488
                        edge1, edge2, err := fetchChanEdgePolicies(
3✔
2489
                                edgeIndex, edges, cidBytes[:],
3✔
2490
                        )
3✔
2491
                        if err != nil {
3✔
2492
                                return err
×
2493
                        }
×
2494

2495
                        node1, err := fetchLightningNode(
3✔
2496
                                nodes, edgeInfo.NodeKey1Bytes[:],
3✔
2497
                        )
3✔
2498
                        if err != nil {
3✔
2499
                                return err
×
2500
                        }
×
2501

2502
                        node2, err := fetchLightningNode(
3✔
2503
                                nodes, edgeInfo.NodeKey2Bytes[:],
3✔
2504
                        )
3✔
2505
                        if err != nil {
3✔
2506
                                return err
×
2507
                        }
×
2508

2509
                        chanEdges = append(chanEdges, ChannelEdge{
3✔
2510
                                Info:    &edgeInfo,
3✔
2511
                                Policy1: edge1,
3✔
2512
                                Policy2: edge2,
3✔
2513
                                Node1:   &node1,
3✔
2514
                                Node2:   &node2,
3✔
2515
                        })
3✔
2516
                }
2517

2518
                return nil
3✔
2519
        }
2520

2521
        if tx == nil {
6✔
2522
                err := kvdb.View(c.db, fetchChanInfos, func() {
6✔
2523
                        chanEdges = nil
3✔
2524
                })
3✔
2525
                if err != nil {
3✔
2526
                        return nil, err
×
2527
                }
×
2528

2529
                return chanEdges, nil
3✔
2530
        }
2531

2532
        err := fetchChanInfos(tx)
×
2533
        if err != nil {
×
2534
                return nil, err
×
2535
        }
×
2536

2537
        return chanEdges, nil
×
2538
}
2539

2540
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2541
        edge1, edge2 *models.ChannelEdgePolicy) error {
3✔
2542

3✔
2543
        // First, we'll fetch the edge update index bucket which currently
3✔
2544
        // stores an entry for the channel we're about to delete.
3✔
2545
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
3✔
2546
        if updateIndex == nil {
3✔
2547
                // No edges in bucket, return early.
×
2548
                return nil
×
2549
        }
×
2550

2551
        // Now that we have the bucket, we'll attempt to construct a template
2552
        // for the index key: updateTime || chanid.
2553
        var indexKey [8 + 8]byte
3✔
2554
        byteOrder.PutUint64(indexKey[8:], chanID)
3✔
2555

3✔
2556
        // With the template constructed, we'll attempt to delete an entry that
3✔
2557
        // would have been created by both edges: we'll alternate the update
3✔
2558
        // times, as one may had overridden the other.
3✔
2559
        if edge1 != nil {
6✔
2560
                byteOrder.PutUint64(
3✔
2561
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
3✔
2562
                )
3✔
2563
                if err := updateIndex.Delete(indexKey[:]); err != nil {
3✔
2564
                        return err
×
2565
                }
×
2566
        }
2567

2568
        // We'll also attempt to delete the entry that may have been created by
2569
        // the second edge.
2570
        if edge2 != nil {
6✔
2571
                byteOrder.PutUint64(
3✔
2572
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
3✔
2573
                )
3✔
2574
                if err := updateIndex.Delete(indexKey[:]); err != nil {
3✔
2575
                        return err
×
2576
                }
×
2577
        }
2578

2579
        return nil
3✔
2580
}
2581

2582
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2583
// cache. It then goes on to delete any policy info and edge info for this
2584
// channel from the DB and finally, if isZombie is true, it will add an entry
2585
// for this channel in the zombie index.
2586
//
2587
// NOTE: this method MUST only be called if the cacheMu has already been
2588
// acquired.
2589
func (c *KVStore) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2590
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2591
        strictZombie bool) (*models.ChannelEdgeInfo, error) {
3✔
2592

3✔
2593
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3✔
2594
        if err != nil {
3✔
UNCOV
2595
                return nil, err
×
UNCOV
2596
        }
×
2597

2598
        // We'll also remove the entry in the edge update index bucket before
2599
        // we delete the edges themselves so we can access their last update
2600
        // times.
2601
        cid := byteOrder.Uint64(chanID)
3✔
2602
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
3✔
2603
        if err != nil {
3✔
2604
                return nil, err
×
2605
        }
×
2606
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
3✔
2607
        if err != nil {
3✔
2608
                return nil, err
×
2609
        }
×
2610

2611
        // The edge key is of the format pubKey || chanID. First we construct
2612
        // the latter half, populating the channel ID.
2613
        var edgeKey [33 + 8]byte
3✔
2614
        copy(edgeKey[33:], chanID)
3✔
2615

3✔
2616
        // With the latter half constructed, copy over the first public key to
3✔
2617
        // delete the edge in this direction, then the second to delete the
3✔
2618
        // edge in the opposite direction.
3✔
2619
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
3✔
2620
        if edges.Get(edgeKey[:]) != nil {
6✔
2621
                if err := edges.Delete(edgeKey[:]); err != nil {
3✔
2622
                        return nil, err
×
2623
                }
×
2624
        }
2625
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
3✔
2626
        if edges.Get(edgeKey[:]) != nil {
6✔
2627
                if err := edges.Delete(edgeKey[:]); err != nil {
3✔
2628
                        return nil, err
×
2629
                }
×
2630
        }
2631

2632
        // As part of deleting the edge we also remove all disabled entries
2633
        // from the edgePolicyDisabledIndex bucket. We do that for both
2634
        // directions.
2635
        err = updateEdgePolicyDisabledIndex(edges, cid, false, false)
3✔
2636
        if err != nil {
3✔
2637
                return nil, err
×
2638
        }
×
2639
        err = updateEdgePolicyDisabledIndex(edges, cid, true, false)
3✔
2640
        if err != nil {
3✔
2641
                return nil, err
×
2642
        }
×
2643

2644
        // With the edge data deleted, we can purge the information from the two
2645
        // edge indexes.
2646
        if err := edgeIndex.Delete(chanID); err != nil {
3✔
2647
                return nil, err
×
2648
        }
×
2649
        var b bytes.Buffer
3✔
2650
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
3✔
2651
                return nil, err
×
2652
        }
×
2653
        if err := chanIndex.Delete(b.Bytes()); err != nil {
3✔
2654
                return nil, err
×
2655
        }
×
2656

2657
        // Finally, we'll mark the edge as a zombie within our index if it's
2658
        // being removed due to the channel becoming a zombie. We do this to
2659
        // ensure we don't store unnecessary data for spent channels.
2660
        if !isZombie {
6✔
2661
                return &edgeInfo, nil
3✔
2662
        }
3✔
2663

2664
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
3✔
2665
        if strictZombie {
3✔
UNCOV
2666
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
×
UNCOV
2667
        }
×
2668

2669
        return &edgeInfo, markEdgeZombie(
3✔
2670
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
3✔
2671
        )
3✔
2672
}
2673

2674
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2675
// particular pair of channel policies. The return values are one of:
2676
//  1. (pubkey1, pubkey2)
2677
//  2. (pubkey1, blank)
2678
//  3. (blank, pubkey2)
2679
//
2680
// A blank pubkey means that corresponding node will be unable to resurrect a
2681
// channel on its own. For example, node1 may continue to publish recent
2682
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2683
// we don't want another fresh update from node1 to resurrect, as the edge can
2684
// only become live once node2 finally sends something recent.
2685
//
2686
// In the case where we have neither update, we allow either party to resurrect
2687
// the channel. If the channel were to be marked zombie again, it would be
2688
// marked with the correct lagging channel since we received an update from only
2689
// one side.
2690
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
UNCOV
2691
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
×
UNCOV
2692

×
UNCOV
2693
        switch {
×
2694
        // If we don't have either edge policy, we'll return both pubkeys so
2695
        // that the channel can be resurrected by either party.
UNCOV
2696
        case e1 == nil && e2 == nil:
×
UNCOV
2697
                return info.NodeKey1Bytes, info.NodeKey2Bytes
×
2698

2699
        // If we're missing edge1, or if both edges are present but edge1 is
2700
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2701
        // means that only an update from edge1 will be able to resurrect the
2702
        // channel.
UNCOV
2703
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
×
UNCOV
2704
                return info.NodeKey1Bytes, [33]byte{}
×
2705

2706
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2707
        // return a blank pubkey for edge1. In this case, only an update from
2708
        // edge2 can resurect the channel.
UNCOV
2709
        default:
×
UNCOV
2710
                return [33]byte{}, info.NodeKey2Bytes
×
2711
        }
2712
}
2713

2714
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2715
// within the database for the referenced channel. The `flags` attribute within
2716
// the ChannelEdgePolicy determines which of the directed edges are being
2717
// updated. If the flag is 1, then the first node's information is being
2718
// updated, otherwise it's the second node's information. The node ordering is
2719
// determined by the lexicographical ordering of the identity public keys of the
2720
// nodes on either side of the channel.
2721
func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2722
        op ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
3✔
2723

3✔
2724
        var (
3✔
2725
                isUpdate1    bool
3✔
2726
                edgeNotFound bool
3✔
2727
                from, to     route.Vertex
3✔
2728
        )
3✔
2729

3✔
2730
        r := &batch.Request{
3✔
2731
                Reset: func() {
6✔
2732
                        isUpdate1 = false
3✔
2733
                        edgeNotFound = false
3✔
2734
                },
3✔
2735
                Update: func(tx kvdb.RwTx) error {
3✔
2736
                        var err error
3✔
2737
                        from, to, isUpdate1, err = updateEdgePolicy(tx, edge)
3✔
2738
                        if err != nil {
3✔
UNCOV
2739
                                log.Errorf("UpdateEdgePolicy faild: %v", err)
×
UNCOV
2740
                        }
×
2741

2742
                        // Silence ErrEdgeNotFound so that the batch can
2743
                        // succeed, but propagate the error via local state.
2744
                        if errors.Is(err, ErrEdgeNotFound) {
3✔
UNCOV
2745
                                edgeNotFound = true
×
UNCOV
2746
                                return nil
×
UNCOV
2747
                        }
×
2748

2749
                        return err
3✔
2750
                },
2751
                OnCommit: func(err error) error {
3✔
2752
                        switch {
3✔
2753
                        case err != nil:
×
2754
                                return err
×
UNCOV
2755
                        case edgeNotFound:
×
UNCOV
2756
                                return ErrEdgeNotFound
×
2757
                        default:
3✔
2758
                                c.updateEdgeCache(edge, isUpdate1)
3✔
2759
                                return nil
3✔
2760
                        }
2761
                },
2762
        }
2763

2764
        for _, f := range op {
6✔
2765
                f(r)
3✔
2766
        }
3✔
2767

2768
        err := c.chanScheduler.Execute(r)
3✔
2769

3✔
2770
        return from, to, err
3✔
2771
}
2772

2773
func (c *KVStore) updateEdgeCache(e *models.ChannelEdgePolicy,
2774
        isUpdate1 bool) {
3✔
2775

3✔
2776
        // If an entry for this channel is found in reject cache, we'll modify
3✔
2777
        // the entry with the updated timestamp for the direction that was just
3✔
2778
        // written. If the edge doesn't exist, we'll load the cache entry lazily
3✔
2779
        // during the next query for this edge.
3✔
2780
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
6✔
2781
                if isUpdate1 {
6✔
2782
                        entry.upd1Time = e.LastUpdate.Unix()
3✔
2783
                } else {
6✔
2784
                        entry.upd2Time = e.LastUpdate.Unix()
3✔
2785
                }
3✔
2786
                c.rejectCache.insert(e.ChannelID, entry)
3✔
2787
        }
2788

2789
        // If an entry for this channel is found in channel cache, we'll modify
2790
        // the entry with the updated policy for the direction that was just
2791
        // written. If the edge doesn't exist, we'll defer loading the info and
2792
        // policies and lazily read from disk during the next query.
2793
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
6✔
2794
                if isUpdate1 {
6✔
2795
                        channel.Policy1 = e
3✔
2796
                } else {
6✔
2797
                        channel.Policy2 = e
3✔
2798
                }
3✔
2799
                c.chanCache.insert(e.ChannelID, channel)
3✔
2800
        }
2801
}
2802

2803
// updateEdgePolicy attempts to update an edge's policy within the relevant
2804
// buckets using an existing database transaction. The returned boolean will be
2805
// true if the updated policy belongs to node1, and false if the policy belonged
2806
// to node2.
2807
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy) (
2808
        route.Vertex, route.Vertex, bool, error) {
3✔
2809

3✔
2810
        var noVertex route.Vertex
3✔
2811

3✔
2812
        edges := tx.ReadWriteBucket(edgeBucket)
3✔
2813
        if edges == nil {
3✔
2814
                return noVertex, noVertex, false, ErrEdgeNotFound
×
2815
        }
×
2816
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
3✔
2817
        if edgeIndex == nil {
3✔
2818
                return noVertex, noVertex, false, ErrEdgeNotFound
×
2819
        }
×
2820

2821
        // Create the channelID key be converting the channel ID
2822
        // integer into a byte slice.
2823
        var chanID [8]byte
3✔
2824
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
3✔
2825

3✔
2826
        // With the channel ID, we then fetch the value storing the two
3✔
2827
        // nodes which connect this channel edge.
3✔
2828
        nodeInfo := edgeIndex.Get(chanID[:])
3✔
2829
        if nodeInfo == nil {
3✔
UNCOV
2830
                return noVertex, noVertex, false, ErrEdgeNotFound
×
UNCOV
2831
        }
×
2832

2833
        // Depending on the flags value passed above, either the first
2834
        // or second edge policy is being updated.
2835
        var fromNode, toNode []byte
3✔
2836
        var isUpdate1 bool
3✔
2837
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
6✔
2838
                fromNode = nodeInfo[:33]
3✔
2839
                toNode = nodeInfo[33:66]
3✔
2840
                isUpdate1 = true
3✔
2841
        } else {
6✔
2842
                fromNode = nodeInfo[33:66]
3✔
2843
                toNode = nodeInfo[:33]
3✔
2844
                isUpdate1 = false
3✔
2845
        }
3✔
2846

2847
        // Finally, with the direction of the edge being updated
2848
        // identified, we update the on-disk edge representation.
2849
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
3✔
2850
        if err != nil {
3✔
2851
                return noVertex, noVertex, false, err
×
2852
        }
×
2853

2854
        var (
3✔
2855
                fromNodePubKey route.Vertex
3✔
2856
                toNodePubKey   route.Vertex
3✔
2857
        )
3✔
2858
        copy(fromNodePubKey[:], fromNode)
3✔
2859
        copy(toNodePubKey[:], toNode)
3✔
2860

3✔
2861
        return fromNodePubKey, toNodePubKey, isUpdate1, nil
3✔
2862
}
2863

2864
// isPublic determines whether the node is seen as public within the graph from
2865
// the source node's point of view. An existing database transaction can also be
2866
// specified.
2867
func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2868
        sourcePubKey []byte) (bool, error) {
3✔
2869

3✔
2870
        // In order to determine whether this node is publicly advertised within
3✔
2871
        // the graph, we'll need to look at all of its edges and check whether
3✔
2872
        // they extend to any other node than the source node. errDone will be
3✔
2873
        // used to terminate the check early.
3✔
2874
        nodeIsPublic := false
3✔
2875
        errDone := errors.New("done")
3✔
2876
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
3✔
2877
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
3✔
2878
                _ *models.ChannelEdgePolicy) error {
6✔
2879

3✔
2880
                // If this edge doesn't extend to the source node, we'll
3✔
2881
                // terminate our search as we can now conclude that the node is
3✔
2882
                // publicly advertised within the graph due to the local node
3✔
2883
                // knowing of the current edge.
3✔
2884
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
3✔
2885
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
6✔
2886

3✔
2887
                        nodeIsPublic = true
3✔
2888
                        return errDone
3✔
2889
                }
3✔
2890

2891
                // Since the edge _does_ extend to the source node, we'll also
2892
                // need to ensure that this is a public edge.
2893
                if info.AuthProof != nil {
6✔
2894
                        nodeIsPublic = true
3✔
2895
                        return errDone
3✔
2896
                }
3✔
2897

2898
                // Otherwise, we'll continue our search.
2899
                return nil
3✔
2900
        })
2901
        if err != nil && !errors.Is(err, errDone) {
3✔
2902
                return false, err
×
2903
        }
×
2904

2905
        return nodeIsPublic, nil
3✔
2906
}
2907

2908
// FetchLightningNodeTx attempts to look up a target node by its identity
2909
// public key. If the node isn't found in the database, then
2910
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
2911
// If none is provided, then a new one will be created.
2912
func (c *KVStore) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
2913
        *models.LightningNode, error) {
3✔
2914

3✔
2915
        return c.fetchLightningNode(tx, nodePub)
3✔
2916
}
3✔
2917

2918
// FetchLightningNode attempts to look up a target node by its identity public
2919
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2920
// returned.
2921
func (c *KVStore) FetchLightningNode(nodePub route.Vertex) (
2922
        *models.LightningNode, error) {
3✔
2923

3✔
2924
        return c.fetchLightningNode(nil, nodePub)
3✔
2925
}
3✔
2926

2927
// fetchLightningNode attempts to look up a target node by its identity public
2928
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2929
// returned. An optional transaction may be provided. If none is provided, then
2930
// a new one will be created.
2931
func (c *KVStore) fetchLightningNode(tx kvdb.RTx,
2932
        nodePub route.Vertex) (*models.LightningNode, error) {
3✔
2933

3✔
2934
        var node *models.LightningNode
3✔
2935
        fetch := func(tx kvdb.RTx) error {
6✔
2936
                // First grab the nodes bucket which stores the mapping from
3✔
2937
                // pubKey to node information.
3✔
2938
                nodes := tx.ReadBucket(nodeBucket)
3✔
2939
                if nodes == nil {
3✔
2940
                        return ErrGraphNotFound
×
2941
                }
×
2942

2943
                // If a key for this serialized public key isn't found, then
2944
                // the target node doesn't exist within the database.
2945
                nodeBytes := nodes.Get(nodePub[:])
3✔
2946
                if nodeBytes == nil {
6✔
2947
                        return ErrGraphNodeNotFound
3✔
2948
                }
3✔
2949

2950
                // If the node is found, then we can de deserialize the node
2951
                // information to return to the user.
2952
                nodeReader := bytes.NewReader(nodeBytes)
3✔
2953
                n, err := deserializeLightningNode(nodeReader)
3✔
2954
                if err != nil {
3✔
2955
                        return err
×
2956
                }
×
2957

2958
                node = &n
3✔
2959

3✔
2960
                return nil
3✔
2961
        }
2962

2963
        if tx == nil {
6✔
2964
                err := kvdb.View(
3✔
2965
                        c.db, fetch, func() {
6✔
2966
                                node = nil
3✔
2967
                        },
3✔
2968
                )
2969
                if err != nil {
6✔
2970
                        return nil, err
3✔
2971
                }
3✔
2972

2973
                return node, nil
3✔
2974
        }
2975

UNCOV
2976
        err := fetch(tx)
×
UNCOV
2977
        if err != nil {
×
UNCOV
2978
                return nil, err
×
UNCOV
2979
        }
×
2980

UNCOV
2981
        return node, nil
×
2982
}
2983

2984
// HasLightningNode determines if the graph has a vertex identified by the
2985
// target node identity public key. If the node exists in the database, a
2986
// timestamp of when the data for the node was lasted updated is returned along
2987
// with a true boolean. Otherwise, an empty time.Time is returned with a false
2988
// boolean.
2989
func (c *KVStore) HasLightningNode(nodePub [33]byte) (time.Time, bool,
2990
        error) {
3✔
2991

3✔
2992
        var (
3✔
2993
                updateTime time.Time
3✔
2994
                exists     bool
3✔
2995
        )
3✔
2996

3✔
2997
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
2998
                // First grab the nodes bucket which stores the mapping from
3✔
2999
                // pubKey to node information.
3✔
3000
                nodes := tx.ReadBucket(nodeBucket)
3✔
3001
                if nodes == nil {
3✔
3002
                        return ErrGraphNotFound
×
3003
                }
×
3004

3005
                // If a key for this serialized public key isn't found, we can
3006
                // exit early.
3007
                nodeBytes := nodes.Get(nodePub[:])
3✔
3008
                if nodeBytes == nil {
6✔
3009
                        exists = false
3✔
3010
                        return nil
3✔
3011
                }
3✔
3012

3013
                // Otherwise we continue on to obtain the time stamp
3014
                // representing the last time the data for this node was
3015
                // updated.
3016
                nodeReader := bytes.NewReader(nodeBytes)
3✔
3017
                node, err := deserializeLightningNode(nodeReader)
3✔
3018
                if err != nil {
3✔
3019
                        return err
×
3020
                }
×
3021

3022
                exists = true
3✔
3023
                updateTime = node.LastUpdate
3✔
3024

3✔
3025
                return nil
3✔
3026
        }, func() {
3✔
3027
                updateTime = time.Time{}
3✔
3028
                exists = false
3✔
3029
        })
3✔
3030
        if err != nil {
3✔
3031
                return time.Time{}, exists, err
×
3032
        }
×
3033

3034
        return updateTime, exists, nil
3✔
3035
}
3036

3037
// nodeTraversal is used to traverse all channels of a node given by its
3038
// public key and passes channel information into the specified callback.
3039
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3040
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3041
                *models.ChannelEdgePolicy) error) error {
3✔
3042

3✔
3043
        traversal := func(tx kvdb.RTx) error {
6✔
3044
                edges := tx.ReadBucket(edgeBucket)
3✔
3045
                if edges == nil {
3✔
3046
                        return ErrGraphNotFound
×
3047
                }
×
3048
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
3049
                if edgeIndex == nil {
3✔
3050
                        return ErrGraphNoEdgesFound
×
3051
                }
×
3052

3053
                // In order to reach all the edges for this node, we take
3054
                // advantage of the construction of the key-space within the
3055
                // edge bucket. The keys are stored in the form: pubKey ||
3056
                // chanID. Therefore, starting from a chanID of zero, we can
3057
                // scan forward in the bucket, grabbing all the edges for the
3058
                // node. Once the prefix no longer matches, then we know we're
3059
                // done.
3060
                var nodeStart [33 + 8]byte
3✔
3061
                copy(nodeStart[:], nodePub)
3✔
3062
                copy(nodeStart[33:], chanStart[:])
3✔
3063

3✔
3064
                // Starting from the key pubKey || 0, we seek forward in the
3✔
3065
                // bucket until the retrieved key no longer has the public key
3✔
3066
                // as its prefix. This indicates that we've stepped over into
3✔
3067
                // another node's edges, so we can terminate our scan.
3✔
3068
                edgeCursor := edges.ReadCursor()
3✔
3069
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
6✔
3070
                        // If the prefix still matches, the channel id is
3✔
3071
                        // returned in nodeEdge. Channel id is used to lookup
3✔
3072
                        // the node at the other end of the channel and both
3✔
3073
                        // edge policies.
3✔
3074
                        chanID := nodeEdge[33:]
3✔
3075
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3✔
3076
                        if err != nil {
3✔
3077
                                return err
×
3078
                        }
×
3079

3080
                        outgoingPolicy, err := fetchChanEdgePolicy(
3✔
3081
                                edges, chanID, nodePub,
3✔
3082
                        )
3✔
3083
                        if err != nil {
3✔
3084
                                return err
×
3085
                        }
×
3086

3087
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3✔
3088
                        if err != nil {
3✔
3089
                                return err
×
3090
                        }
×
3091

3092
                        incomingPolicy, err := fetchChanEdgePolicy(
3✔
3093
                                edges, chanID, otherNode[:],
3✔
3094
                        )
3✔
3095
                        if err != nil {
3✔
3096
                                return err
×
3097
                        }
×
3098

3099
                        // Finally, we execute the callback.
3100
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3✔
3101
                        if err != nil {
6✔
3102
                                return err
3✔
3103
                        }
3✔
3104
                }
3105

3106
                return nil
3✔
3107
        }
3108

3109
        // If no transaction was provided, then we'll create a new transaction
3110
        // to execute the transaction within.
3111
        if tx == nil {
6✔
3112
                return kvdb.View(db, traversal, func() {})
6✔
3113
        }
3114

3115
        // Otherwise, we re-use the existing transaction to execute the graph
3116
        // traversal.
3117
        return traversal(tx)
3✔
3118
}
3119

3120
// ForEachNodeChannel iterates through all channels of the given node,
3121
// executing the passed callback with an edge info structure and the policies
3122
// of each end of the channel. The first edge policy is the outgoing edge *to*
3123
// the connecting node, while the second is the incoming edge *from* the
3124
// connecting node. If the callback returns an error, then the iteration is
3125
// halted with the error propagated back up to the caller.
3126
//
3127
// Unknown policies are passed into the callback as nil values.
3128
func (c *KVStore) ForEachNodeChannel(nodePub route.Vertex,
3129
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3130
                *models.ChannelEdgePolicy) error) error {
3✔
3131

3✔
3132
        return nodeTraversal(nil, nodePub[:], c.db, cb)
3✔
3133
}
3✔
3134

3135
// ForEachNodeChannelTx iterates through all channels of the given node,
3136
// executing the passed callback with an edge info structure and the policies
3137
// of each end of the channel. The first edge policy is the outgoing edge *to*
3138
// the connecting node, while the second is the incoming edge *from* the
3139
// connecting node. If the callback returns an error, then the iteration is
3140
// halted with the error propagated back up to the caller.
3141
//
3142
// Unknown policies are passed into the callback as nil values.
3143
//
3144
// If the caller wishes to re-use an existing boltdb transaction, then it
3145
// should be passed as the first argument.  Otherwise, the first argument should
3146
// be nil and a fresh transaction will be created to execute the graph
3147
// traversal.
3148
func (c *KVStore) ForEachNodeChannelTx(tx kvdb.RTx,
3149
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3150
                *models.ChannelEdgePolicy,
3151
                *models.ChannelEdgePolicy) error) error {
3✔
3152

3✔
3153
        return nodeTraversal(tx, nodePub[:], c.db, cb)
3✔
3154
}
3✔
3155

3156
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3157
// the target node in the channel. This is useful when one knows the pubkey of
3158
// one of the nodes, and wishes to obtain the full LightningNode for the other
3159
// end of the channel.
3160
func (c *KVStore) FetchOtherNode(tx kvdb.RTx,
3161
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
3162
        *models.LightningNode, error) {
3✔
3163

3✔
3164
        // Ensure that the node passed in is actually a member of the channel.
3✔
3165
        var targetNodeBytes [33]byte
3✔
3166
        switch {
3✔
3167
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
3✔
3168
                targetNodeBytes = channel.NodeKey2Bytes
3✔
3169
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
3✔
3170
                targetNodeBytes = channel.NodeKey1Bytes
3✔
3171
        default:
×
3172
                return nil, fmt.Errorf("node not participating in this channel")
×
3173
        }
3174

3175
        var targetNode *models.LightningNode
3✔
3176
        fetchNodeFunc := func(tx kvdb.RTx) error {
6✔
3177
                // First grab the nodes bucket which stores the mapping from
3✔
3178
                // pubKey to node information.
3✔
3179
                nodes := tx.ReadBucket(nodeBucket)
3✔
3180
                if nodes == nil {
3✔
3181
                        return ErrGraphNotFound
×
3182
                }
×
3183

3184
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
3✔
3185
                if err != nil {
3✔
3186
                        return err
×
3187
                }
×
3188

3189
                targetNode = &node
3✔
3190

3✔
3191
                return nil
3✔
3192
        }
3193

3194
        // If the transaction is nil, then we'll need to create a new one,
3195
        // otherwise we can use the existing db transaction.
3196
        var err error
3✔
3197
        if tx == nil {
3✔
3198
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3199
                        targetNode = nil
×
3200
                })
×
3201
        } else {
3✔
3202
                err = fetchNodeFunc(tx)
3✔
3203
        }
3✔
3204

3205
        return targetNode, err
3✔
3206
}
3207

3208
// computeEdgePolicyKeys is a helper function that can be used to compute the
3209
// keys used to index the channel edge policy info for the two nodes of the
3210
// edge. The keys for node 1 and node 2 are returned respectively.
3211
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
3✔
3212
        var (
3✔
3213
                node1Key [33 + 8]byte
3✔
3214
                node2Key [33 + 8]byte
3✔
3215
        )
3✔
3216

3✔
3217
        copy(node1Key[:], info.NodeKey1Bytes[:])
3✔
3218
        copy(node2Key[:], info.NodeKey2Bytes[:])
3✔
3219

3✔
3220
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
3✔
3221
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
3✔
3222

3✔
3223
        return node1Key[:], node2Key[:]
3✔
3224
}
3✔
3225

3226
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3227
// the channel identified by the funding outpoint. If the channel can't be
3228
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3229
// information for the channel itself is returned as well as two structs that
3230
// contain the routing policies for the channel in either direction.
3231
func (c *KVStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3232
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3233
        *models.ChannelEdgePolicy, error) {
3✔
3234

3✔
3235
        var (
3✔
3236
                edgeInfo *models.ChannelEdgeInfo
3✔
3237
                policy1  *models.ChannelEdgePolicy
3✔
3238
                policy2  *models.ChannelEdgePolicy
3✔
3239
        )
3✔
3240

3✔
3241
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
3242
                // First, grab the node bucket. This will be used to populate
3✔
3243
                // the Node pointers in each edge read from disk.
3✔
3244
                nodes := tx.ReadBucket(nodeBucket)
3✔
3245
                if nodes == nil {
3✔
3246
                        return ErrGraphNotFound
×
3247
                }
×
3248

3249
                // Next, grab the edge bucket which stores the edges, and also
3250
                // the index itself so we can group the directed edges together
3251
                // logically.
3252
                edges := tx.ReadBucket(edgeBucket)
3✔
3253
                if edges == nil {
3✔
3254
                        return ErrGraphNoEdgesFound
×
3255
                }
×
3256
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
3257
                if edgeIndex == nil {
3✔
3258
                        return ErrGraphNoEdgesFound
×
3259
                }
×
3260

3261
                // If the channel's outpoint doesn't exist within the outpoint
3262
                // index, then the edge does not exist.
3263
                chanIndex := edges.NestedReadBucket(channelPointBucket)
3✔
3264
                if chanIndex == nil {
3✔
3265
                        return ErrGraphNoEdgesFound
×
3266
                }
×
3267
                var b bytes.Buffer
3✔
3268
                if err := WriteOutpoint(&b, op); err != nil {
3✔
3269
                        return err
×
3270
                }
×
3271
                chanID := chanIndex.Get(b.Bytes())
3✔
3272
                if chanID == nil {
6✔
3273
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
3✔
3274
                }
3✔
3275

3276
                // If the channel is found to exists, then we'll first retrieve
3277
                // the general information for the channel.
3278
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
3✔
3279
                if err != nil {
3✔
3280
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3281
                }
×
3282
                edgeInfo = &edge
3✔
3283

3✔
3284
                // Once we have the information about the channels' parameters,
3✔
3285
                // we'll fetch the routing policies for each for the directed
3✔
3286
                // edges.
3✔
3287
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
3✔
3288
                if err != nil {
3✔
3289
                        return fmt.Errorf("failed to find policy: %w", err)
×
3290
                }
×
3291

3292
                policy1 = e1
3✔
3293
                policy2 = e2
3✔
3294

3✔
3295
                return nil
3✔
3296
        }, func() {
3✔
3297
                edgeInfo = nil
3✔
3298
                policy1 = nil
3✔
3299
                policy2 = nil
3✔
3300
        })
3✔
3301
        if err != nil {
6✔
3302
                return nil, nil, nil, err
3✔
3303
        }
3✔
3304

3305
        return edgeInfo, policy1, policy2, nil
3✔
3306
}
3307

3308
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3309
// channel identified by the channel ID. If the channel can't be found, then
3310
// ErrEdgeNotFound is returned. A struct which houses the general information
3311
// for the channel itself is returned as well as two structs that contain the
3312
// routing policies for the channel in either direction.
3313
//
3314
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3315
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3316
// the ChannelEdgeInfo will only include the public keys of each node.
3317
func (c *KVStore) FetchChannelEdgesByID(chanID uint64) (
3318
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3319
        *models.ChannelEdgePolicy, error) {
3✔
3320

3✔
3321
        var (
3✔
3322
                edgeInfo  *models.ChannelEdgeInfo
3✔
3323
                policy1   *models.ChannelEdgePolicy
3✔
3324
                policy2   *models.ChannelEdgePolicy
3✔
3325
                channelID [8]byte
3✔
3326
        )
3✔
3327

3✔
3328
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
3329
                // First, grab the node bucket. This will be used to populate
3✔
3330
                // the Node pointers in each edge read from disk.
3✔
3331
                nodes := tx.ReadBucket(nodeBucket)
3✔
3332
                if nodes == nil {
3✔
3333
                        return ErrGraphNotFound
×
3334
                }
×
3335

3336
                // Next, grab the edge bucket which stores the edges, and also
3337
                // the index itself so we can group the directed edges together
3338
                // logically.
3339
                edges := tx.ReadBucket(edgeBucket)
3✔
3340
                if edges == nil {
3✔
3341
                        return ErrGraphNoEdgesFound
×
3342
                }
×
3343
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
3344
                if edgeIndex == nil {
3✔
3345
                        return ErrGraphNoEdgesFound
×
3346
                }
×
3347

3348
                byteOrder.PutUint64(channelID[:], chanID)
3✔
3349

3✔
3350
                // Now, attempt to fetch edge.
3✔
3351
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
3✔
3352

3✔
3353
                // If it doesn't exist, we'll quickly check our zombie index to
3✔
3354
                // see if we've previously marked it as so.
3✔
3355
                if errors.Is(err, ErrEdgeNotFound) {
6✔
3356
                        // If the zombie index doesn't exist, or the edge is not
3✔
3357
                        // marked as a zombie within it, then we'll return the
3✔
3358
                        // original ErrEdgeNotFound error.
3✔
3359
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
3✔
3360
                        if zombieIndex == nil {
3✔
3361
                                return ErrEdgeNotFound
×
3362
                        }
×
3363

3364
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
3✔
3365
                                zombieIndex, chanID,
3✔
3366
                        )
3✔
3367
                        if !isZombie {
6✔
3368
                                return ErrEdgeNotFound
3✔
3369
                        }
3✔
3370

3371
                        // Otherwise, the edge is marked as a zombie, so we'll
3372
                        // populate the edge info with the public keys of each
3373
                        // party as this is the only information we have about
3374
                        // it and return an error signaling so.
3375
                        edgeInfo = &models.ChannelEdgeInfo{
3✔
3376
                                NodeKey1Bytes: pubKey1,
3✔
3377
                                NodeKey2Bytes: pubKey2,
3✔
3378
                        }
3✔
3379

3✔
3380
                        return ErrZombieEdge
3✔
3381
                }
3382

3383
                // Otherwise, we'll just return the error if any.
3384
                if err != nil {
3✔
3385
                        return err
×
3386
                }
×
3387

3388
                edgeInfo = &edge
3✔
3389

3✔
3390
                // Then we'll attempt to fetch the accompanying policies of this
3✔
3391
                // edge.
3✔
3392
                e1, e2, err := fetchChanEdgePolicies(
3✔
3393
                        edgeIndex, edges, channelID[:],
3✔
3394
                )
3✔
3395
                if err != nil {
3✔
3396
                        return err
×
3397
                }
×
3398

3399
                policy1 = e1
3✔
3400
                policy2 = e2
3✔
3401

3✔
3402
                return nil
3✔
3403
        }, func() {
3✔
3404
                edgeInfo = nil
3✔
3405
                policy1 = nil
3✔
3406
                policy2 = nil
3✔
3407
        })
3✔
3408
        if errors.Is(err, ErrZombieEdge) {
6✔
3409
                return edgeInfo, nil, nil, err
3✔
3410
        }
3✔
3411
        if err != nil {
6✔
3412
                return nil, nil, nil, err
3✔
3413
        }
3✔
3414

3415
        return edgeInfo, policy1, policy2, nil
3✔
3416
}
3417

3418
// IsPublicNode is a helper method that determines whether the node with the
3419
// given public key is seen as a public node in the graph from the graph's
3420
// source node's point of view.
3421
func (c *KVStore) IsPublicNode(pubKey [33]byte) (bool, error) {
3✔
3422
        var nodeIsPublic bool
3✔
3423
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
3424
                nodes := tx.ReadBucket(nodeBucket)
3✔
3425
                if nodes == nil {
3✔
3426
                        return ErrGraphNodesNotFound
×
3427
                }
×
3428
                ourPubKey := nodes.Get(sourceKey)
3✔
3429
                if ourPubKey == nil {
3✔
3430
                        return ErrSourceNodeNotSet
×
3431
                }
×
3432
                node, err := fetchLightningNode(nodes, pubKey[:])
3✔
3433
                if err != nil {
3✔
3434
                        return err
×
3435
                }
×
3436

3437
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
3✔
3438

3✔
3439
                return err
3✔
3440
        }, func() {
3✔
3441
                nodeIsPublic = false
3✔
3442
        })
3✔
3443
        if err != nil {
3✔
3444
                return false, err
×
3445
        }
×
3446

3447
        return nodeIsPublic, nil
3✔
3448
}
3449

3450
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3451
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
3✔
3452
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
3✔
3453
        if err != nil {
3✔
3454
                return nil, err
×
3455
        }
×
3456

3457
        // With the witness script generated, we'll now turn it into a p2wsh
3458
        // script:
3459
        //  * OP_0 <sha256(script)>
3460
        bldr := txscript.NewScriptBuilder(
3✔
3461
                txscript.WithScriptAllocSize(input.P2WSHSize),
3✔
3462
        )
3✔
3463
        bldr.AddOp(txscript.OP_0)
3✔
3464
        scriptHash := sha256.Sum256(witnessScript)
3✔
3465
        bldr.AddData(scriptHash[:])
3✔
3466

3✔
3467
        return bldr.Script()
3✔
3468
}
3469

3470
// EdgePoint couples the outpoint of a channel with the funding script that it
3471
// creates. The FilteredChainView will use this to watch for spends of this
3472
// edge point on chain. We require both of these values as depending on the
3473
// concrete implementation, either the pkScript, or the out point will be used.
3474
type EdgePoint struct {
3475
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3476
        FundingPkScript []byte
3477

3478
        // OutPoint is the outpoint of the target channel.
3479
        OutPoint wire.OutPoint
3480
}
3481

3482
// String returns a human readable version of the target EdgePoint. We return
3483
// the outpoint directly as it is enough to uniquely identify the edge point.
3484
func (e *EdgePoint) String() string {
×
3485
        return e.OutPoint.String()
×
3486
}
×
3487

3488
// ChannelView returns the verifiable edge information for each active channel
3489
// within the known channel graph. The set of UTXO's (along with their scripts)
3490
// returned are the ones that need to be watched on chain to detect channel
3491
// closes on the resident blockchain.
3492
func (c *KVStore) ChannelView() ([]EdgePoint, error) {
3✔
3493
        var edgePoints []EdgePoint
3✔
3494
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
3495
                // We're going to iterate over the entire channel index, so
3✔
3496
                // we'll need to fetch the edgeBucket to get to the index as
3✔
3497
                // it's a sub-bucket.
3✔
3498
                edges := tx.ReadBucket(edgeBucket)
3✔
3499
                if edges == nil {
3✔
3500
                        return ErrGraphNoEdgesFound
×
3501
                }
×
3502
                chanIndex := edges.NestedReadBucket(channelPointBucket)
3✔
3503
                if chanIndex == nil {
3✔
3504
                        return ErrGraphNoEdgesFound
×
3505
                }
×
3506
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
3507
                if edgeIndex == nil {
3✔
3508
                        return ErrGraphNoEdgesFound
×
3509
                }
×
3510

3511
                // Once we have the proper bucket, we'll range over each key
3512
                // (which is the channel point for the channel) and decode it,
3513
                // accumulating each entry.
3514
                return chanIndex.ForEach(
3✔
3515
                        func(chanPointBytes, chanID []byte) error {
6✔
3516
                                chanPointReader := bytes.NewReader(
3✔
3517
                                        chanPointBytes,
3✔
3518
                                )
3✔
3519

3✔
3520
                                var chanPoint wire.OutPoint
3✔
3521
                                err := ReadOutpoint(chanPointReader, &chanPoint)
3✔
3522
                                if err != nil {
3✔
3523
                                        return err
×
3524
                                }
×
3525

3526
                                edgeInfo, err := fetchChanEdgeInfo(
3✔
3527
                                        edgeIndex, chanID,
3✔
3528
                                )
3✔
3529
                                if err != nil {
3✔
3530
                                        return err
×
3531
                                }
×
3532

3533
                                pkScript, err := genMultiSigP2WSH(
3✔
3534
                                        edgeInfo.BitcoinKey1Bytes[:],
3✔
3535
                                        edgeInfo.BitcoinKey2Bytes[:],
3✔
3536
                                )
3✔
3537
                                if err != nil {
3✔
3538
                                        return err
×
3539
                                }
×
3540

3541
                                edgePoints = append(edgePoints, EdgePoint{
3✔
3542
                                        FundingPkScript: pkScript,
3✔
3543
                                        OutPoint:        chanPoint,
3✔
3544
                                })
3✔
3545

3✔
3546
                                return nil
3✔
3547
                        },
3548
                )
3549
        }, func() {
3✔
3550
                edgePoints = nil
3✔
3551
        }); err != nil {
3✔
3552
                return nil, err
×
3553
        }
×
3554

3555
        return edgePoints, nil
3✔
3556
}
3557

3558
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3559
// zombie. This method is used on an ad-hoc basis, when channels need to be
3560
// marked as zombies outside the normal pruning cycle.
3561
func (c *KVStore) MarkEdgeZombie(chanID uint64,
UNCOV
3562
        pubKey1, pubKey2 [33]byte) error {
×
UNCOV
3563

×
UNCOV
3564
        c.cacheMu.Lock()
×
UNCOV
3565
        defer c.cacheMu.Unlock()
×
UNCOV
3566

×
UNCOV
3567
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
×
UNCOV
3568
                edges := tx.ReadWriteBucket(edgeBucket)
×
UNCOV
3569
                if edges == nil {
×
3570
                        return ErrGraphNoEdgesFound
×
3571
                }
×
UNCOV
3572
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
×
UNCOV
3573
                if err != nil {
×
3574
                        return fmt.Errorf("unable to create zombie "+
×
3575
                                "bucket: %w", err)
×
3576
                }
×
3577

UNCOV
3578
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
×
3579
        })
UNCOV
3580
        if err != nil {
×
3581
                return err
×
3582
        }
×
3583

UNCOV
3584
        c.rejectCache.remove(chanID)
×
UNCOV
3585
        c.chanCache.remove(chanID)
×
UNCOV
3586

×
UNCOV
3587
        return nil
×
3588
}
3589

3590
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3591
// keys should represent the node public keys of the two parties involved in the
3592
// edge.
3593
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3594
        pubKey2 [33]byte) error {
3✔
3595

3✔
3596
        var k [8]byte
3✔
3597
        byteOrder.PutUint64(k[:], chanID)
3✔
3598

3✔
3599
        var v [66]byte
3✔
3600
        copy(v[:33], pubKey1[:])
3✔
3601
        copy(v[33:], pubKey2[:])
3✔
3602

3✔
3603
        return zombieIndex.Put(k[:], v[:])
3✔
3604
}
3✔
3605

3606
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
UNCOV
3607
func (c *KVStore) MarkEdgeLive(chanID uint64) error {
×
UNCOV
3608
        c.cacheMu.Lock()
×
UNCOV
3609
        defer c.cacheMu.Unlock()
×
UNCOV
3610

×
UNCOV
3611
        return c.markEdgeLiveUnsafe(nil, chanID)
×
UNCOV
3612
}
×
3613

3614
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3615
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3616
// case a new transaction will be created.
3617
//
3618
// NOTE: this method MUST only be called if the cacheMu has already been
3619
// acquired.
UNCOV
3620
func (c *KVStore) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
×
UNCOV
3621
        dbFn := func(tx kvdb.RwTx) error {
×
UNCOV
3622
                edges := tx.ReadWriteBucket(edgeBucket)
×
UNCOV
3623
                if edges == nil {
×
3624
                        return ErrGraphNoEdgesFound
×
3625
                }
×
UNCOV
3626
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
×
UNCOV
3627
                if zombieIndex == nil {
×
3628
                        return nil
×
3629
                }
×
3630

UNCOV
3631
                var k [8]byte
×
UNCOV
3632
                byteOrder.PutUint64(k[:], chanID)
×
UNCOV
3633

×
UNCOV
3634
                if len(zombieIndex.Get(k[:])) == 0 {
×
UNCOV
3635
                        return ErrZombieEdgeNotFound
×
UNCOV
3636
                }
×
3637

UNCOV
3638
                return zombieIndex.Delete(k[:])
×
3639
        }
3640

3641
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3642
        // the existing transaction
UNCOV
3643
        var err error
×
UNCOV
3644
        if tx == nil {
×
UNCOV
3645
                err = kvdb.Update(c.db, dbFn, func() {})
×
3646
        } else {
×
3647
                err = dbFn(tx)
×
3648
        }
×
UNCOV
3649
        if err != nil {
×
UNCOV
3650
                return err
×
UNCOV
3651
        }
×
3652

UNCOV
3653
        c.rejectCache.remove(chanID)
×
UNCOV
3654
        c.chanCache.remove(chanID)
×
UNCOV
3655

×
UNCOV
3656
        return nil
×
3657
}
3658

3659
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3660
// zombie, then the two node public keys corresponding to this edge are also
3661
// returned.
UNCOV
3662
func (c *KVStore) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
×
UNCOV
3663
        var (
×
UNCOV
3664
                isZombie         bool
×
UNCOV
3665
                pubKey1, pubKey2 [33]byte
×
UNCOV
3666
        )
×
UNCOV
3667

×
UNCOV
3668
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
UNCOV
3669
                edges := tx.ReadBucket(edgeBucket)
×
UNCOV
3670
                if edges == nil {
×
3671
                        return ErrGraphNoEdgesFound
×
3672
                }
×
UNCOV
3673
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
UNCOV
3674
                if zombieIndex == nil {
×
3675
                        return nil
×
3676
                }
×
3677

UNCOV
3678
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
×
UNCOV
3679

×
UNCOV
3680
                return nil
×
UNCOV
3681
        }, func() {
×
UNCOV
3682
                isZombie = false
×
UNCOV
3683
                pubKey1 = [33]byte{}
×
UNCOV
3684
                pubKey2 = [33]byte{}
×
UNCOV
3685
        })
×
UNCOV
3686
        if err != nil {
×
3687
                return false, [33]byte{}, [33]byte{}
×
3688
        }
×
3689

UNCOV
3690
        return isZombie, pubKey1, pubKey2
×
3691
}
3692

3693
// isZombieEdge returns whether an entry exists for the given channel in the
3694
// zombie index. If an entry exists, then the two node public keys corresponding
3695
// to this edge are also returned.
3696
func isZombieEdge(zombieIndex kvdb.RBucket,
3697
        chanID uint64) (bool, [33]byte, [33]byte) {
3✔
3698

3✔
3699
        var k [8]byte
3✔
3700
        byteOrder.PutUint64(k[:], chanID)
3✔
3701

3✔
3702
        v := zombieIndex.Get(k[:])
3✔
3703
        if v == nil {
6✔
3704
                return false, [33]byte{}, [33]byte{}
3✔
3705
        }
3✔
3706

3707
        var pubKey1, pubKey2 [33]byte
3✔
3708
        copy(pubKey1[:], v[:33])
3✔
3709
        copy(pubKey2[:], v[33:])
3✔
3710

3✔
3711
        return true, pubKey1, pubKey2
3✔
3712
}
3713

3714
// NumZombies returns the current number of zombie channels in the graph.
UNCOV
3715
func (c *KVStore) NumZombies() (uint64, error) {
×
UNCOV
3716
        var numZombies uint64
×
UNCOV
3717
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
×
UNCOV
3718
                edges := tx.ReadBucket(edgeBucket)
×
UNCOV
3719
                if edges == nil {
×
3720
                        return nil
×
3721
                }
×
UNCOV
3722
                zombieIndex := edges.NestedReadBucket(zombieBucket)
×
UNCOV
3723
                if zombieIndex == nil {
×
3724
                        return nil
×
3725
                }
×
3726

UNCOV
3727
                return zombieIndex.ForEach(func(_, _ []byte) error {
×
UNCOV
3728
                        numZombies++
×
UNCOV
3729
                        return nil
×
UNCOV
3730
                })
×
UNCOV
3731
        }, func() {
×
UNCOV
3732
                numZombies = 0
×
UNCOV
3733
        })
×
UNCOV
3734
        if err != nil {
×
3735
                return 0, err
×
3736
        }
×
3737

UNCOV
3738
        return numZombies, nil
×
3739
}
3740

3741
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3742
// that we can ignore channel announcements that we know to be closed without
3743
// having to validate them and fetch a block.
UNCOV
3744
func (c *KVStore) PutClosedScid(scid lnwire.ShortChannelID) error {
×
UNCOV
3745
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
UNCOV
3746
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
×
UNCOV
3747
                if err != nil {
×
3748
                        return err
×
3749
                }
×
3750

UNCOV
3751
                var k [8]byte
×
UNCOV
3752
                byteOrder.PutUint64(k[:], scid.ToUint64())
×
UNCOV
3753

×
UNCOV
3754
                return closedScids.Put(k[:], []byte{})
×
UNCOV
3755
        }, func() {})
×
3756
}
3757

3758
// IsClosedScid checks whether a channel identified by the passed in scid is
3759
// closed. This helps avoid having to perform expensive validation checks.
3760
// TODO: Add an LRU cache to cut down on disc reads.
3761
func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
3✔
3762
        var isClosed bool
3✔
3763
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
3764
                closedScids := tx.ReadBucket(closedScidBucket)
3✔
3765
                if closedScids == nil {
3✔
3766
                        return ErrClosedScidsNotFound
×
3767
                }
×
3768

3769
                var k [8]byte
3✔
3770
                byteOrder.PutUint64(k[:], scid.ToUint64())
3✔
3771

3✔
3772
                if closedScids.Get(k[:]) != nil {
3✔
UNCOV
3773
                        isClosed = true
×
UNCOV
3774
                        return nil
×
UNCOV
3775
                }
×
3776

3777
                return nil
3✔
3778
        }, func() {
3✔
3779
                isClosed = false
3✔
3780
        })
3✔
3781
        if err != nil {
3✔
3782
                return false, err
×
3783
        }
×
3784

3785
        return isClosed, nil
3✔
3786
}
3787

3788
// GraphSession will provide the call-back with access to a NodeTraverser
3789
// instance which can be used to perform queries against the channel graph.
UNCOV
3790
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
×
UNCOV
3791
        return c.db.View(func(tx walletdb.ReadTx) error {
×
UNCOV
3792
                return cb(&nodeTraverserSession{
×
UNCOV
3793
                        db: c,
×
UNCOV
3794
                        tx: tx,
×
UNCOV
3795
                })
×
UNCOV
3796
        }, func() {})
×
3797
}
3798

3799
// nodeTraverserSession implements the NodeTraverser interface but with a
3800
// backing read only transaction for a consistent view of the graph.
3801
type nodeTraverserSession struct {
3802
        tx kvdb.RTx
3803
        db *KVStore
3804
}
3805

3806
// ForEachNodeDirectedChannel calls the callback for every channel of the given
3807
// node.
3808
//
3809
// NOTE: Part of the NodeTraverser interface.
3810
func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex,
UNCOV
3811
        cb func(channel *DirectedChannel) error) error {
×
UNCOV
3812

×
UNCOV
3813
        return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb)
×
UNCOV
3814
}
×
3815

3816
// FetchNodeFeatures returns the features of the given node. If the node is
3817
// unknown, assume no additional features are supported.
3818
//
3819
// NOTE: Part of the NodeTraverser interface.
3820
func (c *nodeTraverserSession) FetchNodeFeatures(nodePub route.Vertex) (
UNCOV
3821
        *lnwire.FeatureVector, error) {
×
UNCOV
3822

×
UNCOV
3823
        return c.db.fetchNodeFeatures(c.tx, nodePub)
×
UNCOV
3824
}
×
3825

3826
func putLightningNode(nodeBucket, aliasBucket, updateIndex kvdb.RwBucket,
3827
        node *models.LightningNode) error {
3✔
3828

3✔
3829
        var (
3✔
3830
                scratch [16]byte
3✔
3831
                b       bytes.Buffer
3✔
3832
        )
3✔
3833

3✔
3834
        pub, err := node.PubKey()
3✔
3835
        if err != nil {
3✔
3836
                return err
×
3837
        }
×
3838
        nodePub := pub.SerializeCompressed()
3✔
3839

3✔
3840
        // If the node has the update time set, write it, else write 0.
3✔
3841
        updateUnix := uint64(0)
3✔
3842
        if node.LastUpdate.Unix() > 0 {
6✔
3843
                updateUnix = uint64(node.LastUpdate.Unix())
3✔
3844
        }
3✔
3845

3846
        byteOrder.PutUint64(scratch[:8], updateUnix)
3✔
3847
        if _, err := b.Write(scratch[:8]); err != nil {
3✔
3848
                return err
×
3849
        }
×
3850

3851
        if _, err := b.Write(nodePub); err != nil {
3✔
3852
                return err
×
3853
        }
×
3854

3855
        // If we got a node announcement for this node, we will have the rest
3856
        // of the data available. If not we don't have more data to write.
3857
        if !node.HaveNodeAnnouncement {
6✔
3858
                // Write HaveNodeAnnouncement=0.
3✔
3859
                byteOrder.PutUint16(scratch[:2], 0)
3✔
3860
                if _, err := b.Write(scratch[:2]); err != nil {
3✔
3861
                        return err
×
3862
                }
×
3863

3864
                return nodeBucket.Put(nodePub, b.Bytes())
3✔
3865
        }
3866

3867
        // Write HaveNodeAnnouncement=1.
3868
        byteOrder.PutUint16(scratch[:2], 1)
3✔
3869
        if _, err := b.Write(scratch[:2]); err != nil {
3✔
3870
                return err
×
3871
        }
×
3872

3873
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
3✔
3874
                return err
×
3875
        }
×
3876
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
3✔
3877
                return err
×
3878
        }
×
3879
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
3✔
3880
                return err
×
3881
        }
×
3882

3883
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
3✔
3884
                return err
×
3885
        }
×
3886

3887
        if err := node.Features.Encode(&b); err != nil {
3✔
3888
                return err
×
3889
        }
×
3890

3891
        numAddresses := uint16(len(node.Addresses))
3✔
3892
        byteOrder.PutUint16(scratch[:2], numAddresses)
3✔
3893
        if _, err := b.Write(scratch[:2]); err != nil {
3✔
3894
                return err
×
3895
        }
×
3896

3897
        for _, address := range node.Addresses {
6✔
3898
                if err := SerializeAddr(&b, address); err != nil {
3✔
3899
                        return err
×
3900
                }
×
3901
        }
3902

3903
        sigLen := len(node.AuthSigBytes)
3✔
3904
        if sigLen > 80 {
3✔
3905
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
3906
                        sigLen)
×
3907
        }
×
3908

3909
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
3✔
3910
        if err != nil {
3✔
3911
                return err
×
3912
        }
×
3913

3914
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
3✔
3915
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
3916
        }
×
3917
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
3✔
3918
        if err != nil {
3✔
3919
                return err
×
3920
        }
×
3921

3922
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
3✔
3923
                return err
×
3924
        }
×
3925

3926
        // With the alias bucket updated, we'll now update the index that
3927
        // tracks the time series of node updates.
3928
        var indexKey [8 + 33]byte
3✔
3929
        byteOrder.PutUint64(indexKey[:8], updateUnix)
3✔
3930
        copy(indexKey[8:], nodePub)
3✔
3931

3✔
3932
        // If there was already an old index entry for this node, then we'll
3✔
3933
        // delete the old one before we write the new entry.
3✔
3934
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
6✔
3935
                // Extract out the old update time to we can reconstruct the
3✔
3936
                // prior index key to delete it from the index.
3✔
3937
                oldUpdateTime := nodeBytes[:8]
3✔
3938

3✔
3939
                var oldIndexKey [8 + 33]byte
3✔
3940
                copy(oldIndexKey[:8], oldUpdateTime)
3✔
3941
                copy(oldIndexKey[8:], nodePub)
3✔
3942

3✔
3943
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
3✔
3944
                        return err
×
3945
                }
×
3946
        }
3947

3948
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
3✔
3949
                return err
×
3950
        }
×
3951

3952
        return nodeBucket.Put(nodePub, b.Bytes())
3✔
3953
}
3954

3955
func fetchLightningNode(nodeBucket kvdb.RBucket,
3956
        nodePub []byte) (models.LightningNode, error) {
3✔
3957

3✔
3958
        nodeBytes := nodeBucket.Get(nodePub)
3✔
3959
        if nodeBytes == nil {
6✔
3960
                return models.LightningNode{}, ErrGraphNodeNotFound
3✔
3961
        }
3✔
3962

3963
        nodeReader := bytes.NewReader(nodeBytes)
3✔
3964

3✔
3965
        return deserializeLightningNode(nodeReader)
3✔
3966
}
3967

3968
func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex,
3969
        *lnwire.FeatureVector, error) {
3✔
3970

3✔
3971
        var (
3✔
3972
                pubKey      route.Vertex
3✔
3973
                features    = lnwire.EmptyFeatureVector()
3✔
3974
                nodeScratch [8]byte
3✔
3975
        )
3✔
3976

3✔
3977
        // Skip ahead:
3✔
3978
        // - LastUpdate (8 bytes)
3✔
3979
        if _, err := r.Read(nodeScratch[:]); err != nil {
3✔
3980
                return pubKey, nil, err
×
3981
        }
×
3982

3983
        if _, err := io.ReadFull(r, pubKey[:]); err != nil {
3✔
3984
                return pubKey, nil, err
×
3985
        }
×
3986

3987
        // Read the node announcement flag.
3988
        if _, err := r.Read(nodeScratch[:2]); err != nil {
3✔
3989
                return pubKey, nil, err
×
3990
        }
×
3991
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
3✔
3992

3✔
3993
        // The rest of the data is optional, and will only be there if we got a
3✔
3994
        // node announcement for this node.
3✔
3995
        if hasNodeAnn == 0 {
6✔
3996
                return pubKey, features, nil
3✔
3997
        }
3✔
3998

3999
        // We did get a node announcement for this node, so we'll have the rest
4000
        // of the data available.
4001
        var rgb uint8
3✔
4002
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
3✔
4003
                return pubKey, nil, err
×
4004
        }
×
4005
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
3✔
4006
                return pubKey, nil, err
×
4007
        }
×
4008
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
3✔
4009
                return pubKey, nil, err
×
4010
        }
×
4011

4012
        if _, err := wire.ReadVarString(r, 0); err != nil {
3✔
4013
                return pubKey, nil, err
×
4014
        }
×
4015

4016
        if err := features.Decode(r); err != nil {
3✔
4017
                return pubKey, nil, err
×
4018
        }
×
4019

4020
        return pubKey, features, nil
3✔
4021
}
4022

4023
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
3✔
4024
        var (
3✔
4025
                node    models.LightningNode
3✔
4026
                scratch [8]byte
3✔
4027
                err     error
3✔
4028
        )
3✔
4029

3✔
4030
        // Always populate a feature vector, even if we don't have a node
3✔
4031
        // announcement and short circuit below.
3✔
4032
        node.Features = lnwire.EmptyFeatureVector()
3✔
4033

3✔
4034
        if _, err := r.Read(scratch[:]); err != nil {
3✔
4035
                return models.LightningNode{}, err
×
4036
        }
×
4037

4038
        unix := int64(byteOrder.Uint64(scratch[:]))
3✔
4039
        node.LastUpdate = time.Unix(unix, 0)
3✔
4040

3✔
4041
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
3✔
4042
                return models.LightningNode{}, err
×
4043
        }
×
4044

4045
        if _, err := r.Read(scratch[:2]); err != nil {
3✔
4046
                return models.LightningNode{}, err
×
4047
        }
×
4048

4049
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
3✔
4050
        if hasNodeAnn == 1 {
6✔
4051
                node.HaveNodeAnnouncement = true
3✔
4052
        } else {
6✔
4053
                node.HaveNodeAnnouncement = false
3✔
4054
        }
3✔
4055

4056
        // The rest of the data is optional, and will only be there if we got a
4057
        // node announcement for this node.
4058
        if !node.HaveNodeAnnouncement {
6✔
4059
                return node, nil
3✔
4060
        }
3✔
4061

4062
        // We did get a node announcement for this node, so we'll have the rest
4063
        // of the data available.
4064
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
3✔
4065
                return models.LightningNode{}, err
×
4066
        }
×
4067
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
3✔
4068
                return models.LightningNode{}, err
×
4069
        }
×
4070
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
3✔
4071
                return models.LightningNode{}, err
×
4072
        }
×
4073

4074
        node.Alias, err = wire.ReadVarString(r, 0)
3✔
4075
        if err != nil {
3✔
4076
                return models.LightningNode{}, err
×
4077
        }
×
4078

4079
        err = node.Features.Decode(r)
3✔
4080
        if err != nil {
3✔
4081
                return models.LightningNode{}, err
×
4082
        }
×
4083

4084
        if _, err := r.Read(scratch[:2]); err != nil {
3✔
4085
                return models.LightningNode{}, err
×
4086
        }
×
4087
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
3✔
4088

3✔
4089
        var addresses []net.Addr
3✔
4090
        for i := 0; i < numAddresses; i++ {
6✔
4091
                address, err := DeserializeAddr(r)
3✔
4092
                if err != nil {
3✔
4093
                        return models.LightningNode{}, err
×
4094
                }
×
4095
                addresses = append(addresses, address)
3✔
4096
        }
4097
        node.Addresses = addresses
3✔
4098

3✔
4099
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
3✔
4100
        if err != nil {
3✔
4101
                return models.LightningNode{}, err
×
4102
        }
×
4103

4104
        // We'll try and see if there are any opaque bytes left, if not, then
4105
        // we'll ignore the EOF error and return the node as is.
4106
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
3✔
4107
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
3✔
4108
        )
3✔
4109
        switch {
3✔
4110
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4111
        case errors.Is(err, io.EOF):
×
4112
        case err != nil:
×
4113
                return models.LightningNode{}, err
×
4114
        }
4115

4116
        return node, nil
3✔
4117
}
4118

4119
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4120
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
3✔
4121

3✔
4122
        var b bytes.Buffer
3✔
4123

3✔
4124
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
3✔
4125
                return err
×
4126
        }
×
4127
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
3✔
4128
                return err
×
4129
        }
×
4130
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
3✔
4131
                return err
×
4132
        }
×
4133
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
3✔
4134
                return err
×
4135
        }
×
4136

4137
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
3✔
4138
                return err
×
4139
        }
×
4140

4141
        authProof := edgeInfo.AuthProof
3✔
4142
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
3✔
4143
        if authProof != nil {
6✔
4144
                nodeSig1 = authProof.NodeSig1Bytes
3✔
4145
                nodeSig2 = authProof.NodeSig2Bytes
3✔
4146
                bitcoinSig1 = authProof.BitcoinSig1Bytes
3✔
4147
                bitcoinSig2 = authProof.BitcoinSig2Bytes
3✔
4148
        }
3✔
4149

4150
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
3✔
4151
                return err
×
4152
        }
×
4153
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
3✔
4154
                return err
×
4155
        }
×
4156
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
3✔
4157
                return err
×
4158
        }
×
4159
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
3✔
4160
                return err
×
4161
        }
×
4162

4163
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
3✔
4164
                return err
×
4165
        }
×
4166
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
3✔
4167
        if err != nil {
3✔
4168
                return err
×
4169
        }
×
4170
        if _, err := b.Write(chanID[:]); err != nil {
3✔
4171
                return err
×
4172
        }
×
4173
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
3✔
4174
                return err
×
4175
        }
×
4176

4177
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
3✔
4178
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4179
        }
×
4180
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
3✔
4181
        if err != nil {
3✔
4182
                return err
×
4183
        }
×
4184

4185
        return edgeIndex.Put(chanID[:], b.Bytes())
3✔
4186
}
4187

4188
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4189
        chanID []byte) (models.ChannelEdgeInfo, error) {
3✔
4190

3✔
4191
        edgeInfoBytes := edgeIndex.Get(chanID)
3✔
4192
        if edgeInfoBytes == nil {
6✔
4193
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
3✔
4194
        }
3✔
4195

4196
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
3✔
4197

3✔
4198
        return deserializeChanEdgeInfo(edgeInfoReader)
3✔
4199
}
4200

4201
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
3✔
4202
        var (
3✔
4203
                err      error
3✔
4204
                edgeInfo models.ChannelEdgeInfo
3✔
4205
        )
3✔
4206

3✔
4207
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
3✔
4208
                return models.ChannelEdgeInfo{}, err
×
4209
        }
×
4210
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
3✔
4211
                return models.ChannelEdgeInfo{}, err
×
4212
        }
×
4213
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
3✔
4214
                return models.ChannelEdgeInfo{}, err
×
4215
        }
×
4216
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
3✔
4217
                return models.ChannelEdgeInfo{}, err
×
4218
        }
×
4219

4220
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
3✔
4221
        if err != nil {
3✔
4222
                return models.ChannelEdgeInfo{}, err
×
4223
        }
×
4224

4225
        proof := &models.ChannelAuthProof{}
3✔
4226

3✔
4227
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
3✔
4228
        if err != nil {
3✔
4229
                return models.ChannelEdgeInfo{}, err
×
4230
        }
×
4231
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
3✔
4232
        if err != nil {
3✔
4233
                return models.ChannelEdgeInfo{}, err
×
4234
        }
×
4235
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
3✔
4236
        if err != nil {
3✔
4237
                return models.ChannelEdgeInfo{}, err
×
4238
        }
×
4239
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
3✔
4240
        if err != nil {
3✔
4241
                return models.ChannelEdgeInfo{}, err
×
4242
        }
×
4243

4244
        if !proof.IsEmpty() {
6✔
4245
                edgeInfo.AuthProof = proof
3✔
4246
        }
3✔
4247

4248
        edgeInfo.ChannelPoint = wire.OutPoint{}
3✔
4249
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
3✔
4250
                return models.ChannelEdgeInfo{}, err
×
4251
        }
×
4252
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
3✔
4253
                return models.ChannelEdgeInfo{}, err
×
4254
        }
×
4255
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
3✔
4256
                return models.ChannelEdgeInfo{}, err
×
4257
        }
×
4258

4259
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
3✔
4260
                return models.ChannelEdgeInfo{}, err
×
4261
        }
×
4262

4263
        // We'll try and see if there are any opaque bytes left, if not, then
4264
        // we'll ignore the EOF error and return the edge as is.
4265
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
3✔
4266
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
3✔
4267
        )
3✔
4268
        switch {
3✔
4269
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4270
        case errors.Is(err, io.EOF):
×
4271
        case err != nil:
×
4272
                return models.ChannelEdgeInfo{}, err
×
4273
        }
4274

4275
        return edgeInfo, nil
3✔
4276
}
4277

4278
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4279
        from, to []byte) error {
3✔
4280

3✔
4281
        var edgeKey [33 + 8]byte
3✔
4282
        copy(edgeKey[:], from)
3✔
4283
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
3✔
4284

3✔
4285
        var b bytes.Buffer
3✔
4286
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
3✔
4287
                return err
×
4288
        }
×
4289

4290
        // Before we write out the new edge, we'll create a new entry in the
4291
        // update index in order to keep it fresh.
4292
        updateUnix := uint64(edge.LastUpdate.Unix())
3✔
4293
        var indexKey [8 + 8]byte
3✔
4294
        byteOrder.PutUint64(indexKey[:8], updateUnix)
3✔
4295
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
3✔
4296

3✔
4297
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
3✔
4298
        if err != nil {
3✔
4299
                return err
×
4300
        }
×
4301

4302
        // If there was already an entry for this edge, then we'll need to
4303
        // delete the old one to ensure we don't leave around any after-images.
4304
        // An unknown policy value does not have a update time recorded, so
4305
        // it also does not need to be removed.
4306
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
3✔
4307
                !bytes.Equal(edgeBytes, unknownPolicy) {
6✔
4308

3✔
4309
                // In order to delete the old entry, we'll need to obtain the
3✔
4310
                // *prior* update time in order to delete it. To do this, we'll
3✔
4311
                // need to deserialize the existing policy within the database
3✔
4312
                // (now outdated by the new one), and delete its corresponding
3✔
4313
                // entry within the update index. We'll ignore any
3✔
4314
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
3✔
4315
                // the channel ID and update time to delete the entry.
3✔
4316
                // TODO(halseth): get rid of these invalid policies in a
3✔
4317
                // migration.
3✔
4318
                oldEdgePolicy, err := deserializeChanEdgePolicy(
3✔
4319
                        bytes.NewReader(edgeBytes),
3✔
4320
                )
3✔
4321
                if err != nil &&
3✔
4322
                        !errors.Is(err, ErrEdgePolicyOptionalFieldNotFound) {
3✔
4323

×
4324
                        return err
×
4325
                }
×
4326

4327
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
3✔
4328

3✔
4329
                var oldIndexKey [8 + 8]byte
3✔
4330
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
3✔
4331
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
3✔
4332

3✔
4333
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
3✔
4334
                        return err
×
4335
                }
×
4336
        }
4337

4338
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
3✔
4339
                return err
×
4340
        }
×
4341

4342
        err = updateEdgePolicyDisabledIndex(
3✔
4343
                edges, edge.ChannelID,
3✔
4344
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
3✔
4345
                edge.IsDisabled(),
3✔
4346
        )
3✔
4347
        if err != nil {
3✔
4348
                return err
×
4349
        }
×
4350

4351
        return edges.Put(edgeKey[:], b.Bytes())
3✔
4352
}
4353

4354
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4355
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4356
// one.
4357
// The direction represents the direction of the edge and disabled is used for
4358
// deciding whether to remove or add an entry to the bucket.
4359
// In general a channel is disabled if two entries for the same chanID exist
4360
// in this bucket.
4361
// Maintaining the bucket this way allows a fast retrieval of disabled
4362
// channels, for example when prune is needed.
4363
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4364
        direction bool, disabled bool) error {
3✔
4365

3✔
4366
        var disabledEdgeKey [8 + 1]byte
3✔
4367
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
3✔
4368
        if direction {
6✔
4369
                disabledEdgeKey[8] = 1
3✔
4370
        }
3✔
4371

4372
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
3✔
4373
                disabledEdgePolicyBucket,
3✔
4374
        )
3✔
4375
        if err != nil {
3✔
4376
                return err
×
4377
        }
×
4378

4379
        if disabled {
6✔
4380
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
3✔
4381
        }
3✔
4382

4383
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
3✔
4384
}
4385

4386
// putChanEdgePolicyUnknown marks the edge policy as unknown
4387
// in the edges bucket.
4388
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4389
        from []byte) error {
3✔
4390

3✔
4391
        var edgeKey [33 + 8]byte
3✔
4392
        copy(edgeKey[:], from)
3✔
4393
        byteOrder.PutUint64(edgeKey[33:], channelID)
3✔
4394

3✔
4395
        if edges.Get(edgeKey[:]) != nil {
3✔
4396
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4397
                        " when there is already a policy present", channelID)
×
4398
        }
×
4399

4400
        return edges.Put(edgeKey[:], unknownPolicy)
3✔
4401
}
4402

4403
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4404
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
3✔
4405

3✔
4406
        var edgeKey [33 + 8]byte
3✔
4407
        copy(edgeKey[:], nodePub)
3✔
4408
        copy(edgeKey[33:], chanID)
3✔
4409

3✔
4410
        edgeBytes := edges.Get(edgeKey[:])
3✔
4411
        if edgeBytes == nil {
3✔
4412
                return nil, ErrEdgeNotFound
×
4413
        }
×
4414

4415
        // No need to deserialize unknown policy.
4416
        if bytes.Equal(edgeBytes, unknownPolicy) {
6✔
4417
                return nil, nil
3✔
4418
        }
3✔
4419

4420
        edgeReader := bytes.NewReader(edgeBytes)
3✔
4421

3✔
4422
        ep, err := deserializeChanEdgePolicy(edgeReader)
3✔
4423
        switch {
3✔
4424
        // If the db policy was missing an expected optional field, we return
4425
        // nil as if the policy was unknown.
UNCOV
4426
        case errors.Is(err, ErrEdgePolicyOptionalFieldNotFound):
×
UNCOV
4427
                return nil, nil
×
4428

4429
        case err != nil:
×
4430
                return nil, err
×
4431
        }
4432

4433
        return ep, nil
3✔
4434
}
4435

4436
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4437
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4438
        error) {
3✔
4439

3✔
4440
        edgeInfo := edgeIndex.Get(chanID)
3✔
4441
        if edgeInfo == nil {
3✔
4442
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4443
                        chanID)
×
4444
        }
×
4445

4446
        // The first node is contained within the first half of the edge
4447
        // information. We only propagate the error here and below if it's
4448
        // something other than edge non-existence.
4449
        node1Pub := edgeInfo[:33]
3✔
4450
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
3✔
4451
        if err != nil {
3✔
4452
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4453
                        node1Pub)
×
4454
        }
×
4455

4456
        // Similarly, the second node is contained within the latter
4457
        // half of the edge information.
4458
        node2Pub := edgeInfo[33:66]
3✔
4459
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
3✔
4460
        if err != nil {
3✔
4461
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4462
                        node2Pub)
×
4463
        }
×
4464

4465
        return edge1, edge2, nil
3✔
4466
}
4467

4468
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4469
        to []byte) error {
3✔
4470

3✔
4471
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
3✔
4472
        if err != nil {
3✔
4473
                return err
×
4474
        }
×
4475

4476
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
3✔
4477
                return err
×
4478
        }
×
4479

4480
        var scratch [8]byte
3✔
4481
        updateUnix := uint64(edge.LastUpdate.Unix())
3✔
4482
        byteOrder.PutUint64(scratch[:], updateUnix)
3✔
4483
        if _, err := w.Write(scratch[:]); err != nil {
3✔
4484
                return err
×
4485
        }
×
4486

4487
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
3✔
4488
                return err
×
4489
        }
×
4490
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
3✔
4491
                return err
×
4492
        }
×
4493
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
3✔
4494
                return err
×
4495
        }
×
4496
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
3✔
4497
                return err
×
4498
        }
×
4499
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
3✔
4500
        if err != nil {
3✔
4501
                return err
×
4502
        }
×
4503
        err = binary.Write(
3✔
4504
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
3✔
4505
        )
3✔
4506
        if err != nil {
3✔
4507
                return err
×
4508
        }
×
4509

4510
        if _, err := w.Write(to); err != nil {
3✔
4511
                return err
×
4512
        }
×
4513

4514
        // If the max_htlc field is present, we write it. To be compatible with
4515
        // older versions that wasn't aware of this field, we write it as part
4516
        // of the opaque data.
4517
        // TODO(halseth): clean up when moving to TLV.
4518
        var opaqueBuf bytes.Buffer
3✔
4519
        if edge.MessageFlags.HasMaxHtlc() {
6✔
4520
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
3✔
4521
                if err != nil {
3✔
4522
                        return err
×
4523
                }
×
4524
        }
4525

4526
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
3✔
4527
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4528
        }
×
4529
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
3✔
4530
                return err
×
4531
        }
×
4532

4533
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
3✔
4534
                return err
×
4535
        }
×
4536

4537
        return nil
3✔
4538
}
4539

4540
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
3✔
4541
        // Deserialize the policy. Note that in case an optional field is not
3✔
4542
        // found, both an error and a populated policy object are returned.
3✔
4543
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
3✔
4544
        if deserializeErr != nil &&
3✔
4545
                !errors.Is(deserializeErr, ErrEdgePolicyOptionalFieldNotFound) {
3✔
4546

×
4547
                return nil, deserializeErr
×
4548
        }
×
4549

4550
        return edge, deserializeErr
3✔
4551
}
4552

4553
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4554
        error) {
3✔
4555

3✔
4556
        edge := &models.ChannelEdgePolicy{}
3✔
4557

3✔
4558
        var err error
3✔
4559
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
3✔
4560
        if err != nil {
3✔
4561
                return nil, err
×
4562
        }
×
4563

4564
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
3✔
4565
                return nil, err
×
4566
        }
×
4567

4568
        var scratch [8]byte
3✔
4569
        if _, err := r.Read(scratch[:]); err != nil {
3✔
4570
                return nil, err
×
4571
        }
×
4572
        unix := int64(byteOrder.Uint64(scratch[:]))
3✔
4573
        edge.LastUpdate = time.Unix(unix, 0)
3✔
4574

3✔
4575
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
3✔
4576
                return nil, err
×
4577
        }
×
4578
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
3✔
4579
                return nil, err
×
4580
        }
×
4581
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
3✔
4582
                return nil, err
×
4583
        }
×
4584

4585
        var n uint64
3✔
4586
        if err := binary.Read(r, byteOrder, &n); err != nil {
3✔
4587
                return nil, err
×
4588
        }
×
4589
        edge.MinHTLC = lnwire.MilliSatoshi(n)
3✔
4590

3✔
4591
        if err := binary.Read(r, byteOrder, &n); err != nil {
3✔
4592
                return nil, err
×
4593
        }
×
4594
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
3✔
4595

3✔
4596
        if err := binary.Read(r, byteOrder, &n); err != nil {
3✔
4597
                return nil, err
×
4598
        }
×
4599
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
3✔
4600

3✔
4601
        if _, err := r.Read(edge.ToNode[:]); err != nil {
3✔
4602
                return nil, err
×
4603
        }
×
4604

4605
        // We'll try and see if there are any opaque bytes left, if not, then
4606
        // we'll ignore the EOF error and return the edge as is.
4607
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
3✔
4608
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
3✔
4609
        )
3✔
4610
        switch {
3✔
4611
        case errors.Is(err, io.ErrUnexpectedEOF):
×
UNCOV
4612
        case errors.Is(err, io.EOF):
×
4613
        case err != nil:
×
4614
                return nil, err
×
4615
        }
4616

4617
        // See if optional fields are present.
4618
        if edge.MessageFlags.HasMaxHtlc() {
6✔
4619
                // The max_htlc field should be at the beginning of the opaque
3✔
4620
                // bytes.
3✔
4621
                opq := edge.ExtraOpaqueData
3✔
4622

3✔
4623
                // If the max_htlc field is not present, it might be old data
3✔
4624
                // stored before this field was validated. We'll return the
3✔
4625
                // edge along with an error.
3✔
4626
                if len(opq) < 8 {
3✔
UNCOV
4627
                        return edge, ErrEdgePolicyOptionalFieldNotFound
×
UNCOV
4628
                }
×
4629

4630
                maxHtlc := byteOrder.Uint64(opq[:8])
3✔
4631
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
3✔
4632

3✔
4633
                // Exclude the parsed field from the rest of the opaque data.
3✔
4634
                edge.ExtraOpaqueData = opq[8:]
3✔
4635
        }
4636

4637
        return edge, nil
3✔
4638
}
4639

4640
// chanGraphNodeTx is an implementation of the NodeRTx interface backed by the
4641
// KVStore and a kvdb.RTx.
4642
type chanGraphNodeTx struct {
4643
        tx   kvdb.RTx
4644
        db   *KVStore
4645
        node *models.LightningNode
4646
}
4647

4648
// A compile-time constraint to ensure chanGraphNodeTx implements the NodeRTx
4649
// interface.
4650
var _ NodeRTx = (*chanGraphNodeTx)(nil)
4651

4652
func newChanGraphNodeTx(tx kvdb.RTx, db *KVStore,
4653
        node *models.LightningNode) *chanGraphNodeTx {
3✔
4654

3✔
4655
        return &chanGraphNodeTx{
3✔
4656
                tx:   tx,
3✔
4657
                db:   db,
3✔
4658
                node: node,
3✔
4659
        }
3✔
4660
}
3✔
4661

4662
// Node returns the raw information of the node.
4663
//
4664
// NOTE: This is a part of the NodeRTx interface.
4665
func (c *chanGraphNodeTx) Node() *models.LightningNode {
3✔
4666
        return c.node
3✔
4667
}
3✔
4668

4669
// FetchNode fetches the node with the given pub key under the same transaction
4670
// used to fetch the current node. The returned node is also a NodeRTx and any
4671
// operations on that NodeRTx will also be done under the same transaction.
4672
//
4673
// NOTE: This is a part of the NodeRTx interface.
UNCOV
4674
func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
×
UNCOV
4675
        node, err := c.db.FetchLightningNodeTx(c.tx, nodePub)
×
UNCOV
4676
        if err != nil {
×
4677
                return nil, err
×
4678
        }
×
4679

UNCOV
4680
        return newChanGraphNodeTx(c.tx, c.db, node), nil
×
4681
}
4682

4683
// ForEachChannel can be used to iterate over the node's channels under
4684
// the same transaction used to fetch the node.
4685
//
4686
// NOTE: This is a part of the NodeRTx interface.
4687
func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
UNCOV
4688
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
×
UNCOV
4689

×
UNCOV
4690
        return c.db.ForEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
×
UNCOV
4691
                func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1,
×
UNCOV
4692
                        policy2 *models.ChannelEdgePolicy) error {
×
UNCOV
4693

×
UNCOV
4694
                        return f(info, policy1, policy2)
×
UNCOV
4695
                },
×
4696
        )
4697
}
4698

4699
// MakeTestGraph creates a new instance of the KVStore for testing
4700
// purposes.
4701
func MakeTestGraph(t testing.TB, modifiers ...KVStoreOptionModifier) (
UNCOV
4702
        *ChannelGraph, error) {
×
UNCOV
4703

×
UNCOV
4704
        opts := DefaultOptions()
×
UNCOV
4705
        for _, modifier := range modifiers {
×
4706
                modifier(opts)
×
4707
        }
×
4708

4709
        // Next, create KVStore for the first time.
UNCOV
4710
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
×
UNCOV
4711
        if err != nil {
×
4712
                backendCleanup()
×
4713

×
4714
                return nil, err
×
4715
        }
×
4716

UNCOV
4717
        graph, err := NewChannelGraph(&Config{
×
UNCOV
4718
                KVDB:        backend,
×
UNCOV
4719
                KVStoreOpts: modifiers,
×
UNCOV
4720
        })
×
UNCOV
4721
        if err != nil {
×
4722
                backendCleanup()
×
4723

×
4724
                return nil, err
×
4725
        }
×
NEW
4726
        require.NoError(t, graph.Start())
×
UNCOV
4727

×
UNCOV
4728
        t.Cleanup(func() {
×
UNCOV
4729
                _ = backend.Close()
×
UNCOV
4730
                backendCleanup()
×
NEW
4731
                require.NoError(t, graph.Stop())
×
UNCOV
4732
        })
×
4733

UNCOV
4734
        return graph, nil
×
4735
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc