• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.51
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/aliasmgr"
22
        "github.com/lightningnetwork/lnd/batch"
23
        "github.com/lightningnetwork/lnd/graph/db/models"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/kvdb"
26
        "github.com/lightningnetwork/lnd/lnwire"
27
        "github.com/lightningnetwork/lnd/routing/route"
28
)
29

30
var (
31
        // nodeBucket is a bucket which houses all the vertices or nodes within
32
        // the channel graph. This bucket has a single-sub bucket which adds an
33
        // additional index from pubkey -> alias. Within the top-level of this
34
        // bucket, the key space maps a node's compressed public key to the
35
        // serialized information for that node. Additionally, there's a
36
        // special key "source" which stores the pubkey of the source node. The
37
        // source node is used as the starting point for all graph/queries and
38
        // traversals. The graph is formed as a star-graph with the source node
39
        // at the center.
40
        //
41
        // maps: pubKey -> nodeInfo
42
        // maps: source -> selfPubKey
43
        nodeBucket = []byte("graph-node")
44

45
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
46
        // will be used to quickly look up the "freshness" of a node's last
47
        // update to the network. The bucket only contains keys, and no values,
48
        // it's mapping:
49
        //
50
        // maps: updateTime || nodeID -> nil
51
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
52

53
        // sourceKey is a special key that resides within the nodeBucket. The
54
        // sourceKey maps a key to the public key of the "self node".
55
        sourceKey = []byte("source")
56

57
        // aliasIndexBucket is a sub-bucket that's nested within the main
58
        // nodeBucket. This bucket maps the public key of a node to its
59
        // current alias. This bucket is provided as it can be used within a
60
        // future UI layer to add an additional degree of confirmation.
61
        aliasIndexBucket = []byte("alias")
62

63
        // edgeBucket is a bucket which houses all of the edge or channel
64
        // information within the channel graph. This bucket essentially acts
65
        // as an adjacency list, which in conjunction with a range scan, can be
66
        // used to iterate over all the incoming and outgoing edges for a
67
        // particular node. Key in the bucket use a prefix scheme which leads
68
        // with the node's public key and sends with the compact edge ID.
69
        // For each chanID, there will be two entries within the bucket, as the
70
        // graph is directed: nodes may have different policies w.r.t to fees
71
        // for their respective directions.
72
        //
73
        // maps: pubKey || chanID -> channel edge policy for node
74
        edgeBucket = []byte("graph-edge")
75

76
        // unknownPolicy is represented as an empty slice. It is
77
        // used as the value in edgeBucket for unknown channel edge policies.
78
        // Unknown policies are still stored in the database to enable efficient
79
        // lookup of incoming channel edges.
80
        unknownPolicy = []byte{}
81

82
        // chanStart is an array of all zero bytes which is used to perform
83
        // range scans within the edgeBucket to obtain all of the outgoing
84
        // edges for a particular node.
85
        chanStart [8]byte
86

87
        // edgeIndexBucket is an index which can be used to iterate all edges
88
        // in the bucket, grouping them according to their in/out nodes.
89
        // Additionally, the items in this bucket also contain the complete
90
        // edge information for a channel. The edge information includes the
91
        // capacity of the channel, the nodes that made the channel, etc. This
92
        // bucket resides within the edgeBucket above. Creation of an edge
93
        // proceeds in two phases: first the edge is added to the edge index,
94
        // afterwards the edgeBucket can be updated with the latest details of
95
        // the edge as they are announced on the network.
96
        //
97
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
98
        edgeIndexBucket = []byte("edge-index")
99

100
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
101
        // bucket contains an index which allows us to gauge the "freshness" of
102
        // a channel's last updates.
103
        //
104
        // maps: updateTime || chanID -> nil
105
        edgeUpdateIndexBucket = []byte("edge-update-index")
106

107
        // channelPointBucket maps a channel's full outpoint (txid:index) to
108
        // its short 8-byte channel ID. This bucket resides within the
109
        // edgeBucket above, and can be used to quickly remove an edge due to
110
        // the outpoint being spent, or to query for existence of a channel.
111
        //
112
        // maps: outPoint -> chanID
113
        channelPointBucket = []byte("chan-index")
114

115
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
116
        // responsible for maintaining an index of zombie channels. Each entry
117
        // exists within the bucket as follows:
118
        //
119
        // maps: chanID -> pubKey1 || pubKey2
120
        //
121
        // The chanID represents the channel ID of the edge that is marked as a
122
        // zombie and is used as the key, which maps to the public keys of the
123
        // edge's participants.
124
        zombieBucket = []byte("zombie-index")
125

126
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
127
        // bucket responsible for maintaining an index of disabled edge
128
        // policies. Each entry exists within the bucket as follows:
129
        //
130
        // maps: <chanID><direction> -> []byte{}
131
        //
132
        // The chanID represents the channel ID of the edge and the direction is
133
        // one byte representing the direction of the edge. The main purpose of
134
        // this index is to allow pruning disabled channels in a fast way without
135
        // the need to iterate all over the graph.
136
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
137

138
        // graphMetaBucket is a top-level bucket which stores various meta-deta
139
        // related to the on-disk channel graph. Data stored in this bucket
140
        // includes the block to which the graph has been synced to, the total
141
        // number of channels, etc.
142
        graphMetaBucket = []byte("graph-meta")
143

144
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
145
        // a mapping from the block height to the hash for the blocks used to
146
        // prune the graph.
147
        // Once a new block is discovered, any channels that have been closed
148
        // (by spending the outpoint) can safely be removed from the graph, and
149
        // the block is added to the prune log. We need to keep such a log for
150
        // the case where a reorg happens, and we must "rewind" the state of the
151
        // graph by removing channels that were previously confirmed. In such a
152
        // case we'll remove all entries from the prune log with a block height
153
        // that no longer exists.
154
        pruneLogBucket = []byte("prune-log")
155

156
        // closedScidBucket is a top-level bucket that stores scids for
157
        // channels that we know to be closed. This is used so that we don't
158
        // need to perform expensive validation checks if we receive a channel
159
        // announcement for the channel again.
160
        //
161
        // maps: scid -> []byte{}
162
        closedScidBucket = []byte("closed-scid")
163
)
164

165
const (
166
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
167
        // we'll permit to be written to disk. We limit this as otherwise, it
168
        // would be possible for a node to create a ton of updates and slowly
169
        // fill our disk, and also waste bandwidth due to relaying.
170
        MaxAllowedExtraOpaqueBytes = 10000
171
)
172

173
// ChannelGraph is a persistent, on-disk graph representation of the Lightning
174
// Network. This struct can be used to implement path finding algorithms on top
175
// of, and also to update a node's view based on information received from the
176
// p2p network. Internally, the graph is stored using a modified adjacency list
177
// representation with some added object interaction possible with each
178
// serialized edge/node. The graph is stored is directed, meaning that are two
179
// edges stored for each channel: an inbound/outbound edge for each node pair.
180
// Nodes, edges, and edge information can all be added to the graph
181
// independently. Edge removal results in the deletion of all edge information
182
// for that edge.
183
type ChannelGraph struct {
184
        db kvdb.Backend
185

186
        // cacheMu guards all caches (rejectCache, chanCache, graphCache). If
187
        // this mutex will be acquired at the same time as the DB mutex then
188
        // the cacheMu MUST be acquired first to prevent deadlock.
189
        cacheMu     sync.RWMutex
190
        rejectCache *rejectCache
191
        chanCache   *channelCache
192
        graphCache  *GraphCache
193

194
        chanScheduler batch.Scheduler
195
        nodeScheduler batch.Scheduler
196
}
197

198
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
199
// returned instance has its own unique reject cache and channel cache.
200
func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
201
        error) {
173✔
202

173✔
203
        opts := DefaultOptions()
173✔
204
        for _, o := range options {
276✔
205
                o(opts)
103✔
206
        }
103✔
207

208
        if !opts.NoMigration {
346✔
209
                if err := initChannelGraph(db); err != nil {
173✔
210
                        return nil, err
×
211
                }
×
212
        }
213

214
        g := &ChannelGraph{
173✔
215
                db:          db,
173✔
216
                rejectCache: newRejectCache(opts.RejectCacheSize),
173✔
217
                chanCache:   newChannelCache(opts.ChannelCacheSize),
173✔
218
        }
173✔
219
        g.chanScheduler = batch.NewTimeScheduler(
173✔
220
                db, &g.cacheMu, opts.BatchCommitInterval,
173✔
221
        )
173✔
222
        g.nodeScheduler = batch.NewTimeScheduler(
173✔
223
                db, nil, opts.BatchCommitInterval,
173✔
224
        )
173✔
225

173✔
226
        // The graph cache can be turned off (e.g. for mobile users) for a
173✔
227
        // speed/memory usage tradeoff.
173✔
228
        if opts.UseGraphCache {
313✔
229
                g.graphCache = NewGraphCache(opts.PreAllocCacheNumNodes)
140✔
230
                startTime := time.Now()
140✔
231
                log.Debugf("Populating in-memory channel graph, this might " +
140✔
232
                        "take a while...")
140✔
233

140✔
234
                err := g.ForEachNodeCacheable(
140✔
235
                        func(tx kvdb.RTx, node GraphCacheNode) error {
240✔
236
                                g.graphCache.AddNodeFeatures(node)
100✔
237

100✔
238
                                return nil
100✔
239
                        },
100✔
240
                )
241
                if err != nil {
140✔
242
                        return nil, err
×
243
                }
×
244

245
                err = g.ForEachChannel(func(info *models.ChannelEdgeInfo,
140✔
246
                        policy1, policy2 *models.ChannelEdgePolicy) error {
536✔
247

396✔
248
                        g.graphCache.AddChannel(info, policy1, policy2)
396✔
249

396✔
250
                        return nil
396✔
251
                })
396✔
252
                if err != nil {
140✔
253
                        return nil, err
×
254
                }
×
255

256
                log.Debugf("Finished populating in-memory channel graph (took "+
140✔
257
                        "%v, %s)", time.Since(startTime), g.graphCache.Stats())
140✔
258
        }
259

260
        return g, nil
173✔
261
}
262

263
// channelMapKey is the key structure used for storing channel edge policies.
264
type channelMapKey struct {
265
        nodeKey route.Vertex
266
        chanID  [8]byte
267
}
268

269
// getChannelMap loads all channel edge policies from the database and stores
270
// them in a map.
271
func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
272
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
144✔
273

144✔
274
        // Create a map to store all channel edge policies.
144✔
275
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
144✔
276

144✔
277
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
1,715✔
278
                // Skip embedded buckets.
1,571✔
279
                if bytes.Equal(k, edgeIndexBucket) ||
1,571✔
280
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
1,571✔
281
                        bytes.Equal(k, zombieBucket) ||
1,571✔
282
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
1,571✔
283
                        bytes.Equal(k, channelPointBucket) {
2,152✔
284

581✔
285
                        return nil
581✔
286
                }
581✔
287

288
                // Validate key length.
289
                if len(k) != 33+8 {
990✔
290
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
291
                }
×
292

293
                var key channelMapKey
990✔
294
                copy(key.nodeKey[:], k[:33])
990✔
295
                copy(key.chanID[:], k[33:])
990✔
296

990✔
297
                // No need to deserialize unknown policy.
990✔
298
                if bytes.Equal(edgeBytes, unknownPolicy) {
990✔
299
                        return nil
×
300
                }
×
301

302
                edgeReader := bytes.NewReader(edgeBytes)
990✔
303
                edge, err := deserializeChanEdgePolicyRaw(
990✔
304
                        edgeReader,
990✔
305
                )
990✔
306

990✔
307
                switch {
990✔
308
                // If the db policy was missing an expected optional field, we
309
                // return nil as if the policy was unknown.
310
                case err == ErrEdgePolicyOptionalFieldNotFound:
×
311
                        return nil
×
312

313
                case err != nil:
×
314
                        return err
×
315
                }
316

317
                channelMap[key] = edge
990✔
318

990✔
319
                return nil
990✔
320
        })
321
        if err != nil {
144✔
322
                return nil, err
×
323
        }
×
324

325
        return channelMap, nil
144✔
326
}
327

328
var graphTopLevelBuckets = [][]byte{
329
        nodeBucket,
330
        edgeBucket,
331
        graphMetaBucket,
332
        closedScidBucket,
333
}
334

335
// Wipe completely deletes all saved state within all used buckets within the
336
// database. The deletion is done in a single transaction, therefore this
337
// operation is fully atomic.
338
func (c *ChannelGraph) Wipe() error {
×
339
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
340
                for _, tlb := range graphTopLevelBuckets {
×
341
                        err := tx.DeleteTopLevelBucket(tlb)
×
342
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
343
                                return err
×
344
                        }
×
345
                }
346
                return nil
×
347
        }, func() {})
×
348
        if err != nil {
×
349
                return err
×
350
        }
×
351

352
        return initChannelGraph(c.db)
×
353
}
354

355
// createChannelDB creates and initializes a fresh version of  In
356
// the case that the target path has not yet been created or doesn't yet exist,
357
// then the path is created. Additionally, all required top-level buckets used
358
// within the database are created.
359
func initChannelGraph(db kvdb.Backend) error {
173✔
360
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
346✔
361
                for _, tlb := range graphTopLevelBuckets {
865✔
362
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
692✔
363
                                return err
×
364
                        }
×
365
                }
366

367
                nodes := tx.ReadWriteBucket(nodeBucket)
173✔
368
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
173✔
369
                if err != nil {
173✔
370
                        return err
×
371
                }
×
372
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
173✔
373
                if err != nil {
173✔
374
                        return err
×
375
                }
×
376

377
                edges := tx.ReadWriteBucket(edgeBucket)
173✔
378
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
173✔
379
                if err != nil {
173✔
380
                        return err
×
381
                }
×
382
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
173✔
383
                if err != nil {
173✔
384
                        return err
×
385
                }
×
386
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
173✔
387
                if err != nil {
173✔
388
                        return err
×
389
                }
×
390
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
173✔
391
                if err != nil {
173✔
392
                        return err
×
393
                }
×
394

395
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
173✔
396
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
173✔
397
                return err
173✔
398
        }, func() {})
173✔
399
        if err != nil {
173✔
400
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
401
        }
×
402

403
        return nil
173✔
404
}
405

406
// NewPathFindTx returns a new read transaction that can be used for a single
407
// path finding session. Will return nil if the graph cache is enabled.
408
func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) {
133✔
409
        if c.graphCache != nil {
212✔
410
                return nil, nil
79✔
411
        }
79✔
412

413
        return c.db.BeginReadTx()
54✔
414
}
415

416
// AddrsForNode returns all known addresses for the target node public key that
417
// the graph DB is aware of. The returned boolean indicates if the given node is
418
// unknown to the graph DB or not.
419
//
420
// NOTE: this is part of the channeldb.AddrSource interface.
421
func (c *ChannelGraph) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
422
        error) {
1✔
423

1✔
424
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
1✔
425
        if err != nil {
1✔
426
                return false, nil, err
×
427
        }
×
428

429
        node, err := c.FetchLightningNode(pubKey)
1✔
430
        // We don't consider it an error if the graph is unaware of the node.
1✔
431
        switch {
1✔
432
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
433
                return false, nil, err
×
434

UNCOV
435
        case errors.Is(err, ErrGraphNodeNotFound):
×
UNCOV
436
                return false, nil, nil
×
437
        }
438

439
        return true, node.Addresses, nil
1✔
440
}
441

442
// ForEachChannel iterates through all the channel edges stored within the
443
// graph and invokes the passed callback for each edge. The callback takes two
444
// edges as since this is a directed graph, both the in/out edges are visited.
445
// If the callback returns an error, then the transaction is aborted and the
446
// iteration stops early.
447
//
448
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
449
// for that particular channel edge routing policy will be passed into the
450
// callback.
451
func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo,
452
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
144✔
453

144✔
454
        return c.db.View(func(tx kvdb.RTx) error {
288✔
455
                edges := tx.ReadBucket(edgeBucket)
144✔
456
                if edges == nil {
144✔
457
                        return ErrGraphNoEdgesFound
×
458
                }
×
459

460
                // First, load all edges in memory indexed by node and channel
461
                // id.
462
                channelMap, err := c.getChannelMap(edges)
144✔
463
                if err != nil {
144✔
464
                        return err
×
465
                }
×
466

467
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
144✔
468
                if edgeIndex == nil {
144✔
469
                        return ErrGraphNoEdgesFound
×
470
                }
×
471

472
                // Load edge index, recombine each channel with the policies
473
                // loaded above and invoke the callback.
474
                return kvdb.ForAll(
144✔
475
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
639✔
476
                                var chanID [8]byte
495✔
477
                                copy(chanID[:], k)
495✔
478

495✔
479
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
495✔
480
                                info, err := deserializeChanEdgeInfo(
495✔
481
                                        edgeInfoReader,
495✔
482
                                )
495✔
483
                                if err != nil {
495✔
484
                                        return err
×
485
                                }
×
486

487
                                policy1 := channelMap[channelMapKey{
495✔
488
                                        nodeKey: info.NodeKey1Bytes,
495✔
489
                                        chanID:  chanID,
495✔
490
                                }]
495✔
491

495✔
492
                                policy2 := channelMap[channelMapKey{
495✔
493
                                        nodeKey: info.NodeKey2Bytes,
495✔
494
                                        chanID:  chanID,
495✔
495
                                }]
495✔
496

495✔
497
                                return cb(&info, policy1, policy2)
495✔
498
                        },
499
                )
500
        }, func() {})
144✔
501
}
502

503
// ForEachNodeDirectedChannel iterates through all channels of a given node,
504
// executing the passed callback on the directed edge representing the channel
505
// and its incoming policy. If the callback returns an error, then the iteration
506
// is halted with the error propagated back up to the caller.
507
//
508
// Unknown policies are passed into the callback as nil values.
509
func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
510
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
703✔
511

703✔
512
        if c.graphCache != nil {
1,164✔
513
                return c.graphCache.ForEachChannel(node, cb)
461✔
514
        }
461✔
515

516
        // Fallback that uses the database.
517
        toNodeCallback := func() route.Vertex {
374✔
518
                return node
132✔
519
        }
132✔
520
        toNodeFeatures, err := c.FetchNodeFeatures(node)
242✔
521
        if err != nil {
242✔
522
                return err
×
523
        }
×
524

525
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
242✔
526
                p2 *models.ChannelEdgePolicy) error {
738✔
527

496✔
528
                var cachedInPolicy *models.CachedEdgePolicy
496✔
529
                if p2 != nil {
989✔
530
                        cachedInPolicy = models.NewCachedPolicy(p2)
493✔
531
                        cachedInPolicy.ToNodePubKey = toNodeCallback
493✔
532
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
493✔
533
                }
493✔
534

535
                var inboundFee lnwire.Fee
496✔
536
                if p1 != nil {
991✔
537
                        // Extract inbound fee. If there is a decoding error,
495✔
538
                        // skip this edge.
495✔
539
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
495✔
540
                        if err != nil {
496✔
541
                                return nil
1✔
542
                        }
1✔
543
                }
544

545
                directedChannel := &DirectedChannel{
495✔
546
                        ChannelID:    e.ChannelID,
495✔
547
                        IsNode1:      node == e.NodeKey1Bytes,
495✔
548
                        OtherNode:    e.NodeKey2Bytes,
495✔
549
                        Capacity:     e.Capacity,
495✔
550
                        OutPolicySet: p1 != nil,
495✔
551
                        InPolicy:     cachedInPolicy,
495✔
552
                        InboundFee:   inboundFee,
495✔
553
                }
495✔
554

495✔
555
                if node == e.NodeKey2Bytes {
743✔
556
                        directedChannel.OtherNode = e.NodeKey1Bytes
248✔
557
                }
248✔
558

559
                return cb(directedChannel)
495✔
560
        }
561
        return nodeTraversal(tx, node[:], c.db, dbCallback)
242✔
562
}
563

564
// FetchNodeFeatures returns the features of a given node. If no features are
565
// known for the node, an empty feature vector is returned.
566
func (c *ChannelGraph) FetchNodeFeatures(
567
        node route.Vertex) (*lnwire.FeatureVector, error) {
1,139✔
568

1,139✔
569
        if c.graphCache != nil {
1,592✔
570
                return c.graphCache.GetFeatures(node), nil
453✔
571
        }
453✔
572

573
        // Fallback that uses the database.
574
        targetNode, err := c.FetchLightningNode(node)
686✔
575
        switch err {
686✔
576
        // If the node exists and has features, return them directly.
577
        case nil:
675✔
578
                return targetNode.Features, nil
675✔
579

580
        // If we couldn't find a node announcement, populate a blank feature
581
        // vector.
582
        case ErrGraphNodeNotFound:
11✔
583
                return lnwire.EmptyFeatureVector(), nil
11✔
584

585
        // Otherwise, bubble the error up.
586
        default:
×
587
                return nil, err
×
588
        }
589
}
590

591
// ForEachNodeCached is similar to ForEachNode, but it utilizes the channel
592
// graph cache instead. Note that this doesn't return all the information the
593
// regular ForEachNode method does.
594
//
595
// NOTE: The callback contents MUST not be modified.
596
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
597
        chans map[uint64]*DirectedChannel) error) error {
1✔
598

1✔
599
        if c.graphCache != nil {
1✔
600
                return c.graphCache.ForEachNode(cb)
×
601
        }
×
602

603
        // Otherwise call back to a version that uses the database directly.
604
        // We'll iterate over each node, then the set of channels for each
605
        // node, and construct a similar callback functiopn signature as the
606
        // main funcotin expects.
607
        return c.ForEachNode(func(tx kvdb.RTx,
1✔
608
                node *models.LightningNode) error {
21✔
609

20✔
610
                channels := make(map[uint64]*DirectedChannel)
20✔
611

20✔
612
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
20✔
613
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
20✔
614
                                p1 *models.ChannelEdgePolicy,
20✔
615
                                p2 *models.ChannelEdgePolicy) error {
210✔
616

190✔
617
                                toNodeCallback := func() route.Vertex {
190✔
618
                                        return node.PubKeyBytes
×
619
                                }
×
620
                                toNodeFeatures, err := c.FetchNodeFeatures(
190✔
621
                                        node.PubKeyBytes,
190✔
622
                                )
190✔
623
                                if err != nil {
190✔
624
                                        return err
×
625
                                }
×
626

627
                                var cachedInPolicy *models.CachedEdgePolicy
190✔
628
                                if p2 != nil {
380✔
629
                                        cachedInPolicy =
190✔
630
                                                models.NewCachedPolicy(p2)
190✔
631
                                        cachedInPolicy.ToNodePubKey =
190✔
632
                                                toNodeCallback
190✔
633
                                        cachedInPolicy.ToNodeFeatures =
190✔
634
                                                toNodeFeatures
190✔
635
                                }
190✔
636

637
                                directedChannel := &DirectedChannel{
190✔
638
                                        ChannelID: e.ChannelID,
190✔
639
                                        IsNode1: node.PubKeyBytes ==
190✔
640
                                                e.NodeKey1Bytes,
190✔
641
                                        OtherNode:    e.NodeKey2Bytes,
190✔
642
                                        Capacity:     e.Capacity,
190✔
643
                                        OutPolicySet: p1 != nil,
190✔
644
                                        InPolicy:     cachedInPolicy,
190✔
645
                                }
190✔
646

190✔
647
                                if node.PubKeyBytes == e.NodeKey2Bytes {
285✔
648
                                        directedChannel.OtherNode =
95✔
649
                                                e.NodeKey1Bytes
95✔
650
                                }
95✔
651

652
                                channels[e.ChannelID] = directedChannel
190✔
653

190✔
654
                                return nil
190✔
655
                        })
656
                if err != nil {
20✔
657
                        return err
×
658
                }
×
659

660
                return cb(node.PubKeyBytes, channels)
20✔
661
        })
662
}
663

664
// DisabledChannelIDs returns the channel ids of disabled channels.
665
// A channel is disabled when two of the associated ChanelEdgePolicies
666
// have their disabled bit on.
667
func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
6✔
668
        var disabledChanIDs []uint64
6✔
669
        var chanEdgeFound map[uint64]struct{}
6✔
670

6✔
671
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
672
                edges := tx.ReadBucket(edgeBucket)
6✔
673
                if edges == nil {
6✔
674
                        return ErrGraphNoEdgesFound
×
675
                }
×
676

677
                disabledEdgePolicyIndex := edges.NestedReadBucket(
6✔
678
                        disabledEdgePolicyBucket,
6✔
679
                )
6✔
680
                if disabledEdgePolicyIndex == nil {
7✔
681
                        return nil
1✔
682
                }
1✔
683

684
                // We iterate over all disabled policies and we add each channel
685
                // that has more than one disabled policy to disabledChanIDs
686
                // array.
687
                return disabledEdgePolicyIndex.ForEach(
5✔
688
                        func(k, v []byte) error {
16✔
689
                                chanID := byteOrder.Uint64(k[:8])
11✔
690
                                _, edgeFound := chanEdgeFound[chanID]
11✔
691
                                if edgeFound {
15✔
692
                                        delete(chanEdgeFound, chanID)
4✔
693
                                        disabledChanIDs = append(
4✔
694
                                                disabledChanIDs, chanID,
4✔
695
                                        )
4✔
696

4✔
697
                                        return nil
4✔
698
                                }
4✔
699

700
                                chanEdgeFound[chanID] = struct{}{}
7✔
701

7✔
702
                                return nil
7✔
703
                        },
704
                )
705
        }, func() {
6✔
706
                disabledChanIDs = nil
6✔
707
                chanEdgeFound = make(map[uint64]struct{})
6✔
708
        })
6✔
709
        if err != nil {
6✔
710
                return nil, err
×
711
        }
×
712

713
        return disabledChanIDs, nil
6✔
714
}
715

716
// ForEachNode iterates through all the stored vertices/nodes in the graph,
717
// executing the passed callback with each node encountered. If the callback
718
// returns an error, then the transaction is aborted and the iteration stops
719
// early.
720
//
721
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
722
// traversal when graph gets mega
723
func (c *ChannelGraph) ForEachNode(
724
        cb func(kvdb.RTx, *models.LightningNode) error) error {
129✔
725

129✔
726
        traversal := func(tx kvdb.RTx) error {
258✔
727
                // First grab the nodes bucket which stores the mapping from
129✔
728
                // pubKey to node information.
129✔
729
                nodes := tx.ReadBucket(nodeBucket)
129✔
730
                if nodes == nil {
129✔
731
                        return ErrGraphNotFound
×
732
                }
×
733

734
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,568✔
735
                        // If this is the source key, then we skip this
1,439✔
736
                        // iteration as the value for this key is a pubKey
1,439✔
737
                        // rather than raw node information.
1,439✔
738
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
1,700✔
739
                                return nil
261✔
740
                        }
261✔
741

742
                        nodeReader := bytes.NewReader(nodeBytes)
1,178✔
743
                        node, err := deserializeLightningNode(nodeReader)
1,178✔
744
                        if err != nil {
1,178✔
745
                                return err
×
746
                        }
×
747

748
                        // Execute the callback, the transaction will abort if
749
                        // this returns an error.
750
                        return cb(tx, &node)
1,178✔
751
                })
752
        }
753

754
        return kvdb.View(c.db, traversal, func() {})
258✔
755
}
756

757
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
758
// graph, executing the passed callback with each node encountered. If the
759
// callback returns an error, then the transaction is aborted and the iteration
760
// stops early.
761
func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
762
        GraphCacheNode) error) error {
141✔
763

141✔
764
        traversal := func(tx kvdb.RTx) error {
282✔
765
                // First grab the nodes bucket which stores the mapping from
141✔
766
                // pubKey to node information.
141✔
767
                nodes := tx.ReadBucket(nodeBucket)
141✔
768
                if nodes == nil {
141✔
769
                        return ErrGraphNotFound
×
770
                }
×
771

772
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
543✔
773
                        // If this is the source key, then we skip this
402✔
774
                        // iteration as the value for this key is a pubKey
402✔
775
                        // rather than raw node information.
402✔
776
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
684✔
777
                                return nil
282✔
778
                        }
282✔
779

780
                        nodeReader := bytes.NewReader(nodeBytes)
120✔
781
                        cacheableNode, err := deserializeLightningNodeCacheable(
120✔
782
                                nodeReader,
120✔
783
                        )
120✔
784
                        if err != nil {
120✔
785
                                return err
×
786
                        }
×
787

788
                        // Execute the callback, the transaction will abort if
789
                        // this returns an error.
790
                        return cb(tx, cacheableNode)
120✔
791
                })
792
        }
793

794
        return kvdb.View(c.db, traversal, func() {})
282✔
795
}
796

797
// SourceNode returns the source node of the graph. The source node is treated
798
// as the center node within a star-graph. This method may be used to kick off
799
// a path finding algorithm in order to explore the reachability of another
800
// node based off the source node.
801
func (c *ChannelGraph) SourceNode() (*models.LightningNode, error) {
232✔
802
        var source *models.LightningNode
232✔
803
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
464✔
804
                // First grab the nodes bucket which stores the mapping from
232✔
805
                // pubKey to node information.
232✔
806
                nodes := tx.ReadBucket(nodeBucket)
232✔
807
                if nodes == nil {
232✔
808
                        return ErrGraphNotFound
×
809
                }
×
810

811
                node, err := c.sourceNode(nodes)
232✔
812
                if err != nil {
233✔
813
                        return err
1✔
814
                }
1✔
815
                source = node
231✔
816

231✔
817
                return nil
231✔
818
        }, func() {
232✔
819
                source = nil
232✔
820
        })
232✔
821
        if err != nil {
233✔
822
                return nil, err
1✔
823
        }
1✔
824

825
        return source, nil
231✔
826
}
827

828
// sourceNode uses an existing database transaction and returns the source node
829
// of the graph. The source node is treated as the center node within a
830
// star-graph. This method may be used to kick off a path finding algorithm in
831
// order to explore the reachability of another node based off the source node.
832
func (c *ChannelGraph) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
833
        error) {
490✔
834

490✔
835
        selfPub := nodes.Get(sourceKey)
490✔
836
        if selfPub == nil {
491✔
837
                return nil, ErrSourceNodeNotSet
1✔
838
        }
1✔
839

840
        // With the pubKey of the source node retrieved, we're able to
841
        // fetch the full node information.
842
        node, err := fetchLightningNode(nodes, selfPub)
489✔
843
        if err != nil {
489✔
844
                return nil, err
×
845
        }
×
846

847
        return &node, nil
489✔
848
}
849

850
// SetSourceNode sets the source node within the graph database. The source
851
// node is to be used as the center of a star-graph within path finding
852
// algorithms.
853
func (c *ChannelGraph) SetSourceNode(node *models.LightningNode) error {
118✔
854
        nodePubBytes := node.PubKeyBytes[:]
118✔
855

118✔
856
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
236✔
857
                // First grab the nodes bucket which stores the mapping from
118✔
858
                // pubKey to node information.
118✔
859
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
118✔
860
                if err != nil {
118✔
861
                        return err
×
862
                }
×
863

864
                // Next we create the mapping from source to the targeted
865
                // public key.
866
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
118✔
867
                        return err
×
868
                }
×
869

870
                // Finally, we commit the information of the lightning node
871
                // itself.
872
                return addLightningNode(tx, node)
118✔
873
        }, func() {})
118✔
874
}
875

876
// AddLightningNode adds a vertex/node to the graph database. If the node is not
877
// in the database from before, this will add a new, unconnected one to the
878
// graph. If it is present from before, this will update that node's
879
// information. Note that this method is expected to only be called to update an
880
// already present node from a node announcement, or to insert a node found in a
881
// channel update.
882
//
883
// TODO(roasbeef): also need sig of announcement
884
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
885
        op ...batch.SchedulerOption) error {
799✔
886

799✔
887
        r := &batch.Request{
799✔
888
                Update: func(tx kvdb.RwTx) error {
1,598✔
889
                        if c.graphCache != nil {
1,411✔
890
                                cNode := newGraphCacheNode(
612✔
891
                                        node.PubKeyBytes, node.Features,
612✔
892
                                )
612✔
893
                                err := c.graphCache.AddNode(tx, cNode)
612✔
894
                                if err != nil {
612✔
895
                                        return err
×
896
                                }
×
897
                        }
898

899
                        return addLightningNode(tx, node)
799✔
900
                },
901
        }
902

903
        for _, f := range op {
799✔
UNCOV
904
                f(r)
×
UNCOV
905
        }
×
906

907
        return c.nodeScheduler.Execute(r)
799✔
908
}
909

910
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
991✔
911
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
991✔
912
        if err != nil {
991✔
913
                return err
×
914
        }
×
915

916
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
991✔
917
        if err != nil {
991✔
918
                return err
×
919
        }
×
920

921
        updateIndex, err := nodes.CreateBucketIfNotExists(
991✔
922
                nodeUpdateIndexBucket,
991✔
923
        )
991✔
924
        if err != nil {
991✔
925
                return err
×
926
        }
×
927

928
        return putLightningNode(nodes, aliases, updateIndex, node)
991✔
929
}
930

931
// LookupAlias attempts to return the alias as advertised by the target node.
932
// TODO(roasbeef): currently assumes that aliases are unique...
933
func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
2✔
934
        var alias string
2✔
935

2✔
936
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
937
                nodes := tx.ReadBucket(nodeBucket)
2✔
938
                if nodes == nil {
2✔
939
                        return ErrGraphNodesNotFound
×
940
                }
×
941

942
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
2✔
943
                if aliases == nil {
2✔
944
                        return ErrGraphNodesNotFound
×
945
                }
×
946

947
                nodePub := pub.SerializeCompressed()
2✔
948
                a := aliases.Get(nodePub)
2✔
949
                if a == nil {
3✔
950
                        return ErrNodeAliasNotFound
1✔
951
                }
1✔
952

953
                // TODO(roasbeef): should actually be using the utf-8
954
                // package...
955
                alias = string(a)
1✔
956
                return nil
1✔
957
        }, func() {
2✔
958
                alias = ""
2✔
959
        })
2✔
960
        if err != nil {
3✔
961
                return "", err
1✔
962
        }
1✔
963

964
        return alias, nil
1✔
965
}
966

967
// DeleteLightningNode starts a new database transaction to remove a vertex/node
968
// from the database according to the node's public key.
969
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
970
        // TODO(roasbeef): ensure dangling edges are removed...
3✔
971
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
972
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
973
                if nodes == nil {
3✔
974
                        return ErrGraphNodeNotFound
×
975
                }
×
976

977
                if c.graphCache != nil {
6✔
978
                        c.graphCache.RemoveNode(nodePub)
3✔
979
                }
3✔
980

981
                return c.deleteLightningNode(nodes, nodePub[:])
3✔
982
        }, func() {})
3✔
983
}
984

985
// deleteLightningNode uses an existing database transaction to remove a
986
// vertex/node from the database according to the node's public key.
987
func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
988
        compressedPubKey []byte) error {
67✔
989

67✔
990
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
67✔
991
        if aliases == nil {
67✔
992
                return ErrGraphNodesNotFound
×
993
        }
×
994

995
        if err := aliases.Delete(compressedPubKey); err != nil {
67✔
996
                return err
×
997
        }
×
998

999
        // Before we delete the node, we'll fetch its current state so we can
1000
        // determine when its last update was to clear out the node update
1001
        // index.
1002
        node, err := fetchLightningNode(nodes, compressedPubKey)
67✔
1003
        if err != nil {
67✔
1004
                return err
×
1005
        }
×
1006

1007
        if err := nodes.Delete(compressedPubKey); err != nil {
67✔
1008
                return err
×
1009
        }
×
1010

1011
        // Finally, we'll delete the index entry for the node within the
1012
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
1013
        // need to track its last update.
1014
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
67✔
1015
        if nodeUpdateIndex == nil {
67✔
1016
                return ErrGraphNodesNotFound
×
1017
        }
×
1018

1019
        // In order to delete the entry, we'll need to reconstruct the key for
1020
        // its last update.
1021
        updateUnix := uint64(node.LastUpdate.Unix())
67✔
1022
        var indexKey [8 + 33]byte
67✔
1023
        byteOrder.PutUint64(indexKey[:8], updateUnix)
67✔
1024
        copy(indexKey[8:], compressedPubKey)
67✔
1025

67✔
1026
        return nodeUpdateIndex.Delete(indexKey[:])
67✔
1027
}
1028

1029
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1030
// undirected edge from the two target nodes are created. The information stored
1031
// denotes the static attributes of the channel, such as the channelID, the keys
1032
// involved in creation of the channel, and the set of features that the channel
1033
// supports. The chanPoint and chanID are used to uniquely identify the edge
1034
// globally within the database.
1035
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
1036
        op ...batch.SchedulerOption) error {
1,705✔
1037

1,705✔
1038
        var alreadyExists bool
1,705✔
1039
        r := &batch.Request{
1,705✔
1040
                Reset: func() {
3,410✔
1041
                        alreadyExists = false
1,705✔
1042
                },
1,705✔
1043
                Update: func(tx kvdb.RwTx) error {
1,705✔
1044
                        err := c.addChannelEdge(tx, edge)
1,705✔
1045

1,705✔
1046
                        // Silence ErrEdgeAlreadyExist so that the batch can
1,705✔
1047
                        // succeed, but propagate the error via local state.
1,705✔
1048
                        if err == ErrEdgeAlreadyExist {
1,939✔
1049
                                alreadyExists = true
234✔
1050
                                return nil
234✔
1051
                        }
234✔
1052

1053
                        return err
1,471✔
1054
                },
1055
                OnCommit: func(err error) error {
1,705✔
1056
                        switch {
1,705✔
1057
                        case err != nil:
×
1058
                                return err
×
1059
                        case alreadyExists:
234✔
1060
                                return ErrEdgeAlreadyExist
234✔
1061
                        default:
1,471✔
1062
                                c.rejectCache.remove(edge.ChannelID)
1,471✔
1063
                                c.chanCache.remove(edge.ChannelID)
1,471✔
1064
                                return nil
1,471✔
1065
                        }
1066
                },
1067
        }
1068

1069
        for _, f := range op {
1,705✔
UNCOV
1070
                if f == nil {
×
1071
                        return fmt.Errorf("nil scheduler option was used")
×
1072
                }
×
1073

UNCOV
1074
                f(r)
×
1075
        }
1076

1077
        return c.chanScheduler.Execute(r)
1,705✔
1078
}
1079

1080
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1081
// utilize an existing db transaction.
1082
func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
1083
        edge *models.ChannelEdgeInfo) error {
1,705✔
1084

1,705✔
1085
        // Construct the channel's primary key which is the 8-byte channel ID.
1,705✔
1086
        var chanKey [8]byte
1,705✔
1087
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1,705✔
1088

1,705✔
1089
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,705✔
1090
        if err != nil {
1,705✔
1091
                return err
×
1092
        }
×
1093
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
1,705✔
1094
        if err != nil {
1,705✔
1095
                return err
×
1096
        }
×
1097
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
1,705✔
1098
        if err != nil {
1,705✔
1099
                return err
×
1100
        }
×
1101
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
1,705✔
1102
        if err != nil {
1,705✔
1103
                return err
×
1104
        }
×
1105

1106
        // First, attempt to check if this edge has already been created. If
1107
        // so, then we can exit early as this method is meant to be idempotent.
1108
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
1,939✔
1109
                return ErrEdgeAlreadyExist
234✔
1110
        }
234✔
1111

1112
        if c.graphCache != nil {
2,752✔
1113
                c.graphCache.AddChannel(edge, nil, nil)
1,281✔
1114
        }
1,281✔
1115

1116
        // Before we insert the channel into the database, we'll ensure that
1117
        // both nodes already exist in the channel graph. If either node
1118
        // doesn't, then we'll insert a "shell" node that just includes its
1119
        // public key, so subsequent validation and queries can work properly.
1120
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
1,471✔
1121
        switch {
1,471✔
1122
        case node1Err == ErrGraphNodeNotFound:
18✔
1123
                node1Shell := models.LightningNode{
18✔
1124
                        PubKeyBytes:          edge.NodeKey1Bytes,
18✔
1125
                        HaveNodeAnnouncement: false,
18✔
1126
                }
18✔
1127
                err := addLightningNode(tx, &node1Shell)
18✔
1128
                if err != nil {
18✔
1129
                        return fmt.Errorf("unable to create shell node "+
×
1130
                                "for: %x: %w", edge.NodeKey1Bytes, err)
×
1131
                }
×
1132
        case node1Err != nil:
×
1133
                return node1Err
×
1134
        }
1135

1136
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
1,471✔
1137
        switch {
1,471✔
1138
        case node2Err == ErrGraphNodeNotFound:
56✔
1139
                node2Shell := models.LightningNode{
56✔
1140
                        PubKeyBytes:          edge.NodeKey2Bytes,
56✔
1141
                        HaveNodeAnnouncement: false,
56✔
1142
                }
56✔
1143
                err := addLightningNode(tx, &node2Shell)
56✔
1144
                if err != nil {
56✔
1145
                        return fmt.Errorf("unable to create shell node "+
×
1146
                                "for: %x: %w", edge.NodeKey2Bytes, err)
×
1147
                }
×
1148
        case node2Err != nil:
×
1149
                return node2Err
×
1150
        }
1151

1152
        // If the edge hasn't been created yet, then we'll first add it to the
1153
        // edge index in order to associate the edge between two nodes and also
1154
        // store the static components of the channel.
1155
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
1,471✔
1156
                return err
×
1157
        }
×
1158

1159
        // Mark edge policies for both sides as unknown. This is to enable
1160
        // efficient incoming channel lookup for a node.
1161
        keys := []*[33]byte{
1,471✔
1162
                &edge.NodeKey1Bytes,
1,471✔
1163
                &edge.NodeKey2Bytes,
1,471✔
1164
        }
1,471✔
1165
        for _, key := range keys {
4,413✔
1166
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
2,942✔
1167
                if err != nil {
2,942✔
1168
                        return err
×
1169
                }
×
1170
        }
1171

1172
        // Finally we add it to the channel index which maps channel points
1173
        // (outpoints) to the shorter channel ID's.
1174
        var b bytes.Buffer
1,471✔
1175
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
1,471✔
1176
                return err
×
1177
        }
×
1178
        return chanIndex.Put(b.Bytes(), chanKey[:])
1,471✔
1179
}
1180

1181
// HasChannelEdge returns true if the database knows of a channel edge with the
1182
// passed channel ID, and false otherwise. If an edge with that ID is found
1183
// within the graph, then two time stamps representing the last time the edge
1184
// was updated for both directed edges are returned along with the boolean. If
1185
// it is not found, then the zombie index is checked and its result is returned
1186
// as the second boolean.
1187
func (c *ChannelGraph) HasChannelEdge(
1188
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
224✔
1189

224✔
1190
        var (
224✔
1191
                upd1Time time.Time
224✔
1192
                upd2Time time.Time
224✔
1193
                exists   bool
224✔
1194
                isZombie bool
224✔
1195
        )
224✔
1196

224✔
1197
        // We'll query the cache with the shared lock held to allow multiple
224✔
1198
        // readers to access values in the cache concurrently if they exist.
224✔
1199
        c.cacheMu.RLock()
224✔
1200
        if entry, ok := c.rejectCache.get(chanID); ok {
304✔
1201
                c.cacheMu.RUnlock()
80✔
1202
                upd1Time = time.Unix(entry.upd1Time, 0)
80✔
1203
                upd2Time = time.Unix(entry.upd2Time, 0)
80✔
1204
                exists, isZombie = entry.flags.unpack()
80✔
1205
                return upd1Time, upd2Time, exists, isZombie, nil
80✔
1206
        }
80✔
1207
        c.cacheMu.RUnlock()
144✔
1208

144✔
1209
        c.cacheMu.Lock()
144✔
1210
        defer c.cacheMu.Unlock()
144✔
1211

144✔
1212
        // The item was not found with the shared lock, so we'll acquire the
144✔
1213
        // exclusive lock and check the cache again in case another method added
144✔
1214
        // the entry to the cache while no lock was held.
144✔
1215
        if entry, ok := c.rejectCache.get(chanID); ok {
149✔
1216
                upd1Time = time.Unix(entry.upd1Time, 0)
5✔
1217
                upd2Time = time.Unix(entry.upd2Time, 0)
5✔
1218
                exists, isZombie = entry.flags.unpack()
5✔
1219
                return upd1Time, upd2Time, exists, isZombie, nil
5✔
1220
        }
5✔
1221

1222
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
278✔
1223
                edges := tx.ReadBucket(edgeBucket)
139✔
1224
                if edges == nil {
139✔
1225
                        return ErrGraphNoEdgesFound
×
1226
                }
×
1227
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
139✔
1228
                if edgeIndex == nil {
139✔
1229
                        return ErrGraphNoEdgesFound
×
1230
                }
×
1231

1232
                var channelID [8]byte
139✔
1233
                byteOrder.PutUint64(channelID[:], chanID)
139✔
1234

139✔
1235
                // If the edge doesn't exist, then we'll also check our zombie
139✔
1236
                // index.
139✔
1237
                if edgeIndex.Get(channelID[:]) == nil {
232✔
1238
                        exists = false
93✔
1239
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
93✔
1240
                        if zombieIndex != nil {
186✔
1241
                                isZombie, _, _ = isZombieEdge(
93✔
1242
                                        zombieIndex, chanID,
93✔
1243
                                )
93✔
1244
                        }
93✔
1245

1246
                        return nil
93✔
1247
                }
1248

1249
                exists = true
46✔
1250
                isZombie = false
46✔
1251

46✔
1252
                // If the channel has been found in the graph, then retrieve
46✔
1253
                // the edges itself so we can return the last updated
46✔
1254
                // timestamps.
46✔
1255
                nodes := tx.ReadBucket(nodeBucket)
46✔
1256
                if nodes == nil {
46✔
1257
                        return ErrGraphNodeNotFound
×
1258
                }
×
1259

1260
                e1, e2, err := fetchChanEdgePolicies(
46✔
1261
                        edgeIndex, edges, channelID[:],
46✔
1262
                )
46✔
1263
                if err != nil {
46✔
1264
                        return err
×
1265
                }
×
1266

1267
                // As we may have only one of the edges populated, only set the
1268
                // update time if the edge was found in the database.
1269
                if e1 != nil {
64✔
1270
                        upd1Time = e1.LastUpdate
18✔
1271
                }
18✔
1272
                if e2 != nil {
62✔
1273
                        upd2Time = e2.LastUpdate
16✔
1274
                }
16✔
1275

1276
                return nil
46✔
1277
        }, func() {}); err != nil {
139✔
1278
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1279
        }
×
1280

1281
        c.rejectCache.insert(chanID, rejectCacheEntry{
139✔
1282
                upd1Time: upd1Time.Unix(),
139✔
1283
                upd2Time: upd2Time.Unix(),
139✔
1284
                flags:    packRejectFlags(exists, isZombie),
139✔
1285
        })
139✔
1286

139✔
1287
        return upd1Time, upd2Time, exists, isZombie, nil
139✔
1288
}
1289

1290
// UpdateChannelEdge retrieves and update edge of the graph database. Method
1291
// only reserved for updating an edge info after its already been created.
1292
// In order to maintain this constraints, we return an error in the scenario
1293
// that an edge info hasn't yet been created yet, but someone attempts to update
1294
// it.
1295
func (c *ChannelGraph) UpdateChannelEdge(edge *models.ChannelEdgeInfo) error {
1✔
1296
        // Construct the channel's primary key which is the 8-byte channel ID.
1✔
1297
        var chanKey [8]byte
1✔
1298
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1✔
1299

1✔
1300
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
1301
                edges := tx.ReadWriteBucket(edgeBucket)
1✔
1302
                if edge == nil {
1✔
1303
                        return ErrEdgeNotFound
×
1304
                }
×
1305

1306
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
1✔
1307
                if edgeIndex == nil {
1✔
1308
                        return ErrEdgeNotFound
×
1309
                }
×
1310

1311
                if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo == nil {
1✔
1312
                        return ErrEdgeNotFound
×
1313
                }
×
1314

1315
                if c.graphCache != nil {
2✔
1316
                        c.graphCache.UpdateChannel(edge)
1✔
1317
                }
1✔
1318

1319
                return putChanEdgeInfo(edgeIndex, edge, chanKey)
1✔
1320
        }, func() {})
1✔
1321
}
1322

1323
const (
1324
        // pruneTipBytes is the total size of the value which stores a prune
1325
        // entry of the graph in the prune log. The "prune tip" is the last
1326
        // entry in the prune log, and indicates if the channel graph is in
1327
        // sync with the current UTXO state. The structure of the value
1328
        // is: blockHash, taking 32 bytes total.
1329
        pruneTipBytes = 32
1330
)
1331

1332
// PruneGraph prunes newly closed channels from the channel graph in response
1333
// to a new block being solved on the network. Any transactions which spend the
1334
// funding output of any known channels within he graph will be deleted.
1335
// Additionally, the "prune tip", or the last block which has been used to
1336
// prune the graph is stored so callers can ensure the graph is fully in sync
1337
// with the current UTXO state. A slice of channels that have been closed by
1338
// the target block are returned if the function succeeds without error.
1339
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
1340
        blockHash *chainhash.Hash, blockHeight uint32) (
1341
        []*models.ChannelEdgeInfo, error) {
234✔
1342

234✔
1343
        c.cacheMu.Lock()
234✔
1344
        defer c.cacheMu.Unlock()
234✔
1345

234✔
1346
        var chansClosed []*models.ChannelEdgeInfo
234✔
1347

234✔
1348
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
468✔
1349
                // First grab the edges bucket which houses the information
234✔
1350
                // we'd like to delete
234✔
1351
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
234✔
1352
                if err != nil {
234✔
1353
                        return err
×
1354
                }
×
1355

1356
                // Next grab the two edge indexes which will also need to be
1357
                // updated.
1358
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
234✔
1359
                if err != nil {
234✔
1360
                        return err
×
1361
                }
×
1362
                chanIndex, err := edges.CreateBucketIfNotExists(
234✔
1363
                        channelPointBucket,
234✔
1364
                )
234✔
1365
                if err != nil {
234✔
1366
                        return err
×
1367
                }
×
1368
                nodes := tx.ReadWriteBucket(nodeBucket)
234✔
1369
                if nodes == nil {
234✔
1370
                        return ErrSourceNodeNotSet
×
1371
                }
×
1372
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
234✔
1373
                if err != nil {
234✔
1374
                        return err
×
1375
                }
×
1376

1377
                // For each of the outpoints that have been spent within the
1378
                // block, we attempt to delete them from the graph as if that
1379
                // outpoint was a channel, then it has now been closed.
1380
                for _, chanPoint := range spentOutputs {
354✔
1381
                        // TODO(roasbeef): load channel bloom filter, continue
120✔
1382
                        // if NOT if filter
120✔
1383

120✔
1384
                        var opBytes bytes.Buffer
120✔
1385
                        err := WriteOutpoint(&opBytes, chanPoint)
120✔
1386
                        if err != nil {
120✔
1387
                                return err
×
1388
                        }
×
1389

1390
                        // First attempt to see if the channel exists within
1391
                        // the database, if not, then we can exit early.
1392
                        chanID := chanIndex.Get(opBytes.Bytes())
120✔
1393
                        if chanID == nil {
219✔
1394
                                continue
99✔
1395
                        }
1396

1397
                        // However, if it does, then we'll read out the full
1398
                        // version so we can add it to the set of deleted
1399
                        // channels.
1400
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
21✔
1401
                        if err != nil {
21✔
1402
                                return err
×
1403
                        }
×
1404

1405
                        // Attempt to delete the channel, an ErrEdgeNotFound
1406
                        // will be returned if that outpoint isn't known to be
1407
                        // a channel. If no error is returned, then a channel
1408
                        // was successfully pruned.
1409
                        err = c.delChannelEdgeUnsafe(
21✔
1410
                                edges, edgeIndex, chanIndex, zombieIndex,
21✔
1411
                                chanID, false, false,
21✔
1412
                        )
21✔
1413
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
21✔
1414
                                return err
×
1415
                        }
×
1416

1417
                        chansClosed = append(chansClosed, &edgeInfo)
21✔
1418
                }
1419

1420
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
234✔
1421
                if err != nil {
234✔
1422
                        return err
×
1423
                }
×
1424

1425
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
234✔
1426
                        pruneLogBucket,
234✔
1427
                )
234✔
1428
                if err != nil {
234✔
1429
                        return err
×
1430
                }
×
1431

1432
                // With the graph pruned, add a new entry to the prune log,
1433
                // which can be used to check if the graph is fully synced with
1434
                // the current UTXO state.
1435
                var blockHeightBytes [4]byte
234✔
1436
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
234✔
1437

234✔
1438
                var newTip [pruneTipBytes]byte
234✔
1439
                copy(newTip[:], blockHash[:])
234✔
1440

234✔
1441
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
234✔
1442
                if err != nil {
234✔
1443
                        return err
×
1444
                }
×
1445

1446
                // Now that the graph has been pruned, we'll also attempt to
1447
                // prune any nodes that have had a channel closed within the
1448
                // latest block.
1449
                return c.pruneGraphNodes(nodes, edgeIndex)
234✔
1450
        }, func() {
234✔
1451
                chansClosed = nil
234✔
1452
        })
234✔
1453
        if err != nil {
234✔
1454
                return nil, err
×
1455
        }
×
1456

1457
        for _, channel := range chansClosed {
255✔
1458
                c.rejectCache.remove(channel.ChannelID)
21✔
1459
                c.chanCache.remove(channel.ChannelID)
21✔
1460
        }
21✔
1461

1462
        if c.graphCache != nil {
468✔
1463
                log.Debugf("Pruned graph, cache now has %s",
234✔
1464
                        c.graphCache.Stats())
234✔
1465
        }
234✔
1466

1467
        return chansClosed, nil
234✔
1468
}
1469

1470
// PruneGraphNodes is a garbage collection method which attempts to prune out
1471
// any nodes from the channel graph that are currently unconnected. This ensure
1472
// that we only maintain a graph of reachable nodes. In the event that a pruned
1473
// node gains more channels, it will be re-added back to the graph.
1474
func (c *ChannelGraph) PruneGraphNodes() error {
24✔
1475
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
48✔
1476
                nodes := tx.ReadWriteBucket(nodeBucket)
24✔
1477
                if nodes == nil {
24✔
1478
                        return ErrGraphNodesNotFound
×
1479
                }
×
1480
                edges := tx.ReadWriteBucket(edgeBucket)
24✔
1481
                if edges == nil {
24✔
1482
                        return ErrGraphNotFound
×
1483
                }
×
1484
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
24✔
1485
                if edgeIndex == nil {
24✔
1486
                        return ErrGraphNoEdgesFound
×
1487
                }
×
1488

1489
                return c.pruneGraphNodes(nodes, edgeIndex)
24✔
1490
        }, func() {})
24✔
1491
}
1492

1493
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1494
// channel closed within the current block. If the node still has existing
1495
// channels in the graph, this will act as a no-op.
1496
func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
1497
        edgeIndex kvdb.RwBucket) error {
258✔
1498

258✔
1499
        log.Trace("Pruning nodes from graph with no open channels")
258✔
1500

258✔
1501
        // We'll retrieve the graph's source node to ensure we don't remove it
258✔
1502
        // even if it no longer has any open channels.
258✔
1503
        sourceNode, err := c.sourceNode(nodes)
258✔
1504
        if err != nil {
258✔
1505
                return err
×
1506
        }
×
1507

1508
        // We'll use this map to keep count the number of references to a node
1509
        // in the graph. A node should only be removed once it has no more
1510
        // references in the graph.
1511
        nodeRefCounts := make(map[[33]byte]int)
258✔
1512
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,527✔
1513
                // If this is the source key, then we skip this
1,269✔
1514
                // iteration as the value for this key is a pubKey
1,269✔
1515
                // rather than raw node information.
1,269✔
1516
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
2,043✔
1517
                        return nil
774✔
1518
                }
774✔
1519

1520
                var nodePub [33]byte
495✔
1521
                copy(nodePub[:], pubKey)
495✔
1522
                nodeRefCounts[nodePub] = 0
495✔
1523

495✔
1524
                return nil
495✔
1525
        })
1526
        if err != nil {
258✔
1527
                return err
×
1528
        }
×
1529

1530
        // To ensure we never delete the source node, we'll start off by
1531
        // bumping its ref count to 1.
1532
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
258✔
1533

258✔
1534
        // Next, we'll run through the edgeIndex which maps a channel ID to the
258✔
1535
        // edge info. We'll use this scan to populate our reference count map
258✔
1536
        // above.
258✔
1537
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
437✔
1538
                // The first 66 bytes of the edge info contain the pubkeys of
179✔
1539
                // the nodes that this edge attaches. We'll extract them, and
179✔
1540
                // add them to the ref count map.
179✔
1541
                var node1, node2 [33]byte
179✔
1542
                copy(node1[:], edgeInfoBytes[:33])
179✔
1543
                copy(node2[:], edgeInfoBytes[33:])
179✔
1544

179✔
1545
                // With the nodes extracted, we'll increase the ref count of
179✔
1546
                // each of the nodes.
179✔
1547
                nodeRefCounts[node1]++
179✔
1548
                nodeRefCounts[node2]++
179✔
1549

179✔
1550
                return nil
179✔
1551
        })
179✔
1552
        if err != nil {
258✔
1553
                return err
×
1554
        }
×
1555

1556
        // Finally, we'll make a second pass over the set of nodes, and delete
1557
        // any nodes that have a ref count of zero.
1558
        var numNodesPruned int
258✔
1559
        for nodePubKey, refCount := range nodeRefCounts {
753✔
1560
                // If the ref count of the node isn't zero, then we can safely
495✔
1561
                // skip it as it still has edges to or from it within the
495✔
1562
                // graph.
495✔
1563
                if refCount != 0 {
926✔
1564
                        continue
431✔
1565
                }
1566

1567
                if c.graphCache != nil {
128✔
1568
                        c.graphCache.RemoveNode(nodePubKey)
64✔
1569
                }
64✔
1570

1571
                // If we reach this point, then there are no longer any edges
1572
                // that connect this node, so we can delete it.
1573
                err := c.deleteLightningNode(nodes, nodePubKey[:])
64✔
1574
                if err != nil {
64✔
1575
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1576
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1577

×
1578
                                log.Warnf("Unable to prune node %x from the "+
×
1579
                                        "graph: %v", nodePubKey, err)
×
1580
                                continue
×
1581
                        }
1582

1583
                        return err
×
1584
                }
1585

1586
                log.Infof("Pruned unconnected node %x from channel graph",
64✔
1587
                        nodePubKey[:])
64✔
1588

64✔
1589
                numNodesPruned++
64✔
1590
        }
1591

1592
        if numNodesPruned > 0 {
306✔
1593
                log.Infof("Pruned %v unconnected nodes from the channel graph",
48✔
1594
                        numNodesPruned)
48✔
1595
        }
48✔
1596

1597
        return nil
258✔
1598
}
1599

1600
// DisconnectBlockAtHeight is used to indicate that the block specified
1601
// by the passed height has been disconnected from the main chain. This
1602
// will "rewind" the graph back to the height below, deleting channels
1603
// that are no longer confirmed from the graph. The prune log will be
1604
// set to the last prune height valid for the remaining chain.
1605
// Channels that were removed from the graph resulting from the
1606
// disconnected block are returned.
1607
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
1608
        []*models.ChannelEdgeInfo, error) {
159✔
1609

159✔
1610
        // Every channel having a ShortChannelID starting at 'height'
159✔
1611
        // will no longer be confirmed.
159✔
1612
        startShortChanID := lnwire.ShortChannelID{
159✔
1613
                BlockHeight: height,
159✔
1614
        }
159✔
1615

159✔
1616
        // Delete everything after this height from the db up until the
159✔
1617
        // SCID alias range.
159✔
1618
        endShortChanID := aliasmgr.StartingAlias
159✔
1619

159✔
1620
        // The block height will be the 3 first bytes of the channel IDs.
159✔
1621
        var chanIDStart [8]byte
159✔
1622
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
159✔
1623
        var chanIDEnd [8]byte
159✔
1624
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
159✔
1625

159✔
1626
        c.cacheMu.Lock()
159✔
1627
        defer c.cacheMu.Unlock()
159✔
1628

159✔
1629
        // Keep track of the channels that are removed from the graph.
159✔
1630
        var removedChans []*models.ChannelEdgeInfo
159✔
1631

159✔
1632
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
318✔
1633
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
159✔
1634
                if err != nil {
159✔
1635
                        return err
×
1636
                }
×
1637
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
159✔
1638
                if err != nil {
159✔
1639
                        return err
×
1640
                }
×
1641
                chanIndex, err := edges.CreateBucketIfNotExists(
159✔
1642
                        channelPointBucket,
159✔
1643
                )
159✔
1644
                if err != nil {
159✔
1645
                        return err
×
1646
                }
×
1647
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
159✔
1648
                if err != nil {
159✔
1649
                        return err
×
1650
                }
×
1651

1652
                // Scan from chanIDStart to chanIDEnd, deleting every
1653
                // found edge.
1654
                // NOTE: we must delete the edges after the cursor loop, since
1655
                // modifying the bucket while traversing is not safe.
1656
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1657
                // so that the StartingAlias itself isn't deleted.
1658
                var keys [][]byte
159✔
1659
                cursor := edgeIndex.ReadWriteCursor()
159✔
1660

159✔
1661
                //nolint:ll
159✔
1662
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
159✔
1663
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, v = cursor.Next() {
240✔
1664
                        edgeInfoReader := bytes.NewReader(v)
81✔
1665
                        edgeInfo, err := deserializeChanEdgeInfo(edgeInfoReader)
81✔
1666
                        if err != nil {
81✔
1667
                                return err
×
1668
                        }
×
1669

1670
                        keys = append(keys, k)
81✔
1671
                        removedChans = append(removedChans, &edgeInfo)
81✔
1672
                }
1673

1674
                for _, k := range keys {
240✔
1675
                        err = c.delChannelEdgeUnsafe(
81✔
1676
                                edges, edgeIndex, chanIndex, zombieIndex,
81✔
1677
                                k, false, false,
81✔
1678
                        )
81✔
1679
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
81✔
1680
                                return err
×
1681
                        }
×
1682
                }
1683

1684
                // Delete all the entries in the prune log having a height
1685
                // greater or equal to the block disconnected.
1686
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
159✔
1687
                if err != nil {
159✔
1688
                        return err
×
1689
                }
×
1690

1691
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
159✔
1692
                        pruneLogBucket,
159✔
1693
                )
159✔
1694
                if err != nil {
159✔
1695
                        return err
×
1696
                }
×
1697

1698
                var pruneKeyStart [4]byte
159✔
1699
                byteOrder.PutUint32(pruneKeyStart[:], height)
159✔
1700

159✔
1701
                var pruneKeyEnd [4]byte
159✔
1702
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
159✔
1703

159✔
1704
                // To avoid modifying the bucket while traversing, we delete
159✔
1705
                // the keys in a second loop.
159✔
1706
                var pruneKeys [][]byte
159✔
1707
                pruneCursor := pruneBucket.ReadWriteCursor()
159✔
1708
                //nolint:ll
159✔
1709
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
159✔
1710
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
254✔
1711
                        pruneKeys = append(pruneKeys, k)
95✔
1712
                }
95✔
1713

1714
                for _, k := range pruneKeys {
254✔
1715
                        if err := pruneBucket.Delete(k); err != nil {
95✔
1716
                                return err
×
1717
                        }
×
1718
                }
1719

1720
                return nil
159✔
1721
        }, func() {
159✔
1722
                removedChans = nil
159✔
1723
        }); err != nil {
159✔
1724
                return nil, err
×
1725
        }
×
1726

1727
        for _, channel := range removedChans {
240✔
1728
                c.rejectCache.remove(channel.ChannelID)
81✔
1729
                c.chanCache.remove(channel.ChannelID)
81✔
1730
        }
81✔
1731

1732
        return removedChans, nil
159✔
1733
}
1734

1735
// PruneTip returns the block height and hash of the latest block that has been
1736
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1737
// to tell if the graph is currently in sync with the current best known UTXO
1738
// state.
1739
func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
55✔
1740
        var (
55✔
1741
                tipHash   chainhash.Hash
55✔
1742
                tipHeight uint32
55✔
1743
        )
55✔
1744

55✔
1745
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
110✔
1746
                graphMeta := tx.ReadBucket(graphMetaBucket)
55✔
1747
                if graphMeta == nil {
55✔
1748
                        return ErrGraphNotFound
×
1749
                }
×
1750
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
55✔
1751
                if pruneBucket == nil {
55✔
1752
                        return ErrGraphNeverPruned
×
1753
                }
×
1754

1755
                pruneCursor := pruneBucket.ReadCursor()
55✔
1756

55✔
1757
                // The prune key with the largest block height will be our
55✔
1758
                // prune tip.
55✔
1759
                k, v := pruneCursor.Last()
55✔
1760
                if k == nil {
74✔
1761
                        return ErrGraphNeverPruned
19✔
1762
                }
19✔
1763

1764
                // Once we have the prune tip, the value will be the block hash,
1765
                // and the key the block height.
1766
                copy(tipHash[:], v[:])
36✔
1767
                tipHeight = byteOrder.Uint32(k[:])
36✔
1768

36✔
1769
                return nil
36✔
1770
        }, func() {})
55✔
1771
        if err != nil {
74✔
1772
                return nil, 0, err
19✔
1773
        }
19✔
1774

1775
        return &tipHash, tipHeight, nil
36✔
1776
}
1777

1778
// DeleteChannelEdges removes edges with the given channel IDs from the
1779
// database and marks them as zombies. This ensures that we're unable to re-add
1780
// it to our database once again. If an edge does not exist within the
1781
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1782
// true, then when we mark these edges as zombies, we'll set up the keys such
1783
// that we require the node that failed to send the fresh update to be the one
1784
// that resurrects the channel from its zombie state. The markZombie bool
1785
// denotes whether or not to mark the channel as a zombie.
1786
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1787
        chanIDs ...uint64) error {
147✔
1788

147✔
1789
        // TODO(roasbeef): possibly delete from node bucket if node has no more
147✔
1790
        // channels
147✔
1791
        // TODO(roasbeef): don't delete both edges?
147✔
1792

147✔
1793
        c.cacheMu.Lock()
147✔
1794
        defer c.cacheMu.Unlock()
147✔
1795

147✔
1796
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
294✔
1797
                edges := tx.ReadWriteBucket(edgeBucket)
147✔
1798
                if edges == nil {
147✔
1799
                        return ErrEdgeNotFound
×
1800
                }
×
1801
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
147✔
1802
                if edgeIndex == nil {
147✔
1803
                        return ErrEdgeNotFound
×
1804
                }
×
1805
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
147✔
1806
                if chanIndex == nil {
147✔
1807
                        return ErrEdgeNotFound
×
1808
                }
×
1809
                nodes := tx.ReadWriteBucket(nodeBucket)
147✔
1810
                if nodes == nil {
147✔
1811
                        return ErrGraphNodeNotFound
×
1812
                }
×
1813
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
147✔
1814
                if err != nil {
147✔
1815
                        return err
×
1816
                }
×
1817

1818
                var rawChanID [8]byte
147✔
1819
                for _, chanID := range chanIDs {
237✔
1820
                        byteOrder.PutUint64(rawChanID[:], chanID)
90✔
1821
                        err := c.delChannelEdgeUnsafe(
90✔
1822
                                edges, edgeIndex, chanIndex, zombieIndex,
90✔
1823
                                rawChanID[:], markZombie, strictZombiePruning,
90✔
1824
                        )
90✔
1825
                        if err != nil {
152✔
1826
                                return err
62✔
1827
                        }
62✔
1828
                }
1829

1830
                return nil
85✔
1831
        }, func() {})
147✔
1832
        if err != nil {
209✔
1833
                return err
62✔
1834
        }
62✔
1835

1836
        for _, chanID := range chanIDs {
113✔
1837
                c.rejectCache.remove(chanID)
28✔
1838
                c.chanCache.remove(chanID)
28✔
1839
        }
28✔
1840

1841
        return nil
85✔
1842
}
1843

1844
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1845
// passed channel point (outpoint). If the passed channel doesn't exist within
1846
// the database, then ErrEdgeNotFound is returned.
1847
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
1✔
1848
        var chanID uint64
1✔
1849
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2✔
1850
                var err error
1✔
1851
                chanID, err = getChanID(tx, chanPoint)
1✔
1852
                return err
1✔
1853
        }, func() {
2✔
1854
                chanID = 0
1✔
1855
        }); err != nil {
1✔
UNCOV
1856
                return 0, err
×
UNCOV
1857
        }
×
1858

1859
        return chanID, nil
1✔
1860
}
1861

1862
// getChanID returns the assigned channel ID for a given channel point.
1863
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
1✔
1864
        var b bytes.Buffer
1✔
1865
        if err := WriteOutpoint(&b, chanPoint); err != nil {
1✔
1866
                return 0, err
×
1867
        }
×
1868

1869
        edges := tx.ReadBucket(edgeBucket)
1✔
1870
        if edges == nil {
1✔
1871
                return 0, ErrGraphNoEdgesFound
×
1872
        }
×
1873
        chanIndex := edges.NestedReadBucket(channelPointBucket)
1✔
1874
        if chanIndex == nil {
1✔
1875
                return 0, ErrGraphNoEdgesFound
×
1876
        }
×
1877

1878
        chanIDBytes := chanIndex.Get(b.Bytes())
1✔
1879
        if chanIDBytes == nil {
1✔
UNCOV
1880
                return 0, ErrEdgeNotFound
×
UNCOV
1881
        }
×
1882

1883
        chanID := byteOrder.Uint64(chanIDBytes)
1✔
1884

1✔
1885
        return chanID, nil
1✔
1886
}
1887

1888
// TODO(roasbeef): allow updates to use Batch?
1889

1890
// HighestChanID returns the "highest" known channel ID in the channel graph.
1891
// This represents the "newest" channel from the PoV of the chain. This method
1892
// can be used by peers to quickly determine if they're graphs are in sync.
1893
func (c *ChannelGraph) HighestChanID() (uint64, error) {
3✔
1894
        var cid uint64
3✔
1895

3✔
1896
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1897
                edges := tx.ReadBucket(edgeBucket)
3✔
1898
                if edges == nil {
3✔
1899
                        return ErrGraphNoEdgesFound
×
1900
                }
×
1901
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1902
                if edgeIndex == nil {
3✔
1903
                        return ErrGraphNoEdgesFound
×
1904
                }
×
1905

1906
                // In order to find the highest chan ID, we'll fetch a cursor
1907
                // and use that to seek to the "end" of our known rage.
1908
                cidCursor := edgeIndex.ReadCursor()
3✔
1909

3✔
1910
                lastChanID, _ := cidCursor.Last()
3✔
1911

3✔
1912
                // If there's no key, then this means that we don't actually
3✔
1913
                // know of any channels, so we'll return a predicable error.
3✔
1914
                if lastChanID == nil {
4✔
1915
                        return ErrGraphNoEdgesFound
1✔
1916
                }
1✔
1917

1918
                // Otherwise, we'll de serialize the channel ID and return it
1919
                // to the caller.
1920
                cid = byteOrder.Uint64(lastChanID)
2✔
1921
                return nil
2✔
1922
        }, func() {
3✔
1923
                cid = 0
3✔
1924
        })
3✔
1925
        if err != nil && err != ErrGraphNoEdgesFound {
3✔
1926
                return 0, err
×
1927
        }
×
1928

1929
        return cid, nil
3✔
1930
}
1931

1932
// ChannelEdge represents the complete set of information for a channel edge in
1933
// the known channel graph. This struct couples the core information of the
1934
// edge as well as each of the known advertised edge policies.
1935
type ChannelEdge struct {
1936
        // Info contains all the static information describing the channel.
1937
        Info *models.ChannelEdgeInfo
1938

1939
        // Policy1 points to the "first" edge policy of the channel containing
1940
        // the dynamic information required to properly route through the edge.
1941
        Policy1 *models.ChannelEdgePolicy
1942

1943
        // Policy2 points to the "second" edge policy of the channel containing
1944
        // the dynamic information required to properly route through the edge.
1945
        Policy2 *models.ChannelEdgePolicy
1946

1947
        // Node1 is "node 1" in the channel. This is the node that would have
1948
        // produced Policy1 if it exists.
1949
        Node1 *models.LightningNode
1950

1951
        // Node2 is "node 2" in the channel. This is the node that would have
1952
        // produced Policy2 if it exists.
1953
        Node2 *models.LightningNode
1954
}
1955

1956
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1957
// one edge that has an update timestamp within the specified horizon.
1958
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
1959
        endTime time.Time) ([]ChannelEdge, error) {
145✔
1960

145✔
1961
        // To ensure we don't return duplicate ChannelEdges, we'll use an
145✔
1962
        // additional map to keep track of the edges already seen to prevent
145✔
1963
        // re-adding it.
145✔
1964
        var edgesSeen map[uint64]struct{}
145✔
1965
        var edgesToCache map[uint64]ChannelEdge
145✔
1966
        var edgesInHorizon []ChannelEdge
145✔
1967

145✔
1968
        c.cacheMu.Lock()
145✔
1969
        defer c.cacheMu.Unlock()
145✔
1970

145✔
1971
        var hits int
145✔
1972
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
290✔
1973
                edges := tx.ReadBucket(edgeBucket)
145✔
1974
                if edges == nil {
145✔
1975
                        return ErrGraphNoEdgesFound
×
1976
                }
×
1977
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
145✔
1978
                if edgeIndex == nil {
145✔
1979
                        return ErrGraphNoEdgesFound
×
1980
                }
×
1981
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
145✔
1982
                if edgeUpdateIndex == nil {
145✔
1983
                        return ErrGraphNoEdgesFound
×
1984
                }
×
1985

1986
                nodes := tx.ReadBucket(nodeBucket)
145✔
1987
                if nodes == nil {
145✔
1988
                        return ErrGraphNodesNotFound
×
1989
                }
×
1990

1991
                // We'll now obtain a cursor to perform a range query within
1992
                // the index to find all channels within the horizon.
1993
                updateCursor := edgeUpdateIndex.ReadCursor()
145✔
1994

145✔
1995
                var startTimeBytes, endTimeBytes [8 + 8]byte
145✔
1996
                byteOrder.PutUint64(
145✔
1997
                        startTimeBytes[:8], uint64(startTime.Unix()),
145✔
1998
                )
145✔
1999
                byteOrder.PutUint64(
145✔
2000
                        endTimeBytes[:8], uint64(endTime.Unix()),
145✔
2001
                )
145✔
2002

145✔
2003
                // With our start and end times constructed, we'll step through
145✔
2004
                // the index collecting the info and policy of each update of
145✔
2005
                // each channel that has a last update within the time range.
145✔
2006
                //
145✔
2007
                //nolint:ll
145✔
2008
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
145✔
2009
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
191✔
2010

46✔
2011
                        // We have a new eligible entry, so we'll slice of the
46✔
2012
                        // chan ID so we can query it in the DB.
46✔
2013
                        chanID := indexKey[8:]
46✔
2014

46✔
2015
                        // If we've already retrieved the info and policies for
46✔
2016
                        // this edge, then we can skip it as we don't need to do
46✔
2017
                        // so again.
46✔
2018
                        chanIDInt := byteOrder.Uint64(chanID)
46✔
2019
                        if _, ok := edgesSeen[chanIDInt]; ok {
65✔
2020
                                continue
19✔
2021
                        }
2022

2023
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
36✔
2024
                                hits++
9✔
2025
                                edgesSeen[chanIDInt] = struct{}{}
9✔
2026
                                edgesInHorizon = append(edgesInHorizon, channel)
9✔
2027
                                continue
9✔
2028
                        }
2029

2030
                        // First, we'll fetch the static edge information.
2031
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
18✔
2032
                        if err != nil {
18✔
2033
                                chanID := byteOrder.Uint64(chanID)
×
2034
                                return fmt.Errorf("unable to fetch info for "+
×
2035
                                        "edge with chan_id=%v: %v", chanID, err)
×
2036
                        }
×
2037

2038
                        // With the static information obtained, we'll now
2039
                        // fetch the dynamic policy info.
2040
                        edge1, edge2, err := fetchChanEdgePolicies(
18✔
2041
                                edgeIndex, edges, chanID,
18✔
2042
                        )
18✔
2043
                        if err != nil {
18✔
2044
                                chanID := byteOrder.Uint64(chanID)
×
2045
                                return fmt.Errorf("unable to fetch policies "+
×
2046
                                        "for edge with chan_id=%v: %v", chanID,
×
2047
                                        err)
×
2048
                        }
×
2049

2050
                        node1, err := fetchLightningNode(
18✔
2051
                                nodes, edgeInfo.NodeKey1Bytes[:],
18✔
2052
                        )
18✔
2053
                        if err != nil {
18✔
2054
                                return err
×
2055
                        }
×
2056

2057
                        node2, err := fetchLightningNode(
18✔
2058
                                nodes, edgeInfo.NodeKey2Bytes[:],
18✔
2059
                        )
18✔
2060
                        if err != nil {
18✔
2061
                                return err
×
2062
                        }
×
2063

2064
                        // Finally, we'll collate this edge with the rest of
2065
                        // edges to be returned.
2066
                        edgesSeen[chanIDInt] = struct{}{}
18✔
2067
                        channel := ChannelEdge{
18✔
2068
                                Info:    &edgeInfo,
18✔
2069
                                Policy1: edge1,
18✔
2070
                                Policy2: edge2,
18✔
2071
                                Node1:   &node1,
18✔
2072
                                Node2:   &node2,
18✔
2073
                        }
18✔
2074
                        edgesInHorizon = append(edgesInHorizon, channel)
18✔
2075
                        edgesToCache[chanIDInt] = channel
18✔
2076
                }
2077

2078
                return nil
145✔
2079
        }, func() {
145✔
2080
                edgesSeen = make(map[uint64]struct{})
145✔
2081
                edgesToCache = make(map[uint64]ChannelEdge)
145✔
2082
                edgesInHorizon = nil
145✔
2083
        })
145✔
2084
        switch {
145✔
2085
        case err == ErrGraphNoEdgesFound:
×
2086
                fallthrough
×
2087
        case err == ErrGraphNodesNotFound:
×
2088
                break
×
2089

2090
        case err != nil:
×
2091
                return nil, err
×
2092
        }
2093

2094
        // Insert any edges loaded from disk into the cache.
2095
        for chanid, channel := range edgesToCache {
163✔
2096
                c.chanCache.insert(chanid, channel)
18✔
2097
        }
18✔
2098

2099
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
145✔
2100
                float64(hits)/float64(len(edgesInHorizon)), hits,
145✔
2101
                len(edgesInHorizon))
145✔
2102

145✔
2103
        return edgesInHorizon, nil
145✔
2104
}
2105

2106
// NodeUpdatesInHorizon returns all the known lightning node which have an
2107
// update timestamp within the passed range. This method can be used by two
2108
// nodes to quickly determine if they have the same set of up to date node
2109
// announcements.
2110
func (c *ChannelGraph) NodeUpdatesInHorizon(startTime,
2111
        endTime time.Time) ([]models.LightningNode, error) {
8✔
2112

8✔
2113
        var nodesInHorizon []models.LightningNode
8✔
2114

8✔
2115
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
16✔
2116
                nodes := tx.ReadBucket(nodeBucket)
8✔
2117
                if nodes == nil {
8✔
2118
                        return ErrGraphNodesNotFound
×
2119
                }
×
2120

2121
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
8✔
2122
                if nodeUpdateIndex == nil {
8✔
2123
                        return ErrGraphNodesNotFound
×
2124
                }
×
2125

2126
                // We'll now obtain a cursor to perform a range query within
2127
                // the index to find all node announcements within the horizon.
2128
                updateCursor := nodeUpdateIndex.ReadCursor()
8✔
2129

8✔
2130
                var startTimeBytes, endTimeBytes [8 + 33]byte
8✔
2131
                byteOrder.PutUint64(
8✔
2132
                        startTimeBytes[:8], uint64(startTime.Unix()),
8✔
2133
                )
8✔
2134
                byteOrder.PutUint64(
8✔
2135
                        endTimeBytes[:8], uint64(endTime.Unix()),
8✔
2136
                )
8✔
2137

8✔
2138
                // With our start and end times constructed, we'll step through
8✔
2139
                // the index collecting info for each node within the time
8✔
2140
                // range.
8✔
2141
                //
8✔
2142
                //nolint:ll
8✔
2143
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
8✔
2144
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
37✔
2145

29✔
2146
                        nodePub := indexKey[8:]
29✔
2147
                        node, err := fetchLightningNode(nodes, nodePub)
29✔
2148
                        if err != nil {
29✔
2149
                                return err
×
2150
                        }
×
2151

2152
                        nodesInHorizon = append(nodesInHorizon, node)
29✔
2153
                }
2154

2155
                return nil
8✔
2156
        }, func() {
8✔
2157
                nodesInHorizon = nil
8✔
2158
        })
8✔
2159
        switch {
8✔
2160
        case err == ErrGraphNoEdgesFound:
×
2161
                fallthrough
×
2162
        case err == ErrGraphNodesNotFound:
×
2163
                break
×
2164

2165
        case err != nil:
×
2166
                return nil, err
×
2167
        }
2168

2169
        return nodesInHorizon, nil
8✔
2170
}
2171

2172
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2173
// ID's that we don't know and are not known zombies of the passed set. In other
2174
// words, we perform a set difference of our set of chan ID's and the ones
2175
// passed in. This method can be used by callers to determine the set of
2176
// channels another peer knows of that we don't.
2177
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
2178
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
113✔
2179

113✔
2180
        var newChanIDs []uint64
113✔
2181

113✔
2182
        c.cacheMu.Lock()
113✔
2183
        defer c.cacheMu.Unlock()
113✔
2184

113✔
2185
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
226✔
2186
                edges := tx.ReadBucket(edgeBucket)
113✔
2187
                if edges == nil {
113✔
2188
                        return ErrGraphNoEdgesFound
×
2189
                }
×
2190
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
113✔
2191
                if edgeIndex == nil {
113✔
2192
                        return ErrGraphNoEdgesFound
×
2193
                }
×
2194

2195
                // Fetch the zombie index, it may not exist if no edges have
2196
                // ever been marked as zombies. If the index has been
2197
                // initialized, we will use it later to skip known zombie edges.
2198
                zombieIndex := edges.NestedReadBucket(zombieBucket)
113✔
2199

113✔
2200
                // We'll run through the set of chanIDs and collate only the
113✔
2201
                // set of channel that are unable to be found within our db.
113✔
2202
                var cidBytes [8]byte
113✔
2203
                for _, info := range chansInfo {
198✔
2204
                        scid := info.ShortChannelID.ToUint64()
85✔
2205
                        byteOrder.PutUint64(cidBytes[:], scid)
85✔
2206

85✔
2207
                        // If the edge is already known, skip it.
85✔
2208
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
101✔
2209
                                continue
16✔
2210
                        }
2211

2212
                        // If the edge is a known zombie, skip it.
2213
                        if zombieIndex != nil {
138✔
2214
                                isZombie, _, _ := isZombieEdge(
69✔
2215
                                        zombieIndex, scid,
69✔
2216
                                )
69✔
2217

69✔
2218
                                // TODO(ziggie): Make sure that for the strict
69✔
2219
                                // pruning case we compare the pubkeys and
69✔
2220
                                // whether the right timestamp is not older than
69✔
2221
                                // the `ChannelPruneExpiry`.
69✔
2222
                                //
69✔
2223
                                // NOTE: The timestamp data has no verification
69✔
2224
                                // attached to it in the `ReplyChannelRange` msg
69✔
2225
                                // so we are trusting this data at this point.
69✔
2226
                                // However it is not critical because we are
69✔
2227
                                // just removing the channel from the db when
69✔
2228
                                // the timestamps are more recent. During the
69✔
2229
                                // querying of the gossip msg verification
69✔
2230
                                // happens as usual.
69✔
2231
                                // However we should start punishing peers when
69✔
2232
                                // they don't provide us honest data ?
69✔
2233
                                isStillZombie := isZombieChan(
69✔
2234
                                        info.Node1UpdateTimestamp,
69✔
2235
                                        info.Node2UpdateTimestamp,
69✔
2236
                                )
69✔
2237

69✔
2238
                                switch {
69✔
2239
                                // If the edge is a known zombie and if we
2240
                                // would still consider it a zombie given the
2241
                                // latest update timestamps, then we skip this
2242
                                // channel.
2243
                                case isZombie && isStillZombie:
23✔
2244
                                        continue
23✔
2245

2246
                                // Otherwise, if we have marked it as a zombie
2247
                                // but the latest update timestamps could bring
2248
                                // it back from the dead, then we mark it alive,
2249
                                // and we let it be added to the set of IDs to
2250
                                // query our peer for.
2251
                                case isZombie && !isStillZombie:
16✔
2252
                                        err := c.markEdgeLiveUnsafe(tx, scid)
16✔
2253
                                        if err != nil {
16✔
2254
                                                return err
×
2255
                                        }
×
2256
                                }
2257
                        }
2258

2259
                        newChanIDs = append(newChanIDs, scid)
46✔
2260
                }
2261

2262
                return nil
113✔
2263
        }, func() {
113✔
2264
                newChanIDs = nil
113✔
2265
        })
113✔
2266
        switch {
113✔
2267
        // If we don't know of any edges yet, then we'll return the entire set
2268
        // of chan IDs specified.
2269
        case err == ErrGraphNoEdgesFound:
×
2270
                ogChanIDs := make([]uint64, len(chansInfo))
×
2271
                for i, info := range chansInfo {
×
2272
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2273
                }
×
2274

2275
                return ogChanIDs, nil
×
2276

2277
        case err != nil:
×
2278
                return nil, err
×
2279
        }
2280

2281
        return newChanIDs, nil
113✔
2282
}
2283

2284
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2285
// latest received channel updates for the channel.
2286
type ChannelUpdateInfo struct {
2287
        // ShortChannelID is the SCID identifier of the channel.
2288
        ShortChannelID lnwire.ShortChannelID
2289

2290
        // Node1UpdateTimestamp is the timestamp of the latest received update
2291
        // from the node 1 channel peer. This will be set to zero time if no
2292
        // update has yet been received from this node.
2293
        Node1UpdateTimestamp time.Time
2294

2295
        // Node2UpdateTimestamp is the timestamp of the latest received update
2296
        // from the node 2 channel peer. This will be set to zero time if no
2297
        // update has yet been received from this node.
2298
        Node2UpdateTimestamp time.Time
2299
}
2300

2301
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2302
// timestamps with zero seconds unix timestamp which equals
2303
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2304
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2305
        node2Timestamp time.Time) ChannelUpdateInfo {
218✔
2306

218✔
2307
        chanInfo := ChannelUpdateInfo{
218✔
2308
                ShortChannelID:       scid,
218✔
2309
                Node1UpdateTimestamp: node1Timestamp,
218✔
2310
                Node2UpdateTimestamp: node2Timestamp,
218✔
2311
        }
218✔
2312

218✔
2313
        if node1Timestamp.IsZero() {
426✔
2314
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
208✔
2315
        }
208✔
2316

2317
        if node2Timestamp.IsZero() {
426✔
2318
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
208✔
2319
        }
208✔
2320

2321
        return chanInfo
218✔
2322
}
2323

2324
// BlockChannelRange represents a range of channels for a given block height.
2325
type BlockChannelRange struct {
2326
        // Height is the height of the block all of the channels below were
2327
        // included in.
2328
        Height uint32
2329

2330
        // Channels is the list of channels identified by their short ID
2331
        // representation known to us that were included in the block height
2332
        // above. The list may include channel update timestamp information if
2333
        // requested.
2334
        Channels []ChannelUpdateInfo
2335
}
2336

2337
// FilterChannelRange returns the channel ID's of all known channels which were
2338
// mined in a block height within the passed range. The channel IDs are grouped
2339
// by their common block height. This method can be used to quickly share with a
2340
// peer the set of channels we know of within a particular range to catch them
2341
// up after a period of time offline. If withTimestamps is true then the
2342
// timestamp info of the latest received channel update messages of the channel
2343
// will be included in the response.
2344
func (c *ChannelGraph) FilterChannelRange(startHeight,
2345
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
11✔
2346

11✔
2347
        startChanID := &lnwire.ShortChannelID{
11✔
2348
                BlockHeight: startHeight,
11✔
2349
        }
11✔
2350

11✔
2351
        endChanID := lnwire.ShortChannelID{
11✔
2352
                BlockHeight: endHeight,
11✔
2353
                TxIndex:     math.MaxUint32 & 0x00ffffff,
11✔
2354
                TxPosition:  math.MaxUint16,
11✔
2355
        }
11✔
2356

11✔
2357
        // As we need to perform a range scan, we'll convert the starting and
11✔
2358
        // ending height to their corresponding values when encoded using short
11✔
2359
        // channel ID's.
11✔
2360
        var chanIDStart, chanIDEnd [8]byte
11✔
2361
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
11✔
2362
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
11✔
2363

11✔
2364
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
11✔
2365
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
2366
                edges := tx.ReadBucket(edgeBucket)
11✔
2367
                if edges == nil {
11✔
2368
                        return ErrGraphNoEdgesFound
×
2369
                }
×
2370
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
2371
                if edgeIndex == nil {
11✔
2372
                        return ErrGraphNoEdgesFound
×
2373
                }
×
2374

2375
                cursor := edgeIndex.ReadCursor()
11✔
2376

11✔
2377
                // We'll now iterate through the database, and find each
11✔
2378
                // channel ID that resides within the specified range.
11✔
2379
                //
11✔
2380
                //nolint:ll
11✔
2381
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
11✔
2382
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
55✔
2383
                        // Don't send alias SCIDs during gossip sync.
44✔
2384
                        edgeReader := bytes.NewReader(v)
44✔
2385
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
44✔
2386
                        if err != nil {
44✔
2387
                                return err
×
2388
                        }
×
2389

2390
                        if edgeInfo.AuthProof == nil {
44✔
UNCOV
2391
                                continue
×
2392
                        }
2393

2394
                        // This channel ID rests within the target range, so
2395
                        // we'll add it to our returned set.
2396
                        rawCid := byteOrder.Uint64(k)
44✔
2397
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
44✔
2398

44✔
2399
                        chanInfo := NewChannelUpdateInfo(
44✔
2400
                                cid, time.Time{}, time.Time{},
44✔
2401
                        )
44✔
2402

44✔
2403
                        if !withTimestamps {
66✔
2404
                                channelsPerBlock[cid.BlockHeight] = append(
22✔
2405
                                        channelsPerBlock[cid.BlockHeight],
22✔
2406
                                        chanInfo,
22✔
2407
                                )
22✔
2408

22✔
2409
                                continue
22✔
2410
                        }
2411

2412
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
22✔
2413

22✔
2414
                        rawPolicy := edges.Get(node1Key)
22✔
2415
                        if len(rawPolicy) != 0 {
28✔
2416
                                r := bytes.NewReader(rawPolicy)
6✔
2417

6✔
2418
                                edge, err := deserializeChanEdgePolicyRaw(r)
6✔
2419
                                if err != nil && !errors.Is(
6✔
2420
                                        err, ErrEdgePolicyOptionalFieldNotFound,
6✔
2421
                                ) {
6✔
2422

×
2423
                                        return err
×
2424
                                }
×
2425

2426
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
6✔
2427
                        }
2428

2429
                        rawPolicy = edges.Get(node2Key)
22✔
2430
                        if len(rawPolicy) != 0 {
33✔
2431
                                r := bytes.NewReader(rawPolicy)
11✔
2432

11✔
2433
                                edge, err := deserializeChanEdgePolicyRaw(r)
11✔
2434
                                if err != nil && !errors.Is(
11✔
2435
                                        err, ErrEdgePolicyOptionalFieldNotFound,
11✔
2436
                                ) {
11✔
2437

×
2438
                                        return err
×
2439
                                }
×
2440

2441
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
11✔
2442
                        }
2443

2444
                        channelsPerBlock[cid.BlockHeight] = append(
22✔
2445
                                channelsPerBlock[cid.BlockHeight], chanInfo,
22✔
2446
                        )
22✔
2447
                }
2448

2449
                return nil
11✔
2450
        }, func() {
11✔
2451
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
11✔
2452
        })
11✔
2453

2454
        switch {
11✔
2455
        // If we don't know of any channels yet, then there's nothing to
2456
        // filter, so we'll return an empty slice.
2457
        case err == ErrGraphNoEdgesFound || len(channelsPerBlock) == 0:
3✔
2458
                return nil, nil
3✔
2459

2460
        case err != nil:
×
2461
                return nil, err
×
2462
        }
2463

2464
        // Return the channel ranges in ascending block height order.
2465
        blocks := make([]uint32, 0, len(channelsPerBlock))
8✔
2466
        for block := range channelsPerBlock {
30✔
2467
                blocks = append(blocks, block)
22✔
2468
        }
22✔
2469
        sort.Slice(blocks, func(i, j int) bool {
24✔
2470
                return blocks[i] < blocks[j]
16✔
2471
        })
16✔
2472

2473
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
8✔
2474
        for _, block := range blocks {
30✔
2475
                channelRanges = append(channelRanges, BlockChannelRange{
22✔
2476
                        Height:   block,
22✔
2477
                        Channels: channelsPerBlock[block],
22✔
2478
                })
22✔
2479
        }
22✔
2480

2481
        return channelRanges, nil
8✔
2482
}
2483

2484
// FetchChanInfos returns the set of channel edges that correspond to the passed
2485
// channel ID's. If an edge is the query is unknown to the database, it will
2486
// skipped and the result will contain only those edges that exist at the time
2487
// of the query. This can be used to respond to peer queries that are seeking to
2488
// fill in gaps in their view of the channel graph.
2489
func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
3✔
2490
        return c.fetchChanInfos(nil, chanIDs)
3✔
2491
}
3✔
2492

2493
// fetchChanInfos returns the set of channel edges that correspond to the passed
2494
// channel ID's. If an edge is the query is unknown to the database, it will
2495
// skipped and the result will contain only those edges that exist at the time
2496
// of the query. This can be used to respond to peer queries that are seeking to
2497
// fill in gaps in their view of the channel graph.
2498
//
2499
// NOTE: An optional transaction may be provided. If none is provided, then a
2500
// new one will be created.
2501
func (c *ChannelGraph) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2502
        []ChannelEdge, error) {
20✔
2503
        // TODO(roasbeef): sort cids?
20✔
2504

20✔
2505
        var (
20✔
2506
                chanEdges []ChannelEdge
20✔
2507
                cidBytes  [8]byte
20✔
2508
        )
20✔
2509

20✔
2510
        fetchChanInfos := func(tx kvdb.RTx) error {
40✔
2511
                edges := tx.ReadBucket(edgeBucket)
20✔
2512
                if edges == nil {
20✔
2513
                        return ErrGraphNoEdgesFound
×
2514
                }
×
2515
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
20✔
2516
                if edgeIndex == nil {
20✔
2517
                        return ErrGraphNoEdgesFound
×
2518
                }
×
2519
                nodes := tx.ReadBucket(nodeBucket)
20✔
2520
                if nodes == nil {
20✔
2521
                        return ErrGraphNotFound
×
2522
                }
×
2523

2524
                for _, cid := range chanIDs {
47✔
2525
                        byteOrder.PutUint64(cidBytes[:], cid)
27✔
2526

27✔
2527
                        // First, we'll fetch the static edge information. If
27✔
2528
                        // the edge is unknown, we will skip the edge and
27✔
2529
                        // continue gathering all known edges.
27✔
2530
                        edgeInfo, err := fetchChanEdgeInfo(
27✔
2531
                                edgeIndex, cidBytes[:],
27✔
2532
                        )
27✔
2533
                        switch {
27✔
2534
                        case errors.Is(err, ErrEdgeNotFound):
19✔
2535
                                continue
19✔
2536
                        case err != nil:
×
2537
                                return err
×
2538
                        }
2539

2540
                        // With the static information obtained, we'll now
2541
                        // fetch the dynamic policy info.
2542
                        edge1, edge2, err := fetchChanEdgePolicies(
8✔
2543
                                edgeIndex, edges, cidBytes[:],
8✔
2544
                        )
8✔
2545
                        if err != nil {
8✔
2546
                                return err
×
2547
                        }
×
2548

2549
                        node1, err := fetchLightningNode(
8✔
2550
                                nodes, edgeInfo.NodeKey1Bytes[:],
8✔
2551
                        )
8✔
2552
                        if err != nil {
8✔
2553
                                return err
×
2554
                        }
×
2555

2556
                        node2, err := fetchLightningNode(
8✔
2557
                                nodes, edgeInfo.NodeKey2Bytes[:],
8✔
2558
                        )
8✔
2559
                        if err != nil {
8✔
2560
                                return err
×
2561
                        }
×
2562

2563
                        chanEdges = append(chanEdges, ChannelEdge{
8✔
2564
                                Info:    &edgeInfo,
8✔
2565
                                Policy1: edge1,
8✔
2566
                                Policy2: edge2,
8✔
2567
                                Node1:   &node1,
8✔
2568
                                Node2:   &node2,
8✔
2569
                        })
8✔
2570
                }
2571
                return nil
20✔
2572
        }
2573

2574
        if tx == nil {
24✔
2575
                err := kvdb.View(c.db, fetchChanInfos, func() {
8✔
2576
                        chanEdges = nil
4✔
2577
                })
4✔
2578
                if err != nil {
4✔
2579
                        return nil, err
×
2580
                }
×
2581

2582
                return chanEdges, nil
4✔
2583
        }
2584

2585
        err := fetchChanInfos(tx)
16✔
2586
        if err != nil {
16✔
2587
                return nil, err
×
2588
        }
×
2589

2590
        return chanEdges, nil
16✔
2591
}
2592

2593
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2594
        edge1, edge2 *models.ChannelEdgePolicy) error {
130✔
2595

130✔
2596
        // First, we'll fetch the edge update index bucket which currently
130✔
2597
        // stores an entry for the channel we're about to delete.
130✔
2598
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
130✔
2599
        if updateIndex == nil {
130✔
2600
                // No edges in bucket, return early.
×
2601
                return nil
×
2602
        }
×
2603

2604
        // Now that we have the bucket, we'll attempt to construct a template
2605
        // for the index key: updateTime || chanid.
2606
        var indexKey [8 + 8]byte
130✔
2607
        byteOrder.PutUint64(indexKey[8:], chanID)
130✔
2608

130✔
2609
        // With the template constructed, we'll attempt to delete an entry that
130✔
2610
        // would have been created by both edges: we'll alternate the update
130✔
2611
        // times, as one may had overridden the other.
130✔
2612
        if edge1 != nil {
140✔
2613
                byteOrder.PutUint64(
10✔
2614
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
10✔
2615
                )
10✔
2616
                if err := updateIndex.Delete(indexKey[:]); err != nil {
10✔
2617
                        return err
×
2618
                }
×
2619
        }
2620

2621
        // We'll also attempt to delete the entry that may have been created by
2622
        // the second edge.
2623
        if edge2 != nil {
142✔
2624
                byteOrder.PutUint64(
12✔
2625
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
12✔
2626
                )
12✔
2627
                if err := updateIndex.Delete(indexKey[:]); err != nil {
12✔
2628
                        return err
×
2629
                }
×
2630
        }
2631

2632
        return nil
130✔
2633
}
2634

2635
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2636
// cache. It then goes on to delete any policy info and edge info for this
2637
// channel from the DB and finally, if isZombie is true, it will add an entry
2638
// for this channel in the zombie index.
2639
//
2640
// NOTE: this method MUST only be called if the cacheMu has already been
2641
// acquired.
2642
func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2643
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2644
        strictZombie bool) error {
192✔
2645

192✔
2646
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
192✔
2647
        if err != nil {
254✔
2648
                return err
62✔
2649
        }
62✔
2650

2651
        if c.graphCache != nil {
260✔
2652
                c.graphCache.RemoveChannel(
130✔
2653
                        edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
130✔
2654
                        edgeInfo.ChannelID,
130✔
2655
                )
130✔
2656
        }
130✔
2657

2658
        // We'll also remove the entry in the edge update index bucket before
2659
        // we delete the edges themselves so we can access their last update
2660
        // times.
2661
        cid := byteOrder.Uint64(chanID)
130✔
2662
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
130✔
2663
        if err != nil {
130✔
2664
                return err
×
2665
        }
×
2666
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
130✔
2667
        if err != nil {
130✔
2668
                return err
×
2669
        }
×
2670

2671
        // The edge key is of the format pubKey || chanID. First we construct
2672
        // the latter half, populating the channel ID.
2673
        var edgeKey [33 + 8]byte
130✔
2674
        copy(edgeKey[33:], chanID)
130✔
2675

130✔
2676
        // With the latter half constructed, copy over the first public key to
130✔
2677
        // delete the edge in this direction, then the second to delete the
130✔
2678
        // edge in the opposite direction.
130✔
2679
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
130✔
2680
        if edges.Get(edgeKey[:]) != nil {
260✔
2681
                if err := edges.Delete(edgeKey[:]); err != nil {
130✔
2682
                        return err
×
2683
                }
×
2684
        }
2685
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
130✔
2686
        if edges.Get(edgeKey[:]) != nil {
260✔
2687
                if err := edges.Delete(edgeKey[:]); err != nil {
130✔
2688
                        return err
×
2689
                }
×
2690
        }
2691

2692
        // As part of deleting the edge we also remove all disabled entries
2693
        // from the edgePolicyDisabledIndex bucket. We do that for both
2694
        // directions.
2695
        err = updateEdgePolicyDisabledIndex(edges, cid, false, false)
130✔
2696
        if err != nil {
130✔
2697
                return err
×
2698
        }
×
2699
        err = updateEdgePolicyDisabledIndex(edges, cid, true, false)
130✔
2700
        if err != nil {
130✔
2701
                return err
×
2702
        }
×
2703

2704
        // With the edge data deleted, we can purge the information from the two
2705
        // edge indexes.
2706
        if err := edgeIndex.Delete(chanID); err != nil {
130✔
2707
                return err
×
2708
        }
×
2709
        var b bytes.Buffer
130✔
2710
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
130✔
2711
                return err
×
2712
        }
×
2713
        if err := chanIndex.Delete(b.Bytes()); err != nil {
130✔
2714
                return err
×
2715
        }
×
2716

2717
        // Finally, we'll mark the edge as a zombie within our index if it's
2718
        // being removed due to the channel becoming a zombie. We do this to
2719
        // ensure we don't store unnecessary data for spent channels.
2720
        if !isZombie {
236✔
2721
                return nil
106✔
2722
        }
106✔
2723

2724
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
24✔
2725
        if strictZombie {
27✔
2726
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
3✔
2727
        }
3✔
2728

2729
        return markEdgeZombie(
24✔
2730
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
24✔
2731
        )
24✔
2732
}
2733

2734
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2735
// particular pair of channel policies. The return values are one of:
2736
//  1. (pubkey1, pubkey2)
2737
//  2. (pubkey1, blank)
2738
//  3. (blank, pubkey2)
2739
//
2740
// A blank pubkey means that corresponding node will be unable to resurrect a
2741
// channel on its own. For example, node1 may continue to publish recent
2742
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2743
// we don't want another fresh update from node1 to resurrect, as the edge can
2744
// only become live once node2 finally sends something recent.
2745
//
2746
// In the case where we have neither update, we allow either party to resurrect
2747
// the channel. If the channel were to be marked zombie again, it would be
2748
// marked with the correct lagging channel since we received an update from only
2749
// one side.
2750
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
2751
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
3✔
2752

3✔
2753
        switch {
3✔
2754
        // If we don't have either edge policy, we'll return both pubkeys so
2755
        // that the channel can be resurrected by either party.
2756
        case e1 == nil && e2 == nil:
×
2757
                return info.NodeKey1Bytes, info.NodeKey2Bytes
×
2758

2759
        // If we're missing edge1, or if both edges are present but edge1 is
2760
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2761
        // means that only an update from edge1 will be able to resurrect the
2762
        // channel.
2763
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
1✔
2764
                return info.NodeKey1Bytes, [33]byte{}
1✔
2765

2766
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2767
        // return a blank pubkey for edge1. In this case, only an update from
2768
        // edge2 can resurect the channel.
2769
        default:
2✔
2770
                return [33]byte{}, info.NodeKey2Bytes
2✔
2771
        }
2772
}
2773

2774
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2775
// within the database for the referenced channel. The `flags` attribute within
2776
// the ChannelEdgePolicy determines which of the directed edges are being
2777
// updated. If the flag is 1, then the first node's information is being
2778
// updated, otherwise it's the second node's information. The node ordering is
2779
// determined by the lexicographical ordering of the identity public keys of the
2780
// nodes on either side of the channel.
2781
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2782
        op ...batch.SchedulerOption) error {
2,663✔
2783

2,663✔
2784
        var (
2,663✔
2785
                isUpdate1    bool
2,663✔
2786
                edgeNotFound bool
2,663✔
2787
        )
2,663✔
2788

2,663✔
2789
        r := &batch.Request{
2,663✔
2790
                Reset: func() {
5,326✔
2791
                        isUpdate1 = false
2,663✔
2792
                        edgeNotFound = false
2,663✔
2793
                },
2,663✔
2794
                Update: func(tx kvdb.RwTx) error {
2,663✔
2795
                        var err error
2,663✔
2796
                        isUpdate1, err = updateEdgePolicy(
2,663✔
2797
                                tx, edge, c.graphCache,
2,663✔
2798
                        )
2,663✔
2799

2,663✔
2800
                        if err != nil {
2,666✔
2801
                                log.Errorf("UpdateEdgePolicy faild: %v", err)
3✔
2802
                        }
3✔
2803

2804
                        // Silence ErrEdgeNotFound so that the batch can
2805
                        // succeed, but propagate the error via local state.
2806
                        if errors.Is(err, ErrEdgeNotFound) {
2,666✔
2807
                                edgeNotFound = true
3✔
2808
                                return nil
3✔
2809
                        }
3✔
2810

2811
                        return err
2,660✔
2812
                },
2813
                OnCommit: func(err error) error {
2,663✔
2814
                        switch {
2,663✔
2815
                        case err != nil:
×
2816
                                return err
×
2817
                        case edgeNotFound:
3✔
2818
                                return ErrEdgeNotFound
3✔
2819
                        default:
2,660✔
2820
                                c.updateEdgeCache(edge, isUpdate1)
2,660✔
2821
                                return nil
2,660✔
2822
                        }
2823
                },
2824
        }
2825

2826
        for _, f := range op {
2,663✔
UNCOV
2827
                f(r)
×
UNCOV
2828
        }
×
2829

2830
        return c.chanScheduler.Execute(r)
2,663✔
2831
}
2832

2833
func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy,
2834
        isUpdate1 bool) {
2,660✔
2835

2,660✔
2836
        // If an entry for this channel is found in reject cache, we'll modify
2,660✔
2837
        // the entry with the updated timestamp for the direction that was just
2,660✔
2838
        // written. If the edge doesn't exist, we'll load the cache entry lazily
2,660✔
2839
        // during the next query for this edge.
2,660✔
2840
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2,665✔
2841
                if isUpdate1 {
8✔
2842
                        entry.upd1Time = e.LastUpdate.Unix()
3✔
2843
                } else {
5✔
2844
                        entry.upd2Time = e.LastUpdate.Unix()
2✔
2845
                }
2✔
2846
                c.rejectCache.insert(e.ChannelID, entry)
5✔
2847
        }
2848

2849
        // If an entry for this channel is found in channel cache, we'll modify
2850
        // the entry with the updated policy for the direction that was just
2851
        // written. If the edge doesn't exist, we'll defer loading the info and
2852
        // policies and lazily read from disk during the next query.
2853
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
2,660✔
UNCOV
2854
                if isUpdate1 {
×
UNCOV
2855
                        channel.Policy1 = e
×
UNCOV
2856
                } else {
×
UNCOV
2857
                        channel.Policy2 = e
×
UNCOV
2858
                }
×
UNCOV
2859
                c.chanCache.insert(e.ChannelID, channel)
×
2860
        }
2861
}
2862

2863
// updateEdgePolicy attempts to update an edge's policy within the relevant
2864
// buckets using an existing database transaction. The returned boolean will be
2865
// true if the updated policy belongs to node1, and false if the policy belonged
2866
// to node2.
2867
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2868
        graphCache *GraphCache) (bool, error) {
2,663✔
2869

2,663✔
2870
        edges := tx.ReadWriteBucket(edgeBucket)
2,663✔
2871
        if edges == nil {
2,663✔
2872
                return false, ErrEdgeNotFound
×
2873
        }
×
2874
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
2,663✔
2875
        if edgeIndex == nil {
2,663✔
2876
                return false, ErrEdgeNotFound
×
2877
        }
×
2878

2879
        // Create the channelID key be converting the channel ID
2880
        // integer into a byte slice.
2881
        var chanID [8]byte
2,663✔
2882
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
2,663✔
2883

2,663✔
2884
        // With the channel ID, we then fetch the value storing the two
2,663✔
2885
        // nodes which connect this channel edge.
2,663✔
2886
        nodeInfo := edgeIndex.Get(chanID[:])
2,663✔
2887
        if nodeInfo == nil {
2,666✔
2888
                return false, ErrEdgeNotFound
3✔
2889
        }
3✔
2890

2891
        // Depending on the flags value passed above, either the first
2892
        // or second edge policy is being updated.
2893
        var fromNode, toNode []byte
2,660✔
2894
        var isUpdate1 bool
2,660✔
2895
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,994✔
2896
                fromNode = nodeInfo[:33]
1,334✔
2897
                toNode = nodeInfo[33:66]
1,334✔
2898
                isUpdate1 = true
1,334✔
2899
        } else {
2,660✔
2900
                fromNode = nodeInfo[33:66]
1,326✔
2901
                toNode = nodeInfo[:33]
1,326✔
2902
                isUpdate1 = false
1,326✔
2903
        }
1,326✔
2904

2905
        // Finally, with the direction of the edge being updated
2906
        // identified, we update the on-disk edge representation.
2907
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
2,660✔
2908
        if err != nil {
2,660✔
2909
                return false, err
×
2910
        }
×
2911

2912
        var (
2,660✔
2913
                fromNodePubKey route.Vertex
2,660✔
2914
                toNodePubKey   route.Vertex
2,660✔
2915
        )
2,660✔
2916
        copy(fromNodePubKey[:], fromNode)
2,660✔
2917
        copy(toNodePubKey[:], toNode)
2,660✔
2918

2,660✔
2919
        if graphCache != nil {
4,934✔
2920
                graphCache.UpdatePolicy(
2,274✔
2921
                        edge, fromNodePubKey, toNodePubKey, isUpdate1,
2,274✔
2922
                )
2,274✔
2923
        }
2,274✔
2924

2925
        return isUpdate1, nil
2,660✔
2926
}
2927

2928
// isPublic determines whether the node is seen as public within the graph from
2929
// the source node's point of view. An existing database transaction can also be
2930
// specified.
2931
func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2932
        sourcePubKey []byte) (bool, error) {
13✔
2933

13✔
2934
        // In order to determine whether this node is publicly advertised within
13✔
2935
        // the graph, we'll need to look at all of its edges and check whether
13✔
2936
        // they extend to any other node than the source node. errDone will be
13✔
2937
        // used to terminate the check early.
13✔
2938
        nodeIsPublic := false
13✔
2939
        errDone := errors.New("done")
13✔
2940
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
13✔
2941
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
13✔
2942
                _ *models.ChannelEdgePolicy) error {
23✔
2943

10✔
2944
                // If this edge doesn't extend to the source node, we'll
10✔
2945
                // terminate our search as we can now conclude that the node is
10✔
2946
                // publicly advertised within the graph due to the local node
10✔
2947
                // knowing of the current edge.
10✔
2948
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
10✔
2949
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
13✔
2950

3✔
2951
                        nodeIsPublic = true
3✔
2952
                        return errDone
3✔
2953
                }
3✔
2954

2955
                // Since the edge _does_ extend to the source node, we'll also
2956
                // need to ensure that this is a public edge.
2957
                if info.AuthProof != nil {
13✔
2958
                        nodeIsPublic = true
6✔
2959
                        return errDone
6✔
2960
                }
6✔
2961

2962
                // Otherwise, we'll continue our search.
2963
                return nil
1✔
2964
        })
2965
        if err != nil && err != errDone {
13✔
2966
                return false, err
×
2967
        }
×
2968

2969
        return nodeIsPublic, nil
13✔
2970
}
2971

2972
// FetchLightningNodeTx attempts to look up a target node by its identity
2973
// public key. If the node isn't found in the database, then
2974
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
2975
// If none is provided, then a new one will be created.
2976
func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
2977
        *models.LightningNode, error) {
2,944✔
2978

2,944✔
2979
        return c.fetchLightningNode(tx, nodePub)
2,944✔
2980
}
2,944✔
2981

2982
// FetchLightningNode attempts to look up a target node by its identity public
2983
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2984
// returned.
2985
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
2986
        *models.LightningNode, error) {
837✔
2987

837✔
2988
        return c.fetchLightningNode(nil, nodePub)
837✔
2989
}
837✔
2990

2991
// fetchLightningNode attempts to look up a target node by its identity public
2992
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2993
// returned. An optional transaction may be provided. If none is provided, then
2994
// a new one will be created.
2995
func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
2996
        nodePub route.Vertex) (*models.LightningNode, error) {
3,781✔
2997

3,781✔
2998
        var node *models.LightningNode
3,781✔
2999
        fetch := func(tx kvdb.RTx) error {
7,562✔
3000
                // First grab the nodes bucket which stores the mapping from
3,781✔
3001
                // pubKey to node information.
3,781✔
3002
                nodes := tx.ReadBucket(nodeBucket)
3,781✔
3003
                if nodes == nil {
3,781✔
3004
                        return ErrGraphNotFound
×
3005
                }
×
3006

3007
                // If a key for this serialized public key isn't found, then
3008
                // the target node doesn't exist within the database.
3009
                nodeBytes := nodes.Get(nodePub[:])
3,781✔
3010
                if nodeBytes == nil {
3,795✔
3011
                        return ErrGraphNodeNotFound
14✔
3012
                }
14✔
3013

3014
                // If the node is found, then we can de deserialize the node
3015
                // information to return to the user.
3016
                nodeReader := bytes.NewReader(nodeBytes)
3,767✔
3017
                n, err := deserializeLightningNode(nodeReader)
3,767✔
3018
                if err != nil {
3,767✔
3019
                        return err
×
3020
                }
×
3021

3022
                node = &n
3,767✔
3023

3,767✔
3024
                return nil
3,767✔
3025
        }
3026

3027
        if tx == nil {
4,618✔
3028
                err := kvdb.View(
837✔
3029
                        c.db, fetch, func() {
1,674✔
3030
                                node = nil
837✔
3031
                        },
837✔
3032
                )
3033
                if err != nil {
851✔
3034
                        return nil, err
14✔
3035
                }
14✔
3036

3037
                return node, nil
823✔
3038
        }
3039

3040
        err := fetch(tx)
2,944✔
3041
        if err != nil {
2,944✔
3042
                return nil, err
×
3043
        }
×
3044

3045
        return node, nil
2,944✔
3046
}
3047

3048
// graphCacheNode is a struct that wraps a LightningNode in a way that it can be
3049
// cached in the graph cache.
3050
type graphCacheNode struct {
3051
        pubKeyBytes route.Vertex
3052
        features    *lnwire.FeatureVector
3053
}
3054

3055
// newGraphCacheNode returns a new cache optimized node.
3056
func newGraphCacheNode(pubKey route.Vertex,
3057
        features *lnwire.FeatureVector) *graphCacheNode {
732✔
3058

732✔
3059
        return &graphCacheNode{
732✔
3060
                pubKeyBytes: pubKey,
732✔
3061
                features:    features,
732✔
3062
        }
732✔
3063
}
732✔
3064

3065
// PubKey returns the node's public identity key.
3066
func (n *graphCacheNode) PubKey() route.Vertex {
732✔
3067
        return n.pubKeyBytes
732✔
3068
}
732✔
3069

3070
// Features returns the node's features.
3071
func (n *graphCacheNode) Features() *lnwire.FeatureVector {
712✔
3072
        return n.features
712✔
3073
}
712✔
3074

3075
// ForEachChannel iterates through all channels of this node, executing the
3076
// passed callback with an edge info structure and the policies of each end
3077
// of the channel. The first edge policy is the outgoing edge *to* the
3078
// connecting node, while the second is the incoming edge *from* the
3079
// connecting node. If the callback returns an error, then the iteration is
3080
// halted with the error propagated back up to the caller.
3081
//
3082
// Unknown policies are passed into the callback as nil values.
3083
func (n *graphCacheNode) ForEachChannel(tx kvdb.RTx,
3084
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3085
                *models.ChannelEdgePolicy) error) error {
632✔
3086

632✔
3087
        return nodeTraversal(tx, n.pubKeyBytes[:], nil, cb)
632✔
3088
}
632✔
3089

3090
var _ GraphCacheNode = (*graphCacheNode)(nil)
3091

3092
// HasLightningNode determines if the graph has a vertex identified by the
3093
// target node identity public key. If the node exists in the database, a
3094
// timestamp of when the data for the node was lasted updated is returned along
3095
// with a true boolean. Otherwise, an empty time.Time is returned with a false
3096
// boolean.
3097
func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool,
3098
        error) {
16✔
3099

16✔
3100
        var (
16✔
3101
                updateTime time.Time
16✔
3102
                exists     bool
16✔
3103
        )
16✔
3104

16✔
3105
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
32✔
3106
                // First grab the nodes bucket which stores the mapping from
16✔
3107
                // pubKey to node information.
16✔
3108
                nodes := tx.ReadBucket(nodeBucket)
16✔
3109
                if nodes == nil {
16✔
3110
                        return ErrGraphNotFound
×
3111
                }
×
3112

3113
                // If a key for this serialized public key isn't found, we can
3114
                // exit early.
3115
                nodeBytes := nodes.Get(nodePub[:])
16✔
3116
                if nodeBytes == nil {
19✔
3117
                        exists = false
3✔
3118
                        return nil
3✔
3119
                }
3✔
3120

3121
                // Otherwise we continue on to obtain the time stamp
3122
                // representing the last time the data for this node was
3123
                // updated.
3124
                nodeReader := bytes.NewReader(nodeBytes)
13✔
3125
                node, err := deserializeLightningNode(nodeReader)
13✔
3126
                if err != nil {
13✔
3127
                        return err
×
3128
                }
×
3129

3130
                exists = true
13✔
3131
                updateTime = node.LastUpdate
13✔
3132
                return nil
13✔
3133
        }, func() {
16✔
3134
                updateTime = time.Time{}
16✔
3135
                exists = false
16✔
3136
        })
16✔
3137
        if err != nil {
16✔
3138
                return time.Time{}, exists, err
×
3139
        }
×
3140

3141
        return updateTime, exists, nil
16✔
3142
}
3143

3144
// nodeTraversal is used to traverse all channels of a node given by its
3145
// public key and passes channel information into the specified callback.
3146
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3147
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3148
                *models.ChannelEdgePolicy) error) error {
1,878✔
3149

1,878✔
3150
        traversal := func(tx kvdb.RTx) error {
3,756✔
3151
                edges := tx.ReadBucket(edgeBucket)
1,878✔
3152
                if edges == nil {
1,878✔
3153
                        return ErrGraphNotFound
×
3154
                }
×
3155
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1,878✔
3156
                if edgeIndex == nil {
1,878✔
3157
                        return ErrGraphNoEdgesFound
×
3158
                }
×
3159

3160
                // In order to reach all the edges for this node, we take
3161
                // advantage of the construction of the key-space within the
3162
                // edge bucket. The keys are stored in the form: pubKey ||
3163
                // chanID. Therefore, starting from a chanID of zero, we can
3164
                // scan forward in the bucket, grabbing all the edges for the
3165
                // node. Once the prefix no longer matches, then we know we're
3166
                // done.
3167
                var nodeStart [33 + 8]byte
1,878✔
3168
                copy(nodeStart[:], nodePub)
1,878✔
3169
                copy(nodeStart[33:], chanStart[:])
1,878✔
3170

1,878✔
3171
                // Starting from the key pubKey || 0, we seek forward in the
1,878✔
3172
                // bucket until the retrieved key no longer has the public key
1,878✔
3173
                // as its prefix. This indicates that we've stepped over into
1,878✔
3174
                // another node's edges, so we can terminate our scan.
1,878✔
3175
                edgeCursor := edges.ReadCursor()
1,878✔
3176
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
5,727✔
3177
                        // If the prefix still matches, the channel id is
3,849✔
3178
                        // returned in nodeEdge. Channel id is used to lookup
3,849✔
3179
                        // the node at the other end of the channel and both
3,849✔
3180
                        // edge policies.
3,849✔
3181
                        chanID := nodeEdge[33:]
3,849✔
3182
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3,849✔
3183
                        if err != nil {
3,849✔
3184
                                return err
×
3185
                        }
×
3186

3187
                        outgoingPolicy, err := fetchChanEdgePolicy(
3,849✔
3188
                                edges, chanID, nodePub,
3,849✔
3189
                        )
3,849✔
3190
                        if err != nil {
3,849✔
3191
                                return err
×
3192
                        }
×
3193

3194
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3,849✔
3195
                        if err != nil {
3,849✔
3196
                                return err
×
3197
                        }
×
3198

3199
                        incomingPolicy, err := fetchChanEdgePolicy(
3,849✔
3200
                                edges, chanID, otherNode[:],
3,849✔
3201
                        )
3,849✔
3202
                        if err != nil {
3,849✔
3203
                                return err
×
3204
                        }
×
3205

3206
                        // Finally, we execute the callback.
3207
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3,849✔
3208
                        if err != nil {
3,858✔
3209
                                return err
9✔
3210
                        }
9✔
3211
                }
3212

3213
                return nil
1,869✔
3214
        }
3215

3216
        // If no transaction was provided, then we'll create a new transaction
3217
        // to execute the transaction within.
3218
        if tx == nil {
1,887✔
3219
                return kvdb.View(db, traversal, func() {})
18✔
3220
        }
3221

3222
        // Otherwise, we re-use the existing transaction to execute the graph
3223
        // traversal.
3224
        return traversal(tx)
1,869✔
3225
}
3226

3227
// ForEachNodeChannel iterates through all channels of the given node,
3228
// executing the passed callback with an edge info structure and the policies
3229
// of each end of the channel. The first edge policy is the outgoing edge *to*
3230
// the connecting node, while the second is the incoming edge *from* the
3231
// connecting node. If the callback returns an error, then the iteration is
3232
// halted with the error propagated back up to the caller.
3233
//
3234
// Unknown policies are passed into the callback as nil values.
3235
func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex,
3236
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3237
                *models.ChannelEdgePolicy) error) error {
6✔
3238

6✔
3239
        return nodeTraversal(nil, nodePub[:], c.db, cb)
6✔
3240
}
6✔
3241

3242
// ForEachNodeChannelTx iterates through all channels of the given node,
3243
// executing the passed callback with an edge info structure and the policies
3244
// of each end of the channel. The first edge policy is the outgoing edge *to*
3245
// the connecting node, while the second is the incoming edge *from* the
3246
// connecting node. If the callback returns an error, then the iteration is
3247
// halted with the error propagated back up to the caller.
3248
//
3249
// Unknown policies are passed into the callback as nil values.
3250
//
3251
// If the caller wishes to re-use an existing boltdb transaction, then it
3252
// should be passed as the first argument.  Otherwise, the first argument should
3253
// be nil and a fresh transaction will be created to execute the graph
3254
// traversal.
3255
func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx,
3256
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3257
                *models.ChannelEdgePolicy,
3258
                *models.ChannelEdgePolicy) error) error {
998✔
3259

998✔
3260
        return nodeTraversal(tx, nodePub[:], c.db, cb)
998✔
3261
}
998✔
3262

3263
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3264
// the target node in the channel. This is useful when one knows the pubkey of
3265
// one of the nodes, and wishes to obtain the full LightningNode for the other
3266
// end of the channel.
3267
func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
3268
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
UNCOV
3269
        *models.LightningNode, error) {
×
UNCOV
3270

×
UNCOV
3271
        // Ensure that the node passed in is actually a member of the channel.
×
UNCOV
3272
        var targetNodeBytes [33]byte
×
UNCOV
3273
        switch {
×
UNCOV
3274
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
×
UNCOV
3275
                targetNodeBytes = channel.NodeKey2Bytes
×
UNCOV
3276
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
×
UNCOV
3277
                targetNodeBytes = channel.NodeKey1Bytes
×
3278
        default:
×
3279
                return nil, fmt.Errorf("node not participating in this channel")
×
3280
        }
3281

UNCOV
3282
        var targetNode *models.LightningNode
×
UNCOV
3283
        fetchNodeFunc := func(tx kvdb.RTx) error {
×
UNCOV
3284
                // First grab the nodes bucket which stores the mapping from
×
UNCOV
3285
                // pubKey to node information.
×
UNCOV
3286
                nodes := tx.ReadBucket(nodeBucket)
×
UNCOV
3287
                if nodes == nil {
×
3288
                        return ErrGraphNotFound
×
3289
                }
×
3290

UNCOV
3291
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
×
UNCOV
3292
                if err != nil {
×
3293
                        return err
×
3294
                }
×
3295

UNCOV
3296
                targetNode = &node
×
UNCOV
3297

×
UNCOV
3298
                return nil
×
3299
        }
3300

3301
        // If the transaction is nil, then we'll need to create a new one,
3302
        // otherwise we can use the existing db transaction.
UNCOV
3303
        var err error
×
UNCOV
3304
        if tx == nil {
×
3305
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3306
                        targetNode = nil
×
3307
                })
×
UNCOV
3308
        } else {
×
UNCOV
3309
                err = fetchNodeFunc(tx)
×
UNCOV
3310
        }
×
3311

UNCOV
3312
        return targetNode, err
×
3313
}
3314

3315
// computeEdgePolicyKeys is a helper function that can be used to compute the
3316
// keys used to index the channel edge policy info for the two nodes of the
3317
// edge. The keys for node 1 and node 2 are returned respectively.
3318
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
22✔
3319
        var (
22✔
3320
                node1Key [33 + 8]byte
22✔
3321
                node2Key [33 + 8]byte
22✔
3322
        )
22✔
3323

22✔
3324
        copy(node1Key[:], info.NodeKey1Bytes[:])
22✔
3325
        copy(node2Key[:], info.NodeKey2Bytes[:])
22✔
3326

22✔
3327
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
22✔
3328
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
22✔
3329

22✔
3330
        return node1Key[:], node2Key[:]
22✔
3331
}
22✔
3332

3333
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3334
// the channel identified by the funding outpoint. If the channel can't be
3335
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3336
// information for the channel itself is returned as well as two structs that
3337
// contain the routing policies for the channel in either direction.
3338
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3339
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3340
        *models.ChannelEdgePolicy, error) {
11✔
3341

11✔
3342
        var (
11✔
3343
                edgeInfo *models.ChannelEdgeInfo
11✔
3344
                policy1  *models.ChannelEdgePolicy
11✔
3345
                policy2  *models.ChannelEdgePolicy
11✔
3346
        )
11✔
3347

11✔
3348
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
3349
                // First, grab the node bucket. This will be used to populate
11✔
3350
                // the Node pointers in each edge read from disk.
11✔
3351
                nodes := tx.ReadBucket(nodeBucket)
11✔
3352
                if nodes == nil {
11✔
3353
                        return ErrGraphNotFound
×
3354
                }
×
3355

3356
                // Next, grab the edge bucket which stores the edges, and also
3357
                // the index itself so we can group the directed edges together
3358
                // logically.
3359
                edges := tx.ReadBucket(edgeBucket)
11✔
3360
                if edges == nil {
11✔
3361
                        return ErrGraphNoEdgesFound
×
3362
                }
×
3363
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
3364
                if edgeIndex == nil {
11✔
3365
                        return ErrGraphNoEdgesFound
×
3366
                }
×
3367

3368
                // If the channel's outpoint doesn't exist within the outpoint
3369
                // index, then the edge does not exist.
3370
                chanIndex := edges.NestedReadBucket(channelPointBucket)
11✔
3371
                if chanIndex == nil {
11✔
3372
                        return ErrGraphNoEdgesFound
×
3373
                }
×
3374
                var b bytes.Buffer
11✔
3375
                if err := WriteOutpoint(&b, op); err != nil {
11✔
3376
                        return err
×
3377
                }
×
3378
                chanID := chanIndex.Get(b.Bytes())
11✔
3379
                if chanID == nil {
21✔
3380
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
10✔
3381
                }
10✔
3382

3383
                // If the channel is found to exists, then we'll first retrieve
3384
                // the general information for the channel.
3385
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
1✔
3386
                if err != nil {
1✔
3387
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3388
                }
×
3389
                edgeInfo = &edge
1✔
3390

1✔
3391
                // Once we have the information about the channels' parameters,
1✔
3392
                // we'll fetch the routing policies for each for the directed
1✔
3393
                // edges.
1✔
3394
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
1✔
3395
                if err != nil {
1✔
3396
                        return fmt.Errorf("failed to find policy: %w", err)
×
3397
                }
×
3398

3399
                policy1 = e1
1✔
3400
                policy2 = e2
1✔
3401
                return nil
1✔
3402
        }, func() {
11✔
3403
                edgeInfo = nil
11✔
3404
                policy1 = nil
11✔
3405
                policy2 = nil
11✔
3406
        })
11✔
3407
        if err != nil {
21✔
3408
                return nil, nil, nil, err
10✔
3409
        }
10✔
3410

3411
        return edgeInfo, policy1, policy2, nil
1✔
3412
}
3413

3414
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3415
// channel identified by the channel ID. If the channel can't be found, then
3416
// ErrEdgeNotFound is returned. A struct which houses the general information
3417
// for the channel itself is returned as well as two structs that contain the
3418
// routing policies for the channel in either direction.
3419
//
3420
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3421
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3422
// the ChannelEdgeInfo will only include the public keys of each node.
3423
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (
3424
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3425
        *models.ChannelEdgePolicy, error) {
25✔
3426

25✔
3427
        var (
25✔
3428
                edgeInfo  *models.ChannelEdgeInfo
25✔
3429
                policy1   *models.ChannelEdgePolicy
25✔
3430
                policy2   *models.ChannelEdgePolicy
25✔
3431
                channelID [8]byte
25✔
3432
        )
25✔
3433

25✔
3434
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
50✔
3435
                // First, grab the node bucket. This will be used to populate
25✔
3436
                // the Node pointers in each edge read from disk.
25✔
3437
                nodes := tx.ReadBucket(nodeBucket)
25✔
3438
                if nodes == nil {
25✔
3439
                        return ErrGraphNotFound
×
3440
                }
×
3441

3442
                // Next, grab the edge bucket which stores the edges, and also
3443
                // the index itself so we can group the directed edges together
3444
                // logically.
3445
                edges := tx.ReadBucket(edgeBucket)
25✔
3446
                if edges == nil {
25✔
3447
                        return ErrGraphNoEdgesFound
×
3448
                }
×
3449
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
25✔
3450
                if edgeIndex == nil {
25✔
3451
                        return ErrGraphNoEdgesFound
×
3452
                }
×
3453

3454
                byteOrder.PutUint64(channelID[:], chanID)
25✔
3455

25✔
3456
                // Now, attempt to fetch edge.
25✔
3457
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
25✔
3458

25✔
3459
                // If it doesn't exist, we'll quickly check our zombie index to
25✔
3460
                // see if we've previously marked it as so.
25✔
3461
                if errors.Is(err, ErrEdgeNotFound) {
26✔
3462
                        // If the zombie index doesn't exist, or the edge is not
1✔
3463
                        // marked as a zombie within it, then we'll return the
1✔
3464
                        // original ErrEdgeNotFound error.
1✔
3465
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
1✔
3466
                        if zombieIndex == nil {
1✔
3467
                                return ErrEdgeNotFound
×
3468
                        }
×
3469

3470
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
1✔
3471
                                zombieIndex, chanID,
1✔
3472
                        )
1✔
3473
                        if !isZombie {
1✔
UNCOV
3474
                                return ErrEdgeNotFound
×
UNCOV
3475
                        }
×
3476

3477
                        // Otherwise, the edge is marked as a zombie, so we'll
3478
                        // populate the edge info with the public keys of each
3479
                        // party as this is the only information we have about
3480
                        // it and return an error signaling so.
3481
                        edgeInfo = &models.ChannelEdgeInfo{
1✔
3482
                                NodeKey1Bytes: pubKey1,
1✔
3483
                                NodeKey2Bytes: pubKey2,
1✔
3484
                        }
1✔
3485
                        return ErrZombieEdge
1✔
3486
                }
3487

3488
                // Otherwise, we'll just return the error if any.
3489
                if err != nil {
24✔
3490
                        return err
×
3491
                }
×
3492

3493
                edgeInfo = &edge
24✔
3494

24✔
3495
                // Then we'll attempt to fetch the accompanying policies of this
24✔
3496
                // edge.
24✔
3497
                e1, e2, err := fetchChanEdgePolicies(
24✔
3498
                        edgeIndex, edges, channelID[:],
24✔
3499
                )
24✔
3500
                if err != nil {
24✔
3501
                        return err
×
3502
                }
×
3503

3504
                policy1 = e1
24✔
3505
                policy2 = e2
24✔
3506
                return nil
24✔
3507
        }, func() {
25✔
3508
                edgeInfo = nil
25✔
3509
                policy1 = nil
25✔
3510
                policy2 = nil
25✔
3511
        })
25✔
3512
        if err == ErrZombieEdge {
26✔
3513
                return edgeInfo, nil, nil, err
1✔
3514
        }
1✔
3515
        if err != nil {
24✔
UNCOV
3516
                return nil, nil, nil, err
×
UNCOV
3517
        }
×
3518

3519
        return edgeInfo, policy1, policy2, nil
24✔
3520
}
3521

3522
// IsPublicNode is a helper method that determines whether the node with the
3523
// given public key is seen as a public node in the graph from the graph's
3524
// source node's point of view.
3525
func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
13✔
3526
        var nodeIsPublic bool
13✔
3527
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
26✔
3528
                nodes := tx.ReadBucket(nodeBucket)
13✔
3529
                if nodes == nil {
13✔
3530
                        return ErrGraphNodesNotFound
×
3531
                }
×
3532
                ourPubKey := nodes.Get(sourceKey)
13✔
3533
                if ourPubKey == nil {
13✔
3534
                        return ErrSourceNodeNotSet
×
3535
                }
×
3536
                node, err := fetchLightningNode(nodes, pubKey[:])
13✔
3537
                if err != nil {
13✔
3538
                        return err
×
3539
                }
×
3540

3541
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
13✔
3542
                return err
13✔
3543
        }, func() {
13✔
3544
                nodeIsPublic = false
13✔
3545
        })
13✔
3546
        if err != nil {
13✔
3547
                return false, err
×
3548
        }
×
3549

3550
        return nodeIsPublic, nil
13✔
3551
}
3552

3553
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3554
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
46✔
3555
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
46✔
3556
        if err != nil {
46✔
3557
                return nil, err
×
3558
        }
×
3559

3560
        // With the witness script generated, we'll now turn it into a p2wsh
3561
        // script:
3562
        //  * OP_0 <sha256(script)>
3563
        bldr := txscript.NewScriptBuilder(
46✔
3564
                txscript.WithScriptAllocSize(input.P2WSHSize),
46✔
3565
        )
46✔
3566
        bldr.AddOp(txscript.OP_0)
46✔
3567
        scriptHash := sha256.Sum256(witnessScript)
46✔
3568
        bldr.AddData(scriptHash[:])
46✔
3569

46✔
3570
        return bldr.Script()
46✔
3571
}
3572

3573
// EdgePoint couples the outpoint of a channel with the funding script that it
3574
// creates. The FilteredChainView will use this to watch for spends of this
3575
// edge point on chain. We require both of these values as depending on the
3576
// concrete implementation, either the pkScript, or the out point will be used.
3577
type EdgePoint struct {
3578
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3579
        FundingPkScript []byte
3580

3581
        // OutPoint is the outpoint of the target channel.
3582
        OutPoint wire.OutPoint
3583
}
3584

3585
// String returns a human readable version of the target EdgePoint. We return
3586
// the outpoint directly as it is enough to uniquely identify the edge point.
3587
func (e *EdgePoint) String() string {
×
3588
        return e.OutPoint.String()
×
3589
}
×
3590

3591
// ChannelView returns the verifiable edge information for each active channel
3592
// within the known channel graph. The set of UTXO's (along with their scripts)
3593
// returned are the ones that need to be watched on chain to detect channel
3594
// closes on the resident blockchain.
3595
func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) {
23✔
3596
        var edgePoints []EdgePoint
23✔
3597
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
46✔
3598
                // We're going to iterate over the entire channel index, so
23✔
3599
                // we'll need to fetch the edgeBucket to get to the index as
23✔
3600
                // it's a sub-bucket.
23✔
3601
                edges := tx.ReadBucket(edgeBucket)
23✔
3602
                if edges == nil {
23✔
3603
                        return ErrGraphNoEdgesFound
×
3604
                }
×
3605
                chanIndex := edges.NestedReadBucket(channelPointBucket)
23✔
3606
                if chanIndex == nil {
23✔
3607
                        return ErrGraphNoEdgesFound
×
3608
                }
×
3609
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
23✔
3610
                if edgeIndex == nil {
23✔
3611
                        return ErrGraphNoEdgesFound
×
3612
                }
×
3613

3614
                // Once we have the proper bucket, we'll range over each key
3615
                // (which is the channel point for the channel) and decode it,
3616
                // accumulating each entry.
3617
                return chanIndex.ForEach(
23✔
3618
                        func(chanPointBytes, chanID []byte) error {
65✔
3619
                                chanPointReader := bytes.NewReader(
42✔
3620
                                        chanPointBytes,
42✔
3621
                                )
42✔
3622

42✔
3623
                                var chanPoint wire.OutPoint
42✔
3624
                                err := ReadOutpoint(chanPointReader, &chanPoint)
42✔
3625
                                if err != nil {
42✔
3626
                                        return err
×
3627
                                }
×
3628

3629
                                edgeInfo, err := fetchChanEdgeInfo(
42✔
3630
                                        edgeIndex, chanID,
42✔
3631
                                )
42✔
3632
                                if err != nil {
42✔
3633
                                        return err
×
3634
                                }
×
3635

3636
                                pkScript, err := genMultiSigP2WSH(
42✔
3637
                                        edgeInfo.BitcoinKey1Bytes[:],
42✔
3638
                                        edgeInfo.BitcoinKey2Bytes[:],
42✔
3639
                                )
42✔
3640
                                if err != nil {
42✔
3641
                                        return err
×
3642
                                }
×
3643

3644
                                edgePoints = append(edgePoints, EdgePoint{
42✔
3645
                                        FundingPkScript: pkScript,
42✔
3646
                                        OutPoint:        chanPoint,
42✔
3647
                                })
42✔
3648

42✔
3649
                                return nil
42✔
3650
                        },
3651
                )
3652
        }, func() {
23✔
3653
                edgePoints = nil
23✔
3654
        }); err != nil {
23✔
3655
                return nil, err
×
3656
        }
×
3657

3658
        return edgePoints, nil
23✔
3659
}
3660

3661
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3662
// zombie. This method is used on an ad-hoc basis, when channels need to be
3663
// marked as zombies outside the normal pruning cycle.
3664
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
3665
        pubKey1, pubKey2 [33]byte) error {
135✔
3666

135✔
3667
        c.cacheMu.Lock()
135✔
3668
        defer c.cacheMu.Unlock()
135✔
3669

135✔
3670
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
270✔
3671
                edges := tx.ReadWriteBucket(edgeBucket)
135✔
3672
                if edges == nil {
135✔
3673
                        return ErrGraphNoEdgesFound
×
3674
                }
×
3675
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
135✔
3676
                if err != nil {
135✔
3677
                        return fmt.Errorf("unable to create zombie "+
×
3678
                                "bucket: %w", err)
×
3679
                }
×
3680

3681
                if c.graphCache != nil {
270✔
3682
                        c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
135✔
3683
                }
135✔
3684

3685
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
135✔
3686
        })
3687
        if err != nil {
135✔
3688
                return err
×
3689
        }
×
3690

3691
        c.rejectCache.remove(chanID)
135✔
3692
        c.chanCache.remove(chanID)
135✔
3693

135✔
3694
        return nil
135✔
3695
}
3696

3697
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3698
// keys should represent the node public keys of the two parties involved in the
3699
// edge.
3700
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3701
        pubKey2 [33]byte) error {
159✔
3702

159✔
3703
        var k [8]byte
159✔
3704
        byteOrder.PutUint64(k[:], chanID)
159✔
3705

159✔
3706
        var v [66]byte
159✔
3707
        copy(v[:33], pubKey1[:])
159✔
3708
        copy(v[33:], pubKey2[:])
159✔
3709

159✔
3710
        return zombieIndex.Put(k[:], v[:])
159✔
3711
}
159✔
3712

3713
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3714
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
3715
        c.cacheMu.Lock()
2✔
3716
        defer c.cacheMu.Unlock()
2✔
3717

2✔
3718
        return c.markEdgeLiveUnsafe(nil, chanID)
2✔
3719
}
2✔
3720

3721
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3722
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3723
// case a new transaction will be created.
3724
//
3725
// NOTE: this method MUST only be called if the cacheMu has already been
3726
// acquired.
3727
func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
18✔
3728
        dbFn := func(tx kvdb.RwTx) error {
36✔
3729
                edges := tx.ReadWriteBucket(edgeBucket)
18✔
3730
                if edges == nil {
18✔
3731
                        return ErrGraphNoEdgesFound
×
3732
                }
×
3733
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
18✔
3734
                if zombieIndex == nil {
18✔
3735
                        return nil
×
3736
                }
×
3737

3738
                var k [8]byte
18✔
3739
                byteOrder.PutUint64(k[:], chanID)
18✔
3740

18✔
3741
                if len(zombieIndex.Get(k[:])) == 0 {
19✔
3742
                        return ErrZombieEdgeNotFound
1✔
3743
                }
1✔
3744

3745
                return zombieIndex.Delete(k[:])
17✔
3746
        }
3747

3748
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3749
        // the existing transaction
3750
        var err error
18✔
3751
        if tx == nil {
20✔
3752
                err = kvdb.Update(c.db, dbFn, func() {})
4✔
3753
        } else {
16✔
3754
                err = dbFn(tx)
16✔
3755
        }
16✔
3756
        if err != nil {
19✔
3757
                return err
1✔
3758
        }
1✔
3759

3760
        c.rejectCache.remove(chanID)
17✔
3761
        c.chanCache.remove(chanID)
17✔
3762

17✔
3763
        // We need to add the channel back into our graph cache, otherwise we
17✔
3764
        // won't use it for path finding.
17✔
3765
        if c.graphCache != nil {
34✔
3766
                edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
17✔
3767
                if err != nil {
17✔
3768
                        return err
×
3769
                }
×
3770

3771
                for _, edgeInfo := range edgeInfos {
17✔
3772
                        c.graphCache.AddChannel(
×
3773
                                edgeInfo.Info, edgeInfo.Policy1,
×
3774
                                edgeInfo.Policy2,
×
3775
                        )
×
3776
                }
×
3777
        }
3778

3779
        return nil
17✔
3780
}
3781

3782
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3783
// zombie, then the two node public keys corresponding to this edge are also
3784
// returned.
3785
func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
5✔
3786
        var (
5✔
3787
                isZombie         bool
5✔
3788
                pubKey1, pubKey2 [33]byte
5✔
3789
        )
5✔
3790

5✔
3791
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
3792
                edges := tx.ReadBucket(edgeBucket)
5✔
3793
                if edges == nil {
5✔
3794
                        return ErrGraphNoEdgesFound
×
3795
                }
×
3796
                zombieIndex := edges.NestedReadBucket(zombieBucket)
5✔
3797
                if zombieIndex == nil {
5✔
3798
                        return nil
×
3799
                }
×
3800

3801
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
5✔
3802
                return nil
5✔
3803
        }, func() {
5✔
3804
                isZombie = false
5✔
3805
                pubKey1 = [33]byte{}
5✔
3806
                pubKey2 = [33]byte{}
5✔
3807
        })
5✔
3808
        if err != nil {
5✔
3809
                return false, [33]byte{}, [33]byte{}
×
3810
        }
×
3811

3812
        return isZombie, pubKey1, pubKey2
5✔
3813
}
3814

3815
// isZombieEdge returns whether an entry exists for the given channel in the
3816
// zombie index. If an entry exists, then the two node public keys corresponding
3817
// to this edge are also returned.
3818
func isZombieEdge(zombieIndex kvdb.RBucket,
3819
        chanID uint64) (bool, [33]byte, [33]byte) {
168✔
3820

168✔
3821
        var k [8]byte
168✔
3822
        byteOrder.PutUint64(k[:], chanID)
168✔
3823

168✔
3824
        v := zombieIndex.Get(k[:])
168✔
3825
        if v == nil {
255✔
3826
                return false, [33]byte{}, [33]byte{}
87✔
3827
        }
87✔
3828

3829
        var pubKey1, pubKey2 [33]byte
81✔
3830
        copy(pubKey1[:], v[:33])
81✔
3831
        copy(pubKey2[:], v[33:])
81✔
3832

81✔
3833
        return true, pubKey1, pubKey2
81✔
3834
}
3835

3836
// NumZombies returns the current number of zombie channels in the graph.
3837
func (c *ChannelGraph) NumZombies() (uint64, error) {
4✔
3838
        var numZombies uint64
4✔
3839
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
3840
                edges := tx.ReadBucket(edgeBucket)
4✔
3841
                if edges == nil {
4✔
3842
                        return nil
×
3843
                }
×
3844
                zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3845
                if zombieIndex == nil {
4✔
3846
                        return nil
×
3847
                }
×
3848

3849
                return zombieIndex.ForEach(func(_, _ []byte) error {
6✔
3850
                        numZombies++
2✔
3851
                        return nil
2✔
3852
                })
2✔
3853
        }, func() {
4✔
3854
                numZombies = 0
4✔
3855
        })
4✔
3856
        if err != nil {
4✔
3857
                return 0, err
×
3858
        }
×
3859

3860
        return numZombies, nil
4✔
3861
}
3862

3863
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3864
// that we can ignore channel announcements that we know to be closed without
3865
// having to validate them and fetch a block.
3866
func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error {
1✔
3867
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
3868
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
1✔
3869
                if err != nil {
1✔
3870
                        return err
×
3871
                }
×
3872

3873
                var k [8]byte
1✔
3874
                byteOrder.PutUint64(k[:], scid.ToUint64())
1✔
3875

1✔
3876
                return closedScids.Put(k[:], []byte{})
1✔
3877
        }, func() {})
1✔
3878
}
3879

3880
// IsClosedScid checks whether a channel identified by the passed in scid is
3881
// closed. This helps avoid having to perform expensive validation checks.
3882
// TODO: Add an LRU cache to cut down on disc reads.
3883
func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
2✔
3884
        var isClosed bool
2✔
3885
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
3886
                closedScids := tx.ReadBucket(closedScidBucket)
2✔
3887
                if closedScids == nil {
2✔
3888
                        return ErrClosedScidsNotFound
×
3889
                }
×
3890

3891
                var k [8]byte
2✔
3892
                byteOrder.PutUint64(k[:], scid.ToUint64())
2✔
3893

2✔
3894
                if closedScids.Get(k[:]) != nil {
3✔
3895
                        isClosed = true
1✔
3896
                        return nil
1✔
3897
                }
1✔
3898

3899
                return nil
1✔
3900
        }, func() {
2✔
3901
                isClosed = false
2✔
3902
        })
2✔
3903
        if err != nil {
2✔
3904
                return false, err
×
3905
        }
×
3906

3907
        return isClosed, nil
2✔
3908
}
3909

3910
func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl
3911
        updateIndex kvdb.RwBucket, node *models.LightningNode) error {
991✔
3912

991✔
3913
        var (
991✔
3914
                scratch [16]byte
991✔
3915
                b       bytes.Buffer
991✔
3916
        )
991✔
3917

991✔
3918
        pub, err := node.PubKey()
991✔
3919
        if err != nil {
991✔
3920
                return err
×
3921
        }
×
3922
        nodePub := pub.SerializeCompressed()
991✔
3923

991✔
3924
        // If the node has the update time set, write it, else write 0.
991✔
3925
        updateUnix := uint64(0)
991✔
3926
        if node.LastUpdate.Unix() > 0 {
1,854✔
3927
                updateUnix = uint64(node.LastUpdate.Unix())
863✔
3928
        }
863✔
3929

3930
        byteOrder.PutUint64(scratch[:8], updateUnix)
991✔
3931
        if _, err := b.Write(scratch[:8]); err != nil {
991✔
3932
                return err
×
3933
        }
×
3934

3935
        if _, err := b.Write(nodePub); err != nil {
991✔
3936
                return err
×
3937
        }
×
3938

3939
        // If we got a node announcement for this node, we will have the rest
3940
        // of the data available. If not we don't have more data to write.
3941
        if !node.HaveNodeAnnouncement {
1,066✔
3942
                // Write HaveNodeAnnouncement=0.
75✔
3943
                byteOrder.PutUint16(scratch[:2], 0)
75✔
3944
                if _, err := b.Write(scratch[:2]); err != nil {
75✔
3945
                        return err
×
3946
                }
×
3947

3948
                return nodeBucket.Put(nodePub, b.Bytes())
75✔
3949
        }
3950

3951
        // Write HaveNodeAnnouncement=1.
3952
        byteOrder.PutUint16(scratch[:2], 1)
916✔
3953
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3954
                return err
×
3955
        }
×
3956

3957
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
916✔
3958
                return err
×
3959
        }
×
3960
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
916✔
3961
                return err
×
3962
        }
×
3963
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
916✔
3964
                return err
×
3965
        }
×
3966

3967
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
916✔
3968
                return err
×
3969
        }
×
3970

3971
        if err := node.Features.Encode(&b); err != nil {
916✔
3972
                return err
×
3973
        }
×
3974

3975
        numAddresses := uint16(len(node.Addresses))
916✔
3976
        byteOrder.PutUint16(scratch[:2], numAddresses)
916✔
3977
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3978
                return err
×
3979
        }
×
3980

3981
        for _, address := range node.Addresses {
2,058✔
3982
                if err := SerializeAddr(&b, address); err != nil {
1,142✔
3983
                        return err
×
3984
                }
×
3985
        }
3986

3987
        sigLen := len(node.AuthSigBytes)
916✔
3988
        if sigLen > 80 {
916✔
3989
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
3990
                        sigLen)
×
3991
        }
×
3992

3993
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
916✔
3994
        if err != nil {
916✔
3995
                return err
×
3996
        }
×
3997

3998
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
916✔
3999
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
4000
        }
×
4001
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
916✔
4002
        if err != nil {
916✔
4003
                return err
×
4004
        }
×
4005

4006
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
916✔
4007
                return err
×
4008
        }
×
4009

4010
        // With the alias bucket updated, we'll now update the index that
4011
        // tracks the time series of node updates.
4012
        var indexKey [8 + 33]byte
916✔
4013
        byteOrder.PutUint64(indexKey[:8], updateUnix)
916✔
4014
        copy(indexKey[8:], nodePub)
916✔
4015

916✔
4016
        // If there was already an old index entry for this node, then we'll
916✔
4017
        // delete the old one before we write the new entry.
916✔
4018
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
1,020✔
4019
                // Extract out the old update time to we can reconstruct the
104✔
4020
                // prior index key to delete it from the index.
104✔
4021
                oldUpdateTime := nodeBytes[:8]
104✔
4022

104✔
4023
                var oldIndexKey [8 + 33]byte
104✔
4024
                copy(oldIndexKey[:8], oldUpdateTime)
104✔
4025
                copy(oldIndexKey[8:], nodePub)
104✔
4026

104✔
4027
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
104✔
4028
                        return err
×
4029
                }
×
4030
        }
4031

4032
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
916✔
4033
                return err
×
4034
        }
×
4035

4036
        return nodeBucket.Put(nodePub, b.Bytes())
916✔
4037
}
4038

4039
func fetchLightningNode(nodeBucket kvdb.RBucket,
4040
        nodePub []byte) (models.LightningNode, error) {
3,592✔
4041

3,592✔
4042
        nodeBytes := nodeBucket.Get(nodePub)
3,592✔
4043
        if nodeBytes == nil {
3,666✔
4044
                return models.LightningNode{}, ErrGraphNodeNotFound
74✔
4045
        }
74✔
4046

4047
        nodeReader := bytes.NewReader(nodeBytes)
3,518✔
4048
        return deserializeLightningNode(nodeReader)
3,518✔
4049
}
4050

4051
func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) {
120✔
4052
        // Always populate a feature vector, even if we don't have a node
120✔
4053
        // announcement and short circuit below.
120✔
4054
        node := newGraphCacheNode(
120✔
4055
                route.Vertex{},
120✔
4056
                lnwire.EmptyFeatureVector(),
120✔
4057
        )
120✔
4058

120✔
4059
        var nodeScratch [8]byte
120✔
4060

120✔
4061
        // Skip ahead:
120✔
4062
        // - LastUpdate (8 bytes)
120✔
4063
        if _, err := r.Read(nodeScratch[:]); err != nil {
120✔
4064
                return nil, err
×
4065
        }
×
4066

4067
        if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil {
120✔
4068
                return nil, err
×
4069
        }
×
4070

4071
        // Read the node announcement flag.
4072
        if _, err := r.Read(nodeScratch[:2]); err != nil {
120✔
4073
                return nil, err
×
4074
        }
×
4075
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
120✔
4076

120✔
4077
        // The rest of the data is optional, and will only be there if we got a
120✔
4078
        // node announcement for this node.
120✔
4079
        if hasNodeAnn == 0 {
120✔
UNCOV
4080
                return node, nil
×
UNCOV
4081
        }
×
4082

4083
        // We did get a node announcement for this node, so we'll have the rest
4084
        // of the data available.
4085
        var rgb uint8
120✔
4086
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4087
                return nil, err
×
4088
        }
×
4089
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4090
                return nil, err
×
4091
        }
×
4092
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4093
                return nil, err
×
4094
        }
×
4095

4096
        if _, err := wire.ReadVarString(r, 0); err != nil {
120✔
4097
                return nil, err
×
4098
        }
×
4099

4100
        if err := node.features.Decode(r); err != nil {
120✔
4101
                return nil, err
×
4102
        }
×
4103

4104
        return node, nil
120✔
4105
}
4106

4107
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
8,476✔
4108
        var (
8,476✔
4109
                node    models.LightningNode
8,476✔
4110
                scratch [8]byte
8,476✔
4111
                err     error
8,476✔
4112
        )
8,476✔
4113

8,476✔
4114
        // Always populate a feature vector, even if we don't have a node
8,476✔
4115
        // announcement and short circuit below.
8,476✔
4116
        node.Features = lnwire.EmptyFeatureVector()
8,476✔
4117

8,476✔
4118
        if _, err := r.Read(scratch[:]); err != nil {
8,476✔
4119
                return models.LightningNode{}, err
×
4120
        }
×
4121

4122
        unix := int64(byteOrder.Uint64(scratch[:]))
8,476✔
4123
        node.LastUpdate = time.Unix(unix, 0)
8,476✔
4124

8,476✔
4125
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
8,476✔
4126
                return models.LightningNode{}, err
×
4127
        }
×
4128

4129
        if _, err := r.Read(scratch[:2]); err != nil {
8,476✔
4130
                return models.LightningNode{}, err
×
4131
        }
×
4132

4133
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
8,476✔
4134
        if hasNodeAnn == 1 {
16,821✔
4135
                node.HaveNodeAnnouncement = true
8,345✔
4136
        } else {
8,476✔
4137
                node.HaveNodeAnnouncement = false
131✔
4138
        }
131✔
4139

4140
        // The rest of the data is optional, and will only be there if we got a
4141
        // node announcement for this node.
4142
        if !node.HaveNodeAnnouncement {
8,607✔
4143
                return node, nil
131✔
4144
        }
131✔
4145

4146
        // We did get a node announcement for this node, so we'll have the rest
4147
        // of the data available.
4148
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
8,345✔
4149
                return models.LightningNode{}, err
×
4150
        }
×
4151
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
8,345✔
4152
                return models.LightningNode{}, err
×
4153
        }
×
4154
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
8,345✔
4155
                return models.LightningNode{}, err
×
4156
        }
×
4157

4158
        node.Alias, err = wire.ReadVarString(r, 0)
8,345✔
4159
        if err != nil {
8,345✔
4160
                return models.LightningNode{}, err
×
4161
        }
×
4162

4163
        err = node.Features.Decode(r)
8,345✔
4164
        if err != nil {
8,345✔
4165
                return models.LightningNode{}, err
×
4166
        }
×
4167

4168
        if _, err := r.Read(scratch[:2]); err != nil {
8,345✔
4169
                return models.LightningNode{}, err
×
4170
        }
×
4171
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
8,345✔
4172

8,345✔
4173
        var addresses []net.Addr
8,345✔
4174
        for i := 0; i < numAddresses; i++ {
18,902✔
4175
                address, err := DeserializeAddr(r)
10,557✔
4176
                if err != nil {
10,557✔
4177
                        return models.LightningNode{}, err
×
4178
                }
×
4179
                addresses = append(addresses, address)
10,557✔
4180
        }
4181
        node.Addresses = addresses
8,345✔
4182

8,345✔
4183
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,345✔
4184
        if err != nil {
8,345✔
4185
                return models.LightningNode{}, err
×
4186
        }
×
4187

4188
        // We'll try and see if there are any opaque bytes left, if not, then
4189
        // we'll ignore the EOF error and return the node as is.
4190
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
8,345✔
4191
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,345✔
4192
        )
8,345✔
4193
        switch {
8,345✔
4194
        case err == io.ErrUnexpectedEOF:
×
4195
        case err == io.EOF:
×
4196
        case err != nil:
×
4197
                return models.LightningNode{}, err
×
4198
        }
4199

4200
        return node, nil
8,345✔
4201
}
4202

4203
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4204
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
1,472✔
4205

1,472✔
4206
        var b bytes.Buffer
1,472✔
4207

1,472✔
4208
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
1,472✔
4209
                return err
×
4210
        }
×
4211
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
1,472✔
4212
                return err
×
4213
        }
×
4214
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
1,472✔
4215
                return err
×
4216
        }
×
4217
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
1,472✔
4218
                return err
×
4219
        }
×
4220

4221
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
1,472✔
4222
                return err
×
4223
        }
×
4224

4225
        authProof := edgeInfo.AuthProof
1,472✔
4226
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
1,472✔
4227
        if authProof != nil {
2,861✔
4228
                nodeSig1 = authProof.NodeSig1Bytes
1,389✔
4229
                nodeSig2 = authProof.NodeSig2Bytes
1,389✔
4230
                bitcoinSig1 = authProof.BitcoinSig1Bytes
1,389✔
4231
                bitcoinSig2 = authProof.BitcoinSig2Bytes
1,389✔
4232
        }
1,389✔
4233

4234
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
1,472✔
4235
                return err
×
4236
        }
×
4237
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
1,472✔
4238
                return err
×
4239
        }
×
4240
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
1,472✔
4241
                return err
×
4242
        }
×
4243
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
1,472✔
4244
                return err
×
4245
        }
×
4246

4247
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
1,472✔
4248
                return err
×
4249
        }
×
4250
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
1,472✔
4251
        if err != nil {
1,472✔
4252
                return err
×
4253
        }
×
4254
        if _, err := b.Write(chanID[:]); err != nil {
1,472✔
4255
                return err
×
4256
        }
×
4257
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
1,472✔
4258
                return err
×
4259
        }
×
4260

4261
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
1,472✔
4262
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4263
        }
×
4264
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
1,472✔
4265
        if err != nil {
1,472✔
4266
                return err
×
4267
        }
×
4268

4269
        return edgeIndex.Put(chanID[:], b.Bytes())
1,472✔
4270
}
4271

4272
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4273
        chanID []byte) (models.ChannelEdgeInfo, error) {
4,175✔
4274

4,175✔
4275
        edgeInfoBytes := edgeIndex.Get(chanID)
4,175✔
4276
        if edgeInfoBytes == nil {
4,257✔
4277
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
82✔
4278
        }
82✔
4279

4280
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
4,093✔
4281
        return deserializeChanEdgeInfo(edgeInfoReader)
4,093✔
4282
}
4283

4284
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
4,713✔
4285
        var (
4,713✔
4286
                err      error
4,713✔
4287
                edgeInfo models.ChannelEdgeInfo
4,713✔
4288
        )
4,713✔
4289

4,713✔
4290
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
4,713✔
4291
                return models.ChannelEdgeInfo{}, err
×
4292
        }
×
4293
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
4,713✔
4294
                return models.ChannelEdgeInfo{}, err
×
4295
        }
×
4296
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
4,713✔
4297
                return models.ChannelEdgeInfo{}, err
×
4298
        }
×
4299
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
4,713✔
4300
                return models.ChannelEdgeInfo{}, err
×
4301
        }
×
4302

4303
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
4,713✔
4304
        if err != nil {
4,713✔
4305
                return models.ChannelEdgeInfo{}, err
×
4306
        }
×
4307

4308
        proof := &models.ChannelAuthProof{}
4,713✔
4309

4,713✔
4310
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,713✔
4311
        if err != nil {
4,713✔
4312
                return models.ChannelEdgeInfo{}, err
×
4313
        }
×
4314
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,713✔
4315
        if err != nil {
4,713✔
4316
                return models.ChannelEdgeInfo{}, err
×
4317
        }
×
4318
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,713✔
4319
        if err != nil {
4,713✔
4320
                return models.ChannelEdgeInfo{}, err
×
4321
        }
×
4322
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,713✔
4323
        if err != nil {
4,713✔
4324
                return models.ChannelEdgeInfo{}, err
×
4325
        }
×
4326

4327
        if !proof.IsEmpty() {
6,473✔
4328
                edgeInfo.AuthProof = proof
1,760✔
4329
        }
1,760✔
4330

4331
        edgeInfo.ChannelPoint = wire.OutPoint{}
4,713✔
4332
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
4,713✔
4333
                return models.ChannelEdgeInfo{}, err
×
4334
        }
×
4335
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
4,713✔
4336
                return models.ChannelEdgeInfo{}, err
×
4337
        }
×
4338
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
4,713✔
4339
                return models.ChannelEdgeInfo{}, err
×
4340
        }
×
4341

4342
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
4,713✔
4343
                return models.ChannelEdgeInfo{}, err
×
4344
        }
×
4345

4346
        // We'll try and see if there are any opaque bytes left, if not, then
4347
        // we'll ignore the EOF error and return the edge as is.
4348
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
4,713✔
4349
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
4,713✔
4350
        )
4,713✔
4351
        switch {
4,713✔
4352
        case err == io.ErrUnexpectedEOF:
×
4353
        case err == io.EOF:
×
4354
        case err != nil:
×
4355
                return models.ChannelEdgeInfo{}, err
×
4356
        }
4357

4358
        return edgeInfo, nil
4,713✔
4359
}
4360

4361
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4362
        from, to []byte) error {
2,660✔
4363

2,660✔
4364
        var edgeKey [33 + 8]byte
2,660✔
4365
        copy(edgeKey[:], from)
2,660✔
4366
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
2,660✔
4367

2,660✔
4368
        var b bytes.Buffer
2,660✔
4369
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
2,660✔
4370
                return err
×
4371
        }
×
4372

4373
        // Before we write out the new edge, we'll create a new entry in the
4374
        // update index in order to keep it fresh.
4375
        updateUnix := uint64(edge.LastUpdate.Unix())
2,660✔
4376
        var indexKey [8 + 8]byte
2,660✔
4377
        byteOrder.PutUint64(indexKey[:8], updateUnix)
2,660✔
4378
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
2,660✔
4379

2,660✔
4380
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
2,660✔
4381
        if err != nil {
2,660✔
4382
                return err
×
4383
        }
×
4384

4385
        // If there was already an entry for this edge, then we'll need to
4386
        // delete the old one to ensure we don't leave around any after-images.
4387
        // An unknown policy value does not have a update time recorded, so
4388
        // it also does not need to be removed.
4389
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
2,660✔
4390
                !bytes.Equal(edgeBytes[:], unknownPolicy) {
2,684✔
4391

24✔
4392
                // In order to delete the old entry, we'll need to obtain the
24✔
4393
                // *prior* update time in order to delete it. To do this, we'll
24✔
4394
                // need to deserialize the existing policy within the database
24✔
4395
                // (now outdated by the new one), and delete its corresponding
24✔
4396
                // entry within the update index. We'll ignore any
24✔
4397
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
24✔
4398
                // the channel ID and update time to delete the entry.
24✔
4399
                // TODO(halseth): get rid of these invalid policies in a
24✔
4400
                // migration.
24✔
4401
                oldEdgePolicy, err := deserializeChanEdgePolicy(
24✔
4402
                        bytes.NewReader(edgeBytes),
24✔
4403
                )
24✔
4404
                if err != nil && err != ErrEdgePolicyOptionalFieldNotFound {
24✔
4405
                        return err
×
4406
                }
×
4407

4408
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
24✔
4409

24✔
4410
                var oldIndexKey [8 + 8]byte
24✔
4411
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
24✔
4412
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
24✔
4413

24✔
4414
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
24✔
4415
                        return err
×
4416
                }
×
4417
        }
4418

4419
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
2,660✔
4420
                return err
×
4421
        }
×
4422

4423
        err = updateEdgePolicyDisabledIndex(
2,660✔
4424
                edges, edge.ChannelID,
2,660✔
4425
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
2,660✔
4426
                edge.IsDisabled(),
2,660✔
4427
        )
2,660✔
4428
        if err != nil {
2,660✔
4429
                return err
×
4430
        }
×
4431

4432
        return edges.Put(edgeKey[:], b.Bytes()[:])
2,660✔
4433
}
4434

4435
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4436
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4437
// one.
4438
// The direction represents the direction of the edge and disabled is used for
4439
// deciding whether to remove or add an entry to the bucket.
4440
// In general a channel is disabled if two entries for the same chanID exist
4441
// in this bucket.
4442
// Maintaining the bucket this way allows a fast retrieval of disabled
4443
// channels, for example when prune is needed.
4444
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4445
        direction bool, disabled bool) error {
2,920✔
4446

2,920✔
4447
        var disabledEdgeKey [8 + 1]byte
2,920✔
4448
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
2,920✔
4449
        if direction {
4,376✔
4450
                disabledEdgeKey[8] = 1
1,456✔
4451
        }
1,456✔
4452

4453
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
2,920✔
4454
                disabledEdgePolicyBucket,
2,920✔
4455
        )
2,920✔
4456
        if err != nil {
2,920✔
4457
                return err
×
4458
        }
×
4459

4460
        if disabled {
2,946✔
4461
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
26✔
4462
        }
26✔
4463

4464
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
2,894✔
4465
}
4466

4467
// putChanEdgePolicyUnknown marks the edge policy as unknown
4468
// in the edges bucket.
4469
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4470
        from []byte) error {
2,942✔
4471

2,942✔
4472
        var edgeKey [33 + 8]byte
2,942✔
4473
        copy(edgeKey[:], from)
2,942✔
4474
        byteOrder.PutUint64(edgeKey[33:], channelID)
2,942✔
4475

2,942✔
4476
        if edges.Get(edgeKey[:]) != nil {
2,942✔
4477
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4478
                        " when there is already a policy present", channelID)
×
4479
        }
×
4480

4481
        return edges.Put(edgeKey[:], unknownPolicy)
2,942✔
4482
}
4483

4484
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4485
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
8,152✔
4486

8,152✔
4487
        var edgeKey [33 + 8]byte
8,152✔
4488
        copy(edgeKey[:], nodePub)
8,152✔
4489
        copy(edgeKey[33:], chanID[:])
8,152✔
4490

8,152✔
4491
        edgeBytes := edges.Get(edgeKey[:])
8,152✔
4492
        if edgeBytes == nil {
8,152✔
4493
                return nil, ErrEdgeNotFound
×
4494
        }
×
4495

4496
        // No need to deserialize unknown policy.
4497
        if bytes.Equal(edgeBytes[:], unknownPolicy) {
8,501✔
4498
                return nil, nil
349✔
4499
        }
349✔
4500

4501
        edgeReader := bytes.NewReader(edgeBytes)
7,803✔
4502

7,803✔
4503
        ep, err := deserializeChanEdgePolicy(edgeReader)
7,803✔
4504
        switch {
7,803✔
4505
        // If the db policy was missing an expected optional field, we return
4506
        // nil as if the policy was unknown.
4507
        case err == ErrEdgePolicyOptionalFieldNotFound:
1✔
4508
                return nil, nil
1✔
4509

4510
        case err != nil:
×
4511
                return nil, err
×
4512
        }
4513

4514
        return ep, nil
7,802✔
4515
}
4516

4517
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4518
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4519
        error) {
227✔
4520

227✔
4521
        edgeInfo := edgeIndex.Get(chanID)
227✔
4522
        if edgeInfo == nil {
227✔
4523
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4524
                        chanID)
×
4525
        }
×
4526

4527
        // The first node is contained within the first half of the edge
4528
        // information. We only propagate the error here and below if it's
4529
        // something other than edge non-existence.
4530
        node1Pub := edgeInfo[:33]
227✔
4531
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
227✔
4532
        if err != nil {
227✔
4533
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4534
                        node1Pub)
×
4535
        }
×
4536

4537
        // Similarly, the second node is contained within the latter
4538
        // half of the edge information.
4539
        node2Pub := edgeInfo[33:66]
227✔
4540
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
227✔
4541
        if err != nil {
227✔
4542
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4543
                        node2Pub)
×
4544
        }
×
4545

4546
        return edge1, edge2, nil
227✔
4547
}
4548

4549
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4550
        to []byte) error {
2,662✔
4551

2,662✔
4552
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
2,662✔
4553
        if err != nil {
2,662✔
4554
                return err
×
4555
        }
×
4556

4557
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
2,662✔
4558
                return err
×
4559
        }
×
4560

4561
        var scratch [8]byte
2,662✔
4562
        updateUnix := uint64(edge.LastUpdate.Unix())
2,662✔
4563
        byteOrder.PutUint64(scratch[:], updateUnix)
2,662✔
4564
        if _, err := w.Write(scratch[:]); err != nil {
2,662✔
4565
                return err
×
4566
        }
×
4567

4568
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
2,662✔
4569
                return err
×
4570
        }
×
4571
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
2,662✔
4572
                return err
×
4573
        }
×
4574
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
2,662✔
4575
                return err
×
4576
        }
×
4577
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
2,662✔
4578
                return err
×
4579
        }
×
4580
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
2,662✔
4581
        if err != nil {
2,662✔
4582
                return err
×
4583
        }
×
4584
        err = binary.Write(
2,662✔
4585
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
2,662✔
4586
        )
2,662✔
4587
        if err != nil {
2,662✔
4588
                return err
×
4589
        }
×
4590

4591
        if _, err := w.Write(to); err != nil {
2,662✔
4592
                return err
×
4593
        }
×
4594

4595
        // If the max_htlc field is present, we write it. To be compatible with
4596
        // older versions that wasn't aware of this field, we write it as part
4597
        // of the opaque data.
4598
        // TODO(halseth): clean up when moving to TLV.
4599
        var opaqueBuf bytes.Buffer
2,662✔
4600
        if edge.MessageFlags.HasMaxHtlc() {
4,940✔
4601
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
2,278✔
4602
                if err != nil {
2,278✔
4603
                        return err
×
4604
                }
×
4605
        }
4606

4607
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
2,662✔
4608
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4609
        }
×
4610
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
2,662✔
4611
                return err
×
4612
        }
×
4613

4614
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
2,662✔
4615
                return err
×
4616
        }
×
4617
        return nil
2,662✔
4618
}
4619

4620
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
7,828✔
4621
        // Deserialize the policy. Note that in case an optional field is not
7,828✔
4622
        // found, both an error and a populated policy object are returned.
7,828✔
4623
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
7,828✔
4624
        if deserializeErr != nil &&
7,828✔
4625
                deserializeErr != ErrEdgePolicyOptionalFieldNotFound {
7,828✔
4626

×
4627
                return nil, deserializeErr
×
4628
        }
×
4629

4630
        return edge, deserializeErr
7,828✔
4631
}
4632

4633
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4634
        error) {
8,835✔
4635

8,835✔
4636
        edge := &models.ChannelEdgePolicy{}
8,835✔
4637

8,835✔
4638
        var err error
8,835✔
4639
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,835✔
4640
        if err != nil {
8,835✔
4641
                return nil, err
×
4642
        }
×
4643

4644
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
8,835✔
4645
                return nil, err
×
4646
        }
×
4647

4648
        var scratch [8]byte
8,835✔
4649
        if _, err := r.Read(scratch[:]); err != nil {
8,835✔
4650
                return nil, err
×
4651
        }
×
4652
        unix := int64(byteOrder.Uint64(scratch[:]))
8,835✔
4653
        edge.LastUpdate = time.Unix(unix, 0)
8,835✔
4654

8,835✔
4655
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
8,835✔
4656
                return nil, err
×
4657
        }
×
4658
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
8,835✔
4659
                return nil, err
×
4660
        }
×
4661
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
8,835✔
4662
                return nil, err
×
4663
        }
×
4664

4665
        var n uint64
8,835✔
4666
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,835✔
4667
                return nil, err
×
4668
        }
×
4669
        edge.MinHTLC = lnwire.MilliSatoshi(n)
8,835✔
4670

8,835✔
4671
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,835✔
4672
                return nil, err
×
4673
        }
×
4674
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
8,835✔
4675

8,835✔
4676
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,835✔
4677
                return nil, err
×
4678
        }
×
4679
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
8,835✔
4680

8,835✔
4681
        if _, err := r.Read(edge.ToNode[:]); err != nil {
8,835✔
4682
                return nil, err
×
4683
        }
×
4684

4685
        // We'll try and see if there are any opaque bytes left, if not, then
4686
        // we'll ignore the EOF error and return the edge as is.
4687
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
8,835✔
4688
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,835✔
4689
        )
8,835✔
4690
        switch {
8,835✔
4691
        case err == io.ErrUnexpectedEOF:
×
4692
        case err == io.EOF:
3✔
4693
        case err != nil:
×
4694
                return nil, err
×
4695
        }
4696

4697
        // See if optional fields are present.
4698
        if edge.MessageFlags.HasMaxHtlc() {
17,290✔
4699
                // The max_htlc field should be at the beginning of the opaque
8,455✔
4700
                // bytes.
8,455✔
4701
                opq := edge.ExtraOpaqueData
8,455✔
4702

8,455✔
4703
                // If the max_htlc field is not present, it might be old data
8,455✔
4704
                // stored before this field was validated. We'll return the
8,455✔
4705
                // edge along with an error.
8,455✔
4706
                if len(opq) < 8 {
8,458✔
4707
                        return edge, ErrEdgePolicyOptionalFieldNotFound
3✔
4708
                }
3✔
4709

4710
                maxHtlc := byteOrder.Uint64(opq[:8])
8,452✔
4711
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
8,452✔
4712

8,452✔
4713
                // Exclude the parsed field from the rest of the opaque data.
8,452✔
4714
                edge.ExtraOpaqueData = opq[8:]
8,452✔
4715
        }
4716

4717
        return edge, nil
8,832✔
4718
}
4719

4720
// MakeTestGraph creates a new instance of the ChannelGraph for testing
4721
// purposes.
4722
func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph,
4723
        error) {
39✔
4724

39✔
4725
        opts := DefaultOptions()
39✔
4726
        for _, modifier := range modifiers {
39✔
4727
                modifier(opts)
×
4728
        }
×
4729

4730
        // Next, create channelgraph for the first time.
4731
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
39✔
4732
        if err != nil {
39✔
4733
                backendCleanup()
×
4734
                return nil, err
×
4735
        }
×
4736

4737
        graph, err := NewChannelGraph(backend)
39✔
4738
        if err != nil {
39✔
4739
                backendCleanup()
×
4740
                return nil, err
×
4741
        }
×
4742

4743
        t.Cleanup(func() {
78✔
4744
                _ = backend.Close()
39✔
4745
                backendCleanup()
39✔
4746
        })
39✔
4747

4748
        return graph, nil
39✔
4749
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc