• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 14358372723

09 Apr 2025 01:26PM UTC coverage: 56.696% (-12.3%) from 69.037%
14358372723

Pull #9696

github

web-flow
Merge e2837e400 into 867d27d68
Pull Request #9696: Add `development_guidelines.md` for both human and machine

107055 of 188823 relevant lines covered (56.7%)

22721.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.41
/graph/db/kv_store.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/btcsuite/btcwallet/walletdb"
22
        "github.com/lightningnetwork/lnd/aliasmgr"
23
        "github.com/lightningnetwork/lnd/batch"
24
        "github.com/lightningnetwork/lnd/graph/db/models"
25
        "github.com/lightningnetwork/lnd/input"
26
        "github.com/lightningnetwork/lnd/kvdb"
27
        "github.com/lightningnetwork/lnd/lnwire"
28
        "github.com/lightningnetwork/lnd/routing/route"
29
        "github.com/stretchr/testify/require"
30
)
31

32
var (
33
        // nodeBucket is a bucket which houses all the vertices or nodes within
34
        // the channel graph. This bucket has a single-sub bucket which adds an
35
        // additional index from pubkey -> alias. Within the top-level of this
36
        // bucket, the key space maps a node's compressed public key to the
37
        // serialized information for that node. Additionally, there's a
38
        // special key "source" which stores the pubkey of the source node. The
39
        // source node is used as the starting point for all graph/queries and
40
        // traversals. The graph is formed as a star-graph with the source node
41
        // at the center.
42
        //
43
        // maps: pubKey -> nodeInfo
44
        // maps: source -> selfPubKey
45
        nodeBucket = []byte("graph-node")
46

47
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
48
        // will be used to quickly look up the "freshness" of a node's last
49
        // update to the network. The bucket only contains keys, and no values,
50
        // it's mapping:
51
        //
52
        // maps: updateTime || nodeID -> nil
53
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
54

55
        // sourceKey is a special key that resides within the nodeBucket. The
56
        // sourceKey maps a key to the public key of the "self node".
57
        sourceKey = []byte("source")
58

59
        // aliasIndexBucket is a sub-bucket that's nested within the main
60
        // nodeBucket. This bucket maps the public key of a node to its
61
        // current alias. This bucket is provided as it can be used within a
62
        // future UI layer to add an additional degree of confirmation.
63
        aliasIndexBucket = []byte("alias")
64

65
        // edgeBucket is a bucket which houses all of the edge or channel
66
        // information within the channel graph. This bucket essentially acts
67
        // as an adjacency list, which in conjunction with a range scan, can be
68
        // used to iterate over all the incoming and outgoing edges for a
69
        // particular node. Key in the bucket use a prefix scheme which leads
70
        // with the node's public key and sends with the compact edge ID.
71
        // For each chanID, there will be two entries within the bucket, as the
72
        // graph is directed: nodes may have different policies w.r.t to fees
73
        // for their respective directions.
74
        //
75
        // maps: pubKey || chanID -> channel edge policy for node
76
        edgeBucket = []byte("graph-edge")
77

78
        // unknownPolicy is represented as an empty slice. It is
79
        // used as the value in edgeBucket for unknown channel edge policies.
80
        // Unknown policies are still stored in the database to enable efficient
81
        // lookup of incoming channel edges.
82
        unknownPolicy = []byte{}
83

84
        // chanStart is an array of all zero bytes which is used to perform
85
        // range scans within the edgeBucket to obtain all of the outgoing
86
        // edges for a particular node.
87
        chanStart [8]byte
88

89
        // edgeIndexBucket is an index which can be used to iterate all edges
90
        // in the bucket, grouping them according to their in/out nodes.
91
        // Additionally, the items in this bucket also contain the complete
92
        // edge information for a channel. The edge information includes the
93
        // capacity of the channel, the nodes that made the channel, etc. This
94
        // bucket resides within the edgeBucket above. Creation of an edge
95
        // proceeds in two phases: first the edge is added to the edge index,
96
        // afterwards the edgeBucket can be updated with the latest details of
97
        // the edge as they are announced on the network.
98
        //
99
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
100
        edgeIndexBucket = []byte("edge-index")
101

102
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
103
        // bucket contains an index which allows us to gauge the "freshness" of
104
        // a channel's last updates.
105
        //
106
        // maps: updateTime || chanID -> nil
107
        edgeUpdateIndexBucket = []byte("edge-update-index")
108

109
        // channelPointBucket maps a channel's full outpoint (txid:index) to
110
        // its short 8-byte channel ID. This bucket resides within the
111
        // edgeBucket above, and can be used to quickly remove an edge due to
112
        // the outpoint being spent, or to query for existence of a channel.
113
        //
114
        // maps: outPoint -> chanID
115
        channelPointBucket = []byte("chan-index")
116

117
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
118
        // responsible for maintaining an index of zombie channels. Each entry
119
        // exists within the bucket as follows:
120
        //
121
        // maps: chanID -> pubKey1 || pubKey2
122
        //
123
        // The chanID represents the channel ID of the edge that is marked as a
124
        // zombie and is used as the key, which maps to the public keys of the
125
        // edge's participants.
126
        zombieBucket = []byte("zombie-index")
127

128
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
129
        // bucket responsible for maintaining an index of disabled edge
130
        // policies. Each entry exists within the bucket as follows:
131
        //
132
        // maps: <chanID><direction> -> []byte{}
133
        //
134
        // The chanID represents the channel ID of the edge and the direction is
135
        // one byte representing the direction of the edge. The main purpose of
136
        // this index is to allow pruning disabled channels in a fast way
137
        // without the need to iterate all over the graph.
138
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
139

140
        // graphMetaBucket is a top-level bucket which stores various meta-deta
141
        // related to the on-disk channel graph. Data stored in this bucket
142
        // includes the block to which the graph has been synced to, the total
143
        // number of channels, etc.
144
        graphMetaBucket = []byte("graph-meta")
145

146
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
147
        // a mapping from the block height to the hash for the blocks used to
148
        // prune the graph.
149
        // Once a new block is discovered, any channels that have been closed
150
        // (by spending the outpoint) can safely be removed from the graph, and
151
        // the block is added to the prune log. We need to keep such a log for
152
        // the case where a reorg happens, and we must "rewind" the state of the
153
        // graph by removing channels that were previously confirmed. In such a
154
        // case we'll remove all entries from the prune log with a block height
155
        // that no longer exists.
156
        pruneLogBucket = []byte("prune-log")
157

158
        // closedScidBucket is a top-level bucket that stores scids for
159
        // channels that we know to be closed. This is used so that we don't
160
        // need to perform expensive validation checks if we receive a channel
161
        // announcement for the channel again.
162
        //
163
        // maps: scid -> []byte{}
164
        closedScidBucket = []byte("closed-scid")
165
)
166

167
const (
168
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
169
        // we'll permit to be written to disk. We limit this as otherwise, it
170
        // would be possible for a node to create a ton of updates and slowly
171
        // fill our disk, and also waste bandwidth due to relaying.
172
        MaxAllowedExtraOpaqueBytes = 10000
173
)
174

175
// KVStore is a persistent, on-disk graph representation of the Lightning
176
// Network. This struct can be used to implement path finding algorithms on top
177
// of, and also to update a node's view based on information received from the
178
// p2p network. Internally, the graph is stored using a modified adjacency list
179
// representation with some added object interaction possible with each
180
// serialized edge/node. The graph is stored is directed, meaning that are two
181
// edges stored for each channel: an inbound/outbound edge for each node pair.
182
// Nodes, edges, and edge information can all be added to the graph
183
// independently. Edge removal results in the deletion of all edge information
184
// for that edge.
185
type KVStore struct {
186
        db kvdb.Backend
187

188
        // cacheMu guards all caches (rejectCache and chanCache). If
189
        // this mutex will be acquired at the same time as the DB mutex then
190
        // the cacheMu MUST be acquired first to prevent deadlock.
191
        cacheMu     sync.RWMutex
192
        rejectCache *rejectCache
193
        chanCache   *channelCache
194

195
        chanScheduler batch.Scheduler
196
        nodeScheduler batch.Scheduler
197
}
198

199
// NewKVStore allocates a new KVStore backed by a DB instance. The
200
// returned instance has its own unique reject cache and channel cache.
201
func NewKVStore(db kvdb.Backend, options ...KVStoreOptionModifier) (*KVStore,
202
        error) {
174✔
203

174✔
204
        opts := DefaultOptions()
174✔
205
        for _, o := range options {
174✔
206
                o(opts)
×
207
        }
×
208

209
        if !opts.NoMigration {
348✔
210
                if err := initKVStore(db); err != nil {
174✔
211
                        return nil, err
×
212
                }
×
213
        }
214

215
        g := &KVStore{
174✔
216
                db:          db,
174✔
217
                rejectCache: newRejectCache(opts.RejectCacheSize),
174✔
218
                chanCache:   newChannelCache(opts.ChannelCacheSize),
174✔
219
        }
174✔
220
        g.chanScheduler = batch.NewTimeScheduler(
174✔
221
                db, &g.cacheMu, opts.BatchCommitInterval,
174✔
222
        )
174✔
223
        g.nodeScheduler = batch.NewTimeScheduler(
174✔
224
                db, nil, opts.BatchCommitInterval,
174✔
225
        )
174✔
226

174✔
227
        return g, nil
174✔
228
}
229

230
// channelMapKey is the key structure used for storing channel edge policies.
231
type channelMapKey struct {
232
        nodeKey route.Vertex
233
        chanID  [8]byte
234
}
235

236
// getChannelMap loads all channel edge policies from the database and stores
237
// them in a map.
238
func (c *KVStore) getChannelMap(edges kvdb.RBucket) (
239
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
145✔
240

145✔
241
        // Create a map to store all channel edge policies.
145✔
242
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
145✔
243

145✔
244
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
1,720✔
245
                // Skip embedded buckets.
1,575✔
246
                if bytes.Equal(k, edgeIndexBucket) ||
1,575✔
247
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
1,575✔
248
                        bytes.Equal(k, zombieBucket) ||
1,575✔
249
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
1,575✔
250
                        bytes.Equal(k, channelPointBucket) {
2,160✔
251

585✔
252
                        return nil
585✔
253
                }
585✔
254

255
                // Validate key length.
256
                if len(k) != 33+8 {
990✔
257
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
258
                }
×
259

260
                var key channelMapKey
990✔
261
                copy(key.nodeKey[:], k[:33])
990✔
262
                copy(key.chanID[:], k[33:])
990✔
263

990✔
264
                // No need to deserialize unknown policy.
990✔
265
                if bytes.Equal(edgeBytes, unknownPolicy) {
990✔
266
                        return nil
×
267
                }
×
268

269
                edgeReader := bytes.NewReader(edgeBytes)
990✔
270
                edge, err := deserializeChanEdgePolicyRaw(
990✔
271
                        edgeReader,
990✔
272
                )
990✔
273

990✔
274
                switch {
990✔
275
                // If the db policy was missing an expected optional field, we
276
                // return nil as if the policy was unknown.
277
                case errors.Is(err, ErrEdgePolicyOptionalFieldNotFound):
×
278
                        return nil
×
279

280
                case err != nil:
×
281
                        return err
×
282
                }
283

284
                channelMap[key] = edge
990✔
285

990✔
286
                return nil
990✔
287
        })
288
        if err != nil {
145✔
289
                return nil, err
×
290
        }
×
291

292
        return channelMap, nil
145✔
293
}
294

295
var graphTopLevelBuckets = [][]byte{
296
        nodeBucket,
297
        edgeBucket,
298
        graphMetaBucket,
299
        closedScidBucket,
300
}
301

302
// Wipe completely deletes all saved state within all used buckets within the
303
// database. The deletion is done in a single transaction, therefore this
304
// operation is fully atomic.
305
func (c *KVStore) Wipe() error {
×
306
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
307
                for _, tlb := range graphTopLevelBuckets {
×
308
                        err := tx.DeleteTopLevelBucket(tlb)
×
309
                        if err != nil &&
×
310
                                !errors.Is(err, kvdb.ErrBucketNotFound) {
×
311

×
312
                                return err
×
313
                        }
×
314
                }
315

316
                return nil
×
317
        }, func() {})
×
318
        if err != nil {
×
319
                return err
×
320
        }
×
321

322
        return initKVStore(c.db)
×
323
}
324

325
// createChannelDB creates and initializes a fresh version of  In
326
// the case that the target path has not yet been created or doesn't yet exist,
327
// then the path is created. Additionally, all required top-level buckets used
328
// within the database are created.
329
func initKVStore(db kvdb.Backend) error {
174✔
330
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
348✔
331
                for _, tlb := range graphTopLevelBuckets {
870✔
332
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
696✔
333
                                return err
×
334
                        }
×
335
                }
336

337
                nodes := tx.ReadWriteBucket(nodeBucket)
174✔
338
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
174✔
339
                if err != nil {
174✔
340
                        return err
×
341
                }
×
342
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
174✔
343
                if err != nil {
174✔
344
                        return err
×
345
                }
×
346

347
                edges := tx.ReadWriteBucket(edgeBucket)
174✔
348
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
174✔
349
                if err != nil {
174✔
350
                        return err
×
351
                }
×
352
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
174✔
353
                if err != nil {
174✔
354
                        return err
×
355
                }
×
356
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
174✔
357
                if err != nil {
174✔
358
                        return err
×
359
                }
×
360
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
174✔
361
                if err != nil {
174✔
362
                        return err
×
363
                }
×
364

365
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
174✔
366
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
174✔
367

174✔
368
                return err
174✔
369
        }, func() {})
174✔
370
        if err != nil {
174✔
371
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
372
        }
×
373

374
        return nil
174✔
375
}
376

377
// AddrsForNode returns all known addresses for the target node public key that
378
// the graph DB is aware of. The returned boolean indicates if the given node is
379
// unknown to the graph DB or not.
380
//
381
// NOTE: this is part of the channeldb.AddrSource interface.
382
func (c *KVStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
383
        error) {
1✔
384

1✔
385
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
1✔
386
        if err != nil {
1✔
387
                return false, nil, err
×
388
        }
×
389

390
        node, err := c.FetchLightningNode(pubKey)
1✔
391
        // We don't consider it an error if the graph is unaware of the node.
1✔
392
        switch {
1✔
393
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
394
                return false, nil, err
×
395

396
        case errors.Is(err, ErrGraphNodeNotFound):
×
397
                return false, nil, nil
×
398
        }
399

400
        return true, node.Addresses, nil
1✔
401
}
402

403
// ForEachChannel iterates through all the channel edges stored within the
404
// graph and invokes the passed callback for each edge. The callback takes two
405
// edges as since this is a directed graph, both the in/out edges are visited.
406
// If the callback returns an error, then the transaction is aborted and the
407
// iteration stops early.
408
//
409
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
410
// for that particular channel edge routing policy will be passed into the
411
// callback.
412
func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
413
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
145✔
414

145✔
415
        return c.db.View(func(tx kvdb.RTx) error {
290✔
416
                edges := tx.ReadBucket(edgeBucket)
145✔
417
                if edges == nil {
145✔
418
                        return ErrGraphNoEdgesFound
×
419
                }
×
420

421
                // First, load all edges in memory indexed by node and channel
422
                // id.
423
                channelMap, err := c.getChannelMap(edges)
145✔
424
                if err != nil {
145✔
425
                        return err
×
426
                }
×
427

428
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
145✔
429
                if edgeIndex == nil {
145✔
430
                        return ErrGraphNoEdgesFound
×
431
                }
×
432

433
                // Load edge index, recombine each channel with the policies
434
                // loaded above and invoke the callback.
435
                return kvdb.ForAll(
145✔
436
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
640✔
437
                                var chanID [8]byte
495✔
438
                                copy(chanID[:], k)
495✔
439

495✔
440
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
495✔
441
                                info, err := deserializeChanEdgeInfo(
495✔
442
                                        edgeInfoReader,
495✔
443
                                )
495✔
444
                                if err != nil {
495✔
445
                                        return err
×
446
                                }
×
447

448
                                policy1 := channelMap[channelMapKey{
495✔
449
                                        nodeKey: info.NodeKey1Bytes,
495✔
450
                                        chanID:  chanID,
495✔
451
                                }]
495✔
452

495✔
453
                                policy2 := channelMap[channelMapKey{
495✔
454
                                        nodeKey: info.NodeKey2Bytes,
495✔
455
                                        chanID:  chanID,
495✔
456
                                }]
495✔
457

495✔
458
                                return cb(&info, policy1, policy2)
495✔
459
                        },
460
                )
461
        }, func() {})
145✔
462
}
463

464
// forEachNodeDirectedChannel iterates through all channels of a given node,
465
// executing the passed callback on the directed edge representing the channel
466
// and its incoming policy. If the callback returns an error, then the iteration
467
// is halted with the error propagated back up to the caller. An optional read
468
// transaction may be provided. If none is provided, a new one will be created.
469
//
470
// Unknown policies are passed into the callback as nil values.
471
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
472
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
242✔
473

242✔
474
        // Fallback that uses the database.
242✔
475
        toNodeCallback := func() route.Vertex {
374✔
476
                return node
132✔
477
        }
132✔
478
        toNodeFeatures, err := c.fetchNodeFeatures(tx, node)
242✔
479
        if err != nil {
242✔
480
                return err
×
481
        }
×
482

483
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
242✔
484
                p2 *models.ChannelEdgePolicy) error {
738✔
485

496✔
486
                var cachedInPolicy *models.CachedEdgePolicy
496✔
487
                if p2 != nil {
989✔
488
                        cachedInPolicy = models.NewCachedPolicy(p2)
493✔
489
                        cachedInPolicy.ToNodePubKey = toNodeCallback
493✔
490
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
493✔
491
                }
493✔
492

493
                var inboundFee lnwire.Fee
496✔
494
                if p1 != nil {
991✔
495
                        // Extract inbound fee. If there is a decoding error,
495✔
496
                        // skip this edge.
495✔
497
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
495✔
498
                        if err != nil {
496✔
499
                                return nil
1✔
500
                        }
1✔
501
                }
502

503
                directedChannel := &DirectedChannel{
495✔
504
                        ChannelID:    e.ChannelID,
495✔
505
                        IsNode1:      node == e.NodeKey1Bytes,
495✔
506
                        OtherNode:    e.NodeKey2Bytes,
495✔
507
                        Capacity:     e.Capacity,
495✔
508
                        OutPolicySet: p1 != nil,
495✔
509
                        InPolicy:     cachedInPolicy,
495✔
510
                        InboundFee:   inboundFee,
495✔
511
                }
495✔
512

495✔
513
                if node == e.NodeKey2Bytes {
743✔
514
                        directedChannel.OtherNode = e.NodeKey1Bytes
248✔
515
                }
248✔
516

517
                return cb(directedChannel)
495✔
518
        }
519

520
        return nodeTraversal(tx, node[:], c.db, dbCallback)
242✔
521
}
522

523
// fetchNodeFeatures returns the features of a given node. If no features are
524
// known for the node, an empty feature vector is returned. An optional read
525
// transaction may be provided. If none is provided, a new one will be created.
526
func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
527
        node route.Vertex) (*lnwire.FeatureVector, error) {
686✔
528

686✔
529
        // Fallback that uses the database.
686✔
530
        targetNode, err := c.FetchLightningNodeTx(tx, node)
686✔
531
        switch {
686✔
532
        // If the node exists and has features, return them directly.
533
        case err == nil:
675✔
534
                return targetNode.Features, nil
675✔
535

536
        // If we couldn't find a node announcement, populate a blank feature
537
        // vector.
538
        case errors.Is(err, ErrGraphNodeNotFound):
11✔
539
                return lnwire.EmptyFeatureVector(), nil
11✔
540

541
        // Otherwise, bubble the error up.
542
        default:
×
543
                return nil, err
×
544
        }
545
}
546

547
// ForEachNodeDirectedChannel iterates through all channels of a given node,
548
// executing the passed callback on the directed edge representing the channel
549
// and its incoming policy. If the callback returns an error, then the iteration
550
// is halted with the error propagated back up to the caller.
551
//
552
// Unknown policies are passed into the callback as nil values.
553
//
554
// NOTE: this is part of the graphdb.NodeTraverser interface.
555
func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
556
        cb func(channel *DirectedChannel) error) error {
3✔
557

3✔
558
        return c.forEachNodeDirectedChannel(nil, nodePub, cb)
3✔
559
}
3✔
560

561
// FetchNodeFeatures returns the features of the given node. If no features are
562
// known for the node, an empty feature vector is returned.
563
//
564
// NOTE: this is part of the graphdb.NodeTraverser interface.
565
func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
566
        *lnwire.FeatureVector, error) {
×
567

×
568
        return c.fetchNodeFeatures(nil, nodePub)
×
569
}
×
570

571
// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
572
// data to the call-back.
573
//
574
// NOTE: The callback contents MUST not be modified.
575
func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
576
        chans map[uint64]*DirectedChannel) error) error {
1✔
577

1✔
578
        // Otherwise call back to a version that uses the database directly.
1✔
579
        // We'll iterate over each node, then the set of channels for each
1✔
580
        // node, and construct a similar callback functiopn signature as the
1✔
581
        // main funcotin expects.
1✔
582
        return c.forEachNode(func(tx kvdb.RTx,
1✔
583
                node *models.LightningNode) error {
21✔
584

20✔
585
                channels := make(map[uint64]*DirectedChannel)
20✔
586

20✔
587
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
20✔
588
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
20✔
589
                                p1 *models.ChannelEdgePolicy,
20✔
590
                                p2 *models.ChannelEdgePolicy) error {
210✔
591

190✔
592
                                toNodeCallback := func() route.Vertex {
190✔
593
                                        return node.PubKeyBytes
×
594
                                }
×
595
                                toNodeFeatures, err := c.fetchNodeFeatures(
190✔
596
                                        tx, node.PubKeyBytes,
190✔
597
                                )
190✔
598
                                if err != nil {
190✔
599
                                        return err
×
600
                                }
×
601

602
                                var cachedInPolicy *models.CachedEdgePolicy
190✔
603
                                if p2 != nil {
380✔
604
                                        cachedInPolicy =
190✔
605
                                                models.NewCachedPolicy(p2)
190✔
606
                                        cachedInPolicy.ToNodePubKey =
190✔
607
                                                toNodeCallback
190✔
608
                                        cachedInPolicy.ToNodeFeatures =
190✔
609
                                                toNodeFeatures
190✔
610
                                }
190✔
611

612
                                directedChannel := &DirectedChannel{
190✔
613
                                        ChannelID: e.ChannelID,
190✔
614
                                        IsNode1: node.PubKeyBytes ==
190✔
615
                                                e.NodeKey1Bytes,
190✔
616
                                        OtherNode:    e.NodeKey2Bytes,
190✔
617
                                        Capacity:     e.Capacity,
190✔
618
                                        OutPolicySet: p1 != nil,
190✔
619
                                        InPolicy:     cachedInPolicy,
190✔
620
                                }
190✔
621

190✔
622
                                if node.PubKeyBytes == e.NodeKey2Bytes {
285✔
623
                                        directedChannel.OtherNode =
95✔
624
                                                e.NodeKey1Bytes
95✔
625
                                }
95✔
626

627
                                channels[e.ChannelID] = directedChannel
190✔
628

190✔
629
                                return nil
190✔
630
                        })
631
                if err != nil {
20✔
632
                        return err
×
633
                }
×
634

635
                return cb(node.PubKeyBytes, channels)
20✔
636
        })
637
}
638

639
// DisabledChannelIDs returns the channel ids of disabled channels.
640
// A channel is disabled when two of the associated ChanelEdgePolicies
641
// have their disabled bit on.
642
func (c *KVStore) DisabledChannelIDs() ([]uint64, error) {
6✔
643
        var disabledChanIDs []uint64
6✔
644
        var chanEdgeFound map[uint64]struct{}
6✔
645

6✔
646
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
647
                edges := tx.ReadBucket(edgeBucket)
6✔
648
                if edges == nil {
6✔
649
                        return ErrGraphNoEdgesFound
×
650
                }
×
651

652
                disabledEdgePolicyIndex := edges.NestedReadBucket(
6✔
653
                        disabledEdgePolicyBucket,
6✔
654
                )
6✔
655
                if disabledEdgePolicyIndex == nil {
7✔
656
                        return nil
1✔
657
                }
1✔
658

659
                // We iterate over all disabled policies and we add each channel
660
                // that has more than one disabled policy to disabledChanIDs
661
                // array.
662
                return disabledEdgePolicyIndex.ForEach(
5✔
663
                        func(k, v []byte) error {
16✔
664
                                chanID := byteOrder.Uint64(k[:8])
11✔
665
                                _, edgeFound := chanEdgeFound[chanID]
11✔
666
                                if edgeFound {
15✔
667
                                        delete(chanEdgeFound, chanID)
4✔
668
                                        disabledChanIDs = append(
4✔
669
                                                disabledChanIDs, chanID,
4✔
670
                                        )
4✔
671

4✔
672
                                        return nil
4✔
673
                                }
4✔
674

675
                                chanEdgeFound[chanID] = struct{}{}
7✔
676

7✔
677
                                return nil
7✔
678
                        },
679
                )
680
        }, func() {
6✔
681
                disabledChanIDs = nil
6✔
682
                chanEdgeFound = make(map[uint64]struct{})
6✔
683
        })
6✔
684
        if err != nil {
6✔
685
                return nil, err
×
686
        }
×
687

688
        return disabledChanIDs, nil
6✔
689
}
690

691
// ForEachNode iterates through all the stored vertices/nodes in the graph,
692
// executing the passed callback with each node encountered. If the callback
693
// returns an error, then the transaction is aborted and the iteration stops
694
// early. Any operations performed on the NodeTx passed to the call-back are
695
// executed under the same read transaction and so, methods on the NodeTx object
696
// _MUST_ only be called from within the call-back.
697
func (c *KVStore) ForEachNode(cb func(tx NodeRTx) error) error {
120✔
698
        return c.forEachNode(func(tx kvdb.RTx,
120✔
699
                node *models.LightningNode) error {
1,090✔
700

970✔
701
                return cb(newChanGraphNodeTx(tx, c, node))
970✔
702
        })
970✔
703
}
704

705
// forEachNode iterates through all the stored vertices/nodes in the graph,
706
// executing the passed callback with each node encountered. If the callback
707
// returns an error, then the transaction is aborted and the iteration stops
708
// early.
709
//
710
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
711
// traversal when graph gets mega.
712
func (c *KVStore) forEachNode(
713
        cb func(kvdb.RTx, *models.LightningNode) error) error {
129✔
714

129✔
715
        traversal := func(tx kvdb.RTx) error {
258✔
716
                // First grab the nodes bucket which stores the mapping from
129✔
717
                // pubKey to node information.
129✔
718
                nodes := tx.ReadBucket(nodeBucket)
129✔
719
                if nodes == nil {
129✔
720
                        return ErrGraphNotFound
×
721
                }
×
722

723
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,568✔
724
                        // If this is the source key, then we skip this
1,439✔
725
                        // iteration as the value for this key is a pubKey
1,439✔
726
                        // rather than raw node information.
1,439✔
727
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
1,700✔
728
                                return nil
261✔
729
                        }
261✔
730

731
                        nodeReader := bytes.NewReader(nodeBytes)
1,178✔
732
                        node, err := deserializeLightningNode(nodeReader)
1,178✔
733
                        if err != nil {
1,178✔
734
                                return err
×
735
                        }
×
736

737
                        // Execute the callback, the transaction will abort if
738
                        // this returns an error.
739
                        return cb(tx, &node)
1,178✔
740
                })
741
        }
742

743
        return kvdb.View(c.db, traversal, func() {})
258✔
744
}
745

746
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
747
// graph, executing the passed callback with each node encountered. If the
748
// callback returns an error, then the transaction is aborted and the iteration
749
// stops early.
750
func (c *KVStore) ForEachNodeCacheable(cb func(route.Vertex,
751
        *lnwire.FeatureVector) error) error {
142✔
752

142✔
753
        traversal := func(tx kvdb.RTx) error {
284✔
754
                // First grab the nodes bucket which stores the mapping from
142✔
755
                // pubKey to node information.
142✔
756
                nodes := tx.ReadBucket(nodeBucket)
142✔
757
                if nodes == nil {
142✔
758
                        return ErrGraphNotFound
×
759
                }
×
760

761
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
546✔
762
                        // If this is the source key, then we skip this
404✔
763
                        // iteration as the value for this key is a pubKey
404✔
764
                        // rather than raw node information.
404✔
765
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
688✔
766
                                return nil
284✔
767
                        }
284✔
768

769
                        nodeReader := bytes.NewReader(nodeBytes)
120✔
770
                        node, features, err := deserializeLightningNodeCacheable( //nolint:ll
120✔
771
                                nodeReader,
120✔
772
                        )
120✔
773
                        if err != nil {
120✔
774
                                return err
×
775
                        }
×
776

777
                        // Execute the callback, the transaction will abort if
778
                        // this returns an error.
779
                        return cb(node, features)
120✔
780
                })
781
        }
782

783
        return kvdb.View(c.db, traversal, func() {})
284✔
784
}
785

786
// SourceNode returns the source node of the graph. The source node is treated
787
// as the center node within a star-graph. This method may be used to kick off
788
// a path finding algorithm in order to explore the reachability of another
789
// node based off the source node.
790
func (c *KVStore) SourceNode() (*models.LightningNode, error) {
231✔
791
        var source *models.LightningNode
231✔
792
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
462✔
793
                // First grab the nodes bucket which stores the mapping from
231✔
794
                // pubKey to node information.
231✔
795
                nodes := tx.ReadBucket(nodeBucket)
231✔
796
                if nodes == nil {
231✔
797
                        return ErrGraphNotFound
×
798
                }
×
799

800
                node, err := c.sourceNode(nodes)
231✔
801
                if err != nil {
232✔
802
                        return err
1✔
803
                }
1✔
804
                source = node
230✔
805

230✔
806
                return nil
230✔
807
        }, func() {
231✔
808
                source = nil
231✔
809
        })
231✔
810
        if err != nil {
232✔
811
                return nil, err
1✔
812
        }
1✔
813

814
        return source, nil
230✔
815
}
816

817
// sourceNode uses an existing database transaction and returns the source node
818
// of the graph. The source node is treated as the center node within a
819
// star-graph. This method may be used to kick off a path finding algorithm in
820
// order to explore the reachability of another node based off the source node.
821
func (c *KVStore) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
822
        error) {
496✔
823

496✔
824
        selfPub := nodes.Get(sourceKey)
496✔
825
        if selfPub == nil {
497✔
826
                return nil, ErrSourceNodeNotSet
1✔
827
        }
1✔
828

829
        // With the pubKey of the source node retrieved, we're able to
830
        // fetch the full node information.
831
        node, err := fetchLightningNode(nodes, selfPub)
495✔
832
        if err != nil {
495✔
833
                return nil, err
×
834
        }
×
835

836
        return &node, nil
495✔
837
}
838

839
// SetSourceNode sets the source node within the graph database. The source
840
// node is to be used as the center of a star-graph within path finding
841
// algorithms.
842
func (c *KVStore) SetSourceNode(node *models.LightningNode) error {
117✔
843
        nodePubBytes := node.PubKeyBytes[:]
117✔
844

117✔
845
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
234✔
846
                // First grab the nodes bucket which stores the mapping from
117✔
847
                // pubKey to node information.
117✔
848
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
117✔
849
                if err != nil {
117✔
850
                        return err
×
851
                }
×
852

853
                // Next we create the mapping from source to the targeted
854
                // public key.
855
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
117✔
856
                        return err
×
857
                }
×
858

859
                // Finally, we commit the information of the lightning node
860
                // itself.
861
                return addLightningNode(tx, node)
117✔
862
        }, func() {})
117✔
863
}
864

865
// AddLightningNode adds a vertex/node to the graph database. If the node is not
866
// in the database from before, this will add a new, unconnected one to the
867
// graph. If it is present from before, this will update that node's
868
// information. Note that this method is expected to only be called to update an
869
// already present node from a node announcement, or to insert a node found in a
870
// channel update.
871
//
872
// TODO(roasbeef): also need sig of announcement.
873
func (c *KVStore) AddLightningNode(node *models.LightningNode,
874
        op ...batch.SchedulerOption) error {
799✔
875

799✔
876
        r := &batch.Request{
799✔
877
                Update: func(tx kvdb.RwTx) error {
1,598✔
878
                        return addLightningNode(tx, node)
799✔
879
                },
799✔
880
        }
881

882
        for _, f := range op {
799✔
883
                f(r)
×
884
        }
×
885

886
        return c.nodeScheduler.Execute(r)
799✔
887
}
888

889
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
990✔
890
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
990✔
891
        if err != nil {
990✔
892
                return err
×
893
        }
×
894

895
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
990✔
896
        if err != nil {
990✔
897
                return err
×
898
        }
×
899

900
        updateIndex, err := nodes.CreateBucketIfNotExists(
990✔
901
                nodeUpdateIndexBucket,
990✔
902
        )
990✔
903
        if err != nil {
990✔
904
                return err
×
905
        }
×
906

907
        return putLightningNode(nodes, aliases, updateIndex, node)
990✔
908
}
909

910
// LookupAlias attempts to return the alias as advertised by the target node.
911
// TODO(roasbeef): currently assumes that aliases are unique...
912
func (c *KVStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
2✔
913
        var alias string
2✔
914

2✔
915
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
916
                nodes := tx.ReadBucket(nodeBucket)
2✔
917
                if nodes == nil {
2✔
918
                        return ErrGraphNodesNotFound
×
919
                }
×
920

921
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
2✔
922
                if aliases == nil {
2✔
923
                        return ErrGraphNodesNotFound
×
924
                }
×
925

926
                nodePub := pub.SerializeCompressed()
2✔
927
                a := aliases.Get(nodePub)
2✔
928
                if a == nil {
3✔
929
                        return ErrNodeAliasNotFound
1✔
930
                }
1✔
931

932
                // TODO(roasbeef): should actually be using the utf-8
933
                // package...
934
                alias = string(a)
1✔
935

1✔
936
                return nil
1✔
937
        }, func() {
2✔
938
                alias = ""
2✔
939
        })
2✔
940
        if err != nil {
3✔
941
                return "", err
1✔
942
        }
1✔
943

944
        return alias, nil
1✔
945
}
946

947
// DeleteLightningNode starts a new database transaction to remove a vertex/node
948
// from the database according to the node's public key.
949
func (c *KVStore) DeleteLightningNode(nodePub route.Vertex) error {
3✔
950
        // TODO(roasbeef): ensure dangling edges are removed...
3✔
951
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
952
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
953
                if nodes == nil {
3✔
954
                        return ErrGraphNodeNotFound
×
955
                }
×
956

957
                return c.deleteLightningNode(nodes, nodePub[:])
3✔
958
        }, func() {})
3✔
959
}
960

961
// deleteLightningNode uses an existing database transaction to remove a
962
// vertex/node from the database according to the node's public key.
963
func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket,
964
        compressedPubKey []byte) error {
64✔
965

64✔
966
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
64✔
967
        if aliases == nil {
64✔
968
                return ErrGraphNodesNotFound
×
969
        }
×
970

971
        if err := aliases.Delete(compressedPubKey); err != nil {
64✔
972
                return err
×
973
        }
×
974

975
        // Before we delete the node, we'll fetch its current state so we can
976
        // determine when its last update was to clear out the node update
977
        // index.
978
        node, err := fetchLightningNode(nodes, compressedPubKey)
64✔
979
        if err != nil {
64✔
980
                return err
×
981
        }
×
982

983
        if err := nodes.Delete(compressedPubKey); err != nil {
64✔
984
                return err
×
985
        }
×
986

987
        // Finally, we'll delete the index entry for the node within the
988
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
989
        // need to track its last update.
990
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
64✔
991
        if nodeUpdateIndex == nil {
64✔
992
                return ErrGraphNodesNotFound
×
993
        }
×
994

995
        // In order to delete the entry, we'll need to reconstruct the key for
996
        // its last update.
997
        updateUnix := uint64(node.LastUpdate.Unix())
64✔
998
        var indexKey [8 + 33]byte
64✔
999
        byteOrder.PutUint64(indexKey[:8], updateUnix)
64✔
1000
        copy(indexKey[8:], compressedPubKey)
64✔
1001

64✔
1002
        return nodeUpdateIndex.Delete(indexKey[:])
64✔
1003
}
1004

1005
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1006
// undirected edge from the two target nodes are created. The information stored
1007
// denotes the static attributes of the channel, such as the channelID, the keys
1008
// involved in creation of the channel, and the set of features that the channel
1009
// supports. The chanPoint and chanID are used to uniquely identify the edge
1010
// globally within the database.
1011
func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
1012
        op ...batch.SchedulerOption) error {
1,713✔
1013

1,713✔
1014
        var alreadyExists bool
1,713✔
1015
        r := &batch.Request{
1,713✔
1016
                Reset: func() {
3,426✔
1017
                        alreadyExists = false
1,713✔
1018
                },
1,713✔
1019
                Update: func(tx kvdb.RwTx) error {
1,713✔
1020
                        err := c.addChannelEdge(tx, edge)
1,713✔
1021

1,713✔
1022
                        // Silence ErrEdgeAlreadyExist so that the batch can
1,713✔
1023
                        // succeed, but propagate the error via local state.
1,713✔
1024
                        if errors.Is(err, ErrEdgeAlreadyExist) {
1,947✔
1025
                                alreadyExists = true
234✔
1026
                                return nil
234✔
1027
                        }
234✔
1028

1029
                        return err
1,479✔
1030
                },
1031
                OnCommit: func(err error) error {
1,713✔
1032
                        switch {
1,713✔
1033
                        case err != nil:
×
1034
                                return err
×
1035
                        case alreadyExists:
234✔
1036
                                return ErrEdgeAlreadyExist
234✔
1037
                        default:
1,479✔
1038
                                c.rejectCache.remove(edge.ChannelID)
1,479✔
1039
                                c.chanCache.remove(edge.ChannelID)
1,479✔
1040
                                return nil
1,479✔
1041
                        }
1042
                },
1043
        }
1044

1045
        for _, f := range op {
1,713✔
1046
                if f == nil {
×
1047
                        return fmt.Errorf("nil scheduler option was used")
×
1048
                }
×
1049

1050
                f(r)
×
1051
        }
1052

1053
        return c.chanScheduler.Execute(r)
1,713✔
1054
}
1055

1056
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1057
// utilize an existing db transaction.
1058
func (c *KVStore) addChannelEdge(tx kvdb.RwTx,
1059
        edge *models.ChannelEdgeInfo) error {
1,713✔
1060

1,713✔
1061
        // Construct the channel's primary key which is the 8-byte channel ID.
1,713✔
1062
        var chanKey [8]byte
1,713✔
1063
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1,713✔
1064

1,713✔
1065
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,713✔
1066
        if err != nil {
1,713✔
1067
                return err
×
1068
        }
×
1069
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
1,713✔
1070
        if err != nil {
1,713✔
1071
                return err
×
1072
        }
×
1073
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
1,713✔
1074
        if err != nil {
1,713✔
1075
                return err
×
1076
        }
×
1077
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
1,713✔
1078
        if err != nil {
1,713✔
1079
                return err
×
1080
        }
×
1081

1082
        // First, attempt to check if this edge has already been created. If
1083
        // so, then we can exit early as this method is meant to be idempotent.
1084
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
1,947✔
1085
                return ErrEdgeAlreadyExist
234✔
1086
        }
234✔
1087

1088
        // Before we insert the channel into the database, we'll ensure that
1089
        // both nodes already exist in the channel graph. If either node
1090
        // doesn't, then we'll insert a "shell" node that just includes its
1091
        // public key, so subsequent validation and queries can work properly.
1092
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
1,479✔
1093
        switch {
1,479✔
1094
        case errors.Is(node1Err, ErrGraphNodeNotFound):
19✔
1095
                node1Shell := models.LightningNode{
19✔
1096
                        PubKeyBytes:          edge.NodeKey1Bytes,
19✔
1097
                        HaveNodeAnnouncement: false,
19✔
1098
                }
19✔
1099
                err := addLightningNode(tx, &node1Shell)
19✔
1100
                if err != nil {
19✔
1101
                        return fmt.Errorf("unable to create shell node "+
×
1102
                                "for: %x: %w", edge.NodeKey1Bytes, err)
×
1103
                }
×
1104
        case node1Err != nil:
×
1105
                return node1Err
×
1106
        }
1107

1108
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
1,479✔
1109
        switch {
1,479✔
1110
        case errors.Is(node2Err, ErrGraphNodeNotFound):
55✔
1111
                node2Shell := models.LightningNode{
55✔
1112
                        PubKeyBytes:          edge.NodeKey2Bytes,
55✔
1113
                        HaveNodeAnnouncement: false,
55✔
1114
                }
55✔
1115
                err := addLightningNode(tx, &node2Shell)
55✔
1116
                if err != nil {
55✔
1117
                        return fmt.Errorf("unable to create shell node "+
×
1118
                                "for: %x: %w", edge.NodeKey2Bytes, err)
×
1119
                }
×
1120
        case node2Err != nil:
×
1121
                return node2Err
×
1122
        }
1123

1124
        // If the edge hasn't been created yet, then we'll first add it to the
1125
        // edge index in order to associate the edge between two nodes and also
1126
        // store the static components of the channel.
1127
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
1,479✔
1128
                return err
×
1129
        }
×
1130

1131
        // Mark edge policies for both sides as unknown. This is to enable
1132
        // efficient incoming channel lookup for a node.
1133
        keys := []*[33]byte{
1,479✔
1134
                &edge.NodeKey1Bytes,
1,479✔
1135
                &edge.NodeKey2Bytes,
1,479✔
1136
        }
1,479✔
1137
        for _, key := range keys {
4,437✔
1138
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
2,958✔
1139
                if err != nil {
2,958✔
1140
                        return err
×
1141
                }
×
1142
        }
1143

1144
        // Finally we add it to the channel index which maps channel points
1145
        // (outpoints) to the shorter channel ID's.
1146
        var b bytes.Buffer
1,479✔
1147
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
1,479✔
1148
                return err
×
1149
        }
×
1150

1151
        return chanIndex.Put(b.Bytes(), chanKey[:])
1,479✔
1152
}
1153

1154
// HasChannelEdge returns true if the database knows of a channel edge with the
1155
// passed channel ID, and false otherwise. If an edge with that ID is found
1156
// within the graph, then two time stamps representing the last time the edge
1157
// was updated for both directed edges are returned along with the boolean. If
1158
// it is not found, then the zombie index is checked and its result is returned
1159
// as the second boolean.
1160
func (c *KVStore) HasChannelEdge(
1161
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
202✔
1162

202✔
1163
        var (
202✔
1164
                upd1Time time.Time
202✔
1165
                upd2Time time.Time
202✔
1166
                exists   bool
202✔
1167
                isZombie bool
202✔
1168
        )
202✔
1169

202✔
1170
        // We'll query the cache with the shared lock held to allow multiple
202✔
1171
        // readers to access values in the cache concurrently if they exist.
202✔
1172
        c.cacheMu.RLock()
202✔
1173
        if entry, ok := c.rejectCache.get(chanID); ok {
265✔
1174
                c.cacheMu.RUnlock()
63✔
1175
                upd1Time = time.Unix(entry.upd1Time, 0)
63✔
1176
                upd2Time = time.Unix(entry.upd2Time, 0)
63✔
1177
                exists, isZombie = entry.flags.unpack()
63✔
1178

63✔
1179
                return upd1Time, upd2Time, exists, isZombie, nil
63✔
1180
        }
63✔
1181
        c.cacheMu.RUnlock()
139✔
1182

139✔
1183
        c.cacheMu.Lock()
139✔
1184
        defer c.cacheMu.Unlock()
139✔
1185

139✔
1186
        // The item was not found with the shared lock, so we'll acquire the
139✔
1187
        // exclusive lock and check the cache again in case another method added
139✔
1188
        // the entry to the cache while no lock was held.
139✔
1189
        if entry, ok := c.rejectCache.get(chanID); ok {
144✔
1190
                upd1Time = time.Unix(entry.upd1Time, 0)
5✔
1191
                upd2Time = time.Unix(entry.upd2Time, 0)
5✔
1192
                exists, isZombie = entry.flags.unpack()
5✔
1193

5✔
1194
                return upd1Time, upd2Time, exists, isZombie, nil
5✔
1195
        }
5✔
1196

1197
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
268✔
1198
                edges := tx.ReadBucket(edgeBucket)
134✔
1199
                if edges == nil {
134✔
1200
                        return ErrGraphNoEdgesFound
×
1201
                }
×
1202
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
134✔
1203
                if edgeIndex == nil {
134✔
1204
                        return ErrGraphNoEdgesFound
×
1205
                }
×
1206

1207
                var channelID [8]byte
134✔
1208
                byteOrder.PutUint64(channelID[:], chanID)
134✔
1209

134✔
1210
                // If the edge doesn't exist, then we'll also check our zombie
134✔
1211
                // index.
134✔
1212
                if edgeIndex.Get(channelID[:]) == nil {
217✔
1213
                        exists = false
83✔
1214
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
83✔
1215
                        if zombieIndex != nil {
166✔
1216
                                isZombie, _, _ = isZombieEdge(
83✔
1217
                                        zombieIndex, chanID,
83✔
1218
                                )
83✔
1219
                        }
83✔
1220

1221
                        return nil
83✔
1222
                }
1223

1224
                exists = true
51✔
1225
                isZombie = false
51✔
1226

51✔
1227
                // If the channel has been found in the graph, then retrieve
51✔
1228
                // the edges itself so we can return the last updated
51✔
1229
                // timestamps.
51✔
1230
                nodes := tx.ReadBucket(nodeBucket)
51✔
1231
                if nodes == nil {
51✔
1232
                        return ErrGraphNodeNotFound
×
1233
                }
×
1234

1235
                e1, e2, err := fetchChanEdgePolicies(
51✔
1236
                        edgeIndex, edges, channelID[:],
51✔
1237
                )
51✔
1238
                if err != nil {
51✔
1239
                        return err
×
1240
                }
×
1241

1242
                // As we may have only one of the edges populated, only set the
1243
                // update time if the edge was found in the database.
1244
                if e1 != nil {
69✔
1245
                        upd1Time = e1.LastUpdate
18✔
1246
                }
18✔
1247
                if e2 != nil {
67✔
1248
                        upd2Time = e2.LastUpdate
16✔
1249
                }
16✔
1250

1251
                return nil
51✔
1252
        }, func() {}); err != nil {
134✔
1253
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1254
        }
×
1255

1256
        c.rejectCache.insert(chanID, rejectCacheEntry{
134✔
1257
                upd1Time: upd1Time.Unix(),
134✔
1258
                upd2Time: upd2Time.Unix(),
134✔
1259
                flags:    packRejectFlags(exists, isZombie),
134✔
1260
        })
134✔
1261

134✔
1262
        return upd1Time, upd2Time, exists, isZombie, nil
134✔
1263
}
1264

1265
// AddEdgeProof sets the proof of an existing edge in the graph database.
1266
func (c *KVStore) AddEdgeProof(chanID lnwire.ShortChannelID,
1267
        proof *models.ChannelAuthProof) error {
1✔
1268

1✔
1269
        // Construct the channel's primary key which is the 8-byte channel ID.
1✔
1270
        var chanKey [8]byte
1✔
1271
        binary.BigEndian.PutUint64(chanKey[:], chanID.ToUint64())
1✔
1272

1✔
1273
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
1274
                edges := tx.ReadWriteBucket(edgeBucket)
1✔
1275
                if edges == nil {
1✔
1276
                        return ErrEdgeNotFound
×
1277
                }
×
1278

1279
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
1✔
1280
                if edgeIndex == nil {
1✔
1281
                        return ErrEdgeNotFound
×
1282
                }
×
1283

1284
                edge, err := fetchChanEdgeInfo(edgeIndex, chanKey[:])
1✔
1285
                if err != nil {
1✔
1286
                        return err
×
1287
                }
×
1288

1289
                edge.AuthProof = proof
1✔
1290

1✔
1291
                return putChanEdgeInfo(edgeIndex, &edge, chanKey)
1✔
1292
        }, func() {})
1✔
1293
}
1294

1295
const (
1296
        // pruneTipBytes is the total size of the value which stores a prune
1297
        // entry of the graph in the prune log. The "prune tip" is the last
1298
        // entry in the prune log, and indicates if the channel graph is in
1299
        // sync with the current UTXO state. The structure of the value
1300
        // is: blockHash, taking 32 bytes total.
1301
        pruneTipBytes = 32
1302
)
1303

1304
// PruneGraph prunes newly closed channels from the channel graph in response
1305
// to a new block being solved on the network. Any transactions which spend the
1306
// funding output of any known channels within he graph will be deleted.
1307
// Additionally, the "prune tip", or the last block which has been used to
1308
// prune the graph is stored so callers can ensure the graph is fully in sync
1309
// with the current UTXO state. A slice of channels that have been closed by
1310
// the target block along with any pruned nodes are returned if the function
1311
// succeeds without error.
1312
func (c *KVStore) PruneGraph(spentOutputs []*wire.OutPoint,
1313
        blockHash *chainhash.Hash, blockHeight uint32) (
1314
        []*models.ChannelEdgeInfo, []route.Vertex, error) {
242✔
1315

242✔
1316
        c.cacheMu.Lock()
242✔
1317
        defer c.cacheMu.Unlock()
242✔
1318

242✔
1319
        var (
242✔
1320
                chansClosed []*models.ChannelEdgeInfo
242✔
1321
                prunedNodes []route.Vertex
242✔
1322
        )
242✔
1323

242✔
1324
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
484✔
1325
                // First grab the edges bucket which houses the information
242✔
1326
                // we'd like to delete
242✔
1327
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
242✔
1328
                if err != nil {
242✔
1329
                        return err
×
1330
                }
×
1331

1332
                // Next grab the two edge indexes which will also need to be
1333
                // updated.
1334
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
242✔
1335
                if err != nil {
242✔
1336
                        return err
×
1337
                }
×
1338
                chanIndex, err := edges.CreateBucketIfNotExists(
242✔
1339
                        channelPointBucket,
242✔
1340
                )
242✔
1341
                if err != nil {
242✔
1342
                        return err
×
1343
                }
×
1344
                nodes := tx.ReadWriteBucket(nodeBucket)
242✔
1345
                if nodes == nil {
242✔
1346
                        return ErrSourceNodeNotSet
×
1347
                }
×
1348
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
242✔
1349
                if err != nil {
242✔
1350
                        return err
×
1351
                }
×
1352

1353
                // For each of the outpoints that have been spent within the
1354
                // block, we attempt to delete them from the graph as if that
1355
                // outpoint was a channel, then it has now been closed.
1356
                for _, chanPoint := range spentOutputs {
378✔
1357
                        // TODO(roasbeef): load channel bloom filter, continue
136✔
1358
                        // if NOT if filter
136✔
1359

136✔
1360
                        var opBytes bytes.Buffer
136✔
1361
                        err := WriteOutpoint(&opBytes, chanPoint)
136✔
1362
                        if err != nil {
136✔
1363
                                return err
×
1364
                        }
×
1365

1366
                        // First attempt to see if the channel exists within
1367
                        // the database, if not, then we can exit early.
1368
                        chanID := chanIndex.Get(opBytes.Bytes())
136✔
1369
                        if chanID == nil {
249✔
1370
                                continue
113✔
1371
                        }
1372

1373
                        // Attempt to delete the channel, an ErrEdgeNotFound
1374
                        // will be returned if that outpoint isn't known to be
1375
                        // a channel. If no error is returned, then a channel
1376
                        // was successfully pruned.
1377
                        edgeInfo, err := c.delChannelEdgeUnsafe(
23✔
1378
                                edges, edgeIndex, chanIndex, zombieIndex,
23✔
1379
                                chanID, false, false,
23✔
1380
                        )
23✔
1381
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
23✔
1382
                                return err
×
1383
                        }
×
1384

1385
                        chansClosed = append(chansClosed, edgeInfo)
23✔
1386
                }
1387

1388
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
242✔
1389
                if err != nil {
242✔
1390
                        return err
×
1391
                }
×
1392

1393
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
242✔
1394
                        pruneLogBucket,
242✔
1395
                )
242✔
1396
                if err != nil {
242✔
1397
                        return err
×
1398
                }
×
1399

1400
                // With the graph pruned, add a new entry to the prune log,
1401
                // which can be used to check if the graph is fully synced with
1402
                // the current UTXO state.
1403
                var blockHeightBytes [4]byte
242✔
1404
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
242✔
1405

242✔
1406
                var newTip [pruneTipBytes]byte
242✔
1407
                copy(newTip[:], blockHash[:])
242✔
1408

242✔
1409
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
242✔
1410
                if err != nil {
242✔
1411
                        return err
×
1412
                }
×
1413

1414
                // Now that the graph has been pruned, we'll also attempt to
1415
                // prune any nodes that have had a channel closed within the
1416
                // latest block.
1417
                prunedNodes, err = c.pruneGraphNodes(nodes, edgeIndex)
242✔
1418

242✔
1419
                return err
242✔
1420
        }, func() {
242✔
1421
                chansClosed = nil
242✔
1422
                prunedNodes = nil
242✔
1423
        })
242✔
1424
        if err != nil {
242✔
1425
                return nil, nil, err
×
1426
        }
×
1427

1428
        for _, channel := range chansClosed {
265✔
1429
                c.rejectCache.remove(channel.ChannelID)
23✔
1430
                c.chanCache.remove(channel.ChannelID)
23✔
1431
        }
23✔
1432

1433
        return chansClosed, prunedNodes, nil
242✔
1434
}
1435

1436
// PruneGraphNodes is a garbage collection method which attempts to prune out
1437
// any nodes from the channel graph that are currently unconnected. This ensure
1438
// that we only maintain a graph of reachable nodes. In the event that a pruned
1439
// node gains more channels, it will be re-added back to the graph.
1440
func (c *KVStore) PruneGraphNodes() ([]route.Vertex, error) {
23✔
1441
        var prunedNodes []route.Vertex
23✔
1442
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
46✔
1443
                nodes := tx.ReadWriteBucket(nodeBucket)
23✔
1444
                if nodes == nil {
23✔
1445
                        return ErrGraphNodesNotFound
×
1446
                }
×
1447
                edges := tx.ReadWriteBucket(edgeBucket)
23✔
1448
                if edges == nil {
23✔
1449
                        return ErrGraphNotFound
×
1450
                }
×
1451
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
23✔
1452
                if edgeIndex == nil {
23✔
1453
                        return ErrGraphNoEdgesFound
×
1454
                }
×
1455

1456
                var err error
23✔
1457
                prunedNodes, err = c.pruneGraphNodes(nodes, edgeIndex)
23✔
1458
                if err != nil {
23✔
1459
                        return err
×
1460
                }
×
1461

1462
                return nil
23✔
1463
        }, func() {
23✔
1464
                prunedNodes = nil
23✔
1465
        })
23✔
1466

1467
        return prunedNodes, err
23✔
1468
}
1469

1470
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1471
// channel closed within the current block. If the node still has existing
1472
// channels in the graph, this will act as a no-op.
1473
func (c *KVStore) pruneGraphNodes(nodes kvdb.RwBucket,
1474
        edgeIndex kvdb.RwBucket) ([]route.Vertex, error) {
265✔
1475

265✔
1476
        log.Trace("Pruning nodes from graph with no open channels")
265✔
1477

265✔
1478
        // We'll retrieve the graph's source node to ensure we don't remove it
265✔
1479
        // even if it no longer has any open channels.
265✔
1480
        sourceNode, err := c.sourceNode(nodes)
265✔
1481
        if err != nil {
265✔
1482
                return nil, err
×
1483
        }
×
1484

1485
        // We'll use this map to keep count the number of references to a node
1486
        // in the graph. A node should only be removed once it has no more
1487
        // references in the graph.
1488
        nodeRefCounts := make(map[[33]byte]int)
265✔
1489
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,571✔
1490
                // If this is the source key, then we skip this
1,306✔
1491
                // iteration as the value for this key is a pubKey
1,306✔
1492
                // rather than raw node information.
1,306✔
1493
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
2,101✔
1494
                        return nil
795✔
1495
                }
795✔
1496

1497
                var nodePub [33]byte
511✔
1498
                copy(nodePub[:], pubKey)
511✔
1499
                nodeRefCounts[nodePub] = 0
511✔
1500

511✔
1501
                return nil
511✔
1502
        })
1503
        if err != nil {
265✔
1504
                return nil, err
×
1505
        }
×
1506

1507
        // To ensure we never delete the source node, we'll start off by
1508
        // bumping its ref count to 1.
1509
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
265✔
1510

265✔
1511
        // Next, we'll run through the edgeIndex which maps a channel ID to the
265✔
1512
        // edge info. We'll use this scan to populate our reference count map
265✔
1513
        // above.
265✔
1514
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
471✔
1515
                // The first 66 bytes of the edge info contain the pubkeys of
206✔
1516
                // the nodes that this edge attaches. We'll extract them, and
206✔
1517
                // add them to the ref count map.
206✔
1518
                var node1, node2 [33]byte
206✔
1519
                copy(node1[:], edgeInfoBytes[:33])
206✔
1520
                copy(node2[:], edgeInfoBytes[33:])
206✔
1521

206✔
1522
                // With the nodes extracted, we'll increase the ref count of
206✔
1523
                // each of the nodes.
206✔
1524
                nodeRefCounts[node1]++
206✔
1525
                nodeRefCounts[node2]++
206✔
1526

206✔
1527
                return nil
206✔
1528
        })
206✔
1529
        if err != nil {
265✔
1530
                return nil, err
×
1531
        }
×
1532

1533
        // Finally, we'll make a second pass over the set of nodes, and delete
1534
        // any nodes that have a ref count of zero.
1535
        var pruned []route.Vertex
265✔
1536
        for nodePubKey, refCount := range nodeRefCounts {
776✔
1537
                // If the ref count of the node isn't zero, then we can safely
511✔
1538
                // skip it as it still has edges to or from it within the
511✔
1539
                // graph.
511✔
1540
                if refCount != 0 {
961✔
1541
                        continue
450✔
1542
                }
1543

1544
                // If we reach this point, then there are no longer any edges
1545
                // that connect this node, so we can delete it.
1546
                err := c.deleteLightningNode(nodes, nodePubKey[:])
61✔
1547
                if err != nil {
61✔
1548
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1549
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1550

×
1551
                                log.Warnf("Unable to prune node %x from the "+
×
1552
                                        "graph: %v", nodePubKey, err)
×
1553
                                continue
×
1554
                        }
1555

1556
                        return nil, err
×
1557
                }
1558

1559
                log.Infof("Pruned unconnected node %x from channel graph",
61✔
1560
                        nodePubKey[:])
61✔
1561

61✔
1562
                pruned = append(pruned, nodePubKey)
61✔
1563
        }
1564

1565
        if len(pruned) > 0 {
310✔
1566
                log.Infof("Pruned %v unconnected nodes from the channel graph",
45✔
1567
                        len(pruned))
45✔
1568
        }
45✔
1569

1570
        return pruned, err
265✔
1571
}
1572

1573
// DisconnectBlockAtHeight is used to indicate that the block specified
1574
// by the passed height has been disconnected from the main chain. This
1575
// will "rewind" the graph back to the height below, deleting channels
1576
// that are no longer confirmed from the graph. The prune log will be
1577
// set to the last prune height valid for the remaining chain.
1578
// Channels that were removed from the graph resulting from the
1579
// disconnected block are returned.
1580
func (c *KVStore) DisconnectBlockAtHeight(height uint32) (
1581
        []*models.ChannelEdgeInfo, error) {
151✔
1582

151✔
1583
        // Every channel having a ShortChannelID starting at 'height'
151✔
1584
        // will no longer be confirmed.
151✔
1585
        startShortChanID := lnwire.ShortChannelID{
151✔
1586
                BlockHeight: height,
151✔
1587
        }
151✔
1588

151✔
1589
        // Delete everything after this height from the db up until the
151✔
1590
        // SCID alias range.
151✔
1591
        endShortChanID := aliasmgr.StartingAlias
151✔
1592

151✔
1593
        // The block height will be the 3 first bytes of the channel IDs.
151✔
1594
        var chanIDStart [8]byte
151✔
1595
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
151✔
1596
        var chanIDEnd [8]byte
151✔
1597
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
151✔
1598

151✔
1599
        c.cacheMu.Lock()
151✔
1600
        defer c.cacheMu.Unlock()
151✔
1601

151✔
1602
        // Keep track of the channels that are removed from the graph.
151✔
1603
        var removedChans []*models.ChannelEdgeInfo
151✔
1604

151✔
1605
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
302✔
1606
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
151✔
1607
                if err != nil {
151✔
1608
                        return err
×
1609
                }
×
1610
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
151✔
1611
                if err != nil {
151✔
1612
                        return err
×
1613
                }
×
1614
                chanIndex, err := edges.CreateBucketIfNotExists(
151✔
1615
                        channelPointBucket,
151✔
1616
                )
151✔
1617
                if err != nil {
151✔
1618
                        return err
×
1619
                }
×
1620
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
151✔
1621
                if err != nil {
151✔
1622
                        return err
×
1623
                }
×
1624

1625
                // Scan from chanIDStart to chanIDEnd, deleting every
1626
                // found edge.
1627
                // NOTE: we must delete the edges after the cursor loop, since
1628
                // modifying the bucket while traversing is not safe.
1629
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1630
                // so that the StartingAlias itself isn't deleted.
1631
                var keys [][]byte
151✔
1632
                cursor := edgeIndex.ReadWriteCursor()
151✔
1633

151✔
1634
                //nolint:ll
151✔
1635
                for k, _ := cursor.Seek(chanIDStart[:]); k != nil &&
151✔
1636
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, _ = cursor.Next() {
240✔
1637
                        keys = append(keys, k)
89✔
1638
                }
89✔
1639

1640
                for _, k := range keys {
240✔
1641
                        edgeInfo, err := c.delChannelEdgeUnsafe(
89✔
1642
                                edges, edgeIndex, chanIndex, zombieIndex,
89✔
1643
                                k, false, false,
89✔
1644
                        )
89✔
1645
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
89✔
1646
                                return err
×
1647
                        }
×
1648

1649
                        removedChans = append(removedChans, edgeInfo)
89✔
1650
                }
1651

1652
                // Delete all the entries in the prune log having a height
1653
                // greater or equal to the block disconnected.
1654
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
151✔
1655
                if err != nil {
151✔
1656
                        return err
×
1657
                }
×
1658

1659
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
151✔
1660
                        pruneLogBucket,
151✔
1661
                )
151✔
1662
                if err != nil {
151✔
1663
                        return err
×
1664
                }
×
1665

1666
                var pruneKeyStart [4]byte
151✔
1667
                byteOrder.PutUint32(pruneKeyStart[:], height)
151✔
1668

151✔
1669
                var pruneKeyEnd [4]byte
151✔
1670
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
151✔
1671

151✔
1672
                // To avoid modifying the bucket while traversing, we delete
151✔
1673
                // the keys in a second loop.
151✔
1674
                var pruneKeys [][]byte
151✔
1675
                pruneCursor := pruneBucket.ReadWriteCursor()
151✔
1676
                //nolint:ll
151✔
1677
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
151✔
1678
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
247✔
1679
                        pruneKeys = append(pruneKeys, k)
96✔
1680
                }
96✔
1681

1682
                for _, k := range pruneKeys {
247✔
1683
                        if err := pruneBucket.Delete(k); err != nil {
96✔
1684
                                return err
×
1685
                        }
×
1686
                }
1687

1688
                return nil
151✔
1689
        }, func() {
151✔
1690
                removedChans = nil
151✔
1691
        }); err != nil {
151✔
1692
                return nil, err
×
1693
        }
×
1694

1695
        for _, channel := range removedChans {
240✔
1696
                c.rejectCache.remove(channel.ChannelID)
89✔
1697
                c.chanCache.remove(channel.ChannelID)
89✔
1698
        }
89✔
1699

1700
        return removedChans, nil
151✔
1701
}
1702

1703
// PruneTip returns the block height and hash of the latest block that has been
1704
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1705
// to tell if the graph is currently in sync with the current best known UTXO
1706
// state.
1707
func (c *KVStore) PruneTip() (*chainhash.Hash, uint32, error) {
53✔
1708
        var (
53✔
1709
                tipHash   chainhash.Hash
53✔
1710
                tipHeight uint32
53✔
1711
        )
53✔
1712

53✔
1713
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
106✔
1714
                graphMeta := tx.ReadBucket(graphMetaBucket)
53✔
1715
                if graphMeta == nil {
53✔
1716
                        return ErrGraphNotFound
×
1717
                }
×
1718
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
53✔
1719
                if pruneBucket == nil {
53✔
1720
                        return ErrGraphNeverPruned
×
1721
                }
×
1722

1723
                pruneCursor := pruneBucket.ReadCursor()
53✔
1724

53✔
1725
                // The prune key with the largest block height will be our
53✔
1726
                // prune tip.
53✔
1727
                k, v := pruneCursor.Last()
53✔
1728
                if k == nil {
71✔
1729
                        return ErrGraphNeverPruned
18✔
1730
                }
18✔
1731

1732
                // Once we have the prune tip, the value will be the block hash,
1733
                // and the key the block height.
1734
                copy(tipHash[:], v)
35✔
1735
                tipHeight = byteOrder.Uint32(k)
35✔
1736

35✔
1737
                return nil
35✔
1738
        }, func() {})
53✔
1739
        if err != nil {
71✔
1740
                return nil, 0, err
18✔
1741
        }
18✔
1742

1743
        return &tipHash, tipHeight, nil
35✔
1744
}
1745

1746
// DeleteChannelEdges removes edges with the given channel IDs from the
1747
// database and marks them as zombies. This ensures that we're unable to re-add
1748
// it to our database once again. If an edge does not exist within the
1749
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1750
// true, then when we mark these edges as zombies, we'll set up the keys such
1751
// that we require the node that failed to send the fresh update to be the one
1752
// that resurrects the channel from its zombie state. The markZombie bool
1753
// denotes whether or not to mark the channel as a zombie.
1754
func (c *KVStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1755
        chanIDs ...uint64) ([]*models.ChannelEdgeInfo, error) {
148✔
1756

148✔
1757
        // TODO(roasbeef): possibly delete from node bucket if node has no more
148✔
1758
        // channels
148✔
1759
        // TODO(roasbeef): don't delete both edges?
148✔
1760

148✔
1761
        c.cacheMu.Lock()
148✔
1762
        defer c.cacheMu.Unlock()
148✔
1763

148✔
1764
        var infos []*models.ChannelEdgeInfo
148✔
1765
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
296✔
1766
                edges := tx.ReadWriteBucket(edgeBucket)
148✔
1767
                if edges == nil {
148✔
1768
                        return ErrEdgeNotFound
×
1769
                }
×
1770
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
148✔
1771
                if edgeIndex == nil {
148✔
1772
                        return ErrEdgeNotFound
×
1773
                }
×
1774
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
148✔
1775
                if chanIndex == nil {
148✔
1776
                        return ErrEdgeNotFound
×
1777
                }
×
1778
                nodes := tx.ReadWriteBucket(nodeBucket)
148✔
1779
                if nodes == nil {
148✔
1780
                        return ErrGraphNodeNotFound
×
1781
                }
×
1782
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
148✔
1783
                if err != nil {
148✔
1784
                        return err
×
1785
                }
×
1786

1787
                var rawChanID [8]byte
148✔
1788
                for _, chanID := range chanIDs {
233✔
1789
                        byteOrder.PutUint64(rawChanID[:], chanID)
85✔
1790
                        edgeInfo, err := c.delChannelEdgeUnsafe(
85✔
1791
                                edges, edgeIndex, chanIndex, zombieIndex,
85✔
1792
                                rawChanID[:], markZombie, strictZombiePruning,
85✔
1793
                        )
85✔
1794
                        if err != nil {
147✔
1795
                                return err
62✔
1796
                        }
62✔
1797

1798
                        infos = append(infos, edgeInfo)
23✔
1799
                }
1800

1801
                return nil
86✔
1802
        }, func() {
148✔
1803
                infos = nil
148✔
1804
        })
148✔
1805
        if err != nil {
210✔
1806
                return nil, err
62✔
1807
        }
62✔
1808

1809
        for _, chanID := range chanIDs {
109✔
1810
                c.rejectCache.remove(chanID)
23✔
1811
                c.chanCache.remove(chanID)
23✔
1812
        }
23✔
1813

1814
        return infos, nil
86✔
1815
}
1816

1817
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1818
// passed channel point (outpoint). If the passed channel doesn't exist within
1819
// the database, then ErrEdgeNotFound is returned.
1820
func (c *KVStore) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
1✔
1821
        var chanID uint64
1✔
1822
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2✔
1823
                var err error
1✔
1824
                chanID, err = getChanID(tx, chanPoint)
1✔
1825
                return err
1✔
1826
        }, func() {
2✔
1827
                chanID = 0
1✔
1828
        }); err != nil {
1✔
1829
                return 0, err
×
1830
        }
×
1831

1832
        return chanID, nil
1✔
1833
}
1834

1835
// getChanID returns the assigned channel ID for a given channel point.
1836
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
1✔
1837
        var b bytes.Buffer
1✔
1838
        if err := WriteOutpoint(&b, chanPoint); err != nil {
1✔
1839
                return 0, err
×
1840
        }
×
1841

1842
        edges := tx.ReadBucket(edgeBucket)
1✔
1843
        if edges == nil {
1✔
1844
                return 0, ErrGraphNoEdgesFound
×
1845
        }
×
1846
        chanIndex := edges.NestedReadBucket(channelPointBucket)
1✔
1847
        if chanIndex == nil {
1✔
1848
                return 0, ErrGraphNoEdgesFound
×
1849
        }
×
1850

1851
        chanIDBytes := chanIndex.Get(b.Bytes())
1✔
1852
        if chanIDBytes == nil {
1✔
1853
                return 0, ErrEdgeNotFound
×
1854
        }
×
1855

1856
        chanID := byteOrder.Uint64(chanIDBytes)
1✔
1857

1✔
1858
        return chanID, nil
1✔
1859
}
1860

1861
// TODO(roasbeef): allow updates to use Batch?
1862

1863
// HighestChanID returns the "highest" known channel ID in the channel graph.
1864
// This represents the "newest" channel from the PoV of the chain. This method
1865
// can be used by peers to quickly determine if they're graphs are in sync.
1866
func (c *KVStore) HighestChanID() (uint64, error) {
3✔
1867
        var cid uint64
3✔
1868

3✔
1869
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1870
                edges := tx.ReadBucket(edgeBucket)
3✔
1871
                if edges == nil {
3✔
1872
                        return ErrGraphNoEdgesFound
×
1873
                }
×
1874
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1875
                if edgeIndex == nil {
3✔
1876
                        return ErrGraphNoEdgesFound
×
1877
                }
×
1878

1879
                // In order to find the highest chan ID, we'll fetch a cursor
1880
                // and use that to seek to the "end" of our known rage.
1881
                cidCursor := edgeIndex.ReadCursor()
3✔
1882

3✔
1883
                lastChanID, _ := cidCursor.Last()
3✔
1884

3✔
1885
                // If there's no key, then this means that we don't actually
3✔
1886
                // know of any channels, so we'll return a predicable error.
3✔
1887
                if lastChanID == nil {
4✔
1888
                        return ErrGraphNoEdgesFound
1✔
1889
                }
1✔
1890

1891
                // Otherwise, we'll de serialize the channel ID and return it
1892
                // to the caller.
1893
                cid = byteOrder.Uint64(lastChanID)
2✔
1894

2✔
1895
                return nil
2✔
1896
        }, func() {
3✔
1897
                cid = 0
3✔
1898
        })
3✔
1899
        if err != nil && !errors.Is(err, ErrGraphNoEdgesFound) {
3✔
1900
                return 0, err
×
1901
        }
×
1902

1903
        return cid, nil
3✔
1904
}
1905

1906
// ChannelEdge represents the complete set of information for a channel edge in
1907
// the known channel graph. This struct couples the core information of the
1908
// edge as well as each of the known advertised edge policies.
1909
type ChannelEdge struct {
1910
        // Info contains all the static information describing the channel.
1911
        Info *models.ChannelEdgeInfo
1912

1913
        // Policy1 points to the "first" edge policy of the channel containing
1914
        // the dynamic information required to properly route through the edge.
1915
        Policy1 *models.ChannelEdgePolicy
1916

1917
        // Policy2 points to the "second" edge policy of the channel containing
1918
        // the dynamic information required to properly route through the edge.
1919
        Policy2 *models.ChannelEdgePolicy
1920

1921
        // Node1 is "node 1" in the channel. This is the node that would have
1922
        // produced Policy1 if it exists.
1923
        Node1 *models.LightningNode
1924

1925
        // Node2 is "node 2" in the channel. This is the node that would have
1926
        // produced Policy2 if it exists.
1927
        Node2 *models.LightningNode
1928
}
1929

1930
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1931
// one edge that has an update timestamp within the specified horizon.
1932
func (c *KVStore) ChanUpdatesInHorizon(startTime,
1933
        endTime time.Time) ([]ChannelEdge, error) {
146✔
1934

146✔
1935
        // To ensure we don't return duplicate ChannelEdges, we'll use an
146✔
1936
        // additional map to keep track of the edges already seen to prevent
146✔
1937
        // re-adding it.
146✔
1938
        var edgesSeen map[uint64]struct{}
146✔
1939
        var edgesToCache map[uint64]ChannelEdge
146✔
1940
        var edgesInHorizon []ChannelEdge
146✔
1941

146✔
1942
        c.cacheMu.Lock()
146✔
1943
        defer c.cacheMu.Unlock()
146✔
1944

146✔
1945
        var hits int
146✔
1946
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
292✔
1947
                edges := tx.ReadBucket(edgeBucket)
146✔
1948
                if edges == nil {
146✔
1949
                        return ErrGraphNoEdgesFound
×
1950
                }
×
1951
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
146✔
1952
                if edgeIndex == nil {
146✔
1953
                        return ErrGraphNoEdgesFound
×
1954
                }
×
1955
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
146✔
1956
                if edgeUpdateIndex == nil {
146✔
1957
                        return ErrGraphNoEdgesFound
×
1958
                }
×
1959

1960
                nodes := tx.ReadBucket(nodeBucket)
146✔
1961
                if nodes == nil {
146✔
1962
                        return ErrGraphNodesNotFound
×
1963
                }
×
1964

1965
                // We'll now obtain a cursor to perform a range query within
1966
                // the index to find all channels within the horizon.
1967
                updateCursor := edgeUpdateIndex.ReadCursor()
146✔
1968

146✔
1969
                var startTimeBytes, endTimeBytes [8 + 8]byte
146✔
1970
                byteOrder.PutUint64(
146✔
1971
                        startTimeBytes[:8], uint64(startTime.Unix()),
146✔
1972
                )
146✔
1973
                byteOrder.PutUint64(
146✔
1974
                        endTimeBytes[:8], uint64(endTime.Unix()),
146✔
1975
                )
146✔
1976

146✔
1977
                // With our start and end times constructed, we'll step through
146✔
1978
                // the index collecting the info and policy of each update of
146✔
1979
                // each channel that has a last update within the time range.
146✔
1980
                //
146✔
1981
                //nolint:ll
146✔
1982
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
146✔
1983
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
192✔
1984
                        // We have a new eligible entry, so we'll slice of the
46✔
1985
                        // chan ID so we can query it in the DB.
46✔
1986
                        chanID := indexKey[8:]
46✔
1987

46✔
1988
                        // If we've already retrieved the info and policies for
46✔
1989
                        // this edge, then we can skip it as we don't need to do
46✔
1990
                        // so again.
46✔
1991
                        chanIDInt := byteOrder.Uint64(chanID)
46✔
1992
                        if _, ok := edgesSeen[chanIDInt]; ok {
65✔
1993
                                continue
19✔
1994
                        }
1995

1996
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
36✔
1997
                                hits++
9✔
1998
                                edgesSeen[chanIDInt] = struct{}{}
9✔
1999
                                edgesInHorizon = append(edgesInHorizon, channel)
9✔
2000

9✔
2001
                                continue
9✔
2002
                        }
2003

2004
                        // First, we'll fetch the static edge information.
2005
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
18✔
2006
                        if err != nil {
18✔
2007
                                chanID := byteOrder.Uint64(chanID)
×
2008
                                return fmt.Errorf("unable to fetch info for "+
×
2009
                                        "edge with chan_id=%v: %v", chanID, err)
×
2010
                        }
×
2011

2012
                        // With the static information obtained, we'll now
2013
                        // fetch the dynamic policy info.
2014
                        edge1, edge2, err := fetchChanEdgePolicies(
18✔
2015
                                edgeIndex, edges, chanID,
18✔
2016
                        )
18✔
2017
                        if err != nil {
18✔
2018
                                chanID := byteOrder.Uint64(chanID)
×
2019
                                return fmt.Errorf("unable to fetch policies "+
×
2020
                                        "for edge with chan_id=%v: %v", chanID,
×
2021
                                        err)
×
2022
                        }
×
2023

2024
                        node1, err := fetchLightningNode(
18✔
2025
                                nodes, edgeInfo.NodeKey1Bytes[:],
18✔
2026
                        )
18✔
2027
                        if err != nil {
18✔
2028
                                return err
×
2029
                        }
×
2030

2031
                        node2, err := fetchLightningNode(
18✔
2032
                                nodes, edgeInfo.NodeKey2Bytes[:],
18✔
2033
                        )
18✔
2034
                        if err != nil {
18✔
2035
                                return err
×
2036
                        }
×
2037

2038
                        // Finally, we'll collate this edge with the rest of
2039
                        // edges to be returned.
2040
                        edgesSeen[chanIDInt] = struct{}{}
18✔
2041
                        channel := ChannelEdge{
18✔
2042
                                Info:    &edgeInfo,
18✔
2043
                                Policy1: edge1,
18✔
2044
                                Policy2: edge2,
18✔
2045
                                Node1:   &node1,
18✔
2046
                                Node2:   &node2,
18✔
2047
                        }
18✔
2048
                        edgesInHorizon = append(edgesInHorizon, channel)
18✔
2049
                        edgesToCache[chanIDInt] = channel
18✔
2050
                }
2051

2052
                return nil
146✔
2053
        }, func() {
146✔
2054
                edgesSeen = make(map[uint64]struct{})
146✔
2055
                edgesToCache = make(map[uint64]ChannelEdge)
146✔
2056
                edgesInHorizon = nil
146✔
2057
        })
146✔
2058
        switch {
146✔
2059
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2060
                fallthrough
×
2061
        case errors.Is(err, ErrGraphNodesNotFound):
×
2062
                break
×
2063

2064
        case err != nil:
×
2065
                return nil, err
×
2066
        }
2067

2068
        // Insert any edges loaded from disk into the cache.
2069
        for chanid, channel := range edgesToCache {
164✔
2070
                c.chanCache.insert(chanid, channel)
18✔
2071
        }
18✔
2072

2073
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
146✔
2074
                float64(hits)/float64(len(edgesInHorizon)), hits,
146✔
2075
                len(edgesInHorizon))
146✔
2076

146✔
2077
        return edgesInHorizon, nil
146✔
2078
}
2079

2080
// NodeUpdatesInHorizon returns all the known lightning node which have an
2081
// update timestamp within the passed range. This method can be used by two
2082
// nodes to quickly determine if they have the same set of up to date node
2083
// announcements.
2084
func (c *KVStore) NodeUpdatesInHorizon(startTime,
2085
        endTime time.Time) ([]models.LightningNode, error) {
8✔
2086

8✔
2087
        var nodesInHorizon []models.LightningNode
8✔
2088

8✔
2089
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
16✔
2090
                nodes := tx.ReadBucket(nodeBucket)
8✔
2091
                if nodes == nil {
8✔
2092
                        return ErrGraphNodesNotFound
×
2093
                }
×
2094

2095
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
8✔
2096
                if nodeUpdateIndex == nil {
8✔
2097
                        return ErrGraphNodesNotFound
×
2098
                }
×
2099

2100
                // We'll now obtain a cursor to perform a range query within
2101
                // the index to find all node announcements within the horizon.
2102
                updateCursor := nodeUpdateIndex.ReadCursor()
8✔
2103

8✔
2104
                var startTimeBytes, endTimeBytes [8 + 33]byte
8✔
2105
                byteOrder.PutUint64(
8✔
2106
                        startTimeBytes[:8], uint64(startTime.Unix()),
8✔
2107
                )
8✔
2108
                byteOrder.PutUint64(
8✔
2109
                        endTimeBytes[:8], uint64(endTime.Unix()),
8✔
2110
                )
8✔
2111

8✔
2112
                // With our start and end times constructed, we'll step through
8✔
2113
                // the index collecting info for each node within the time
8✔
2114
                // range.
8✔
2115
                //
8✔
2116
                //nolint:ll
8✔
2117
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
8✔
2118
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
37✔
2119
                        nodePub := indexKey[8:]
29✔
2120
                        node, err := fetchLightningNode(nodes, nodePub)
29✔
2121
                        if err != nil {
29✔
2122
                                return err
×
2123
                        }
×
2124

2125
                        nodesInHorizon = append(nodesInHorizon, node)
29✔
2126
                }
2127

2128
                return nil
8✔
2129
        }, func() {
8✔
2130
                nodesInHorizon = nil
8✔
2131
        })
8✔
2132
        switch {
8✔
2133
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2134
                fallthrough
×
2135
        case errors.Is(err, ErrGraphNodesNotFound):
×
2136
                break
×
2137

2138
        case err != nil:
×
2139
                return nil, err
×
2140
        }
2141

2142
        return nodesInHorizon, nil
8✔
2143
}
2144

2145
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2146
// ID's that we don't know and are not known zombies of the passed set. In other
2147
// words, we perform a set difference of our set of chan ID's and the ones
2148
// passed in. This method can be used by callers to determine the set of
2149
// channels another peer knows of that we don't. The ChannelUpdateInfos for the
2150
// known zombies is also returned.
2151
func (c *KVStore) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo) ([]uint64,
2152
        []ChannelUpdateInfo, error) {
126✔
2153

126✔
2154
        var (
126✔
2155
                newChanIDs   []uint64
126✔
2156
                knownZombies []ChannelUpdateInfo
126✔
2157
        )
126✔
2158

126✔
2159
        c.cacheMu.Lock()
126✔
2160
        defer c.cacheMu.Unlock()
126✔
2161

126✔
2162
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
252✔
2163
                edges := tx.ReadBucket(edgeBucket)
126✔
2164
                if edges == nil {
126✔
2165
                        return ErrGraphNoEdgesFound
×
2166
                }
×
2167
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
126✔
2168
                if edgeIndex == nil {
126✔
2169
                        return ErrGraphNoEdgesFound
×
2170
                }
×
2171

2172
                // Fetch the zombie index, it may not exist if no edges have
2173
                // ever been marked as zombies. If the index has been
2174
                // initialized, we will use it later to skip known zombie edges.
2175
                zombieIndex := edges.NestedReadBucket(zombieBucket)
126✔
2176

126✔
2177
                // We'll run through the set of chanIDs and collate only the
126✔
2178
                // set of channel that are unable to be found within our db.
126✔
2179
                var cidBytes [8]byte
126✔
2180
                for _, info := range chansInfo {
228✔
2181
                        scid := info.ShortChannelID.ToUint64()
102✔
2182
                        byteOrder.PutUint64(cidBytes[:], scid)
102✔
2183

102✔
2184
                        // If the edge is already known, skip it.
102✔
2185
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
121✔
2186
                                continue
19✔
2187
                        }
2188

2189
                        // If the edge is a known zombie, skip it.
2190
                        if zombieIndex != nil {
166✔
2191
                                isZombie, _, _ := isZombieEdge(
83✔
2192
                                        zombieIndex, scid,
83✔
2193
                                )
83✔
2194

83✔
2195
                                if isZombie {
130✔
2196
                                        knownZombies = append(
47✔
2197
                                                knownZombies, info,
47✔
2198
                                        )
47✔
2199

47✔
2200
                                        continue
47✔
2201
                                }
2202
                        }
2203

2204
                        newChanIDs = append(newChanIDs, scid)
36✔
2205
                }
2206

2207
                return nil
126✔
2208
        }, func() {
126✔
2209
                newChanIDs = nil
126✔
2210
                knownZombies = nil
126✔
2211
        })
126✔
2212
        switch {
126✔
2213
        // If we don't know of any edges yet, then we'll return the entire set
2214
        // of chan IDs specified.
2215
        case errors.Is(err, ErrGraphNoEdgesFound):
×
2216
                ogChanIDs := make([]uint64, len(chansInfo))
×
2217
                for i, info := range chansInfo {
×
2218
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2219
                }
×
2220

2221
                return ogChanIDs, nil, nil
×
2222

2223
        case err != nil:
×
2224
                return nil, nil, err
×
2225
        }
2226

2227
        return newChanIDs, knownZombies, nil
126✔
2228
}
2229

2230
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2231
// latest received channel updates for the channel.
2232
type ChannelUpdateInfo struct {
2233
        // ShortChannelID is the SCID identifier of the channel.
2234
        ShortChannelID lnwire.ShortChannelID
2235

2236
        // Node1UpdateTimestamp is the timestamp of the latest received update
2237
        // from the node 1 channel peer. This will be set to zero time if no
2238
        // update has yet been received from this node.
2239
        Node1UpdateTimestamp time.Time
2240

2241
        // Node2UpdateTimestamp is the timestamp of the latest received update
2242
        // from the node 2 channel peer. This will be set to zero time if no
2243
        // update has yet been received from this node.
2244
        Node2UpdateTimestamp time.Time
2245
}
2246

2247
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2248
// timestamps with zero seconds unix timestamp which equals
2249
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2250
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2251
        node2Timestamp time.Time) ChannelUpdateInfo {
196✔
2252

196✔
2253
        chanInfo := ChannelUpdateInfo{
196✔
2254
                ShortChannelID:       scid,
196✔
2255
                Node1UpdateTimestamp: node1Timestamp,
196✔
2256
                Node2UpdateTimestamp: node2Timestamp,
196✔
2257
        }
196✔
2258

196✔
2259
        if node1Timestamp.IsZero() {
382✔
2260
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
186✔
2261
        }
186✔
2262

2263
        if node2Timestamp.IsZero() {
382✔
2264
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
186✔
2265
        }
186✔
2266

2267
        return chanInfo
196✔
2268
}
2269

2270
// BlockChannelRange represents a range of channels for a given block height.
2271
type BlockChannelRange struct {
2272
        // Height is the height of the block all of the channels below were
2273
        // included in.
2274
        Height uint32
2275

2276
        // Channels is the list of channels identified by their short ID
2277
        // representation known to us that were included in the block height
2278
        // above. The list may include channel update timestamp information if
2279
        // requested.
2280
        Channels []ChannelUpdateInfo
2281
}
2282

2283
// FilterChannelRange returns the channel ID's of all known channels which were
2284
// mined in a block height within the passed range. The channel IDs are grouped
2285
// by their common block height. This method can be used to quickly share with a
2286
// peer the set of channels we know of within a particular range to catch them
2287
// up after a period of time offline. If withTimestamps is true then the
2288
// timestamp info of the latest received channel update messages of the channel
2289
// will be included in the response.
2290
func (c *KVStore) FilterChannelRange(startHeight,
2291
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
11✔
2292

11✔
2293
        startChanID := &lnwire.ShortChannelID{
11✔
2294
                BlockHeight: startHeight,
11✔
2295
        }
11✔
2296

11✔
2297
        endChanID := lnwire.ShortChannelID{
11✔
2298
                BlockHeight: endHeight,
11✔
2299
                TxIndex:     math.MaxUint32 & 0x00ffffff,
11✔
2300
                TxPosition:  math.MaxUint16,
11✔
2301
        }
11✔
2302

11✔
2303
        // As we need to perform a range scan, we'll convert the starting and
11✔
2304
        // ending height to their corresponding values when encoded using short
11✔
2305
        // channel ID's.
11✔
2306
        var chanIDStart, chanIDEnd [8]byte
11✔
2307
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
11✔
2308
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
11✔
2309

11✔
2310
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
11✔
2311
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
2312
                edges := tx.ReadBucket(edgeBucket)
11✔
2313
                if edges == nil {
11✔
2314
                        return ErrGraphNoEdgesFound
×
2315
                }
×
2316
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
2317
                if edgeIndex == nil {
11✔
2318
                        return ErrGraphNoEdgesFound
×
2319
                }
×
2320

2321
                cursor := edgeIndex.ReadCursor()
11✔
2322

11✔
2323
                // We'll now iterate through the database, and find each
11✔
2324
                // channel ID that resides within the specified range.
11✔
2325
                //
11✔
2326
                //nolint:ll
11✔
2327
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
11✔
2328
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
55✔
2329
                        // Don't send alias SCIDs during gossip sync.
44✔
2330
                        edgeReader := bytes.NewReader(v)
44✔
2331
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
44✔
2332
                        if err != nil {
44✔
2333
                                return err
×
2334
                        }
×
2335

2336
                        if edgeInfo.AuthProof == nil {
44✔
2337
                                continue
×
2338
                        }
2339

2340
                        // This channel ID rests within the target range, so
2341
                        // we'll add it to our returned set.
2342
                        rawCid := byteOrder.Uint64(k)
44✔
2343
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
44✔
2344

44✔
2345
                        chanInfo := NewChannelUpdateInfo(
44✔
2346
                                cid, time.Time{}, time.Time{},
44✔
2347
                        )
44✔
2348

44✔
2349
                        if !withTimestamps {
66✔
2350
                                channelsPerBlock[cid.BlockHeight] = append(
22✔
2351
                                        channelsPerBlock[cid.BlockHeight],
22✔
2352
                                        chanInfo,
22✔
2353
                                )
22✔
2354

22✔
2355
                                continue
22✔
2356
                        }
2357

2358
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
22✔
2359

22✔
2360
                        rawPolicy := edges.Get(node1Key)
22✔
2361
                        if len(rawPolicy) != 0 {
28✔
2362
                                r := bytes.NewReader(rawPolicy)
6✔
2363

6✔
2364
                                edge, err := deserializeChanEdgePolicyRaw(r)
6✔
2365
                                if err != nil && !errors.Is(
6✔
2366
                                        err, ErrEdgePolicyOptionalFieldNotFound,
6✔
2367
                                ) {
6✔
2368

×
2369
                                        return err
×
2370
                                }
×
2371

2372
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
6✔
2373
                        }
2374

2375
                        rawPolicy = edges.Get(node2Key)
22✔
2376
                        if len(rawPolicy) != 0 {
33✔
2377
                                r := bytes.NewReader(rawPolicy)
11✔
2378

11✔
2379
                                edge, err := deserializeChanEdgePolicyRaw(r)
11✔
2380
                                if err != nil && !errors.Is(
11✔
2381
                                        err, ErrEdgePolicyOptionalFieldNotFound,
11✔
2382
                                ) {
11✔
2383

×
2384
                                        return err
×
2385
                                }
×
2386

2387
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
11✔
2388
                        }
2389

2390
                        channelsPerBlock[cid.BlockHeight] = append(
22✔
2391
                                channelsPerBlock[cid.BlockHeight], chanInfo,
22✔
2392
                        )
22✔
2393
                }
2394

2395
                return nil
11✔
2396
        }, func() {
11✔
2397
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
11✔
2398
        })
11✔
2399

2400
        switch {
11✔
2401
        // If we don't know of any channels yet, then there's nothing to
2402
        // filter, so we'll return an empty slice.
2403
        case errors.Is(err, ErrGraphNoEdgesFound) || len(channelsPerBlock) == 0:
3✔
2404
                return nil, nil
3✔
2405

2406
        case err != nil:
×
2407
                return nil, err
×
2408
        }
2409

2410
        // Return the channel ranges in ascending block height order.
2411
        blocks := make([]uint32, 0, len(channelsPerBlock))
8✔
2412
        for block := range channelsPerBlock {
30✔
2413
                blocks = append(blocks, block)
22✔
2414
        }
22✔
2415
        sort.Slice(blocks, func(i, j int) bool {
34✔
2416
                return blocks[i] < blocks[j]
26✔
2417
        })
26✔
2418

2419
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
8✔
2420
        for _, block := range blocks {
30✔
2421
                channelRanges = append(channelRanges, BlockChannelRange{
22✔
2422
                        Height:   block,
22✔
2423
                        Channels: channelsPerBlock[block],
22✔
2424
                })
22✔
2425
        }
22✔
2426

2427
        return channelRanges, nil
8✔
2428
}
2429

2430
// FetchChanInfos returns the set of channel edges that correspond to the passed
2431
// channel ID's. If an edge is the query is unknown to the database, it will
2432
// skipped and the result will contain only those edges that exist at the time
2433
// of the query. This can be used to respond to peer queries that are seeking to
2434
// fill in gaps in their view of the channel graph.
2435
func (c *KVStore) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
4✔
2436
        return c.fetchChanInfos(nil, chanIDs)
4✔
2437
}
4✔
2438

2439
// fetchChanInfos returns the set of channel edges that correspond to the passed
2440
// channel ID's. If an edge is the query is unknown to the database, it will
2441
// skipped and the result will contain only those edges that exist at the time
2442
// of the query. This can be used to respond to peer queries that are seeking to
2443
// fill in gaps in their view of the channel graph.
2444
//
2445
// NOTE: An optional transaction may be provided. If none is provided, then a
2446
// new one will be created.
2447
func (c *KVStore) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2448
        []ChannelEdge, error) {
4✔
2449
        // TODO(roasbeef): sort cids?
4✔
2450

4✔
2451
        var (
4✔
2452
                chanEdges []ChannelEdge
4✔
2453
                cidBytes  [8]byte
4✔
2454
        )
4✔
2455

4✔
2456
        fetchChanInfos := func(tx kvdb.RTx) error {
8✔
2457
                edges := tx.ReadBucket(edgeBucket)
4✔
2458
                if edges == nil {
4✔
2459
                        return ErrGraphNoEdgesFound
×
2460
                }
×
2461
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
4✔
2462
                if edgeIndex == nil {
4✔
2463
                        return ErrGraphNoEdgesFound
×
2464
                }
×
2465
                nodes := tx.ReadBucket(nodeBucket)
4✔
2466
                if nodes == nil {
4✔
2467
                        return ErrGraphNotFound
×
2468
                }
×
2469

2470
                for _, cid := range chanIDs {
15✔
2471
                        byteOrder.PutUint64(cidBytes[:], cid)
11✔
2472

11✔
2473
                        // First, we'll fetch the static edge information. If
11✔
2474
                        // the edge is unknown, we will skip the edge and
11✔
2475
                        // continue gathering all known edges.
11✔
2476
                        edgeInfo, err := fetchChanEdgeInfo(
11✔
2477
                                edgeIndex, cidBytes[:],
11✔
2478
                        )
11✔
2479
                        switch {
11✔
2480
                        case errors.Is(err, ErrEdgeNotFound):
3✔
2481
                                continue
3✔
2482
                        case err != nil:
×
2483
                                return err
×
2484
                        }
2485

2486
                        // With the static information obtained, we'll now
2487
                        // fetch the dynamic policy info.
2488
                        edge1, edge2, err := fetchChanEdgePolicies(
8✔
2489
                                edgeIndex, edges, cidBytes[:],
8✔
2490
                        )
8✔
2491
                        if err != nil {
8✔
2492
                                return err
×
2493
                        }
×
2494

2495
                        node1, err := fetchLightningNode(
8✔
2496
                                nodes, edgeInfo.NodeKey1Bytes[:],
8✔
2497
                        )
8✔
2498
                        if err != nil {
8✔
2499
                                return err
×
2500
                        }
×
2501

2502
                        node2, err := fetchLightningNode(
8✔
2503
                                nodes, edgeInfo.NodeKey2Bytes[:],
8✔
2504
                        )
8✔
2505
                        if err != nil {
8✔
2506
                                return err
×
2507
                        }
×
2508

2509
                        chanEdges = append(chanEdges, ChannelEdge{
8✔
2510
                                Info:    &edgeInfo,
8✔
2511
                                Policy1: edge1,
8✔
2512
                                Policy2: edge2,
8✔
2513
                                Node1:   &node1,
8✔
2514
                                Node2:   &node2,
8✔
2515
                        })
8✔
2516
                }
2517

2518
                return nil
4✔
2519
        }
2520

2521
        if tx == nil {
8✔
2522
                err := kvdb.View(c.db, fetchChanInfos, func() {
8✔
2523
                        chanEdges = nil
4✔
2524
                })
4✔
2525
                if err != nil {
4✔
2526
                        return nil, err
×
2527
                }
×
2528

2529
                return chanEdges, nil
4✔
2530
        }
2531

2532
        err := fetchChanInfos(tx)
×
2533
        if err != nil {
×
2534
                return nil, err
×
2535
        }
×
2536

2537
        return chanEdges, nil
×
2538
}
2539

2540
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2541
        edge1, edge2 *models.ChannelEdgePolicy) error {
135✔
2542

135✔
2543
        // First, we'll fetch the edge update index bucket which currently
135✔
2544
        // stores an entry for the channel we're about to delete.
135✔
2545
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
135✔
2546
        if updateIndex == nil {
135✔
2547
                // No edges in bucket, return early.
×
2548
                return nil
×
2549
        }
×
2550

2551
        // Now that we have the bucket, we'll attempt to construct a template
2552
        // for the index key: updateTime || chanid.
2553
        var indexKey [8 + 8]byte
135✔
2554
        byteOrder.PutUint64(indexKey[8:], chanID)
135✔
2555

135✔
2556
        // With the template constructed, we'll attempt to delete an entry that
135✔
2557
        // would have been created by both edges: we'll alternate the update
135✔
2558
        // times, as one may had overridden the other.
135✔
2559
        if edge1 != nil {
145✔
2560
                byteOrder.PutUint64(
10✔
2561
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
10✔
2562
                )
10✔
2563
                if err := updateIndex.Delete(indexKey[:]); err != nil {
10✔
2564
                        return err
×
2565
                }
×
2566
        }
2567

2568
        // We'll also attempt to delete the entry that may have been created by
2569
        // the second edge.
2570
        if edge2 != nil {
147✔
2571
                byteOrder.PutUint64(
12✔
2572
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
12✔
2573
                )
12✔
2574
                if err := updateIndex.Delete(indexKey[:]); err != nil {
12✔
2575
                        return err
×
2576
                }
×
2577
        }
2578

2579
        return nil
135✔
2580
}
2581

2582
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2583
// cache. It then goes on to delete any policy info and edge info for this
2584
// channel from the DB and finally, if isZombie is true, it will add an entry
2585
// for this channel in the zombie index.
2586
//
2587
// NOTE: this method MUST only be called if the cacheMu has already been
2588
// acquired.
2589
func (c *KVStore) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2590
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2591
        strictZombie bool) (*models.ChannelEdgeInfo, error) {
197✔
2592

197✔
2593
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
197✔
2594
        if err != nil {
259✔
2595
                return nil, err
62✔
2596
        }
62✔
2597

2598
        // We'll also remove the entry in the edge update index bucket before
2599
        // we delete the edges themselves so we can access their last update
2600
        // times.
2601
        cid := byteOrder.Uint64(chanID)
135✔
2602
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
135✔
2603
        if err != nil {
135✔
2604
                return nil, err
×
2605
        }
×
2606
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
135✔
2607
        if err != nil {
135✔
2608
                return nil, err
×
2609
        }
×
2610

2611
        // The edge key is of the format pubKey || chanID. First we construct
2612
        // the latter half, populating the channel ID.
2613
        var edgeKey [33 + 8]byte
135✔
2614
        copy(edgeKey[33:], chanID)
135✔
2615

135✔
2616
        // With the latter half constructed, copy over the first public key to
135✔
2617
        // delete the edge in this direction, then the second to delete the
135✔
2618
        // edge in the opposite direction.
135✔
2619
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
135✔
2620
        if edges.Get(edgeKey[:]) != nil {
270✔
2621
                if err := edges.Delete(edgeKey[:]); err != nil {
135✔
2622
                        return nil, err
×
2623
                }
×
2624
        }
2625
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
135✔
2626
        if edges.Get(edgeKey[:]) != nil {
270✔
2627
                if err := edges.Delete(edgeKey[:]); err != nil {
135✔
2628
                        return nil, err
×
2629
                }
×
2630
        }
2631

2632
        // As part of deleting the edge we also remove all disabled entries
2633
        // from the edgePolicyDisabledIndex bucket. We do that for both
2634
        // directions.
2635
        err = updateEdgePolicyDisabledIndex(edges, cid, false, false)
135✔
2636
        if err != nil {
135✔
2637
                return nil, err
×
2638
        }
×
2639
        err = updateEdgePolicyDisabledIndex(edges, cid, true, false)
135✔
2640
        if err != nil {
135✔
2641
                return nil, err
×
2642
        }
×
2643

2644
        // With the edge data deleted, we can purge the information from the two
2645
        // edge indexes.
2646
        if err := edgeIndex.Delete(chanID); err != nil {
135✔
2647
                return nil, err
×
2648
        }
×
2649
        var b bytes.Buffer
135✔
2650
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
135✔
2651
                return nil, err
×
2652
        }
×
2653
        if err := chanIndex.Delete(b.Bytes()); err != nil {
135✔
2654
                return nil, err
×
2655
        }
×
2656

2657
        // Finally, we'll mark the edge as a zombie within our index if it's
2658
        // being removed due to the channel becoming a zombie. We do this to
2659
        // ensure we don't store unnecessary data for spent channels.
2660
        if !isZombie {
248✔
2661
                return &edgeInfo, nil
113✔
2662
        }
113✔
2663

2664
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
22✔
2665
        if strictZombie {
25✔
2666
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
3✔
2667
        }
3✔
2668

2669
        return &edgeInfo, markEdgeZombie(
22✔
2670
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
22✔
2671
        )
22✔
2672
}
2673

2674
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2675
// particular pair of channel policies. The return values are one of:
2676
//  1. (pubkey1, pubkey2)
2677
//  2. (pubkey1, blank)
2678
//  3. (blank, pubkey2)
2679
//
2680
// A blank pubkey means that corresponding node will be unable to resurrect a
2681
// channel on its own. For example, node1 may continue to publish recent
2682
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2683
// we don't want another fresh update from node1 to resurrect, as the edge can
2684
// only become live once node2 finally sends something recent.
2685
//
2686
// In the case where we have neither update, we allow either party to resurrect
2687
// the channel. If the channel were to be marked zombie again, it would be
2688
// marked with the correct lagging channel since we received an update from only
2689
// one side.
2690
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
2691
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
3✔
2692

3✔
2693
        switch {
3✔
2694
        // If we don't have either edge policy, we'll return both pubkeys so
2695
        // that the channel can be resurrected by either party.
2696
        case e1 == nil && e2 == nil:
×
2697
                return info.NodeKey1Bytes, info.NodeKey2Bytes
×
2698

2699
        // If we're missing edge1, or if both edges are present but edge1 is
2700
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2701
        // means that only an update from edge1 will be able to resurrect the
2702
        // channel.
2703
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
1✔
2704
                return info.NodeKey1Bytes, [33]byte{}
1✔
2705

2706
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2707
        // return a blank pubkey for edge1. In this case, only an update from
2708
        // edge2 can resurect the channel.
2709
        default:
2✔
2710
                return [33]byte{}, info.NodeKey2Bytes
2✔
2711
        }
2712
}
2713

2714
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2715
// within the database for the referenced channel. The `flags` attribute within
2716
// the ChannelEdgePolicy determines which of the directed edges are being
2717
// updated. If the flag is 1, then the first node's information is being
2718
// updated, otherwise it's the second node's information. The node ordering is
2719
// determined by the lexicographical ordering of the identity public keys of the
2720
// nodes on either side of the channel.
2721
func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2722
        op ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
2,663✔
2723

2,663✔
2724
        var (
2,663✔
2725
                isUpdate1    bool
2,663✔
2726
                edgeNotFound bool
2,663✔
2727
                from, to     route.Vertex
2,663✔
2728
        )
2,663✔
2729

2,663✔
2730
        r := &batch.Request{
2,663✔
2731
                Reset: func() {
5,326✔
2732
                        isUpdate1 = false
2,663✔
2733
                        edgeNotFound = false
2,663✔
2734
                },
2,663✔
2735
                Update: func(tx kvdb.RwTx) error {
2,663✔
2736
                        var err error
2,663✔
2737
                        from, to, isUpdate1, err = updateEdgePolicy(tx, edge)
2,663✔
2738
                        if err != nil {
2,666✔
2739
                                log.Errorf("UpdateEdgePolicy faild: %v", err)
3✔
2740
                        }
3✔
2741

2742
                        // Silence ErrEdgeNotFound so that the batch can
2743
                        // succeed, but propagate the error via local state.
2744
                        if errors.Is(err, ErrEdgeNotFound) {
2,666✔
2745
                                edgeNotFound = true
3✔
2746
                                return nil
3✔
2747
                        }
3✔
2748

2749
                        return err
2,660✔
2750
                },
2751
                OnCommit: func(err error) error {
2,663✔
2752
                        switch {
2,663✔
2753
                        case err != nil:
×
2754
                                return err
×
2755
                        case edgeNotFound:
3✔
2756
                                return ErrEdgeNotFound
3✔
2757
                        default:
2,660✔
2758
                                c.updateEdgeCache(edge, isUpdate1)
2,660✔
2759
                                return nil
2,660✔
2760
                        }
2761
                },
2762
        }
2763

2764
        for _, f := range op {
2,663✔
2765
                f(r)
×
2766
        }
×
2767

2768
        err := c.chanScheduler.Execute(r)
2,663✔
2769

2,663✔
2770
        return from, to, err
2,663✔
2771
}
2772

2773
func (c *KVStore) updateEdgeCache(e *models.ChannelEdgePolicy,
2774
        isUpdate1 bool) {
2,660✔
2775

2,660✔
2776
        // If an entry for this channel is found in reject cache, we'll modify
2,660✔
2777
        // the entry with the updated timestamp for the direction that was just
2,660✔
2778
        // written. If the edge doesn't exist, we'll load the cache entry lazily
2,660✔
2779
        // during the next query for this edge.
2,660✔
2780
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2,665✔
2781
                if isUpdate1 {
8✔
2782
                        entry.upd1Time = e.LastUpdate.Unix()
3✔
2783
                } else {
5✔
2784
                        entry.upd2Time = e.LastUpdate.Unix()
2✔
2785
                }
2✔
2786
                c.rejectCache.insert(e.ChannelID, entry)
5✔
2787
        }
2788

2789
        // If an entry for this channel is found in channel cache, we'll modify
2790
        // the entry with the updated policy for the direction that was just
2791
        // written. If the edge doesn't exist, we'll defer loading the info and
2792
        // policies and lazily read from disk during the next query.
2793
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
2,660✔
2794
                if isUpdate1 {
×
2795
                        channel.Policy1 = e
×
2796
                } else {
×
2797
                        channel.Policy2 = e
×
2798
                }
×
2799
                c.chanCache.insert(e.ChannelID, channel)
×
2800
        }
2801
}
2802

2803
// updateEdgePolicy attempts to update an edge's policy within the relevant
2804
// buckets using an existing database transaction. The returned boolean will be
2805
// true if the updated policy belongs to node1, and false if the policy belonged
2806
// to node2.
2807
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy) (
2808
        route.Vertex, route.Vertex, bool, error) {
2,663✔
2809

2,663✔
2810
        var noVertex route.Vertex
2,663✔
2811

2,663✔
2812
        edges := tx.ReadWriteBucket(edgeBucket)
2,663✔
2813
        if edges == nil {
2,663✔
2814
                return noVertex, noVertex, false, ErrEdgeNotFound
×
2815
        }
×
2816
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
2,663✔
2817
        if edgeIndex == nil {
2,663✔
2818
                return noVertex, noVertex, false, ErrEdgeNotFound
×
2819
        }
×
2820

2821
        // Create the channelID key be converting the channel ID
2822
        // integer into a byte slice.
2823
        var chanID [8]byte
2,663✔
2824
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
2,663✔
2825

2,663✔
2826
        // With the channel ID, we then fetch the value storing the two
2,663✔
2827
        // nodes which connect this channel edge.
2,663✔
2828
        nodeInfo := edgeIndex.Get(chanID[:])
2,663✔
2829
        if nodeInfo == nil {
2,666✔
2830
                return noVertex, noVertex, false, ErrEdgeNotFound
3✔
2831
        }
3✔
2832

2833
        // Depending on the flags value passed above, either the first
2834
        // or second edge policy is being updated.
2835
        var fromNode, toNode []byte
2,660✔
2836
        var isUpdate1 bool
2,660✔
2837
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,994✔
2838
                fromNode = nodeInfo[:33]
1,334✔
2839
                toNode = nodeInfo[33:66]
1,334✔
2840
                isUpdate1 = true
1,334✔
2841
        } else {
2,660✔
2842
                fromNode = nodeInfo[33:66]
1,326✔
2843
                toNode = nodeInfo[:33]
1,326✔
2844
                isUpdate1 = false
1,326✔
2845
        }
1,326✔
2846

2847
        // Finally, with the direction of the edge being updated
2848
        // identified, we update the on-disk edge representation.
2849
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
2,660✔
2850
        if err != nil {
2,660✔
2851
                return noVertex, noVertex, false, err
×
2852
        }
×
2853

2854
        var (
2,660✔
2855
                fromNodePubKey route.Vertex
2,660✔
2856
                toNodePubKey   route.Vertex
2,660✔
2857
        )
2,660✔
2858
        copy(fromNodePubKey[:], fromNode)
2,660✔
2859
        copy(toNodePubKey[:], toNode)
2,660✔
2860

2,660✔
2861
        return fromNodePubKey, toNodePubKey, isUpdate1, nil
2,660✔
2862
}
2863

2864
// isPublic determines whether the node is seen as public within the graph from
2865
// the source node's point of view. An existing database transaction can also be
2866
// specified.
2867
func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2868
        sourcePubKey []byte) (bool, error) {
13✔
2869

13✔
2870
        // In order to determine whether this node is publicly advertised within
13✔
2871
        // the graph, we'll need to look at all of its edges and check whether
13✔
2872
        // they extend to any other node than the source node. errDone will be
13✔
2873
        // used to terminate the check early.
13✔
2874
        nodeIsPublic := false
13✔
2875
        errDone := errors.New("done")
13✔
2876
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
13✔
2877
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
13✔
2878
                _ *models.ChannelEdgePolicy) error {
23✔
2879

10✔
2880
                // If this edge doesn't extend to the source node, we'll
10✔
2881
                // terminate our search as we can now conclude that the node is
10✔
2882
                // publicly advertised within the graph due to the local node
10✔
2883
                // knowing of the current edge.
10✔
2884
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
10✔
2885
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
13✔
2886

3✔
2887
                        nodeIsPublic = true
3✔
2888
                        return errDone
3✔
2889
                }
3✔
2890

2891
                // Since the edge _does_ extend to the source node, we'll also
2892
                // need to ensure that this is a public edge.
2893
                if info.AuthProof != nil {
13✔
2894
                        nodeIsPublic = true
6✔
2895
                        return errDone
6✔
2896
                }
6✔
2897

2898
                // Otherwise, we'll continue our search.
2899
                return nil
1✔
2900
        })
2901
        if err != nil && !errors.Is(err, errDone) {
13✔
2902
                return false, err
×
2903
        }
×
2904

2905
        return nodeIsPublic, nil
13✔
2906
}
2907

2908
// FetchLightningNodeTx attempts to look up a target node by its identity
2909
// public key. If the node isn't found in the database, then
2910
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
2911
// If none is provided, then a new one will be created.
2912
func (c *KVStore) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
2913
        *models.LightningNode, error) {
3,630✔
2914

3,630✔
2915
        return c.fetchLightningNode(tx, nodePub)
3,630✔
2916
}
3,630✔
2917

2918
// FetchLightningNode attempts to look up a target node by its identity public
2919
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2920
// returned.
2921
func (c *KVStore) FetchLightningNode(nodePub route.Vertex) (
2922
        *models.LightningNode, error) {
153✔
2923

153✔
2924
        return c.fetchLightningNode(nil, nodePub)
153✔
2925
}
153✔
2926

2927
// fetchLightningNode attempts to look up a target node by its identity public
2928
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2929
// returned. An optional transaction may be provided. If none is provided, then
2930
// a new one will be created.
2931
func (c *KVStore) fetchLightningNode(tx kvdb.RTx,
2932
        nodePub route.Vertex) (*models.LightningNode, error) {
3,783✔
2933

3,783✔
2934
        var node *models.LightningNode
3,783✔
2935
        fetch := func(tx kvdb.RTx) error {
7,566✔
2936
                // First grab the nodes bucket which stores the mapping from
3,783✔
2937
                // pubKey to node information.
3,783✔
2938
                nodes := tx.ReadBucket(nodeBucket)
3,783✔
2939
                if nodes == nil {
3,783✔
2940
                        return ErrGraphNotFound
×
2941
                }
×
2942

2943
                // If a key for this serialized public key isn't found, then
2944
                // the target node doesn't exist within the database.
2945
                nodeBytes := nodes.Get(nodePub[:])
3,783✔
2946
                if nodeBytes == nil {
3,797✔
2947
                        return ErrGraphNodeNotFound
14✔
2948
                }
14✔
2949

2950
                // If the node is found, then we can de deserialize the node
2951
                // information to return to the user.
2952
                nodeReader := bytes.NewReader(nodeBytes)
3,769✔
2953
                n, err := deserializeLightningNode(nodeReader)
3,769✔
2954
                if err != nil {
3,769✔
2955
                        return err
×
2956
                }
×
2957

2958
                node = &n
3,769✔
2959

3,769✔
2960
                return nil
3,769✔
2961
        }
2962

2963
        if tx == nil {
3,939✔
2964
                err := kvdb.View(
156✔
2965
                        c.db, fetch, func() {
312✔
2966
                                node = nil
156✔
2967
                        },
156✔
2968
                )
2969
                if err != nil {
159✔
2970
                        return nil, err
3✔
2971
                }
3✔
2972

2973
                return node, nil
153✔
2974
        }
2975

2976
        err := fetch(tx)
3,627✔
2977
        if err != nil {
3,638✔
2978
                return nil, err
11✔
2979
        }
11✔
2980

2981
        return node, nil
3,616✔
2982
}
2983

2984
// HasLightningNode determines if the graph has a vertex identified by the
2985
// target node identity public key. If the node exists in the database, a
2986
// timestamp of when the data for the node was lasted updated is returned along
2987
// with a true boolean. Otherwise, an empty time.Time is returned with a false
2988
// boolean.
2989
func (c *KVStore) HasLightningNode(nodePub [33]byte) (time.Time, bool,
2990
        error) {
17✔
2991

17✔
2992
        var (
17✔
2993
                updateTime time.Time
17✔
2994
                exists     bool
17✔
2995
        )
17✔
2996

17✔
2997
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
34✔
2998
                // First grab the nodes bucket which stores the mapping from
17✔
2999
                // pubKey to node information.
17✔
3000
                nodes := tx.ReadBucket(nodeBucket)
17✔
3001
                if nodes == nil {
17✔
3002
                        return ErrGraphNotFound
×
3003
                }
×
3004

3005
                // If a key for this serialized public key isn't found, we can
3006
                // exit early.
3007
                nodeBytes := nodes.Get(nodePub[:])
17✔
3008
                if nodeBytes == nil {
20✔
3009
                        exists = false
3✔
3010
                        return nil
3✔
3011
                }
3✔
3012

3013
                // Otherwise we continue on to obtain the time stamp
3014
                // representing the last time the data for this node was
3015
                // updated.
3016
                nodeReader := bytes.NewReader(nodeBytes)
14✔
3017
                node, err := deserializeLightningNode(nodeReader)
14✔
3018
                if err != nil {
14✔
3019
                        return err
×
3020
                }
×
3021

3022
                exists = true
14✔
3023
                updateTime = node.LastUpdate
14✔
3024

14✔
3025
                return nil
14✔
3026
        }, func() {
17✔
3027
                updateTime = time.Time{}
17✔
3028
                exists = false
17✔
3029
        })
17✔
3030
        if err != nil {
17✔
3031
                return time.Time{}, exists, err
×
3032
        }
×
3033

3034
        return updateTime, exists, nil
17✔
3035
}
3036

3037
// nodeTraversal is used to traverse all channels of a node given by its
3038
// public key and passes channel information into the specified callback.
3039
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3040
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3041
                *models.ChannelEdgePolicy) error) error {
1,266✔
3042

1,266✔
3043
        traversal := func(tx kvdb.RTx) error {
2,532✔
3044
                edges := tx.ReadBucket(edgeBucket)
1,266✔
3045
                if edges == nil {
1,266✔
3046
                        return ErrGraphNotFound
×
3047
                }
×
3048
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1,266✔
3049
                if edgeIndex == nil {
1,266✔
3050
                        return ErrGraphNoEdgesFound
×
3051
                }
×
3052

3053
                // In order to reach all the edges for this node, we take
3054
                // advantage of the construction of the key-space within the
3055
                // edge bucket. The keys are stored in the form: pubKey ||
3056
                // chanID. Therefore, starting from a chanID of zero, we can
3057
                // scan forward in the bucket, grabbing all the edges for the
3058
                // node. Once the prefix no longer matches, then we know we're
3059
                // done.
3060
                var nodeStart [33 + 8]byte
1,266✔
3061
                copy(nodeStart[:], nodePub)
1,266✔
3062
                copy(nodeStart[33:], chanStart[:])
1,266✔
3063

1,266✔
3064
                // Starting from the key pubKey || 0, we seek forward in the
1,266✔
3065
                // bucket until the retrieved key no longer has the public key
1,266✔
3066
                // as its prefix. This indicates that we've stepped over into
1,266✔
3067
                // another node's edges, so we can terminate our scan.
1,266✔
3068
                edgeCursor := edges.ReadCursor()
1,266✔
3069
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
5,106✔
3070
                        // If the prefix still matches, the channel id is
3,840✔
3071
                        // returned in nodeEdge. Channel id is used to lookup
3,840✔
3072
                        // the node at the other end of the channel and both
3,840✔
3073
                        // edge policies.
3,840✔
3074
                        chanID := nodeEdge[33:]
3,840✔
3075
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3,840✔
3076
                        if err != nil {
3,840✔
3077
                                return err
×
3078
                        }
×
3079

3080
                        outgoingPolicy, err := fetchChanEdgePolicy(
3,840✔
3081
                                edges, chanID, nodePub,
3,840✔
3082
                        )
3,840✔
3083
                        if err != nil {
3,840✔
3084
                                return err
×
3085
                        }
×
3086

3087
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3,840✔
3088
                        if err != nil {
3,840✔
3089
                                return err
×
3090
                        }
×
3091

3092
                        incomingPolicy, err := fetchChanEdgePolicy(
3,840✔
3093
                                edges, chanID, otherNode[:],
3,840✔
3094
                        )
3,840✔
3095
                        if err != nil {
3,840✔
3096
                                return err
×
3097
                        }
×
3098

3099
                        // Finally, we execute the callback.
3100
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3,840✔
3101
                        if err != nil {
3,849✔
3102
                                return err
9✔
3103
                        }
9✔
3104
                }
3105

3106
                return nil
1,257✔
3107
        }
3108

3109
        // If no transaction was provided, then we'll create a new transaction
3110
        // to execute the transaction within.
3111
        if tx == nil {
1,275✔
3112
                return kvdb.View(db, traversal, func() {})
18✔
3113
        }
3114

3115
        // Otherwise, we re-use the existing transaction to execute the graph
3116
        // traversal.
3117
        return traversal(tx)
1,257✔
3118
}
3119

3120
// ForEachNodeChannel iterates through all channels of the given node,
3121
// executing the passed callback with an edge info structure and the policies
3122
// of each end of the channel. The first edge policy is the outgoing edge *to*
3123
// the connecting node, while the second is the incoming edge *from* the
3124
// connecting node. If the callback returns an error, then the iteration is
3125
// halted with the error propagated back up to the caller.
3126
//
3127
// Unknown policies are passed into the callback as nil values.
3128
func (c *KVStore) ForEachNodeChannel(nodePub route.Vertex,
3129
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3130
                *models.ChannelEdgePolicy) error) error {
6✔
3131

6✔
3132
        return nodeTraversal(nil, nodePub[:], c.db, cb)
6✔
3133
}
6✔
3134

3135
// ForEachNodeChannelTx iterates through all channels of the given node,
3136
// executing the passed callback with an edge info structure and the policies
3137
// of each end of the channel. The first edge policy is the outgoing edge *to*
3138
// the connecting node, while the second is the incoming edge *from* the
3139
// connecting node. If the callback returns an error, then the iteration is
3140
// halted with the error propagated back up to the caller.
3141
//
3142
// Unknown policies are passed into the callback as nil values.
3143
//
3144
// If the caller wishes to re-use an existing boltdb transaction, then it
3145
// should be passed as the first argument.  Otherwise, the first argument should
3146
// be nil and a fresh transaction will be created to execute the graph
3147
// traversal.
3148
func (c *KVStore) ForEachNodeChannelTx(tx kvdb.RTx,
3149
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3150
                *models.ChannelEdgePolicy,
3151
                *models.ChannelEdgePolicy) error) error {
1,018✔
3152

1,018✔
3153
        return nodeTraversal(tx, nodePub[:], c.db, cb)
1,018✔
3154
}
1,018✔
3155

3156
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3157
// the target node in the channel. This is useful when one knows the pubkey of
3158
// one of the nodes, and wishes to obtain the full LightningNode for the other
3159
// end of the channel.
3160
func (c *KVStore) FetchOtherNode(tx kvdb.RTx,
3161
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
3162
        *models.LightningNode, error) {
×
3163

×
3164
        // Ensure that the node passed in is actually a member of the channel.
×
3165
        var targetNodeBytes [33]byte
×
3166
        switch {
×
3167
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
×
3168
                targetNodeBytes = channel.NodeKey2Bytes
×
3169
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
×
3170
                targetNodeBytes = channel.NodeKey1Bytes
×
3171
        default:
×
3172
                return nil, fmt.Errorf("node not participating in this channel")
×
3173
        }
3174

3175
        var targetNode *models.LightningNode
×
3176
        fetchNodeFunc := func(tx kvdb.RTx) error {
×
3177
                // First grab the nodes bucket which stores the mapping from
×
3178
                // pubKey to node information.
×
3179
                nodes := tx.ReadBucket(nodeBucket)
×
3180
                if nodes == nil {
×
3181
                        return ErrGraphNotFound
×
3182
                }
×
3183

3184
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
×
3185
                if err != nil {
×
3186
                        return err
×
3187
                }
×
3188

3189
                targetNode = &node
×
3190

×
3191
                return nil
×
3192
        }
3193

3194
        // If the transaction is nil, then we'll need to create a new one,
3195
        // otherwise we can use the existing db transaction.
3196
        var err error
×
3197
        if tx == nil {
×
3198
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3199
                        targetNode = nil
×
3200
                })
×
3201
        } else {
×
3202
                err = fetchNodeFunc(tx)
×
3203
        }
×
3204

3205
        return targetNode, err
×
3206
}
3207

3208
// computeEdgePolicyKeys is a helper function that can be used to compute the
3209
// keys used to index the channel edge policy info for the two nodes of the
3210
// edge. The keys for node 1 and node 2 are returned respectively.
3211
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
22✔
3212
        var (
22✔
3213
                node1Key [33 + 8]byte
22✔
3214
                node2Key [33 + 8]byte
22✔
3215
        )
22✔
3216

22✔
3217
        copy(node1Key[:], info.NodeKey1Bytes[:])
22✔
3218
        copy(node2Key[:], info.NodeKey2Bytes[:])
22✔
3219

22✔
3220
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
22✔
3221
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
22✔
3222

22✔
3223
        return node1Key[:], node2Key[:]
22✔
3224
}
22✔
3225

3226
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3227
// the channel identified by the funding outpoint. If the channel can't be
3228
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3229
// information for the channel itself is returned as well as two structs that
3230
// contain the routing policies for the channel in either direction.
3231
func (c *KVStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3232
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3233
        *models.ChannelEdgePolicy, error) {
11✔
3234

11✔
3235
        var (
11✔
3236
                edgeInfo *models.ChannelEdgeInfo
11✔
3237
                policy1  *models.ChannelEdgePolicy
11✔
3238
                policy2  *models.ChannelEdgePolicy
11✔
3239
        )
11✔
3240

11✔
3241
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
3242
                // First, grab the node bucket. This will be used to populate
11✔
3243
                // the Node pointers in each edge read from disk.
11✔
3244
                nodes := tx.ReadBucket(nodeBucket)
11✔
3245
                if nodes == nil {
11✔
3246
                        return ErrGraphNotFound
×
3247
                }
×
3248

3249
                // Next, grab the edge bucket which stores the edges, and also
3250
                // the index itself so we can group the directed edges together
3251
                // logically.
3252
                edges := tx.ReadBucket(edgeBucket)
11✔
3253
                if edges == nil {
11✔
3254
                        return ErrGraphNoEdgesFound
×
3255
                }
×
3256
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
3257
                if edgeIndex == nil {
11✔
3258
                        return ErrGraphNoEdgesFound
×
3259
                }
×
3260

3261
                // If the channel's outpoint doesn't exist within the outpoint
3262
                // index, then the edge does not exist.
3263
                chanIndex := edges.NestedReadBucket(channelPointBucket)
11✔
3264
                if chanIndex == nil {
11✔
3265
                        return ErrGraphNoEdgesFound
×
3266
                }
×
3267
                var b bytes.Buffer
11✔
3268
                if err := WriteOutpoint(&b, op); err != nil {
11✔
3269
                        return err
×
3270
                }
×
3271
                chanID := chanIndex.Get(b.Bytes())
11✔
3272
                if chanID == nil {
21✔
3273
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
10✔
3274
                }
10✔
3275

3276
                // If the channel is found to exists, then we'll first retrieve
3277
                // the general information for the channel.
3278
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
1✔
3279
                if err != nil {
1✔
3280
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3281
                }
×
3282
                edgeInfo = &edge
1✔
3283

1✔
3284
                // Once we have the information about the channels' parameters,
1✔
3285
                // we'll fetch the routing policies for each for the directed
1✔
3286
                // edges.
1✔
3287
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
1✔
3288
                if err != nil {
1✔
3289
                        return fmt.Errorf("failed to find policy: %w", err)
×
3290
                }
×
3291

3292
                policy1 = e1
1✔
3293
                policy2 = e2
1✔
3294

1✔
3295
                return nil
1✔
3296
        }, func() {
11✔
3297
                edgeInfo = nil
11✔
3298
                policy1 = nil
11✔
3299
                policy2 = nil
11✔
3300
        })
11✔
3301
        if err != nil {
21✔
3302
                return nil, nil, nil, err
10✔
3303
        }
10✔
3304

3305
        return edgeInfo, policy1, policy2, nil
1✔
3306
}
3307

3308
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3309
// channel identified by the channel ID. If the channel can't be found, then
3310
// ErrEdgeNotFound is returned. A struct which houses the general information
3311
// for the channel itself is returned as well as two structs that contain the
3312
// routing policies for the channel in either direction.
3313
//
3314
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3315
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3316
// the ChannelEdgeInfo will only include the public keys of each node.
3317
func (c *KVStore) FetchChannelEdgesByID(chanID uint64) (
3318
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3319
        *models.ChannelEdgePolicy, error) {
2,679✔
3320

2,679✔
3321
        var (
2,679✔
3322
                edgeInfo  *models.ChannelEdgeInfo
2,679✔
3323
                policy1   *models.ChannelEdgePolicy
2,679✔
3324
                policy2   *models.ChannelEdgePolicy
2,679✔
3325
                channelID [8]byte
2,679✔
3326
        )
2,679✔
3327

2,679✔
3328
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
5,351✔
3329
                // First, grab the node bucket. This will be used to populate
2,672✔
3330
                // the Node pointers in each edge read from disk.
2,672✔
3331
                nodes := tx.ReadBucket(nodeBucket)
2,672✔
3332
                if nodes == nil {
2,672✔
3333
                        return ErrGraphNotFound
×
3334
                }
×
3335

3336
                // Next, grab the edge bucket which stores the edges, and also
3337
                // the index itself so we can group the directed edges together
3338
                // logically.
3339
                edges := tx.ReadBucket(edgeBucket)
2,672✔
3340
                if edges == nil {
2,672✔
3341
                        return ErrGraphNoEdgesFound
×
3342
                }
×
3343
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
2,672✔
3344
                if edgeIndex == nil {
2,672✔
3345
                        return ErrGraphNoEdgesFound
×
3346
                }
×
3347

3348
                byteOrder.PutUint64(channelID[:], chanID)
2,672✔
3349

2,672✔
3350
                // Now, attempt to fetch edge.
2,672✔
3351
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
2,672✔
3352

2,672✔
3353
                // If it doesn't exist, we'll quickly check our zombie index to
2,672✔
3354
                // see if we've previously marked it as so.
2,672✔
3355
                if errors.Is(err, ErrEdgeNotFound) {
2,673✔
3356
                        // If the zombie index doesn't exist, or the edge is not
1✔
3357
                        // marked as a zombie within it, then we'll return the
1✔
3358
                        // original ErrEdgeNotFound error.
1✔
3359
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
1✔
3360
                        if zombieIndex == nil {
1✔
3361
                                return ErrEdgeNotFound
×
3362
                        }
×
3363

3364
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
1✔
3365
                                zombieIndex, chanID,
1✔
3366
                        )
1✔
3367
                        if !isZombie {
1✔
3368
                                return ErrEdgeNotFound
×
3369
                        }
×
3370

3371
                        // Otherwise, the edge is marked as a zombie, so we'll
3372
                        // populate the edge info with the public keys of each
3373
                        // party as this is the only information we have about
3374
                        // it and return an error signaling so.
3375
                        edgeInfo = &models.ChannelEdgeInfo{
1✔
3376
                                NodeKey1Bytes: pubKey1,
1✔
3377
                                NodeKey2Bytes: pubKey2,
1✔
3378
                        }
1✔
3379

1✔
3380
                        return ErrZombieEdge
1✔
3381
                }
3382

3383
                // Otherwise, we'll just return the error if any.
3384
                if err != nil {
2,671✔
3385
                        return err
×
3386
                }
×
3387

3388
                edgeInfo = &edge
2,671✔
3389

2,671✔
3390
                // Then we'll attempt to fetch the accompanying policies of this
2,671✔
3391
                // edge.
2,671✔
3392
                e1, e2, err := fetchChanEdgePolicies(
2,671✔
3393
                        edgeIndex, edges, channelID[:],
2,671✔
3394
                )
2,671✔
3395
                if err != nil {
2,671✔
3396
                        return err
×
3397
                }
×
3398

3399
                policy1 = e1
2,671✔
3400
                policy2 = e2
2,671✔
3401

2,671✔
3402
                return nil
2,671✔
3403
        }, func() {
2,679✔
3404
                edgeInfo = nil
2,679✔
3405
                policy1 = nil
2,679✔
3406
                policy2 = nil
2,679✔
3407
        })
2,679✔
3408
        if errors.Is(err, ErrZombieEdge) {
2,680✔
3409
                return edgeInfo, nil, nil, err
1✔
3410
        }
1✔
3411
        if err != nil {
2,685✔
3412
                return nil, nil, nil, err
7✔
3413
        }
7✔
3414

3415
        return edgeInfo, policy1, policy2, nil
2,671✔
3416
}
3417

3418
// IsPublicNode is a helper method that determines whether the node with the
3419
// given public key is seen as a public node in the graph from the graph's
3420
// source node's point of view.
3421
func (c *KVStore) IsPublicNode(pubKey [33]byte) (bool, error) {
13✔
3422
        var nodeIsPublic bool
13✔
3423
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
26✔
3424
                nodes := tx.ReadBucket(nodeBucket)
13✔
3425
                if nodes == nil {
13✔
3426
                        return ErrGraphNodesNotFound
×
3427
                }
×
3428
                ourPubKey := nodes.Get(sourceKey)
13✔
3429
                if ourPubKey == nil {
13✔
3430
                        return ErrSourceNodeNotSet
×
3431
                }
×
3432
                node, err := fetchLightningNode(nodes, pubKey[:])
13✔
3433
                if err != nil {
13✔
3434
                        return err
×
3435
                }
×
3436

3437
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
13✔
3438

13✔
3439
                return err
13✔
3440
        }, func() {
13✔
3441
                nodeIsPublic = false
13✔
3442
        })
13✔
3443
        if err != nil {
13✔
3444
                return false, err
×
3445
        }
×
3446

3447
        return nodeIsPublic, nil
13✔
3448
}
3449

3450
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3451
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
46✔
3452
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
46✔
3453
        if err != nil {
46✔
3454
                return nil, err
×
3455
        }
×
3456

3457
        // With the witness script generated, we'll now turn it into a p2wsh
3458
        // script:
3459
        //  * OP_0 <sha256(script)>
3460
        bldr := txscript.NewScriptBuilder(
46✔
3461
                txscript.WithScriptAllocSize(input.P2WSHSize),
46✔
3462
        )
46✔
3463
        bldr.AddOp(txscript.OP_0)
46✔
3464
        scriptHash := sha256.Sum256(witnessScript)
46✔
3465
        bldr.AddData(scriptHash[:])
46✔
3466

46✔
3467
        return bldr.Script()
46✔
3468
}
3469

3470
// EdgePoint couples the outpoint of a channel with the funding script that it
3471
// creates. The FilteredChainView will use this to watch for spends of this
3472
// edge point on chain. We require both of these values as depending on the
3473
// concrete implementation, either the pkScript, or the out point will be used.
3474
type EdgePoint struct {
3475
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3476
        FundingPkScript []byte
3477

3478
        // OutPoint is the outpoint of the target channel.
3479
        OutPoint wire.OutPoint
3480
}
3481

3482
// String returns a human readable version of the target EdgePoint. We return
3483
// the outpoint directly as it is enough to uniquely identify the edge point.
3484
func (e *EdgePoint) String() string {
×
3485
        return e.OutPoint.String()
×
3486
}
×
3487

3488
// ChannelView returns the verifiable edge information for each active channel
3489
// within the known channel graph. The set of UTXO's (along with their scripts)
3490
// returned are the ones that need to be watched on chain to detect channel
3491
// closes on the resident blockchain.
3492
func (c *KVStore) ChannelView() ([]EdgePoint, error) {
22✔
3493
        var edgePoints []EdgePoint
22✔
3494
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
44✔
3495
                // We're going to iterate over the entire channel index, so
22✔
3496
                // we'll need to fetch the edgeBucket to get to the index as
22✔
3497
                // it's a sub-bucket.
22✔
3498
                edges := tx.ReadBucket(edgeBucket)
22✔
3499
                if edges == nil {
22✔
3500
                        return ErrGraphNoEdgesFound
×
3501
                }
×
3502
                chanIndex := edges.NestedReadBucket(channelPointBucket)
22✔
3503
                if chanIndex == nil {
22✔
3504
                        return ErrGraphNoEdgesFound
×
3505
                }
×
3506
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
22✔
3507
                if edgeIndex == nil {
22✔
3508
                        return ErrGraphNoEdgesFound
×
3509
                }
×
3510

3511
                // Once we have the proper bucket, we'll range over each key
3512
                // (which is the channel point for the channel) and decode it,
3513
                // accumulating each entry.
3514
                return chanIndex.ForEach(
22✔
3515
                        func(chanPointBytes, chanID []byte) error {
64✔
3516
                                chanPointReader := bytes.NewReader(
42✔
3517
                                        chanPointBytes,
42✔
3518
                                )
42✔
3519

42✔
3520
                                var chanPoint wire.OutPoint
42✔
3521
                                err := ReadOutpoint(chanPointReader, &chanPoint)
42✔
3522
                                if err != nil {
42✔
3523
                                        return err
×
3524
                                }
×
3525

3526
                                edgeInfo, err := fetchChanEdgeInfo(
42✔
3527
                                        edgeIndex, chanID,
42✔
3528
                                )
42✔
3529
                                if err != nil {
42✔
3530
                                        return err
×
3531
                                }
×
3532

3533
                                pkScript, err := genMultiSigP2WSH(
42✔
3534
                                        edgeInfo.BitcoinKey1Bytes[:],
42✔
3535
                                        edgeInfo.BitcoinKey2Bytes[:],
42✔
3536
                                )
42✔
3537
                                if err != nil {
42✔
3538
                                        return err
×
3539
                                }
×
3540

3541
                                edgePoints = append(edgePoints, EdgePoint{
42✔
3542
                                        FundingPkScript: pkScript,
42✔
3543
                                        OutPoint:        chanPoint,
42✔
3544
                                })
42✔
3545

42✔
3546
                                return nil
42✔
3547
                        },
3548
                )
3549
        }, func() {
22✔
3550
                edgePoints = nil
22✔
3551
        }); err != nil {
22✔
3552
                return nil, err
×
3553
        }
×
3554

3555
        return edgePoints, nil
22✔
3556
}
3557

3558
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3559
// zombie. This method is used on an ad-hoc basis, when channels need to be
3560
// marked as zombies outside the normal pruning cycle.
3561
func (c *KVStore) MarkEdgeZombie(chanID uint64,
3562
        pubKey1, pubKey2 [33]byte) error {
129✔
3563

129✔
3564
        c.cacheMu.Lock()
129✔
3565
        defer c.cacheMu.Unlock()
129✔
3566

129✔
3567
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
258✔
3568
                edges := tx.ReadWriteBucket(edgeBucket)
129✔
3569
                if edges == nil {
129✔
3570
                        return ErrGraphNoEdgesFound
×
3571
                }
×
3572
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
129✔
3573
                if err != nil {
129✔
3574
                        return fmt.Errorf("unable to create zombie "+
×
3575
                                "bucket: %w", err)
×
3576
                }
×
3577

3578
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
129✔
3579
        })
3580
        if err != nil {
129✔
3581
                return err
×
3582
        }
×
3583

3584
        c.rejectCache.remove(chanID)
129✔
3585
        c.chanCache.remove(chanID)
129✔
3586

129✔
3587
        return nil
129✔
3588
}
3589

3590
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3591
// keys should represent the node public keys of the two parties involved in the
3592
// edge.
3593
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3594
        pubKey2 [33]byte) error {
151✔
3595

151✔
3596
        var k [8]byte
151✔
3597
        byteOrder.PutUint64(k[:], chanID)
151✔
3598

151✔
3599
        var v [66]byte
151✔
3600
        copy(v[:33], pubKey1[:])
151✔
3601
        copy(v[33:], pubKey2[:])
151✔
3602

151✔
3603
        return zombieIndex.Put(k[:], v[:])
151✔
3604
}
151✔
3605

3606
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3607
func (c *KVStore) MarkEdgeLive(chanID uint64) error {
16✔
3608
        c.cacheMu.Lock()
16✔
3609
        defer c.cacheMu.Unlock()
16✔
3610

16✔
3611
        return c.markEdgeLiveUnsafe(nil, chanID)
16✔
3612
}
16✔
3613

3614
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3615
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3616
// case a new transaction will be created.
3617
//
3618
// NOTE: this method MUST only be called if the cacheMu has already been
3619
// acquired.
3620
func (c *KVStore) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
16✔
3621
        dbFn := func(tx kvdb.RwTx) error {
32✔
3622
                edges := tx.ReadWriteBucket(edgeBucket)
16✔
3623
                if edges == nil {
16✔
3624
                        return ErrGraphNoEdgesFound
×
3625
                }
×
3626
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
16✔
3627
                if zombieIndex == nil {
16✔
3628
                        return nil
×
3629
                }
×
3630

3631
                var k [8]byte
16✔
3632
                byteOrder.PutUint64(k[:], chanID)
16✔
3633

16✔
3634
                if len(zombieIndex.Get(k[:])) == 0 {
17✔
3635
                        return ErrZombieEdgeNotFound
1✔
3636
                }
1✔
3637

3638
                return zombieIndex.Delete(k[:])
15✔
3639
        }
3640

3641
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3642
        // the existing transaction
3643
        var err error
16✔
3644
        if tx == nil {
32✔
3645
                err = kvdb.Update(c.db, dbFn, func() {})
32✔
3646
        } else {
×
3647
                err = dbFn(tx)
×
3648
        }
×
3649
        if err != nil {
17✔
3650
                return err
1✔
3651
        }
1✔
3652

3653
        c.rejectCache.remove(chanID)
15✔
3654
        c.chanCache.remove(chanID)
15✔
3655

15✔
3656
        return nil
15✔
3657
}
3658

3659
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3660
// zombie, then the two node public keys corresponding to this edge are also
3661
// returned.
3662
func (c *KVStore) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
14✔
3663
        var (
14✔
3664
                isZombie         bool
14✔
3665
                pubKey1, pubKey2 [33]byte
14✔
3666
        )
14✔
3667

14✔
3668
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
28✔
3669
                edges := tx.ReadBucket(edgeBucket)
14✔
3670
                if edges == nil {
14✔
3671
                        return ErrGraphNoEdgesFound
×
3672
                }
×
3673
                zombieIndex := edges.NestedReadBucket(zombieBucket)
14✔
3674
                if zombieIndex == nil {
14✔
3675
                        return nil
×
3676
                }
×
3677

3678
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
14✔
3679

14✔
3680
                return nil
14✔
3681
        }, func() {
14✔
3682
                isZombie = false
14✔
3683
                pubKey1 = [33]byte{}
14✔
3684
                pubKey2 = [33]byte{}
14✔
3685
        })
14✔
3686
        if err != nil {
14✔
3687
                return false, [33]byte{}, [33]byte{}
×
3688
        }
×
3689

3690
        return isZombie, pubKey1, pubKey2
14✔
3691
}
3692

3693
// isZombieEdge returns whether an entry exists for the given channel in the
3694
// zombie index. If an entry exists, then the two node public keys corresponding
3695
// to this edge are also returned.
3696
func isZombieEdge(zombieIndex kvdb.RBucket,
3697
        chanID uint64) (bool, [33]byte, [33]byte) {
181✔
3698

181✔
3699
        var k [8]byte
181✔
3700
        byteOrder.PutUint64(k[:], chanID)
181✔
3701

181✔
3702
        v := zombieIndex.Get(k[:])
181✔
3703
        if v == nil {
275✔
3704
                return false, [33]byte{}, [33]byte{}
94✔
3705
        }
94✔
3706

3707
        var pubKey1, pubKey2 [33]byte
87✔
3708
        copy(pubKey1[:], v[:33])
87✔
3709
        copy(pubKey2[:], v[33:])
87✔
3710

87✔
3711
        return true, pubKey1, pubKey2
87✔
3712
}
3713

3714
// NumZombies returns the current number of zombie channels in the graph.
3715
func (c *KVStore) NumZombies() (uint64, error) {
4✔
3716
        var numZombies uint64
4✔
3717
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
3718
                edges := tx.ReadBucket(edgeBucket)
4✔
3719
                if edges == nil {
4✔
3720
                        return nil
×
3721
                }
×
3722
                zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3723
                if zombieIndex == nil {
4✔
3724
                        return nil
×
3725
                }
×
3726

3727
                return zombieIndex.ForEach(func(_, _ []byte) error {
6✔
3728
                        numZombies++
2✔
3729
                        return nil
2✔
3730
                })
2✔
3731
        }, func() {
4✔
3732
                numZombies = 0
4✔
3733
        })
4✔
3734
        if err != nil {
4✔
3735
                return 0, err
×
3736
        }
×
3737

3738
        return numZombies, nil
4✔
3739
}
3740

3741
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3742
// that we can ignore channel announcements that we know to be closed without
3743
// having to validate them and fetch a block.
3744
func (c *KVStore) PutClosedScid(scid lnwire.ShortChannelID) error {
1✔
3745
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
3746
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
1✔
3747
                if err != nil {
1✔
3748
                        return err
×
3749
                }
×
3750

3751
                var k [8]byte
1✔
3752
                byteOrder.PutUint64(k[:], scid.ToUint64())
1✔
3753

1✔
3754
                return closedScids.Put(k[:], []byte{})
1✔
3755
        }, func() {})
1✔
3756
}
3757

3758
// IsClosedScid checks whether a channel identified by the passed in scid is
3759
// closed. This helps avoid having to perform expensive validation checks.
3760
// TODO: Add an LRU cache to cut down on disc reads.
3761
func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
2✔
3762
        var isClosed bool
2✔
3763
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
3764
                closedScids := tx.ReadBucket(closedScidBucket)
2✔
3765
                if closedScids == nil {
2✔
3766
                        return ErrClosedScidsNotFound
×
3767
                }
×
3768

3769
                var k [8]byte
2✔
3770
                byteOrder.PutUint64(k[:], scid.ToUint64())
2✔
3771

2✔
3772
                if closedScids.Get(k[:]) != nil {
3✔
3773
                        isClosed = true
1✔
3774
                        return nil
1✔
3775
                }
1✔
3776

3777
                return nil
1✔
3778
        }, func() {
2✔
3779
                isClosed = false
2✔
3780
        })
2✔
3781
        if err != nil {
2✔
3782
                return false, err
×
3783
        }
×
3784

3785
        return isClosed, nil
2✔
3786
}
3787

3788
// GraphSession will provide the call-back with access to a NodeTraverser
3789
// instance which can be used to perform queries against the channel graph.
3790
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
54✔
3791
        return c.db.View(func(tx walletdb.ReadTx) error {
108✔
3792
                return cb(&nodeTraverserSession{
54✔
3793
                        db: c,
54✔
3794
                        tx: tx,
54✔
3795
                })
54✔
3796
        }, func() {})
108✔
3797
}
3798

3799
// nodeTraverserSession implements the NodeTraverser interface but with a
3800
// backing read only transaction for a consistent view of the graph.
3801
type nodeTraverserSession struct {
3802
        tx kvdb.RTx
3803
        db *KVStore
3804
}
3805

3806
// ForEachNodeDirectedChannel calls the callback for every channel of the given
3807
// node.
3808
//
3809
// NOTE: Part of the NodeTraverser interface.
3810
func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex,
3811
        cb func(channel *DirectedChannel) error) error {
239✔
3812

239✔
3813
        return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb)
239✔
3814
}
239✔
3815

3816
// FetchNodeFeatures returns the features of the given node. If the node is
3817
// unknown, assume no additional features are supported.
3818
//
3819
// NOTE: Part of the NodeTraverser interface.
3820
func (c *nodeTraverserSession) FetchNodeFeatures(nodePub route.Vertex) (
3821
        *lnwire.FeatureVector, error) {
254✔
3822

254✔
3823
        return c.db.fetchNodeFeatures(c.tx, nodePub)
254✔
3824
}
254✔
3825

3826
func putLightningNode(nodeBucket, aliasBucket, updateIndex kvdb.RwBucket,
3827
        node *models.LightningNode) error {
990✔
3828

990✔
3829
        var (
990✔
3830
                scratch [16]byte
990✔
3831
                b       bytes.Buffer
990✔
3832
        )
990✔
3833

990✔
3834
        pub, err := node.PubKey()
990✔
3835
        if err != nil {
990✔
3836
                return err
×
3837
        }
×
3838
        nodePub := pub.SerializeCompressed()
990✔
3839

990✔
3840
        // If the node has the update time set, write it, else write 0.
990✔
3841
        updateUnix := uint64(0)
990✔
3842
        if node.LastUpdate.Unix() > 0 {
1,853✔
3843
                updateUnix = uint64(node.LastUpdate.Unix())
863✔
3844
        }
863✔
3845

3846
        byteOrder.PutUint64(scratch[:8], updateUnix)
990✔
3847
        if _, err := b.Write(scratch[:8]); err != nil {
990✔
3848
                return err
×
3849
        }
×
3850

3851
        if _, err := b.Write(nodePub); err != nil {
990✔
3852
                return err
×
3853
        }
×
3854

3855
        // If we got a node announcement for this node, we will have the rest
3856
        // of the data available. If not we don't have more data to write.
3857
        if !node.HaveNodeAnnouncement {
1,064✔
3858
                // Write HaveNodeAnnouncement=0.
74✔
3859
                byteOrder.PutUint16(scratch[:2], 0)
74✔
3860
                if _, err := b.Write(scratch[:2]); err != nil {
74✔
3861
                        return err
×
3862
                }
×
3863

3864
                return nodeBucket.Put(nodePub, b.Bytes())
74✔
3865
        }
3866

3867
        // Write HaveNodeAnnouncement=1.
3868
        byteOrder.PutUint16(scratch[:2], 1)
916✔
3869
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3870
                return err
×
3871
        }
×
3872

3873
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
916✔
3874
                return err
×
3875
        }
×
3876
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
916✔
3877
                return err
×
3878
        }
×
3879
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
916✔
3880
                return err
×
3881
        }
×
3882

3883
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
916✔
3884
                return err
×
3885
        }
×
3886

3887
        if err := node.Features.Encode(&b); err != nil {
916✔
3888
                return err
×
3889
        }
×
3890

3891
        numAddresses := uint16(len(node.Addresses))
916✔
3892
        byteOrder.PutUint16(scratch[:2], numAddresses)
916✔
3893
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3894
                return err
×
3895
        }
×
3896

3897
        for _, address := range node.Addresses {
2,060✔
3898
                if err := SerializeAddr(&b, address); err != nil {
1,144✔
3899
                        return err
×
3900
                }
×
3901
        }
3902

3903
        sigLen := len(node.AuthSigBytes)
916✔
3904
        if sigLen > 80 {
916✔
3905
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
3906
                        sigLen)
×
3907
        }
×
3908

3909
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
916✔
3910
        if err != nil {
916✔
3911
                return err
×
3912
        }
×
3913

3914
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
916✔
3915
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
3916
        }
×
3917
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
916✔
3918
        if err != nil {
916✔
3919
                return err
×
3920
        }
×
3921

3922
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
916✔
3923
                return err
×
3924
        }
×
3925

3926
        // With the alias bucket updated, we'll now update the index that
3927
        // tracks the time series of node updates.
3928
        var indexKey [8 + 33]byte
916✔
3929
        byteOrder.PutUint64(indexKey[:8], updateUnix)
916✔
3930
        copy(indexKey[8:], nodePub)
916✔
3931

916✔
3932
        // If there was already an old index entry for this node, then we'll
916✔
3933
        // delete the old one before we write the new entry.
916✔
3934
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
1,020✔
3935
                // Extract out the old update time to we can reconstruct the
104✔
3936
                // prior index key to delete it from the index.
104✔
3937
                oldUpdateTime := nodeBytes[:8]
104✔
3938

104✔
3939
                var oldIndexKey [8 + 33]byte
104✔
3940
                copy(oldIndexKey[:8], oldUpdateTime)
104✔
3941
                copy(oldIndexKey[8:], nodePub)
104✔
3942

104✔
3943
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
104✔
3944
                        return err
×
3945
                }
×
3946
        }
3947

3948
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
916✔
3949
                return err
×
3950
        }
×
3951

3952
        return nodeBucket.Put(nodePub, b.Bytes())
916✔
3953
}
3954

3955
func fetchLightningNode(nodeBucket kvdb.RBucket,
3956
        nodePub []byte) (models.LightningNode, error) {
3,611✔
3957

3,611✔
3958
        nodeBytes := nodeBucket.Get(nodePub)
3,611✔
3959
        if nodeBytes == nil {
3,685✔
3960
                return models.LightningNode{}, ErrGraphNodeNotFound
74✔
3961
        }
74✔
3962

3963
        nodeReader := bytes.NewReader(nodeBytes)
3,537✔
3964

3,537✔
3965
        return deserializeLightningNode(nodeReader)
3,537✔
3966
}
3967

3968
func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex,
3969
        *lnwire.FeatureVector, error) {
120✔
3970

120✔
3971
        var (
120✔
3972
                pubKey      route.Vertex
120✔
3973
                features    = lnwire.EmptyFeatureVector()
120✔
3974
                nodeScratch [8]byte
120✔
3975
        )
120✔
3976

120✔
3977
        // Skip ahead:
120✔
3978
        // - LastUpdate (8 bytes)
120✔
3979
        if _, err := r.Read(nodeScratch[:]); err != nil {
120✔
3980
                return pubKey, nil, err
×
3981
        }
×
3982

3983
        if _, err := io.ReadFull(r, pubKey[:]); err != nil {
120✔
3984
                return pubKey, nil, err
×
3985
        }
×
3986

3987
        // Read the node announcement flag.
3988
        if _, err := r.Read(nodeScratch[:2]); err != nil {
120✔
3989
                return pubKey, nil, err
×
3990
        }
×
3991
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
120✔
3992

120✔
3993
        // The rest of the data is optional, and will only be there if we got a
120✔
3994
        // node announcement for this node.
120✔
3995
        if hasNodeAnn == 0 {
120✔
3996
                return pubKey, features, nil
×
3997
        }
×
3998

3999
        // We did get a node announcement for this node, so we'll have the rest
4000
        // of the data available.
4001
        var rgb uint8
120✔
4002
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4003
                return pubKey, nil, err
×
4004
        }
×
4005
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4006
                return pubKey, nil, err
×
4007
        }
×
4008
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4009
                return pubKey, nil, err
×
4010
        }
×
4011

4012
        if _, err := wire.ReadVarString(r, 0); err != nil {
120✔
4013
                return pubKey, nil, err
×
4014
        }
×
4015

4016
        if err := features.Decode(r); err != nil {
120✔
4017
                return pubKey, nil, err
×
4018
        }
×
4019

4020
        return pubKey, features, nil
120✔
4021
}
4022

4023
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
8,498✔
4024
        var (
8,498✔
4025
                node    models.LightningNode
8,498✔
4026
                scratch [8]byte
8,498✔
4027
                err     error
8,498✔
4028
        )
8,498✔
4029

8,498✔
4030
        // Always populate a feature vector, even if we don't have a node
8,498✔
4031
        // announcement and short circuit below.
8,498✔
4032
        node.Features = lnwire.EmptyFeatureVector()
8,498✔
4033

8,498✔
4034
        if _, err := r.Read(scratch[:]); err != nil {
8,498✔
4035
                return models.LightningNode{}, err
×
4036
        }
×
4037

4038
        unix := int64(byteOrder.Uint64(scratch[:]))
8,498✔
4039
        node.LastUpdate = time.Unix(unix, 0)
8,498✔
4040

8,498✔
4041
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
8,498✔
4042
                return models.LightningNode{}, err
×
4043
        }
×
4044

4045
        if _, err := r.Read(scratch[:2]); err != nil {
8,498✔
4046
                return models.LightningNode{}, err
×
4047
        }
×
4048

4049
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
8,498✔
4050
        if hasNodeAnn == 1 {
16,857✔
4051
                node.HaveNodeAnnouncement = true
8,359✔
4052
        } else {
8,498✔
4053
                node.HaveNodeAnnouncement = false
139✔
4054
        }
139✔
4055

4056
        // The rest of the data is optional, and will only be there if we got a
4057
        // node announcement for this node.
4058
        if !node.HaveNodeAnnouncement {
8,637✔
4059
                return node, nil
139✔
4060
        }
139✔
4061

4062
        // We did get a node announcement for this node, so we'll have the rest
4063
        // of the data available.
4064
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
8,359✔
4065
                return models.LightningNode{}, err
×
4066
        }
×
4067
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
8,359✔
4068
                return models.LightningNode{}, err
×
4069
        }
×
4070
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
8,359✔
4071
                return models.LightningNode{}, err
×
4072
        }
×
4073

4074
        node.Alias, err = wire.ReadVarString(r, 0)
8,359✔
4075
        if err != nil {
8,359✔
4076
                return models.LightningNode{}, err
×
4077
        }
×
4078

4079
        err = node.Features.Decode(r)
8,359✔
4080
        if err != nil {
8,359✔
4081
                return models.LightningNode{}, err
×
4082
        }
×
4083

4084
        if _, err := r.Read(scratch[:2]); err != nil {
8,359✔
4085
                return models.LightningNode{}, err
×
4086
        }
×
4087
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
8,359✔
4088

8,359✔
4089
        var addresses []net.Addr
8,359✔
4090
        for i := 0; i < numAddresses; i++ {
18,948✔
4091
                address, err := DeserializeAddr(r)
10,589✔
4092
                if err != nil {
10,589✔
4093
                        return models.LightningNode{}, err
×
4094
                }
×
4095
                addresses = append(addresses, address)
10,589✔
4096
        }
4097
        node.Addresses = addresses
8,359✔
4098

8,359✔
4099
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,359✔
4100
        if err != nil {
8,359✔
4101
                return models.LightningNode{}, err
×
4102
        }
×
4103

4104
        // We'll try and see if there are any opaque bytes left, if not, then
4105
        // we'll ignore the EOF error and return the node as is.
4106
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
8,359✔
4107
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,359✔
4108
        )
8,359✔
4109
        switch {
8,359✔
4110
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4111
        case errors.Is(err, io.EOF):
×
4112
        case err != nil:
×
4113
                return models.LightningNode{}, err
×
4114
        }
4115

4116
        return node, nil
8,359✔
4117
}
4118

4119
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4120
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
1,480✔
4121

1,480✔
4122
        var b bytes.Buffer
1,480✔
4123

1,480✔
4124
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
1,480✔
4125
                return err
×
4126
        }
×
4127
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
1,480✔
4128
                return err
×
4129
        }
×
4130
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
1,480✔
4131
                return err
×
4132
        }
×
4133
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
1,480✔
4134
                return err
×
4135
        }
×
4136

4137
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
1,480✔
4138
                return err
×
4139
        }
×
4140

4141
        authProof := edgeInfo.AuthProof
1,480✔
4142
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
1,480✔
4143
        if authProof != nil {
2,877✔
4144
                nodeSig1 = authProof.NodeSig1Bytes
1,397✔
4145
                nodeSig2 = authProof.NodeSig2Bytes
1,397✔
4146
                bitcoinSig1 = authProof.BitcoinSig1Bytes
1,397✔
4147
                bitcoinSig2 = authProof.BitcoinSig2Bytes
1,397✔
4148
        }
1,397✔
4149

4150
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
1,480✔
4151
                return err
×
4152
        }
×
4153
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
1,480✔
4154
                return err
×
4155
        }
×
4156
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
1,480✔
4157
                return err
×
4158
        }
×
4159
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
1,480✔
4160
                return err
×
4161
        }
×
4162

4163
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
1,480✔
4164
                return err
×
4165
        }
×
4166
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
1,480✔
4167
        if err != nil {
1,480✔
4168
                return err
×
4169
        }
×
4170
        if _, err := b.Write(chanID[:]); err != nil {
1,480✔
4171
                return err
×
4172
        }
×
4173
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
1,480✔
4174
                return err
×
4175
        }
×
4176

4177
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
1,480✔
4178
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4179
        }
×
4180
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
1,480✔
4181
        if err != nil {
1,480✔
4182
                return err
×
4183
        }
×
4184

4185
        return edgeIndex.Put(chanID[:], b.Bytes())
1,480✔
4186
}
4187

4188
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4189
        chanID []byte) (models.ChannelEdgeInfo, error) {
6,782✔
4190

6,782✔
4191
        edgeInfoBytes := edgeIndex.Get(chanID)
6,782✔
4192
        if edgeInfoBytes == nil {
6,848✔
4193
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
66✔
4194
        }
66✔
4195

4196
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
6,716✔
4197

6,716✔
4198
        return deserializeChanEdgeInfo(edgeInfoReader)
6,716✔
4199
}
4200

4201
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
7,255✔
4202
        var (
7,255✔
4203
                err      error
7,255✔
4204
                edgeInfo models.ChannelEdgeInfo
7,255✔
4205
        )
7,255✔
4206

7,255✔
4207
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
7,255✔
4208
                return models.ChannelEdgeInfo{}, err
×
4209
        }
×
4210
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
7,255✔
4211
                return models.ChannelEdgeInfo{}, err
×
4212
        }
×
4213
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
7,255✔
4214
                return models.ChannelEdgeInfo{}, err
×
4215
        }
×
4216
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
7,255✔
4217
                return models.ChannelEdgeInfo{}, err
×
4218
        }
×
4219

4220
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
7,255✔
4221
        if err != nil {
7,255✔
4222
                return models.ChannelEdgeInfo{}, err
×
4223
        }
×
4224

4225
        proof := &models.ChannelAuthProof{}
7,255✔
4226

7,255✔
4227
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
7,255✔
4228
        if err != nil {
7,255✔
4229
                return models.ChannelEdgeInfo{}, err
×
4230
        }
×
4231
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
7,255✔
4232
        if err != nil {
7,255✔
4233
                return models.ChannelEdgeInfo{}, err
×
4234
        }
×
4235
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
7,255✔
4236
        if err != nil {
7,255✔
4237
                return models.ChannelEdgeInfo{}, err
×
4238
        }
×
4239
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
7,255✔
4240
        if err != nil {
7,255✔
4241
                return models.ChannelEdgeInfo{}, err
×
4242
        }
×
4243

4244
        if !proof.IsEmpty() {
11,409✔
4245
                edgeInfo.AuthProof = proof
4,154✔
4246
        }
4,154✔
4247

4248
        edgeInfo.ChannelPoint = wire.OutPoint{}
7,255✔
4249
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
7,255✔
4250
                return models.ChannelEdgeInfo{}, err
×
4251
        }
×
4252
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
7,255✔
4253
                return models.ChannelEdgeInfo{}, err
×
4254
        }
×
4255
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
7,255✔
4256
                return models.ChannelEdgeInfo{}, err
×
4257
        }
×
4258

4259
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
7,255✔
4260
                return models.ChannelEdgeInfo{}, err
×
4261
        }
×
4262

4263
        // We'll try and see if there are any opaque bytes left, if not, then
4264
        // we'll ignore the EOF error and return the edge as is.
4265
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
7,255✔
4266
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
7,255✔
4267
        )
7,255✔
4268
        switch {
7,255✔
4269
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4270
        case errors.Is(err, io.EOF):
×
4271
        case err != nil:
×
4272
                return models.ChannelEdgeInfo{}, err
×
4273
        }
4274

4275
        return edgeInfo, nil
7,255✔
4276
}
4277

4278
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4279
        from, to []byte) error {
2,660✔
4280

2,660✔
4281
        var edgeKey [33 + 8]byte
2,660✔
4282
        copy(edgeKey[:], from)
2,660✔
4283
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
2,660✔
4284

2,660✔
4285
        var b bytes.Buffer
2,660✔
4286
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
2,660✔
4287
                return err
×
4288
        }
×
4289

4290
        // Before we write out the new edge, we'll create a new entry in the
4291
        // update index in order to keep it fresh.
4292
        updateUnix := uint64(edge.LastUpdate.Unix())
2,660✔
4293
        var indexKey [8 + 8]byte
2,660✔
4294
        byteOrder.PutUint64(indexKey[:8], updateUnix)
2,660✔
4295
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
2,660✔
4296

2,660✔
4297
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
2,660✔
4298
        if err != nil {
2,660✔
4299
                return err
×
4300
        }
×
4301

4302
        // If there was already an entry for this edge, then we'll need to
4303
        // delete the old one to ensure we don't leave around any after-images.
4304
        // An unknown policy value does not have a update time recorded, so
4305
        // it also does not need to be removed.
4306
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
2,660✔
4307
                !bytes.Equal(edgeBytes, unknownPolicy) {
2,684✔
4308

24✔
4309
                // In order to delete the old entry, we'll need to obtain the
24✔
4310
                // *prior* update time in order to delete it. To do this, we'll
24✔
4311
                // need to deserialize the existing policy within the database
24✔
4312
                // (now outdated by the new one), and delete its corresponding
24✔
4313
                // entry within the update index. We'll ignore any
24✔
4314
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
24✔
4315
                // the channel ID and update time to delete the entry.
24✔
4316
                // TODO(halseth): get rid of these invalid policies in a
24✔
4317
                // migration.
24✔
4318
                oldEdgePolicy, err := deserializeChanEdgePolicy(
24✔
4319
                        bytes.NewReader(edgeBytes),
24✔
4320
                )
24✔
4321
                if err != nil &&
24✔
4322
                        !errors.Is(err, ErrEdgePolicyOptionalFieldNotFound) {
24✔
4323

×
4324
                        return err
×
4325
                }
×
4326

4327
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
24✔
4328

24✔
4329
                var oldIndexKey [8 + 8]byte
24✔
4330
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
24✔
4331
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
24✔
4332

24✔
4333
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
24✔
4334
                        return err
×
4335
                }
×
4336
        }
4337

4338
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
2,660✔
4339
                return err
×
4340
        }
×
4341

4342
        err = updateEdgePolicyDisabledIndex(
2,660✔
4343
                edges, edge.ChannelID,
2,660✔
4344
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
2,660✔
4345
                edge.IsDisabled(),
2,660✔
4346
        )
2,660✔
4347
        if err != nil {
2,660✔
4348
                return err
×
4349
        }
×
4350

4351
        return edges.Put(edgeKey[:], b.Bytes())
2,660✔
4352
}
4353

4354
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4355
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4356
// one.
4357
// The direction represents the direction of the edge and disabled is used for
4358
// deciding whether to remove or add an entry to the bucket.
4359
// In general a channel is disabled if two entries for the same chanID exist
4360
// in this bucket.
4361
// Maintaining the bucket this way allows a fast retrieval of disabled
4362
// channels, for example when prune is needed.
4363
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4364
        direction bool, disabled bool) error {
2,930✔
4365

2,930✔
4366
        var disabledEdgeKey [8 + 1]byte
2,930✔
4367
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
2,930✔
4368
        if direction {
4,391✔
4369
                disabledEdgeKey[8] = 1
1,461✔
4370
        }
1,461✔
4371

4372
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
2,930✔
4373
                disabledEdgePolicyBucket,
2,930✔
4374
        )
2,930✔
4375
        if err != nil {
2,930✔
4376
                return err
×
4377
        }
×
4378

4379
        if disabled {
2,956✔
4380
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
26✔
4381
        }
26✔
4382

4383
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
2,904✔
4384
}
4385

4386
// putChanEdgePolicyUnknown marks the edge policy as unknown
4387
// in the edges bucket.
4388
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4389
        from []byte) error {
2,958✔
4390

2,958✔
4391
        var edgeKey [33 + 8]byte
2,958✔
4392
        copy(edgeKey[:], from)
2,958✔
4393
        byteOrder.PutUint64(edgeKey[33:], channelID)
2,958✔
4394

2,958✔
4395
        if edges.Get(edgeKey[:]) != nil {
2,958✔
4396
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4397
                        " when there is already a policy present", channelID)
×
4398
        }
×
4399

4400
        return edges.Put(edgeKey[:], unknownPolicy)
2,958✔
4401
}
4402

4403
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4404
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
13,448✔
4405

13,448✔
4406
        var edgeKey [33 + 8]byte
13,448✔
4407
        copy(edgeKey[:], nodePub)
13,448✔
4408
        copy(edgeKey[33:], chanID)
13,448✔
4409

13,448✔
4410
        edgeBytes := edges.Get(edgeKey[:])
13,448✔
4411
        if edgeBytes == nil {
13,448✔
4412
                return nil, ErrEdgeNotFound
×
4413
        }
×
4414

4415
        // No need to deserialize unknown policy.
4416
        if bytes.Equal(edgeBytes, unknownPolicy) {
14,961✔
4417
                return nil, nil
1,513✔
4418
        }
1,513✔
4419

4420
        edgeReader := bytes.NewReader(edgeBytes)
11,935✔
4421

11,935✔
4422
        ep, err := deserializeChanEdgePolicy(edgeReader)
11,935✔
4423
        switch {
11,935✔
4424
        // If the db policy was missing an expected optional field, we return
4425
        // nil as if the policy was unknown.
4426
        case errors.Is(err, ErrEdgePolicyOptionalFieldNotFound):
2✔
4427
                return nil, nil
2✔
4428

4429
        case err != nil:
×
4430
                return nil, err
×
4431
        }
4432

4433
        return ep, nil
11,933✔
4434
}
4435

4436
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4437
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4438
        error) {
2,884✔
4439

2,884✔
4440
        edgeInfo := edgeIndex.Get(chanID)
2,884✔
4441
        if edgeInfo == nil {
2,884✔
4442
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4443
                        chanID)
×
4444
        }
×
4445

4446
        // The first node is contained within the first half of the edge
4447
        // information. We only propagate the error here and below if it's
4448
        // something other than edge non-existence.
4449
        node1Pub := edgeInfo[:33]
2,884✔
4450
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
2,884✔
4451
        if err != nil {
2,884✔
4452
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4453
                        node1Pub)
×
4454
        }
×
4455

4456
        // Similarly, the second node is contained within the latter
4457
        // half of the edge information.
4458
        node2Pub := edgeInfo[33:66]
2,884✔
4459
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
2,884✔
4460
        if err != nil {
2,884✔
4461
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4462
                        node2Pub)
×
4463
        }
×
4464

4465
        return edge1, edge2, nil
2,884✔
4466
}
4467

4468
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4469
        to []byte) error {
2,662✔
4470

2,662✔
4471
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
2,662✔
4472
        if err != nil {
2,662✔
4473
                return err
×
4474
        }
×
4475

4476
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
2,662✔
4477
                return err
×
4478
        }
×
4479

4480
        var scratch [8]byte
2,662✔
4481
        updateUnix := uint64(edge.LastUpdate.Unix())
2,662✔
4482
        byteOrder.PutUint64(scratch[:], updateUnix)
2,662✔
4483
        if _, err := w.Write(scratch[:]); err != nil {
2,662✔
4484
                return err
×
4485
        }
×
4486

4487
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
2,662✔
4488
                return err
×
4489
        }
×
4490
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
2,662✔
4491
                return err
×
4492
        }
×
4493
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
2,662✔
4494
                return err
×
4495
        }
×
4496
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
2,662✔
4497
                return err
×
4498
        }
×
4499
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
2,662✔
4500
        if err != nil {
2,662✔
4501
                return err
×
4502
        }
×
4503
        err = binary.Write(
2,662✔
4504
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
2,662✔
4505
        )
2,662✔
4506
        if err != nil {
2,662✔
4507
                return err
×
4508
        }
×
4509

4510
        if _, err := w.Write(to); err != nil {
2,662✔
4511
                return err
×
4512
        }
×
4513

4514
        // If the max_htlc field is present, we write it. To be compatible with
4515
        // older versions that wasn't aware of this field, we write it as part
4516
        // of the opaque data.
4517
        // TODO(halseth): clean up when moving to TLV.
4518
        var opaqueBuf bytes.Buffer
2,662✔
4519
        if edge.MessageFlags.HasMaxHtlc() {
4,940✔
4520
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
2,278✔
4521
                if err != nil {
2,278✔
4522
                        return err
×
4523
                }
×
4524
        }
4525

4526
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
2,662✔
4527
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4528
        }
×
4529
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
2,662✔
4530
                return err
×
4531
        }
×
4532

4533
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
2,662✔
4534
                return err
×
4535
        }
×
4536

4537
        return nil
2,662✔
4538
}
4539

4540
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
11,960✔
4541
        // Deserialize the policy. Note that in case an optional field is not
11,960✔
4542
        // found, both an error and a populated policy object are returned.
11,960✔
4543
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
11,960✔
4544
        if deserializeErr != nil &&
11,960✔
4545
                !errors.Is(deserializeErr, ErrEdgePolicyOptionalFieldNotFound) {
11,960✔
4546

×
4547
                return nil, deserializeErr
×
4548
        }
×
4549

4550
        return edge, deserializeErr
11,960✔
4551
}
4552

4553
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4554
        error) {
12,967✔
4555

12,967✔
4556
        edge := &models.ChannelEdgePolicy{}
12,967✔
4557

12,967✔
4558
        var err error
12,967✔
4559
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
12,967✔
4560
        if err != nil {
12,967✔
4561
                return nil, err
×
4562
        }
×
4563

4564
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
12,967✔
4565
                return nil, err
×
4566
        }
×
4567

4568
        var scratch [8]byte
12,967✔
4569
        if _, err := r.Read(scratch[:]); err != nil {
12,967✔
4570
                return nil, err
×
4571
        }
×
4572
        unix := int64(byteOrder.Uint64(scratch[:]))
12,967✔
4573
        edge.LastUpdate = time.Unix(unix, 0)
12,967✔
4574

12,967✔
4575
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
12,967✔
4576
                return nil, err
×
4577
        }
×
4578
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
12,967✔
4579
                return nil, err
×
4580
        }
×
4581
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
12,967✔
4582
                return nil, err
×
4583
        }
×
4584

4585
        var n uint64
12,967✔
4586
        if err := binary.Read(r, byteOrder, &n); err != nil {
12,967✔
4587
                return nil, err
×
4588
        }
×
4589
        edge.MinHTLC = lnwire.MilliSatoshi(n)
12,967✔
4590

12,967✔
4591
        if err := binary.Read(r, byteOrder, &n); err != nil {
12,967✔
4592
                return nil, err
×
4593
        }
×
4594
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
12,967✔
4595

12,967✔
4596
        if err := binary.Read(r, byteOrder, &n); err != nil {
12,967✔
4597
                return nil, err
×
4598
        }
×
4599
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
12,967✔
4600

12,967✔
4601
        if _, err := r.Read(edge.ToNode[:]); err != nil {
12,967✔
4602
                return nil, err
×
4603
        }
×
4604

4605
        // We'll try and see if there are any opaque bytes left, if not, then
4606
        // we'll ignore the EOF error and return the edge as is.
4607
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
12,967✔
4608
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
12,967✔
4609
        )
12,967✔
4610
        switch {
12,967✔
4611
        case errors.Is(err, io.ErrUnexpectedEOF):
×
4612
        case errors.Is(err, io.EOF):
4✔
4613
        case err != nil:
×
4614
                return nil, err
×
4615
        }
4616

4617
        // See if optional fields are present.
4618
        if edge.MessageFlags.HasMaxHtlc() {
24,958✔
4619
                // The max_htlc field should be at the beginning of the opaque
11,991✔
4620
                // bytes.
11,991✔
4621
                opq := edge.ExtraOpaqueData
11,991✔
4622

11,991✔
4623
                // If the max_htlc field is not present, it might be old data
11,991✔
4624
                // stored before this field was validated. We'll return the
11,991✔
4625
                // edge along with an error.
11,991✔
4626
                if len(opq) < 8 {
11,995✔
4627
                        return edge, ErrEdgePolicyOptionalFieldNotFound
4✔
4628
                }
4✔
4629

4630
                maxHtlc := byteOrder.Uint64(opq[:8])
11,987✔
4631
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
11,987✔
4632

11,987✔
4633
                // Exclude the parsed field from the rest of the opaque data.
11,987✔
4634
                edge.ExtraOpaqueData = opq[8:]
11,987✔
4635
        }
4636

4637
        return edge, nil
12,963✔
4638
}
4639

4640
// chanGraphNodeTx is an implementation of the NodeRTx interface backed by the
4641
// KVStore and a kvdb.RTx.
4642
type chanGraphNodeTx struct {
4643
        tx   kvdb.RTx
4644
        db   *KVStore
4645
        node *models.LightningNode
4646
}
4647

4648
// A compile-time constraint to ensure chanGraphNodeTx implements the NodeRTx
4649
// interface.
4650
var _ NodeRTx = (*chanGraphNodeTx)(nil)
4651

4652
func newChanGraphNodeTx(tx kvdb.RTx, db *KVStore,
4653
        node *models.LightningNode) *chanGraphNodeTx {
3,914✔
4654

3,914✔
4655
        return &chanGraphNodeTx{
3,914✔
4656
                tx:   tx,
3,914✔
4657
                db:   db,
3,914✔
4658
                node: node,
3,914✔
4659
        }
3,914✔
4660
}
3,914✔
4661

4662
// Node returns the raw information of the node.
4663
//
4664
// NOTE: This is a part of the NodeRTx interface.
4665
func (c *chanGraphNodeTx) Node() *models.LightningNode {
4,839✔
4666
        return c.node
4,839✔
4667
}
4,839✔
4668

4669
// FetchNode fetches the node with the given pub key under the same transaction
4670
// used to fetch the current node. The returned node is also a NodeRTx and any
4671
// operations on that NodeRTx will also be done under the same transaction.
4672
//
4673
// NOTE: This is a part of the NodeRTx interface.
4674
func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
2,944✔
4675
        node, err := c.db.FetchLightningNodeTx(c.tx, nodePub)
2,944✔
4676
        if err != nil {
2,944✔
4677
                return nil, err
×
4678
        }
×
4679

4680
        return newChanGraphNodeTx(c.tx, c.db, node), nil
2,944✔
4681
}
4682

4683
// ForEachChannel can be used to iterate over the node's channels under
4684
// the same transaction used to fetch the node.
4685
//
4686
// NOTE: This is a part of the NodeRTx interface.
4687
func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
4688
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
965✔
4689

965✔
4690
        return c.db.ForEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
965✔
4691
                func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1,
965✔
4692
                        policy2 *models.ChannelEdgePolicy) error {
3,909✔
4693

2,944✔
4694
                        return f(info, policy1, policy2)
2,944✔
4695
                },
2,944✔
4696
        )
4697
}
4698

4699
// MakeTestGraph creates a new instance of the KVStore for testing
4700
// purposes.
4701
func MakeTestGraph(t testing.TB, modifiers ...KVStoreOptionModifier) (
4702
        *ChannelGraph, error) {
41✔
4703

41✔
4704
        opts := DefaultOptions()
41✔
4705
        for _, modifier := range modifiers {
41✔
4706
                modifier(opts)
×
4707
        }
×
4708

4709
        // Next, create KVStore for the first time.
4710
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
41✔
4711
        if err != nil {
41✔
4712
                backendCleanup()
×
4713

×
4714
                return nil, err
×
4715
        }
×
4716

4717
        graph, err := NewChannelGraph(&Config{
41✔
4718
                KVDB:        backend,
41✔
4719
                KVStoreOpts: modifiers,
41✔
4720
        })
41✔
4721
        if err != nil {
41✔
4722
                backendCleanup()
×
4723

×
4724
                return nil, err
×
4725
        }
×
4726
        require.NoError(t, graph.Start())
41✔
4727

41✔
4728
        t.Cleanup(func() {
82✔
4729
                _ = backend.Close()
41✔
4730
                backendCleanup()
41✔
4731
                require.NoError(t, graph.Stop())
41✔
4732
        })
41✔
4733

4734
        return graph, nil
41✔
4735
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc