• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.71
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/aliasmgr"
22
        "github.com/lightningnetwork/lnd/batch"
23
        "github.com/lightningnetwork/lnd/graph/db/models"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/kvdb"
26
        "github.com/lightningnetwork/lnd/lnwire"
27
        "github.com/lightningnetwork/lnd/routing/route"
28
)
29

30
var (
31
        // nodeBucket is a bucket which houses all the vertices or nodes within
32
        // the channel graph. This bucket has a single-sub bucket which adds an
33
        // additional index from pubkey -> alias. Within the top-level of this
34
        // bucket, the key space maps a node's compressed public key to the
35
        // serialized information for that node. Additionally, there's a
36
        // special key "source" which stores the pubkey of the source node. The
37
        // source node is used as the starting point for all graph/queries and
38
        // traversals. The graph is formed as a star-graph with the source node
39
        // at the center.
40
        //
41
        // maps: pubKey -> nodeInfo
42
        // maps: source -> selfPubKey
43
        nodeBucket = []byte("graph-node")
44

45
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
46
        // will be used to quickly look up the "freshness" of a node's last
47
        // update to the network. The bucket only contains keys, and no values,
48
        // it's mapping:
49
        //
50
        // maps: updateTime || nodeID -> nil
51
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
52

53
        // sourceKey is a special key that resides within the nodeBucket. The
54
        // sourceKey maps a key to the public key of the "self node".
55
        sourceKey = []byte("source")
56

57
        // aliasIndexBucket is a sub-bucket that's nested within the main
58
        // nodeBucket. This bucket maps the public key of a node to its
59
        // current alias. This bucket is provided as it can be used within a
60
        // future UI layer to add an additional degree of confirmation.
61
        aliasIndexBucket = []byte("alias")
62

63
        // edgeBucket is a bucket which houses all of the edge or channel
64
        // information within the channel graph. This bucket essentially acts
65
        // as an adjacency list, which in conjunction with a range scan, can be
66
        // used to iterate over all the incoming and outgoing edges for a
67
        // particular node. Key in the bucket use a prefix scheme which leads
68
        // with the node's public key and sends with the compact edge ID.
69
        // For each chanID, there will be two entries within the bucket, as the
70
        // graph is directed: nodes may have different policies w.r.t to fees
71
        // for their respective directions.
72
        //
73
        // maps: pubKey || chanID -> channel edge policy for node
74
        edgeBucket = []byte("graph-edge")
75

76
        // unknownPolicy is represented as an empty slice. It is
77
        // used as the value in edgeBucket for unknown channel edge policies.
78
        // Unknown policies are still stored in the database to enable efficient
79
        // lookup of incoming channel edges.
80
        unknownPolicy = []byte{}
81

82
        // chanStart is an array of all zero bytes which is used to perform
83
        // range scans within the edgeBucket to obtain all of the outgoing
84
        // edges for a particular node.
85
        chanStart [8]byte
86

87
        // edgeIndexBucket is an index which can be used to iterate all edges
88
        // in the bucket, grouping them according to their in/out nodes.
89
        // Additionally, the items in this bucket also contain the complete
90
        // edge information for a channel. The edge information includes the
91
        // capacity of the channel, the nodes that made the channel, etc. This
92
        // bucket resides within the edgeBucket above. Creation of an edge
93
        // proceeds in two phases: first the edge is added to the edge index,
94
        // afterwards the edgeBucket can be updated with the latest details of
95
        // the edge as they are announced on the network.
96
        //
97
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
98
        edgeIndexBucket = []byte("edge-index")
99

100
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
101
        // bucket contains an index which allows us to gauge the "freshness" of
102
        // a channel's last updates.
103
        //
104
        // maps: updateTime || chanID -> nil
105
        edgeUpdateIndexBucket = []byte("edge-update-index")
106

107
        // channelPointBucket maps a channel's full outpoint (txid:index) to
108
        // its short 8-byte channel ID. This bucket resides within the
109
        // edgeBucket above, and can be used to quickly remove an edge due to
110
        // the outpoint being spent, or to query for existence of a channel.
111
        //
112
        // maps: outPoint -> chanID
113
        channelPointBucket = []byte("chan-index")
114

115
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
116
        // responsible for maintaining an index of zombie channels. Each entry
117
        // exists within the bucket as follows:
118
        //
119
        // maps: chanID -> pubKey1 || pubKey2
120
        //
121
        // The chanID represents the channel ID of the edge that is marked as a
122
        // zombie and is used as the key, which maps to the public keys of the
123
        // edge's participants.
124
        zombieBucket = []byte("zombie-index")
125

126
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
127
        // bucket responsible for maintaining an index of disabled edge
128
        // policies. Each entry exists within the bucket as follows:
129
        //
130
        // maps: <chanID><direction> -> []byte{}
131
        //
132
        // The chanID represents the channel ID of the edge and the direction is
133
        // one byte representing the direction of the edge. The main purpose of
134
        // this index is to allow pruning disabled channels in a fast way without
135
        // the need to iterate all over the graph.
136
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
137

138
        // graphMetaBucket is a top-level bucket which stores various meta-deta
139
        // related to the on-disk channel graph. Data stored in this bucket
140
        // includes the block to which the graph has been synced to, the total
141
        // number of channels, etc.
142
        graphMetaBucket = []byte("graph-meta")
143

144
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
145
        // a mapping from the block height to the hash for the blocks used to
146
        // prune the graph.
147
        // Once a new block is discovered, any channels that have been closed
148
        // (by spending the outpoint) can safely be removed from the graph, and
149
        // the block is added to the prune log. We need to keep such a log for
150
        // the case where a reorg happens, and we must "rewind" the state of the
151
        // graph by removing channels that were previously confirmed. In such a
152
        // case we'll remove all entries from the prune log with a block height
153
        // that no longer exists.
154
        pruneLogBucket = []byte("prune-log")
155

156
        // closedScidBucket is a top-level bucket that stores scids for
157
        // channels that we know to be closed. This is used so that we don't
158
        // need to perform expensive validation checks if we receive a channel
159
        // announcement for the channel again.
160
        //
161
        // maps: scid -> []byte{}
162
        closedScidBucket = []byte("closed-scid")
163
)
164

165
const (
166
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
167
        // we'll permit to be written to disk. We limit this as otherwise, it
168
        // would be possible for a node to create a ton of updates and slowly
169
        // fill our disk, and also waste bandwidth due to relaying.
170
        MaxAllowedExtraOpaqueBytes = 10000
171
)
172

173
// ChannelGraph is a persistent, on-disk graph representation of the Lightning
174
// Network. This struct can be used to implement path finding algorithms on top
175
// of, and also to update a node's view based on information received from the
176
// p2p network. Internally, the graph is stored using a modified adjacency list
177
// representation with some added object interaction possible with each
178
// serialized edge/node. The graph is stored is directed, meaning that are two
179
// edges stored for each channel: an inbound/outbound edge for each node pair.
180
// Nodes, edges, and edge information can all be added to the graph
181
// independently. Edge removal results in the deletion of all edge information
182
// for that edge.
183
type ChannelGraph struct {
184
        db kvdb.Backend
185

186
        // cacheMu guards all caches (rejectCache, chanCache, graphCache). If
187
        // this mutex will be acquired at the same time as the DB mutex then
188
        // the cacheMu MUST be acquired first to prevent deadlock.
189
        cacheMu     sync.RWMutex
190
        rejectCache *rejectCache
191
        chanCache   *channelCache
192
        graphCache  *GraphCache
193

194
        chanScheduler batch.Scheduler
195
        nodeScheduler batch.Scheduler
196
}
197

198
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
199
// returned instance has its own unique reject cache and channel cache.
200
func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
201
        error) {
173✔
202

173✔
203
        opts := DefaultOptions()
173✔
204
        for _, o := range options {
276✔
205
                o(opts)
103✔
206
        }
103✔
207

208
        if !opts.NoMigration {
346✔
209
                if err := initChannelGraph(db); err != nil {
173✔
210
                        return nil, err
×
211
                }
×
212
        }
213

214
        g := &ChannelGraph{
173✔
215
                db:          db,
173✔
216
                rejectCache: newRejectCache(opts.RejectCacheSize),
173✔
217
                chanCache:   newChannelCache(opts.ChannelCacheSize),
173✔
218
        }
173✔
219
        g.chanScheduler = batch.NewTimeScheduler(
173✔
220
                db, &g.cacheMu, opts.BatchCommitInterval,
173✔
221
        )
173✔
222
        g.nodeScheduler = batch.NewTimeScheduler(
173✔
223
                db, nil, opts.BatchCommitInterval,
173✔
224
        )
173✔
225

173✔
226
        // The graph cache can be turned off (e.g. for mobile users) for a
173✔
227
        // speed/memory usage tradeoff.
173✔
228
        if opts.UseGraphCache {
313✔
229
                g.graphCache = NewGraphCache(opts.PreAllocCacheNumNodes)
140✔
230
                startTime := time.Now()
140✔
231
                log.Debugf("Populating in-memory channel graph, this might " +
140✔
232
                        "take a while...")
140✔
233

140✔
234
                err := g.ForEachNodeCacheable(
140✔
235
                        func(tx kvdb.RTx, node GraphCacheNode) error {
240✔
236
                                g.graphCache.AddNodeFeatures(node)
100✔
237

100✔
238
                                return nil
100✔
239
                        },
100✔
240
                )
241
                if err != nil {
140✔
242
                        return nil, err
×
243
                }
×
244

245
                err = g.ForEachChannel(func(info *models.ChannelEdgeInfo,
140✔
246
                        policy1, policy2 *models.ChannelEdgePolicy) error {
536✔
247

396✔
248
                        g.graphCache.AddChannel(info, policy1, policy2)
396✔
249

396✔
250
                        return nil
396✔
251
                })
396✔
252
                if err != nil {
140✔
253
                        return nil, err
×
254
                }
×
255

256
                log.Debugf("Finished populating in-memory channel graph (took "+
140✔
257
                        "%v, %s)", time.Since(startTime), g.graphCache.Stats())
140✔
258
        }
259

260
        return g, nil
173✔
261
}
262

263
// channelMapKey is the key structure used for storing channel edge policies.
264
type channelMapKey struct {
265
        nodeKey route.Vertex
266
        chanID  [8]byte
267
}
268

269
// getChannelMap loads all channel edge policies from the database and stores
270
// them in a map.
271
func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
272
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
144✔
273

144✔
274
        // Create a map to store all channel edge policies.
144✔
275
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
144✔
276

144✔
277
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
1,715✔
278
                // Skip embedded buckets.
1,571✔
279
                if bytes.Equal(k, edgeIndexBucket) ||
1,571✔
280
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
1,571✔
281
                        bytes.Equal(k, zombieBucket) ||
1,571✔
282
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
1,571✔
283
                        bytes.Equal(k, channelPointBucket) {
2,152✔
284

581✔
285
                        return nil
581✔
286
                }
581✔
287

288
                // Validate key length.
289
                if len(k) != 33+8 {
990✔
290
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
291
                }
×
292

293
                var key channelMapKey
990✔
294
                copy(key.nodeKey[:], k[:33])
990✔
295
                copy(key.chanID[:], k[33:])
990✔
296

990✔
297
                // No need to deserialize unknown policy.
990✔
298
                if bytes.Equal(edgeBytes, unknownPolicy) {
990✔
299
                        return nil
×
300
                }
×
301

302
                edgeReader := bytes.NewReader(edgeBytes)
990✔
303
                edge, err := deserializeChanEdgePolicyRaw(
990✔
304
                        edgeReader,
990✔
305
                )
990✔
306

990✔
307
                switch {
990✔
308
                // If the db policy was missing an expected optional field, we
309
                // return nil as if the policy was unknown.
310
                case err == ErrEdgePolicyOptionalFieldNotFound:
×
311
                        return nil
×
312

313
                case err != nil:
×
314
                        return err
×
315
                }
316

317
                channelMap[key] = edge
990✔
318

990✔
319
                return nil
990✔
320
        })
321
        if err != nil {
144✔
322
                return nil, err
×
323
        }
×
324

325
        return channelMap, nil
144✔
326
}
327

328
var graphTopLevelBuckets = [][]byte{
329
        nodeBucket,
330
        edgeBucket,
331
        graphMetaBucket,
332
        closedScidBucket,
333
}
334

335
// Wipe completely deletes all saved state within all used buckets within the
336
// database. The deletion is done in a single transaction, therefore this
337
// operation is fully atomic.
338
func (c *ChannelGraph) Wipe() error {
×
339
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
340
                for _, tlb := range graphTopLevelBuckets {
×
341
                        err := tx.DeleteTopLevelBucket(tlb)
×
342
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
343
                                return err
×
344
                        }
×
345
                }
346
                return nil
×
347
        }, func() {})
×
348
        if err != nil {
×
349
                return err
×
350
        }
×
351

352
        return initChannelGraph(c.db)
×
353
}
354

355
// createChannelDB creates and initializes a fresh version of  In
356
// the case that the target path has not yet been created or doesn't yet exist,
357
// then the path is created. Additionally, all required top-level buckets used
358
// within the database are created.
359
func initChannelGraph(db kvdb.Backend) error {
173✔
360
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
346✔
361
                for _, tlb := range graphTopLevelBuckets {
865✔
362
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
692✔
363
                                return err
×
364
                        }
×
365
                }
366

367
                nodes := tx.ReadWriteBucket(nodeBucket)
173✔
368
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
173✔
369
                if err != nil {
173✔
370
                        return err
×
371
                }
×
372
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
173✔
373
                if err != nil {
173✔
374
                        return err
×
375
                }
×
376

377
                edges := tx.ReadWriteBucket(edgeBucket)
173✔
378
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
173✔
379
                if err != nil {
173✔
380
                        return err
×
381
                }
×
382
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
173✔
383
                if err != nil {
173✔
384
                        return err
×
385
                }
×
386
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
173✔
387
                if err != nil {
173✔
388
                        return err
×
389
                }
×
390
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
173✔
391
                if err != nil {
173✔
392
                        return err
×
393
                }
×
394

395
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
173✔
396
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
173✔
397
                return err
173✔
398
        }, func() {})
173✔
399
        if err != nil {
173✔
400
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
401
        }
×
402

403
        return nil
173✔
404
}
405

406
// NewPathFindTx returns a new read transaction that can be used for a single
407
// path finding session. Will return nil if the graph cache is enabled.
408
func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) {
133✔
409
        if c.graphCache != nil {
212✔
410
                return nil, nil
79✔
411
        }
79✔
412

413
        return c.db.BeginReadTx()
54✔
414
}
415

416
// AddrsForNode returns all known addresses for the target node public key that
417
// the graph DB is aware of. The returned boolean indicates if the given node is
418
// unknown to the graph DB or not.
419
//
420
// NOTE: this is part of the channeldb.AddrSource interface.
421
func (c *ChannelGraph) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
422
        error) {
1✔
423

1✔
424
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
1✔
425
        if err != nil {
1✔
426
                return false, nil, err
×
427
        }
×
428

429
        node, err := c.FetchLightningNode(pubKey)
1✔
430
        // We don't consider it an error if the graph is unaware of the node.
1✔
431
        switch {
1✔
432
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
433
                return false, nil, err
×
434

435
        case errors.Is(err, ErrGraphNodeNotFound):
×
436
                return false, nil, nil
×
437
        }
438

439
        return true, node.Addresses, nil
1✔
440
}
441

442
// ForEachChannel iterates through all the channel edges stored within the
443
// graph and invokes the passed callback for each edge. The callback takes two
444
// edges as since this is a directed graph, both the in/out edges are visited.
445
// If the callback returns an error, then the transaction is aborted and the
446
// iteration stops early.
447
//
448
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
449
// for that particular channel edge routing policy will be passed into the
450
// callback.
451
func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo,
452
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
144✔
453

144✔
454
        return c.db.View(func(tx kvdb.RTx) error {
288✔
455
                edges := tx.ReadBucket(edgeBucket)
144✔
456
                if edges == nil {
144✔
457
                        return ErrGraphNoEdgesFound
×
458
                }
×
459

460
                // First, load all edges in memory indexed by node and channel
461
                // id.
462
                channelMap, err := c.getChannelMap(edges)
144✔
463
                if err != nil {
144✔
464
                        return err
×
465
                }
×
466

467
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
144✔
468
                if edgeIndex == nil {
144✔
469
                        return ErrGraphNoEdgesFound
×
470
                }
×
471

472
                // Load edge index, recombine each channel with the policies
473
                // loaded above and invoke the callback.
474
                return kvdb.ForAll(
144✔
475
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
639✔
476
                                var chanID [8]byte
495✔
477
                                copy(chanID[:], k)
495✔
478

495✔
479
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
495✔
480
                                info, err := deserializeChanEdgeInfo(
495✔
481
                                        edgeInfoReader,
495✔
482
                                )
495✔
483
                                if err != nil {
495✔
484
                                        return err
×
485
                                }
×
486

487
                                policy1 := channelMap[channelMapKey{
495✔
488
                                        nodeKey: info.NodeKey1Bytes,
495✔
489
                                        chanID:  chanID,
495✔
490
                                }]
495✔
491

495✔
492
                                policy2 := channelMap[channelMapKey{
495✔
493
                                        nodeKey: info.NodeKey2Bytes,
495✔
494
                                        chanID:  chanID,
495✔
495
                                }]
495✔
496

495✔
497
                                return cb(&info, policy1, policy2)
495✔
498
                        },
499
                )
500
        }, func() {})
144✔
501
}
502

503
// ForEachNodeDirectedChannel iterates through all channels of a given node,
504
// executing the passed callback on the directed edge representing the channel
505
// and its incoming policy. If the callback returns an error, then the iteration
506
// is halted with the error propagated back up to the caller.
507
//
508
// Unknown policies are passed into the callback as nil values.
509
func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
510
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
704✔
511

704✔
512
        if c.graphCache != nil {
1,166✔
513
                return c.graphCache.ForEachChannel(node, cb)
462✔
514
        }
462✔
515

516
        // Fallback that uses the database.
517
        toNodeCallback := func() route.Vertex {
374✔
518
                return node
132✔
519
        }
132✔
520
        toNodeFeatures, err := c.FetchNodeFeatures(node)
242✔
521
        if err != nil {
242✔
522
                return err
×
523
        }
×
524

525
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
242✔
526
                p2 *models.ChannelEdgePolicy) error {
738✔
527

496✔
528
                var cachedInPolicy *models.CachedEdgePolicy
496✔
529
                if p2 != nil {
989✔
530
                        cachedInPolicy = models.NewCachedPolicy(p2)
493✔
531
                        cachedInPolicy.ToNodePubKey = toNodeCallback
493✔
532
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
493✔
533
                }
493✔
534

535
                var inboundFee lnwire.Fee
496✔
536
                if p1 != nil {
991✔
537
                        // Extract inbound fee. If there is a decoding error,
495✔
538
                        // skip this edge.
495✔
539
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
495✔
540
                        if err != nil {
496✔
541
                                return nil
1✔
542
                        }
1✔
543
                }
544

545
                directedChannel := &DirectedChannel{
495✔
546
                        ChannelID:    e.ChannelID,
495✔
547
                        IsNode1:      node == e.NodeKey1Bytes,
495✔
548
                        OtherNode:    e.NodeKey2Bytes,
495✔
549
                        Capacity:     e.Capacity,
495✔
550
                        OutPolicySet: p1 != nil,
495✔
551
                        InPolicy:     cachedInPolicy,
495✔
552
                        InboundFee:   inboundFee,
495✔
553
                }
495✔
554

495✔
555
                if node == e.NodeKey2Bytes {
743✔
556
                        directedChannel.OtherNode = e.NodeKey1Bytes
248✔
557
                }
248✔
558

559
                return cb(directedChannel)
495✔
560
        }
561
        return nodeTraversal(tx, node[:], c.db, dbCallback)
242✔
562
}
563

564
// FetchNodeFeatures returns the features of a given node. If no features are
565
// known for the node, an empty feature vector is returned.
566
func (c *ChannelGraph) FetchNodeFeatures(
567
        node route.Vertex) (*lnwire.FeatureVector, error) {
1,139✔
568

1,139✔
569
        if c.graphCache != nil {
1,592✔
570
                return c.graphCache.GetFeatures(node), nil
453✔
571
        }
453✔
572

573
        // Fallback that uses the database.
574
        targetNode, err := c.FetchLightningNode(node)
686✔
575
        switch err {
686✔
576
        // If the node exists and has features, return them directly.
577
        case nil:
675✔
578
                return targetNode.Features, nil
675✔
579

580
        // If we couldn't find a node announcement, populate a blank feature
581
        // vector.
582
        case ErrGraphNodeNotFound:
11✔
583
                return lnwire.EmptyFeatureVector(), nil
11✔
584

585
        // Otherwise, bubble the error up.
586
        default:
×
587
                return nil, err
×
588
        }
589
}
590

591
// ForEachNodeCached is similar to ForEachNode, but it utilizes the channel
592
// graph cache instead. Note that this doesn't return all the information the
593
// regular ForEachNode method does.
594
//
595
// NOTE: The callback contents MUST not be modified.
596
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
597
        chans map[uint64]*DirectedChannel) error) error {
1✔
598

1✔
599
        if c.graphCache != nil {
1✔
600
                return c.graphCache.ForEachNode(cb)
×
601
        }
×
602

603
        // Otherwise call back to a version that uses the database directly.
604
        // We'll iterate over each node, then the set of channels for each
605
        // node, and construct a similar callback functiopn signature as the
606
        // main funcotin expects.
607
        return c.ForEachNode(func(tx kvdb.RTx,
1✔
608
                node *models.LightningNode) error {
21✔
609

20✔
610
                channels := make(map[uint64]*DirectedChannel)
20✔
611

20✔
612
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
20✔
613
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
20✔
614
                                p1 *models.ChannelEdgePolicy,
20✔
615
                                p2 *models.ChannelEdgePolicy) error {
210✔
616

190✔
617
                                toNodeCallback := func() route.Vertex {
190✔
618
                                        return node.PubKeyBytes
×
619
                                }
×
620
                                toNodeFeatures, err := c.FetchNodeFeatures(
190✔
621
                                        node.PubKeyBytes,
190✔
622
                                )
190✔
623
                                if err != nil {
190✔
624
                                        return err
×
625
                                }
×
626

627
                                var cachedInPolicy *models.CachedEdgePolicy
190✔
628
                                if p2 != nil {
380✔
629
                                        cachedInPolicy =
190✔
630
                                                models.NewCachedPolicy(p2)
190✔
631
                                        cachedInPolicy.ToNodePubKey =
190✔
632
                                                toNodeCallback
190✔
633
                                        cachedInPolicy.ToNodeFeatures =
190✔
634
                                                toNodeFeatures
190✔
635
                                }
190✔
636

637
                                directedChannel := &DirectedChannel{
190✔
638
                                        ChannelID: e.ChannelID,
190✔
639
                                        IsNode1: node.PubKeyBytes ==
190✔
640
                                                e.NodeKey1Bytes,
190✔
641
                                        OtherNode:    e.NodeKey2Bytes,
190✔
642
                                        Capacity:     e.Capacity,
190✔
643
                                        OutPolicySet: p1 != nil,
190✔
644
                                        InPolicy:     cachedInPolicy,
190✔
645
                                }
190✔
646

190✔
647
                                if node.PubKeyBytes == e.NodeKey2Bytes {
285✔
648
                                        directedChannel.OtherNode =
95✔
649
                                                e.NodeKey1Bytes
95✔
650
                                }
95✔
651

652
                                channels[e.ChannelID] = directedChannel
190✔
653

190✔
654
                                return nil
190✔
655
                        })
656
                if err != nil {
20✔
657
                        return err
×
658
                }
×
659

660
                return cb(node.PubKeyBytes, channels)
20✔
661
        })
662
}
663

664
// DisabledChannelIDs returns the channel ids of disabled channels.
665
// A channel is disabled when two of the associated ChanelEdgePolicies
666
// have their disabled bit on.
667
func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
6✔
668
        var disabledChanIDs []uint64
6✔
669
        var chanEdgeFound map[uint64]struct{}
6✔
670

6✔
671
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
672
                edges := tx.ReadBucket(edgeBucket)
6✔
673
                if edges == nil {
6✔
674
                        return ErrGraphNoEdgesFound
×
675
                }
×
676

677
                disabledEdgePolicyIndex := edges.NestedReadBucket(
6✔
678
                        disabledEdgePolicyBucket,
6✔
679
                )
6✔
680
                if disabledEdgePolicyIndex == nil {
7✔
681
                        return nil
1✔
682
                }
1✔
683

684
                // We iterate over all disabled policies and we add each channel
685
                // that has more than one disabled policy to disabledChanIDs
686
                // array.
687
                return disabledEdgePolicyIndex.ForEach(
5✔
688
                        func(k, v []byte) error {
16✔
689
                                chanID := byteOrder.Uint64(k[:8])
11✔
690
                                _, edgeFound := chanEdgeFound[chanID]
11✔
691
                                if edgeFound {
15✔
692
                                        delete(chanEdgeFound, chanID)
4✔
693
                                        disabledChanIDs = append(
4✔
694
                                                disabledChanIDs, chanID,
4✔
695
                                        )
4✔
696

4✔
697
                                        return nil
4✔
698
                                }
4✔
699

700
                                chanEdgeFound[chanID] = struct{}{}
7✔
701

7✔
702
                                return nil
7✔
703
                        },
704
                )
705
        }, func() {
6✔
706
                disabledChanIDs = nil
6✔
707
                chanEdgeFound = make(map[uint64]struct{})
6✔
708
        })
6✔
709
        if err != nil {
6✔
710
                return nil, err
×
711
        }
×
712

713
        return disabledChanIDs, nil
6✔
714
}
715

716
// ForEachNode iterates through all the stored vertices/nodes in the graph,
717
// executing the passed callback with each node encountered. If the callback
718
// returns an error, then the transaction is aborted and the iteration stops
719
// early.
720
//
721
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
722
// traversal when graph gets mega
723
func (c *ChannelGraph) ForEachNode(
724
        cb func(kvdb.RTx, *models.LightningNode) error) error {
129✔
725

129✔
726
        traversal := func(tx kvdb.RTx) error {
258✔
727
                // First grab the nodes bucket which stores the mapping from
129✔
728
                // pubKey to node information.
129✔
729
                nodes := tx.ReadBucket(nodeBucket)
129✔
730
                if nodes == nil {
129✔
731
                        return ErrGraphNotFound
×
732
                }
×
733

734
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,568✔
735
                        // If this is the source key, then we skip this
1,439✔
736
                        // iteration as the value for this key is a pubKey
1,439✔
737
                        // rather than raw node information.
1,439✔
738
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
1,700✔
739
                                return nil
261✔
740
                        }
261✔
741

742
                        nodeReader := bytes.NewReader(nodeBytes)
1,178✔
743
                        node, err := deserializeLightningNode(nodeReader)
1,178✔
744
                        if err != nil {
1,178✔
745
                                return err
×
746
                        }
×
747

748
                        // Execute the callback, the transaction will abort if
749
                        // this returns an error.
750
                        return cb(tx, &node)
1,178✔
751
                })
752
        }
753

754
        return kvdb.View(c.db, traversal, func() {})
258✔
755
}
756

757
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
758
// graph, executing the passed callback with each node encountered. If the
759
// callback returns an error, then the transaction is aborted and the iteration
760
// stops early.
761
func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
762
        GraphCacheNode) error) error {
141✔
763

141✔
764
        traversal := func(tx kvdb.RTx) error {
282✔
765
                // First grab the nodes bucket which stores the mapping from
141✔
766
                // pubKey to node information.
141✔
767
                nodes := tx.ReadBucket(nodeBucket)
141✔
768
                if nodes == nil {
141✔
769
                        return ErrGraphNotFound
×
770
                }
×
771

772
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
543✔
773
                        // If this is the source key, then we skip this
402✔
774
                        // iteration as the value for this key is a pubKey
402✔
775
                        // rather than raw node information.
402✔
776
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
684✔
777
                                return nil
282✔
778
                        }
282✔
779

780
                        nodeReader := bytes.NewReader(nodeBytes)
120✔
781
                        cacheableNode, err := deserializeLightningNodeCacheable(
120✔
782
                                nodeReader,
120✔
783
                        )
120✔
784
                        if err != nil {
120✔
785
                                return err
×
786
                        }
×
787

788
                        // Execute the callback, the transaction will abort if
789
                        // this returns an error.
790
                        return cb(tx, cacheableNode)
120✔
791
                })
792
        }
793

794
        return kvdb.View(c.db, traversal, func() {})
282✔
795
}
796

797
// SourceNode returns the source node of the graph. The source node is treated
798
// as the center node within a star-graph. This method may be used to kick off
799
// a path finding algorithm in order to explore the reachability of another
800
// node based off the source node.
801
func (c *ChannelGraph) SourceNode() (*models.LightningNode, error) {
232✔
802
        var source *models.LightningNode
232✔
803
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
464✔
804
                // First grab the nodes bucket which stores the mapping from
232✔
805
                // pubKey to node information.
232✔
806
                nodes := tx.ReadBucket(nodeBucket)
232✔
807
                if nodes == nil {
232✔
808
                        return ErrGraphNotFound
×
809
                }
×
810

811
                node, err := c.sourceNode(nodes)
232✔
812
                if err != nil {
233✔
813
                        return err
1✔
814
                }
1✔
815
                source = node
231✔
816

231✔
817
                return nil
231✔
818
        }, func() {
232✔
819
                source = nil
232✔
820
        })
232✔
821
        if err != nil {
233✔
822
                return nil, err
1✔
823
        }
1✔
824

825
        return source, nil
231✔
826
}
827

828
// sourceNode uses an existing database transaction and returns the source node
829
// of the graph. The source node is treated as the center node within a
830
// star-graph. This method may be used to kick off a path finding algorithm in
831
// order to explore the reachability of another node based off the source node.
832
func (c *ChannelGraph) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
833
        error) {
498✔
834

498✔
835
        selfPub := nodes.Get(sourceKey)
498✔
836
        if selfPub == nil {
499✔
837
                return nil, ErrSourceNodeNotSet
1✔
838
        }
1✔
839

840
        // With the pubKey of the source node retrieved, we're able to
841
        // fetch the full node information.
842
        node, err := fetchLightningNode(nodes, selfPub)
497✔
843
        if err != nil {
497✔
844
                return nil, err
×
845
        }
×
846

847
        return &node, nil
497✔
848
}
849

850
// SetSourceNode sets the source node within the graph database. The source
851
// node is to be used as the center of a star-graph within path finding
852
// algorithms.
853
func (c *ChannelGraph) SetSourceNode(node *models.LightningNode) error {
118✔
854
        nodePubBytes := node.PubKeyBytes[:]
118✔
855

118✔
856
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
236✔
857
                // First grab the nodes bucket which stores the mapping from
118✔
858
                // pubKey to node information.
118✔
859
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
118✔
860
                if err != nil {
118✔
861
                        return err
×
862
                }
×
863

864
                // Next we create the mapping from source to the targeted
865
                // public key.
866
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
118✔
867
                        return err
×
868
                }
×
869

870
                // Finally, we commit the information of the lightning node
871
                // itself.
872
                return addLightningNode(tx, node)
118✔
873
        }, func() {})
118✔
874
}
875

876
// AddLightningNode adds a vertex/node to the graph database. If the node is not
877
// in the database from before, this will add a new, unconnected one to the
878
// graph. If it is present from before, this will update that node's
879
// information. Note that this method is expected to only be called to update an
880
// already present node from a node announcement, or to insert a node found in a
881
// channel update.
882
//
883
// TODO(roasbeef): also need sig of announcement
884
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
885
        op ...batch.SchedulerOption) error {
799✔
886

799✔
887
        r := &batch.Request{
799✔
888
                Update: func(tx kvdb.RwTx) error {
1,598✔
889
                        if c.graphCache != nil {
1,411✔
890
                                cNode := newGraphCacheNode(
612✔
891
                                        node.PubKeyBytes, node.Features,
612✔
892
                                )
612✔
893
                                err := c.graphCache.AddNode(tx, cNode)
612✔
894
                                if err != nil {
612✔
895
                                        return err
×
896
                                }
×
897
                        }
898

899
                        return addLightningNode(tx, node)
799✔
900
                },
901
        }
902

903
        for _, f := range op {
799✔
904
                f(r)
×
905
        }
×
906

907
        return c.nodeScheduler.Execute(r)
799✔
908
}
909

910
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
994✔
911
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
994✔
912
        if err != nil {
994✔
913
                return err
×
914
        }
×
915

916
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
994✔
917
        if err != nil {
994✔
918
                return err
×
919
        }
×
920

921
        updateIndex, err := nodes.CreateBucketIfNotExists(
994✔
922
                nodeUpdateIndexBucket,
994✔
923
        )
994✔
924
        if err != nil {
994✔
925
                return err
×
926
        }
×
927

928
        return putLightningNode(nodes, aliases, updateIndex, node)
994✔
929
}
930

931
// LookupAlias attempts to return the alias as advertised by the target node.
932
// TODO(roasbeef): currently assumes that aliases are unique...
933
func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
2✔
934
        var alias string
2✔
935

2✔
936
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
937
                nodes := tx.ReadBucket(nodeBucket)
2✔
938
                if nodes == nil {
2✔
939
                        return ErrGraphNodesNotFound
×
940
                }
×
941

942
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
2✔
943
                if aliases == nil {
2✔
944
                        return ErrGraphNodesNotFound
×
945
                }
×
946

947
                nodePub := pub.SerializeCompressed()
2✔
948
                a := aliases.Get(nodePub)
2✔
949
                if a == nil {
3✔
950
                        return ErrNodeAliasNotFound
1✔
951
                }
1✔
952

953
                // TODO(roasbeef): should actually be using the utf-8
954
                // package...
955
                alias = string(a)
1✔
956
                return nil
1✔
957
        }, func() {
2✔
958
                alias = ""
2✔
959
        })
2✔
960
        if err != nil {
3✔
961
                return "", err
1✔
962
        }
1✔
963

964
        return alias, nil
1✔
965
}
966

967
// DeleteLightningNode starts a new database transaction to remove a vertex/node
968
// from the database according to the node's public key.
969
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
970
        // TODO(roasbeef): ensure dangling edges are removed...
3✔
971
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
972
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
973
                if nodes == nil {
3✔
974
                        return ErrGraphNodeNotFound
×
975
                }
×
976

977
                if c.graphCache != nil {
6✔
978
                        c.graphCache.RemoveNode(nodePub)
3✔
979
                }
3✔
980

981
                return c.deleteLightningNode(nodes, nodePub[:])
3✔
982
        }, func() {})
3✔
983
}
984

985
// deleteLightningNode uses an existing database transaction to remove a
986
// vertex/node from the database according to the node's public key.
987
func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
988
        compressedPubKey []byte) error {
69✔
989

69✔
990
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
69✔
991
        if aliases == nil {
69✔
992
                return ErrGraphNodesNotFound
×
993
        }
×
994

995
        if err := aliases.Delete(compressedPubKey); err != nil {
69✔
996
                return err
×
997
        }
×
998

999
        // Before we delete the node, we'll fetch its current state so we can
1000
        // determine when its last update was to clear out the node update
1001
        // index.
1002
        node, err := fetchLightningNode(nodes, compressedPubKey)
69✔
1003
        if err != nil {
69✔
1004
                return err
×
1005
        }
×
1006

1007
        if err := nodes.Delete(compressedPubKey); err != nil {
69✔
1008
                return err
×
1009
        }
×
1010

1011
        // Finally, we'll delete the index entry for the node within the
1012
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
1013
        // need to track its last update.
1014
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
69✔
1015
        if nodeUpdateIndex == nil {
69✔
1016
                return ErrGraphNodesNotFound
×
1017
        }
×
1018

1019
        // In order to delete the entry, we'll need to reconstruct the key for
1020
        // its last update.
1021
        updateUnix := uint64(node.LastUpdate.Unix())
69✔
1022
        var indexKey [8 + 33]byte
69✔
1023
        byteOrder.PutUint64(indexKey[:8], updateUnix)
69✔
1024
        copy(indexKey[8:], compressedPubKey)
69✔
1025

69✔
1026
        return nodeUpdateIndex.Delete(indexKey[:])
69✔
1027
}
1028

1029
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1030
// undirected edge from the two target nodes are created. The information stored
1031
// denotes the static attributes of the channel, such as the channelID, the keys
1032
// involved in creation of the channel, and the set of features that the channel
1033
// supports. The chanPoint and chanID are used to uniquely identify the edge
1034
// globally within the database.
1035
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
1036
        op ...batch.SchedulerOption) error {
1,718✔
1037

1,718✔
1038
        var alreadyExists bool
1,718✔
1039
        r := &batch.Request{
1,718✔
1040
                Reset: func() {
3,436✔
1041
                        alreadyExists = false
1,718✔
1042
                },
1,718✔
1043
                Update: func(tx kvdb.RwTx) error {
1,718✔
1044
                        err := c.addChannelEdge(tx, edge)
1,718✔
1045

1,718✔
1046
                        // Silence ErrEdgeAlreadyExist so that the batch can
1,718✔
1047
                        // succeed, but propagate the error via local state.
1,718✔
1048
                        if err == ErrEdgeAlreadyExist {
1,952✔
1049
                                alreadyExists = true
234✔
1050
                                return nil
234✔
1051
                        }
234✔
1052

1053
                        return err
1,484✔
1054
                },
1055
                OnCommit: func(err error) error {
1,718✔
1056
                        switch {
1,718✔
1057
                        case err != nil:
×
1058
                                return err
×
1059
                        case alreadyExists:
234✔
1060
                                return ErrEdgeAlreadyExist
234✔
1061
                        default:
1,484✔
1062
                                c.rejectCache.remove(edge.ChannelID)
1,484✔
1063
                                c.chanCache.remove(edge.ChannelID)
1,484✔
1064
                                return nil
1,484✔
1065
                        }
1066
                },
1067
        }
1068

1069
        for _, f := range op {
1,718✔
1070
                if f == nil {
×
1071
                        return fmt.Errorf("nil scheduler option was used")
×
1072
                }
×
1073

1074
                f(r)
×
1075
        }
1076

1077
        return c.chanScheduler.Execute(r)
1,718✔
1078
}
1079

1080
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1081
// utilize an existing db transaction.
1082
func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
1083
        edge *models.ChannelEdgeInfo) error {
1,718✔
1084

1,718✔
1085
        // Construct the channel's primary key which is the 8-byte channel ID.
1,718✔
1086
        var chanKey [8]byte
1,718✔
1087
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1,718✔
1088

1,718✔
1089
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,718✔
1090
        if err != nil {
1,718✔
1091
                return err
×
1092
        }
×
1093
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
1,718✔
1094
        if err != nil {
1,718✔
1095
                return err
×
1096
        }
×
1097
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
1,718✔
1098
        if err != nil {
1,718✔
1099
                return err
×
1100
        }
×
1101
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
1,718✔
1102
        if err != nil {
1,718✔
1103
                return err
×
1104
        }
×
1105

1106
        // First, attempt to check if this edge has already been created. If
1107
        // so, then we can exit early as this method is meant to be idempotent.
1108
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
1,952✔
1109
                return ErrEdgeAlreadyExist
234✔
1110
        }
234✔
1111

1112
        if c.graphCache != nil {
2,778✔
1113
                c.graphCache.AddChannel(edge, nil, nil)
1,294✔
1114
        }
1,294✔
1115

1116
        // Before we insert the channel into the database, we'll ensure that
1117
        // both nodes already exist in the channel graph. If either node
1118
        // doesn't, then we'll insert a "shell" node that just includes its
1119
        // public key, so subsequent validation and queries can work properly.
1120
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
1,484✔
1121
        switch {
1,484✔
1122
        case node1Err == ErrGraphNodeNotFound:
18✔
1123
                node1Shell := models.LightningNode{
18✔
1124
                        PubKeyBytes:          edge.NodeKey1Bytes,
18✔
1125
                        HaveNodeAnnouncement: false,
18✔
1126
                }
18✔
1127
                err := addLightningNode(tx, &node1Shell)
18✔
1128
                if err != nil {
18✔
1129
                        return fmt.Errorf("unable to create shell node "+
×
1130
                                "for: %x", edge.NodeKey1Bytes)
×
1131
                }
×
1132
        case node1Err != nil:
×
1133
                return err
×
1134
        }
1135

1136
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
1,484✔
1137
        switch {
1,484✔
1138
        case node2Err == ErrGraphNodeNotFound:
59✔
1139
                node2Shell := models.LightningNode{
59✔
1140
                        PubKeyBytes:          edge.NodeKey2Bytes,
59✔
1141
                        HaveNodeAnnouncement: false,
59✔
1142
                }
59✔
1143
                err := addLightningNode(tx, &node2Shell)
59✔
1144
                if err != nil {
59✔
1145
                        return fmt.Errorf("unable to create shell node "+
×
1146
                                "for: %x", edge.NodeKey2Bytes)
×
1147
                }
×
1148
        case node2Err != nil:
×
1149
                return err
×
1150
        }
1151

1152
        // If the edge hasn't been created yet, then we'll first add it to the
1153
        // edge index in order to associate the edge between two nodes and also
1154
        // store the static components of the channel.
1155
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
1,484✔
1156
                return err
×
1157
        }
×
1158

1159
        // Mark edge policies for both sides as unknown. This is to enable
1160
        // efficient incoming channel lookup for a node.
1161
        keys := []*[33]byte{
1,484✔
1162
                &edge.NodeKey1Bytes,
1,484✔
1163
                &edge.NodeKey2Bytes,
1,484✔
1164
        }
1,484✔
1165
        for _, key := range keys {
4,452✔
1166
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
2,968✔
1167
                if err != nil {
2,968✔
1168
                        return err
×
1169
                }
×
1170
        }
1171

1172
        // Finally we add it to the channel index which maps channel points
1173
        // (outpoints) to the shorter channel ID's.
1174
        var b bytes.Buffer
1,484✔
1175
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
1,484✔
1176
                return err
×
1177
        }
×
1178
        return chanIndex.Put(b.Bytes(), chanKey[:])
1,484✔
1179
}
1180

1181
// HasChannelEdge returns true if the database knows of a channel edge with the
1182
// passed channel ID, and false otherwise. If an edge with that ID is found
1183
// within the graph, then two time stamps representing the last time the edge
1184
// was updated for both directed edges are returned along with the boolean. If
1185
// it is not found, then the zombie index is checked and its result is returned
1186
// as the second boolean.
1187
func (c *ChannelGraph) HasChannelEdge(
1188
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
218✔
1189

218✔
1190
        var (
218✔
1191
                upd1Time time.Time
218✔
1192
                upd2Time time.Time
218✔
1193
                exists   bool
218✔
1194
                isZombie bool
218✔
1195
        )
218✔
1196

218✔
1197
        // We'll query the cache with the shared lock held to allow multiple
218✔
1198
        // readers to access values in the cache concurrently if they exist.
218✔
1199
        c.cacheMu.RLock()
218✔
1200
        if entry, ok := c.rejectCache.get(chanID); ok {
287✔
1201
                c.cacheMu.RUnlock()
69✔
1202
                upd1Time = time.Unix(entry.upd1Time, 0)
69✔
1203
                upd2Time = time.Unix(entry.upd2Time, 0)
69✔
1204
                exists, isZombie = entry.flags.unpack()
69✔
1205
                return upd1Time, upd2Time, exists, isZombie, nil
69✔
1206
        }
69✔
1207
        c.cacheMu.RUnlock()
149✔
1208

149✔
1209
        c.cacheMu.Lock()
149✔
1210
        defer c.cacheMu.Unlock()
149✔
1211

149✔
1212
        // The item was not found with the shared lock, so we'll acquire the
149✔
1213
        // exclusive lock and check the cache again in case another method added
149✔
1214
        // the entry to the cache while no lock was held.
149✔
1215
        if entry, ok := c.rejectCache.get(chanID); ok {
154✔
1216
                upd1Time = time.Unix(entry.upd1Time, 0)
5✔
1217
                upd2Time = time.Unix(entry.upd2Time, 0)
5✔
1218
                exists, isZombie = entry.flags.unpack()
5✔
1219
                return upd1Time, upd2Time, exists, isZombie, nil
5✔
1220
        }
5✔
1221

1222
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
288✔
1223
                edges := tx.ReadBucket(edgeBucket)
144✔
1224
                if edges == nil {
144✔
1225
                        return ErrGraphNoEdgesFound
×
1226
                }
×
1227
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
144✔
1228
                if edgeIndex == nil {
144✔
1229
                        return ErrGraphNoEdgesFound
×
1230
                }
×
1231

1232
                var channelID [8]byte
144✔
1233
                byteOrder.PutUint64(channelID[:], chanID)
144✔
1234

144✔
1235
                // If the edge doesn't exist, then we'll also check our zombie
144✔
1236
                // index.
144✔
1237
                if edgeIndex.Get(channelID[:]) == nil {
240✔
1238
                        exists = false
96✔
1239
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
96✔
1240
                        if zombieIndex != nil {
192✔
1241
                                isZombie, _, _ = isZombieEdge(
96✔
1242
                                        zombieIndex, chanID,
96✔
1243
                                )
96✔
1244
                        }
96✔
1245

1246
                        return nil
96✔
1247
                }
1248

1249
                exists = true
48✔
1250
                isZombie = false
48✔
1251

48✔
1252
                // If the channel has been found in the graph, then retrieve
48✔
1253
                // the edges itself so we can return the last updated
48✔
1254
                // timestamps.
48✔
1255
                nodes := tx.ReadBucket(nodeBucket)
48✔
1256
                if nodes == nil {
48✔
1257
                        return ErrGraphNodeNotFound
×
1258
                }
×
1259

1260
                e1, e2, err := fetchChanEdgePolicies(
48✔
1261
                        edgeIndex, edges, channelID[:],
48✔
1262
                )
48✔
1263
                if err != nil {
48✔
1264
                        return err
×
1265
                }
×
1266

1267
                // As we may have only one of the edges populated, only set the
1268
                // update time if the edge was found in the database.
1269
                if e1 != nil {
66✔
1270
                        upd1Time = e1.LastUpdate
18✔
1271
                }
18✔
1272
                if e2 != nil {
64✔
1273
                        upd2Time = e2.LastUpdate
16✔
1274
                }
16✔
1275

1276
                return nil
48✔
1277
        }, func() {}); err != nil {
144✔
1278
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1279
        }
×
1280

1281
        c.rejectCache.insert(chanID, rejectCacheEntry{
144✔
1282
                upd1Time: upd1Time.Unix(),
144✔
1283
                upd2Time: upd2Time.Unix(),
144✔
1284
                flags:    packRejectFlags(exists, isZombie),
144✔
1285
        })
144✔
1286

144✔
1287
        return upd1Time, upd2Time, exists, isZombie, nil
144✔
1288
}
1289

1290
// UpdateChannelEdge retrieves and update edge of the graph database. Method
1291
// only reserved for updating an edge info after its already been created.
1292
// In order to maintain this constraints, we return an error in the scenario
1293
// that an edge info hasn't yet been created yet, but someone attempts to update
1294
// it.
1295
func (c *ChannelGraph) UpdateChannelEdge(edge *models.ChannelEdgeInfo) error {
1✔
1296
        // Construct the channel's primary key which is the 8-byte channel ID.
1✔
1297
        var chanKey [8]byte
1✔
1298
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1✔
1299

1✔
1300
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
1301
                edges := tx.ReadWriteBucket(edgeBucket)
1✔
1302
                if edge == nil {
1✔
1303
                        return ErrEdgeNotFound
×
1304
                }
×
1305

1306
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
1✔
1307
                if edgeIndex == nil {
1✔
1308
                        return ErrEdgeNotFound
×
1309
                }
×
1310

1311
                if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo == nil {
1✔
1312
                        return ErrEdgeNotFound
×
1313
                }
×
1314

1315
                if c.graphCache != nil {
2✔
1316
                        c.graphCache.UpdateChannel(edge)
1✔
1317
                }
1✔
1318

1319
                return putChanEdgeInfo(edgeIndex, edge, chanKey)
1✔
1320
        }, func() {})
1✔
1321
}
1322

1323
const (
1324
        // pruneTipBytes is the total size of the value which stores a prune
1325
        // entry of the graph in the prune log. The "prune tip" is the last
1326
        // entry in the prune log, and indicates if the channel graph is in
1327
        // sync with the current UTXO state. The structure of the value
1328
        // is: blockHash, taking 32 bytes total.
1329
        pruneTipBytes = 32
1330
)
1331

1332
// PruneGraph prunes newly closed channels from the channel graph in response
1333
// to a new block being solved on the network. Any transactions which spend the
1334
// funding output of any known channels within he graph will be deleted.
1335
// Additionally, the "prune tip", or the last block which has been used to
1336
// prune the graph is stored so callers can ensure the graph is fully in sync
1337
// with the current UTXO state. A slice of channels that have been closed by
1338
// the target block are returned if the function succeeds without error.
1339
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
1340
        blockHash *chainhash.Hash, blockHeight uint32) (
1341
        []*models.ChannelEdgeInfo, error) {
242✔
1342

242✔
1343
        c.cacheMu.Lock()
242✔
1344
        defer c.cacheMu.Unlock()
242✔
1345

242✔
1346
        var chansClosed []*models.ChannelEdgeInfo
242✔
1347

242✔
1348
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
484✔
1349
                // First grab the edges bucket which houses the information
242✔
1350
                // we'd like to delete
242✔
1351
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
242✔
1352
                if err != nil {
242✔
1353
                        return err
×
1354
                }
×
1355

1356
                // Next grab the two edge indexes which will also need to be
1357
                // updated.
1358
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
242✔
1359
                if err != nil {
242✔
1360
                        return err
×
1361
                }
×
1362
                chanIndex, err := edges.CreateBucketIfNotExists(
242✔
1363
                        channelPointBucket,
242✔
1364
                )
242✔
1365
                if err != nil {
242✔
1366
                        return err
×
1367
                }
×
1368
                nodes := tx.ReadWriteBucket(nodeBucket)
242✔
1369
                if nodes == nil {
242✔
1370
                        return ErrSourceNodeNotSet
×
1371
                }
×
1372
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
242✔
1373
                if err != nil {
242✔
1374
                        return err
×
1375
                }
×
1376

1377
                // For each of the outpoints that have been spent within the
1378
                // block, we attempt to delete them from the graph as if that
1379
                // outpoint was a channel, then it has now been closed.
1380
                for _, chanPoint := range spentOutputs {
383✔
1381
                        // TODO(roasbeef): load channel bloom filter, continue
141✔
1382
                        // if NOT if filter
141✔
1383

141✔
1384
                        var opBytes bytes.Buffer
141✔
1385
                        err := WriteOutpoint(&opBytes, chanPoint)
141✔
1386
                        if err != nil {
141✔
1387
                                return err
×
1388
                        }
×
1389

1390
                        // First attempt to see if the channel exists within
1391
                        // the database, if not, then we can exit early.
1392
                        chanID := chanIndex.Get(opBytes.Bytes())
141✔
1393
                        if chanID == nil {
258✔
1394
                                continue
117✔
1395
                        }
1396

1397
                        // However, if it does, then we'll read out the full
1398
                        // version so we can add it to the set of deleted
1399
                        // channels.
1400
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
24✔
1401
                        if err != nil {
24✔
1402
                                return err
×
1403
                        }
×
1404

1405
                        // Attempt to delete the channel, an ErrEdgeNotFound
1406
                        // will be returned if that outpoint isn't known to be
1407
                        // a channel. If no error is returned, then a channel
1408
                        // was successfully pruned.
1409
                        err = c.delChannelEdgeUnsafe(
24✔
1410
                                edges, edgeIndex, chanIndex, zombieIndex,
24✔
1411
                                chanID, false, false,
24✔
1412
                        )
24✔
1413
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
24✔
1414
                                return err
×
1415
                        }
×
1416

1417
                        chansClosed = append(chansClosed, &edgeInfo)
24✔
1418
                }
1419

1420
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
242✔
1421
                if err != nil {
242✔
1422
                        return err
×
1423
                }
×
1424

1425
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
242✔
1426
                        pruneLogBucket,
242✔
1427
                )
242✔
1428
                if err != nil {
242✔
1429
                        return err
×
1430
                }
×
1431

1432
                // With the graph pruned, add a new entry to the prune log,
1433
                // which can be used to check if the graph is fully synced with
1434
                // the current UTXO state.
1435
                var blockHeightBytes [4]byte
242✔
1436
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
242✔
1437

242✔
1438
                var newTip [pruneTipBytes]byte
242✔
1439
                copy(newTip[:], blockHash[:])
242✔
1440

242✔
1441
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
242✔
1442
                if err != nil {
242✔
1443
                        return err
×
1444
                }
×
1445

1446
                // Now that the graph has been pruned, we'll also attempt to
1447
                // prune any nodes that have had a channel closed within the
1448
                // latest block.
1449
                return c.pruneGraphNodes(nodes, edgeIndex)
242✔
1450
        }, func() {
242✔
1451
                chansClosed = nil
242✔
1452
        })
242✔
1453
        if err != nil {
242✔
1454
                return nil, err
×
1455
        }
×
1456

1457
        for _, channel := range chansClosed {
266✔
1458
                c.rejectCache.remove(channel.ChannelID)
24✔
1459
                c.chanCache.remove(channel.ChannelID)
24✔
1460
        }
24✔
1461

1462
        if c.graphCache != nil {
484✔
1463
                log.Debugf("Pruned graph, cache now has %s",
242✔
1464
                        c.graphCache.Stats())
242✔
1465
        }
242✔
1466

1467
        return chansClosed, nil
242✔
1468
}
1469

1470
// PruneGraphNodes is a garbage collection method which attempts to prune out
1471
// any nodes from the channel graph that are currently unconnected. This ensure
1472
// that we only maintain a graph of reachable nodes. In the event that a pruned
1473
// node gains more channels, it will be re-added back to the graph.
1474
func (c *ChannelGraph) PruneGraphNodes() error {
24✔
1475
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
48✔
1476
                nodes := tx.ReadWriteBucket(nodeBucket)
24✔
1477
                if nodes == nil {
24✔
1478
                        return ErrGraphNodesNotFound
×
1479
                }
×
1480
                edges := tx.ReadWriteBucket(edgeBucket)
24✔
1481
                if edges == nil {
24✔
1482
                        return ErrGraphNotFound
×
1483
                }
×
1484
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
24✔
1485
                if edgeIndex == nil {
24✔
1486
                        return ErrGraphNoEdgesFound
×
1487
                }
×
1488

1489
                return c.pruneGraphNodes(nodes, edgeIndex)
24✔
1490
        }, func() {})
24✔
1491
}
1492

1493
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1494
// channel closed within the current block. If the node still has existing
1495
// channels in the graph, this will act as a no-op.
1496
func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
1497
        edgeIndex kvdb.RwBucket) error {
266✔
1498

266✔
1499
        log.Trace("Pruning nodes from graph with no open channels")
266✔
1500

266✔
1501
        // We'll retrieve the graph's source node to ensure we don't remove it
266✔
1502
        // even if it no longer has any open channels.
266✔
1503
        sourceNode, err := c.sourceNode(nodes)
266✔
1504
        if err != nil {
266✔
1505
                return err
×
1506
        }
×
1507

1508
        // We'll use this map to keep count the number of references to a node
1509
        // in the graph. A node should only be removed once it has no more
1510
        // references in the graph.
1511
        nodeRefCounts := make(map[[33]byte]int)
266✔
1512
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,575✔
1513
                // If this is the source key, then we skip this
1,309✔
1514
                // iteration as the value for this key is a pubKey
1,309✔
1515
                // rather than raw node information.
1,309✔
1516
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
2,107✔
1517
                        return nil
798✔
1518
                }
798✔
1519

1520
                var nodePub [33]byte
511✔
1521
                copy(nodePub[:], pubKey)
511✔
1522
                nodeRefCounts[nodePub] = 0
511✔
1523

511✔
1524
                return nil
511✔
1525
        })
1526
        if err != nil {
266✔
1527
                return err
×
1528
        }
×
1529

1530
        // To ensure we never delete the source node, we'll start off by
1531
        // bumping its ref count to 1.
1532
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
266✔
1533

266✔
1534
        // Next, we'll run through the edgeIndex which maps a channel ID to the
266✔
1535
        // edge info. We'll use this scan to populate our reference count map
266✔
1536
        // above.
266✔
1537
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
458✔
1538
                // The first 66 bytes of the edge info contain the pubkeys of
192✔
1539
                // the nodes that this edge attaches. We'll extract them, and
192✔
1540
                // add them to the ref count map.
192✔
1541
                var node1, node2 [33]byte
192✔
1542
                copy(node1[:], edgeInfoBytes[:33])
192✔
1543
                copy(node2[:], edgeInfoBytes[33:])
192✔
1544

192✔
1545
                // With the nodes extracted, we'll increase the ref count of
192✔
1546
                // each of the nodes.
192✔
1547
                nodeRefCounts[node1]++
192✔
1548
                nodeRefCounts[node2]++
192✔
1549

192✔
1550
                return nil
192✔
1551
        })
192✔
1552
        if err != nil {
266✔
1553
                return err
×
1554
        }
×
1555

1556
        // Finally, we'll make a second pass over the set of nodes, and delete
1557
        // any nodes that have a ref count of zero.
1558
        var numNodesPruned int
266✔
1559
        for nodePubKey, refCount := range nodeRefCounts {
777✔
1560
                // If the ref count of the node isn't zero, then we can safely
511✔
1561
                // skip it as it still has edges to or from it within the
511✔
1562
                // graph.
511✔
1563
                if refCount != 0 {
956✔
1564
                        continue
445✔
1565
                }
1566

1567
                if c.graphCache != nil {
132✔
1568
                        c.graphCache.RemoveNode(nodePubKey)
66✔
1569
                }
66✔
1570

1571
                // If we reach this point, then there are no longer any edges
1572
                // that connect this node, so we can delete it.
1573
                err := c.deleteLightningNode(nodes, nodePubKey[:])
66✔
1574
                if err != nil {
66✔
1575
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1576
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1577

×
1578
                                log.Warnf("Unable to prune node %x from the "+
×
1579
                                        "graph: %v", nodePubKey, err)
×
1580
                                continue
×
1581
                        }
1582

1583
                        return err
×
1584
                }
1585

1586
                log.Infof("Pruned unconnected node %x from channel graph",
66✔
1587
                        nodePubKey[:])
66✔
1588

66✔
1589
                numNodesPruned++
66✔
1590
        }
1591

1592
        if numNodesPruned > 0 {
316✔
1593
                log.Infof("Pruned %v unconnected nodes from the channel graph",
50✔
1594
                        numNodesPruned)
50✔
1595
        }
50✔
1596

1597
        return nil
266✔
1598
}
1599

1600
// DisconnectBlockAtHeight is used to indicate that the block specified
1601
// by the passed height has been disconnected from the main chain. This
1602
// will "rewind" the graph back to the height below, deleting channels
1603
// that are no longer confirmed from the graph. The prune log will be
1604
// set to the last prune height valid for the remaining chain.
1605
// Channels that were removed from the graph resulting from the
1606
// disconnected block are returned.
1607
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
1608
        []*models.ChannelEdgeInfo, error) {
162✔
1609

162✔
1610
        // Every channel having a ShortChannelID starting at 'height'
162✔
1611
        // will no longer be confirmed.
162✔
1612
        startShortChanID := lnwire.ShortChannelID{
162✔
1613
                BlockHeight: height,
162✔
1614
        }
162✔
1615

162✔
1616
        // Delete everything after this height from the db up until the
162✔
1617
        // SCID alias range.
162✔
1618
        endShortChanID := aliasmgr.StartingAlias
162✔
1619

162✔
1620
        // The block height will be the 3 first bytes of the channel IDs.
162✔
1621
        var chanIDStart [8]byte
162✔
1622
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
162✔
1623
        var chanIDEnd [8]byte
162✔
1624
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
162✔
1625

162✔
1626
        c.cacheMu.Lock()
162✔
1627
        defer c.cacheMu.Unlock()
162✔
1628

162✔
1629
        // Keep track of the channels that are removed from the graph.
162✔
1630
        var removedChans []*models.ChannelEdgeInfo
162✔
1631

162✔
1632
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
324✔
1633
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
162✔
1634
                if err != nil {
162✔
1635
                        return err
×
1636
                }
×
1637
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
162✔
1638
                if err != nil {
162✔
1639
                        return err
×
1640
                }
×
1641
                chanIndex, err := edges.CreateBucketIfNotExists(
162✔
1642
                        channelPointBucket,
162✔
1643
                )
162✔
1644
                if err != nil {
162✔
1645
                        return err
×
1646
                }
×
1647
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
162✔
1648
                if err != nil {
162✔
1649
                        return err
×
1650
                }
×
1651

1652
                // Scan from chanIDStart to chanIDEnd, deleting every
1653
                // found edge.
1654
                // NOTE: we must delete the edges after the cursor loop, since
1655
                // modifying the bucket while traversing is not safe.
1656
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1657
                // so that the StartingAlias itself isn't deleted.
1658
                var keys [][]byte
162✔
1659
                cursor := edgeIndex.ReadWriteCursor()
162✔
1660

162✔
1661
                //nolint:ll
162✔
1662
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
162✔
1663
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, v = cursor.Next() {
253✔
1664
                        edgeInfoReader := bytes.NewReader(v)
91✔
1665
                        edgeInfo, err := deserializeChanEdgeInfo(edgeInfoReader)
91✔
1666
                        if err != nil {
91✔
1667
                                return err
×
1668
                        }
×
1669

1670
                        keys = append(keys, k)
91✔
1671
                        removedChans = append(removedChans, &edgeInfo)
91✔
1672
                }
1673

1674
                for _, k := range keys {
253✔
1675
                        err = c.delChannelEdgeUnsafe(
91✔
1676
                                edges, edgeIndex, chanIndex, zombieIndex,
91✔
1677
                                k, false, false,
91✔
1678
                        )
91✔
1679
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
91✔
1680
                                return err
×
1681
                        }
×
1682
                }
1683

1684
                // Delete all the entries in the prune log having a height
1685
                // greater or equal to the block disconnected.
1686
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
162✔
1687
                if err != nil {
162✔
1688
                        return err
×
1689
                }
×
1690

1691
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
162✔
1692
                        pruneLogBucket,
162✔
1693
                )
162✔
1694
                if err != nil {
162✔
1695
                        return err
×
1696
                }
×
1697

1698
                var pruneKeyStart [4]byte
162✔
1699
                byteOrder.PutUint32(pruneKeyStart[:], height)
162✔
1700

162✔
1701
                var pruneKeyEnd [4]byte
162✔
1702
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
162✔
1703

162✔
1704
                // To avoid modifying the bucket while traversing, we delete
162✔
1705
                // the keys in a second loop.
162✔
1706
                var pruneKeys [][]byte
162✔
1707
                pruneCursor := pruneBucket.ReadWriteCursor()
162✔
1708
                //nolint:ll
162✔
1709
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
162✔
1710
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
256✔
1711
                        pruneKeys = append(pruneKeys, k)
94✔
1712
                }
94✔
1713

1714
                for _, k := range pruneKeys {
256✔
1715
                        if err := pruneBucket.Delete(k); err != nil {
94✔
1716
                                return err
×
1717
                        }
×
1718
                }
1719

1720
                return nil
162✔
1721
        }, func() {
162✔
1722
                removedChans = nil
162✔
1723
        }); err != nil {
162✔
1724
                return nil, err
×
1725
        }
×
1726

1727
        for _, channel := range removedChans {
253✔
1728
                c.rejectCache.remove(channel.ChannelID)
91✔
1729
                c.chanCache.remove(channel.ChannelID)
91✔
1730
        }
91✔
1731

1732
        return removedChans, nil
162✔
1733
}
1734

1735
// PruneTip returns the block height and hash of the latest block that has been
1736
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1737
// to tell if the graph is currently in sync with the current best known UTXO
1738
// state.
1739
func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
55✔
1740
        var (
55✔
1741
                tipHash   chainhash.Hash
55✔
1742
                tipHeight uint32
55✔
1743
        )
55✔
1744

55✔
1745
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
110✔
1746
                graphMeta := tx.ReadBucket(graphMetaBucket)
55✔
1747
                if graphMeta == nil {
55✔
1748
                        return ErrGraphNotFound
×
1749
                }
×
1750
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
55✔
1751
                if pruneBucket == nil {
55✔
1752
                        return ErrGraphNeverPruned
×
1753
                }
×
1754

1755
                pruneCursor := pruneBucket.ReadCursor()
55✔
1756

55✔
1757
                // The prune key with the largest block height will be our
55✔
1758
                // prune tip.
55✔
1759
                k, v := pruneCursor.Last()
55✔
1760
                if k == nil {
74✔
1761
                        return ErrGraphNeverPruned
19✔
1762
                }
19✔
1763

1764
                // Once we have the prune tip, the value will be the block hash,
1765
                // and the key the block height.
1766
                copy(tipHash[:], v[:])
36✔
1767
                tipHeight = byteOrder.Uint32(k[:])
36✔
1768

36✔
1769
                return nil
36✔
1770
        }, func() {})
55✔
1771
        if err != nil {
74✔
1772
                return nil, 0, err
19✔
1773
        }
19✔
1774

1775
        return &tipHash, tipHeight, nil
36✔
1776
}
1777

1778
// DeleteChannelEdges removes edges with the given channel IDs from the
1779
// database and marks them as zombies. This ensures that we're unable to re-add
1780
// it to our database once again. If an edge does not exist within the
1781
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1782
// true, then when we mark these edges as zombies, we'll set up the keys such
1783
// that we require the node that failed to send the fresh update to be the one
1784
// that resurrects the channel from its zombie state. The markZombie bool
1785
// denotes whether or not to mark the channel as a zombie.
1786
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1787
        chanIDs ...uint64) error {
142✔
1788

142✔
1789
        // TODO(roasbeef): possibly delete from node bucket if node has no more
142✔
1790
        // channels
142✔
1791
        // TODO(roasbeef): don't delete both edges?
142✔
1792

142✔
1793
        c.cacheMu.Lock()
142✔
1794
        defer c.cacheMu.Unlock()
142✔
1795

142✔
1796
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
284✔
1797
                edges := tx.ReadWriteBucket(edgeBucket)
142✔
1798
                if edges == nil {
142✔
1799
                        return ErrEdgeNotFound
×
1800
                }
×
1801
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
142✔
1802
                if edgeIndex == nil {
142✔
1803
                        return ErrEdgeNotFound
×
1804
                }
×
1805
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
142✔
1806
                if chanIndex == nil {
142✔
1807
                        return ErrEdgeNotFound
×
1808
                }
×
1809
                nodes := tx.ReadWriteBucket(nodeBucket)
142✔
1810
                if nodes == nil {
142✔
1811
                        return ErrGraphNodeNotFound
×
1812
                }
×
1813
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
142✔
1814
                if err != nil {
142✔
1815
                        return err
×
1816
                }
×
1817

1818
                var rawChanID [8]byte
142✔
1819
                for _, chanID := range chanIDs {
226✔
1820
                        byteOrder.PutUint64(rawChanID[:], chanID)
84✔
1821
                        err := c.delChannelEdgeUnsafe(
84✔
1822
                                edges, edgeIndex, chanIndex, zombieIndex,
84✔
1823
                                rawChanID[:], markZombie, strictZombiePruning,
84✔
1824
                        )
84✔
1825
                        if err != nil {
142✔
1826
                                return err
58✔
1827
                        }
58✔
1828
                }
1829

1830
                return nil
84✔
1831
        }, func() {})
142✔
1832
        if err != nil {
200✔
1833
                return err
58✔
1834
        }
58✔
1835

1836
        for _, chanID := range chanIDs {
110✔
1837
                c.rejectCache.remove(chanID)
26✔
1838
                c.chanCache.remove(chanID)
26✔
1839
        }
26✔
1840

1841
        return nil
84✔
1842
}
1843

1844
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1845
// passed channel point (outpoint). If the passed channel doesn't exist within
1846
// the database, then ErrEdgeNotFound is returned.
1847
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
1✔
1848
        var chanID uint64
1✔
1849
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2✔
1850
                var err error
1✔
1851
                chanID, err = getChanID(tx, chanPoint)
1✔
1852
                return err
1✔
1853
        }, func() {
2✔
1854
                chanID = 0
1✔
1855
        }); err != nil {
1✔
1856
                return 0, err
×
1857
        }
×
1858

1859
        return chanID, nil
1✔
1860
}
1861

1862
// getChanID returns the assigned channel ID for a given channel point.
1863
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
1✔
1864
        var b bytes.Buffer
1✔
1865
        if err := WriteOutpoint(&b, chanPoint); err != nil {
1✔
1866
                return 0, err
×
1867
        }
×
1868

1869
        edges := tx.ReadBucket(edgeBucket)
1✔
1870
        if edges == nil {
1✔
1871
                return 0, ErrGraphNoEdgesFound
×
1872
        }
×
1873
        chanIndex := edges.NestedReadBucket(channelPointBucket)
1✔
1874
        if chanIndex == nil {
1✔
1875
                return 0, ErrGraphNoEdgesFound
×
1876
        }
×
1877

1878
        chanIDBytes := chanIndex.Get(b.Bytes())
1✔
1879
        if chanIDBytes == nil {
1✔
1880
                return 0, ErrEdgeNotFound
×
1881
        }
×
1882

1883
        chanID := byteOrder.Uint64(chanIDBytes)
1✔
1884

1✔
1885
        return chanID, nil
1✔
1886
}
1887

1888
// TODO(roasbeef): allow updates to use Batch?
1889

1890
// HighestChanID returns the "highest" known channel ID in the channel graph.
1891
// This represents the "newest" channel from the PoV of the chain. This method
1892
// can be used by peers to quickly determine if they're graphs are in sync.
1893
func (c *ChannelGraph) HighestChanID() (uint64, error) {
3✔
1894
        var cid uint64
3✔
1895

3✔
1896
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1897
                edges := tx.ReadBucket(edgeBucket)
3✔
1898
                if edges == nil {
3✔
1899
                        return ErrGraphNoEdgesFound
×
1900
                }
×
1901
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1902
                if edgeIndex == nil {
3✔
1903
                        return ErrGraphNoEdgesFound
×
1904
                }
×
1905

1906
                // In order to find the highest chan ID, we'll fetch a cursor
1907
                // and use that to seek to the "end" of our known rage.
1908
                cidCursor := edgeIndex.ReadCursor()
3✔
1909

3✔
1910
                lastChanID, _ := cidCursor.Last()
3✔
1911

3✔
1912
                // If there's no key, then this means that we don't actually
3✔
1913
                // know of any channels, so we'll return a predicable error.
3✔
1914
                if lastChanID == nil {
4✔
1915
                        return ErrGraphNoEdgesFound
1✔
1916
                }
1✔
1917

1918
                // Otherwise, we'll de serialize the channel ID and return it
1919
                // to the caller.
1920
                cid = byteOrder.Uint64(lastChanID)
2✔
1921
                return nil
2✔
1922
        }, func() {
3✔
1923
                cid = 0
3✔
1924
        })
3✔
1925
        if err != nil && err != ErrGraphNoEdgesFound {
3✔
1926
                return 0, err
×
1927
        }
×
1928

1929
        return cid, nil
3✔
1930
}
1931

1932
// ChannelEdge represents the complete set of information for a channel edge in
1933
// the known channel graph. This struct couples the core information of the
1934
// edge as well as each of the known advertised edge policies.
1935
type ChannelEdge struct {
1936
        // Info contains all the static information describing the channel.
1937
        Info *models.ChannelEdgeInfo
1938

1939
        // Policy1 points to the "first" edge policy of the channel containing
1940
        // the dynamic information required to properly route through the edge.
1941
        Policy1 *models.ChannelEdgePolicy
1942

1943
        // Policy2 points to the "second" edge policy of the channel containing
1944
        // the dynamic information required to properly route through the edge.
1945
        Policy2 *models.ChannelEdgePolicy
1946

1947
        // Node1 is "node 1" in the channel. This is the node that would have
1948
        // produced Policy1 if it exists.
1949
        Node1 *models.LightningNode
1950

1951
        // Node2 is "node 2" in the channel. This is the node that would have
1952
        // produced Policy2 if it exists.
1953
        Node2 *models.LightningNode
1954
}
1955

1956
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1957
// one edge that has an update timestamp within the specified horizon.
1958
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
1959
        endTime time.Time) ([]ChannelEdge, error) {
133✔
1960

133✔
1961
        // To ensure we don't return duplicate ChannelEdges, we'll use an
133✔
1962
        // additional map to keep track of the edges already seen to prevent
133✔
1963
        // re-adding it.
133✔
1964
        var edgesSeen map[uint64]struct{}
133✔
1965
        var edgesToCache map[uint64]ChannelEdge
133✔
1966
        var edgesInHorizon []ChannelEdge
133✔
1967

133✔
1968
        c.cacheMu.Lock()
133✔
1969
        defer c.cacheMu.Unlock()
133✔
1970

133✔
1971
        var hits int
133✔
1972
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
266✔
1973
                edges := tx.ReadBucket(edgeBucket)
133✔
1974
                if edges == nil {
133✔
1975
                        return ErrGraphNoEdgesFound
×
1976
                }
×
1977
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
133✔
1978
                if edgeIndex == nil {
133✔
1979
                        return ErrGraphNoEdgesFound
×
1980
                }
×
1981
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
133✔
1982
                if edgeUpdateIndex == nil {
133✔
1983
                        return ErrGraphNoEdgesFound
×
1984
                }
×
1985

1986
                nodes := tx.ReadBucket(nodeBucket)
133✔
1987
                if nodes == nil {
133✔
1988
                        return ErrGraphNodesNotFound
×
1989
                }
×
1990

1991
                // We'll now obtain a cursor to perform a range query within
1992
                // the index to find all channels within the horizon.
1993
                updateCursor := edgeUpdateIndex.ReadCursor()
133✔
1994

133✔
1995
                var startTimeBytes, endTimeBytes [8 + 8]byte
133✔
1996
                byteOrder.PutUint64(
133✔
1997
                        startTimeBytes[:8], uint64(startTime.Unix()),
133✔
1998
                )
133✔
1999
                byteOrder.PutUint64(
133✔
2000
                        endTimeBytes[:8], uint64(endTime.Unix()),
133✔
2001
                )
133✔
2002

133✔
2003
                // With our start and end times constructed, we'll step through
133✔
2004
                // the index collecting the info and policy of each update of
133✔
2005
                // each channel that has a last update within the time range.
133✔
2006
                //
133✔
2007
                //nolint:ll
133✔
2008
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
133✔
2009
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
179✔
2010

46✔
2011
                        // We have a new eligible entry, so we'll slice of the
46✔
2012
                        // chan ID so we can query it in the DB.
46✔
2013
                        chanID := indexKey[8:]
46✔
2014

46✔
2015
                        // If we've already retrieved the info and policies for
46✔
2016
                        // this edge, then we can skip it as we don't need to do
46✔
2017
                        // so again.
46✔
2018
                        chanIDInt := byteOrder.Uint64(chanID)
46✔
2019
                        if _, ok := edgesSeen[chanIDInt]; ok {
65✔
2020
                                continue
19✔
2021
                        }
2022

2023
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
36✔
2024
                                hits++
9✔
2025
                                edgesSeen[chanIDInt] = struct{}{}
9✔
2026
                                edgesInHorizon = append(edgesInHorizon, channel)
9✔
2027
                                continue
9✔
2028
                        }
2029

2030
                        // First, we'll fetch the static edge information.
2031
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
18✔
2032
                        if err != nil {
18✔
2033
                                chanID := byteOrder.Uint64(chanID)
×
2034
                                return fmt.Errorf("unable to fetch info for "+
×
2035
                                        "edge with chan_id=%v: %v", chanID, err)
×
2036
                        }
×
2037

2038
                        // With the static information obtained, we'll now
2039
                        // fetch the dynamic policy info.
2040
                        edge1, edge2, err := fetchChanEdgePolicies(
18✔
2041
                                edgeIndex, edges, chanID,
18✔
2042
                        )
18✔
2043
                        if err != nil {
18✔
2044
                                chanID := byteOrder.Uint64(chanID)
×
2045
                                return fmt.Errorf("unable to fetch policies "+
×
2046
                                        "for edge with chan_id=%v: %v", chanID,
×
2047
                                        err)
×
2048
                        }
×
2049

2050
                        node1, err := fetchLightningNode(
18✔
2051
                                nodes, edgeInfo.NodeKey1Bytes[:],
18✔
2052
                        )
18✔
2053
                        if err != nil {
18✔
2054
                                return err
×
2055
                        }
×
2056

2057
                        node2, err := fetchLightningNode(
18✔
2058
                                nodes, edgeInfo.NodeKey2Bytes[:],
18✔
2059
                        )
18✔
2060
                        if err != nil {
18✔
2061
                                return err
×
2062
                        }
×
2063

2064
                        // Finally, we'll collate this edge with the rest of
2065
                        // edges to be returned.
2066
                        edgesSeen[chanIDInt] = struct{}{}
18✔
2067
                        channel := ChannelEdge{
18✔
2068
                                Info:    &edgeInfo,
18✔
2069
                                Policy1: edge1,
18✔
2070
                                Policy2: edge2,
18✔
2071
                                Node1:   &node1,
18✔
2072
                                Node2:   &node2,
18✔
2073
                        }
18✔
2074
                        edgesInHorizon = append(edgesInHorizon, channel)
18✔
2075
                        edgesToCache[chanIDInt] = channel
18✔
2076
                }
2077

2078
                return nil
133✔
2079
        }, func() {
133✔
2080
                edgesSeen = make(map[uint64]struct{})
133✔
2081
                edgesToCache = make(map[uint64]ChannelEdge)
133✔
2082
                edgesInHorizon = nil
133✔
2083
        })
133✔
2084
        switch {
133✔
2085
        case err == ErrGraphNoEdgesFound:
×
2086
                fallthrough
×
2087
        case err == ErrGraphNodesNotFound:
×
2088
                break
×
2089

2090
        case err != nil:
×
2091
                return nil, err
×
2092
        }
2093

2094
        // Insert any edges loaded from disk into the cache.
2095
        for chanid, channel := range edgesToCache {
151✔
2096
                c.chanCache.insert(chanid, channel)
18✔
2097
        }
18✔
2098

2099
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
133✔
2100
                float64(hits)/float64(len(edgesInHorizon)), hits,
133✔
2101
                len(edgesInHorizon))
133✔
2102

133✔
2103
        return edgesInHorizon, nil
133✔
2104
}
2105

2106
// NodeUpdatesInHorizon returns all the known lightning node which have an
2107
// update timestamp within the passed range. This method can be used by two
2108
// nodes to quickly determine if they have the same set of up to date node
2109
// announcements.
2110
func (c *ChannelGraph) NodeUpdatesInHorizon(startTime,
2111
        endTime time.Time) ([]models.LightningNode, error) {
8✔
2112

8✔
2113
        var nodesInHorizon []models.LightningNode
8✔
2114

8✔
2115
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
16✔
2116
                nodes := tx.ReadBucket(nodeBucket)
8✔
2117
                if nodes == nil {
8✔
2118
                        return ErrGraphNodesNotFound
×
2119
                }
×
2120

2121
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
8✔
2122
                if nodeUpdateIndex == nil {
8✔
2123
                        return ErrGraphNodesNotFound
×
2124
                }
×
2125

2126
                // We'll now obtain a cursor to perform a range query within
2127
                // the index to find all node announcements within the horizon.
2128
                updateCursor := nodeUpdateIndex.ReadCursor()
8✔
2129

8✔
2130
                var startTimeBytes, endTimeBytes [8 + 33]byte
8✔
2131
                byteOrder.PutUint64(
8✔
2132
                        startTimeBytes[:8], uint64(startTime.Unix()),
8✔
2133
                )
8✔
2134
                byteOrder.PutUint64(
8✔
2135
                        endTimeBytes[:8], uint64(endTime.Unix()),
8✔
2136
                )
8✔
2137

8✔
2138
                // With our start and end times constructed, we'll step through
8✔
2139
                // the index collecting info for each node within the time
8✔
2140
                // range.
8✔
2141
                //
8✔
2142
                //nolint:ll
8✔
2143
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
8✔
2144
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
37✔
2145

29✔
2146
                        nodePub := indexKey[8:]
29✔
2147
                        node, err := fetchLightningNode(nodes, nodePub)
29✔
2148
                        if err != nil {
29✔
2149
                                return err
×
2150
                        }
×
2151

2152
                        nodesInHorizon = append(nodesInHorizon, node)
29✔
2153
                }
2154

2155
                return nil
8✔
2156
        }, func() {
8✔
2157
                nodesInHorizon = nil
8✔
2158
        })
8✔
2159
        switch {
8✔
2160
        case err == ErrGraphNoEdgesFound:
×
2161
                fallthrough
×
2162
        case err == ErrGraphNodesNotFound:
×
2163
                break
×
2164

2165
        case err != nil:
×
2166
                return nil, err
×
2167
        }
2168

2169
        return nodesInHorizon, nil
8✔
2170
}
2171

2172
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2173
// ID's that we don't know and are not known zombies of the passed set. In other
2174
// words, we perform a set difference of our set of chan ID's and the ones
2175
// passed in. This method can be used by callers to determine the set of
2176
// channels another peer knows of that we don't.
2177
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
2178
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
122✔
2179

122✔
2180
        var newChanIDs []uint64
122✔
2181

122✔
2182
        c.cacheMu.Lock()
122✔
2183
        defer c.cacheMu.Unlock()
122✔
2184

122✔
2185
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
244✔
2186
                edges := tx.ReadBucket(edgeBucket)
122✔
2187
                if edges == nil {
122✔
2188
                        return ErrGraphNoEdgesFound
×
2189
                }
×
2190
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
122✔
2191
                if edgeIndex == nil {
122✔
2192
                        return ErrGraphNoEdgesFound
×
2193
                }
×
2194

2195
                // Fetch the zombie index, it may not exist if no edges have
2196
                // ever been marked as zombies. If the index has been
2197
                // initialized, we will use it later to skip known zombie edges.
2198
                zombieIndex := edges.NestedReadBucket(zombieBucket)
122✔
2199

122✔
2200
                // We'll run through the set of chanIDs and collate only the
122✔
2201
                // set of channel that are unable to be found within our db.
122✔
2202
                var cidBytes [8]byte
122✔
2203
                for _, info := range chansInfo {
241✔
2204
                        scid := info.ShortChannelID.ToUint64()
119✔
2205
                        byteOrder.PutUint64(cidBytes[:], scid)
119✔
2206

119✔
2207
                        // If the edge is already known, skip it.
119✔
2208
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
136✔
2209
                                continue
17✔
2210
                        }
2211

2212
                        // If the edge is a known zombie, skip it.
2213
                        if zombieIndex != nil {
204✔
2214
                                isZombie, _, _ := isZombieEdge(
102✔
2215
                                        zombieIndex, scid,
102✔
2216
                                )
102✔
2217

102✔
2218
                                // TODO(ziggie): Make sure that for the strict
102✔
2219
                                // pruning case we compare the pubkeys and
102✔
2220
                                // whether the right timestamp is not older than
102✔
2221
                                // the `ChannelPruneExpiry`.
102✔
2222
                                //
102✔
2223
                                // NOTE: The timestamp data has no verification
102✔
2224
                                // attached to it in the `ReplyChannelRange` msg
102✔
2225
                                // so we are trusting this data at this point.
102✔
2226
                                // However it is not critical because we are
102✔
2227
                                // just removing the channel from the db when
102✔
2228
                                // the timestamps are more recent. During the
102✔
2229
                                // querying of the gossip msg verification
102✔
2230
                                // happens as usual.
102✔
2231
                                // However we should start punishing peers when
102✔
2232
                                // they don't provide us honest data ?
102✔
2233
                                isStillZombie := isZombieChan(
102✔
2234
                                        info.Node1UpdateTimestamp,
102✔
2235
                                        info.Node2UpdateTimestamp,
102✔
2236
                                )
102✔
2237

102✔
2238
                                switch {
102✔
2239
                                // If the edge is a known zombie and if we
2240
                                // would still consider it a zombie given the
2241
                                // latest update timestamps, then we skip this
2242
                                // channel.
2243
                                case isZombie && isStillZombie:
30✔
2244
                                        continue
30✔
2245

2246
                                // Otherwise, if we have marked it as a zombie
2247
                                // but the latest update timestamps could bring
2248
                                // it back from the dead, then we mark it alive,
2249
                                // and we let it be added to the set of IDs to
2250
                                // query our peer for.
2251
                                case isZombie && !isStillZombie:
20✔
2252
                                        err := c.markEdgeLiveUnsafe(tx, scid)
20✔
2253
                                        if err != nil {
20✔
2254
                                                return err
×
2255
                                        }
×
2256
                                }
2257
                        }
2258

2259
                        newChanIDs = append(newChanIDs, scid)
72✔
2260
                }
2261

2262
                return nil
122✔
2263
        }, func() {
122✔
2264
                newChanIDs = nil
122✔
2265
        })
122✔
2266
        switch {
122✔
2267
        // If we don't know of any edges yet, then we'll return the entire set
2268
        // of chan IDs specified.
2269
        case err == ErrGraphNoEdgesFound:
×
2270
                ogChanIDs := make([]uint64, len(chansInfo))
×
2271
                for i, info := range chansInfo {
×
2272
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2273
                }
×
2274

2275
                return ogChanIDs, nil
×
2276

2277
        case err != nil:
×
2278
                return nil, err
×
2279
        }
2280

2281
        return newChanIDs, nil
122✔
2282
}
2283

2284
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2285
// latest received channel updates for the channel.
2286
type ChannelUpdateInfo struct {
2287
        // ShortChannelID is the SCID identifier of the channel.
2288
        ShortChannelID lnwire.ShortChannelID
2289

2290
        // Node1UpdateTimestamp is the timestamp of the latest received update
2291
        // from the node 1 channel peer. This will be set to zero time if no
2292
        // update has yet been received from this node.
2293
        Node1UpdateTimestamp time.Time
2294

2295
        // Node2UpdateTimestamp is the timestamp of the latest received update
2296
        // from the node 2 channel peer. This will be set to zero time if no
2297
        // update has yet been received from this node.
2298
        Node2UpdateTimestamp time.Time
2299
}
2300

2301
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2302
// timestamps with zero seconds unix timestamp which equals
2303
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2304
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2305
        node2Timestamp time.Time) ChannelUpdateInfo {
218✔
2306

218✔
2307
        chanInfo := ChannelUpdateInfo{
218✔
2308
                ShortChannelID:       scid,
218✔
2309
                Node1UpdateTimestamp: node1Timestamp,
218✔
2310
                Node2UpdateTimestamp: node2Timestamp,
218✔
2311
        }
218✔
2312

218✔
2313
        if node1Timestamp.IsZero() {
426✔
2314
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
208✔
2315
        }
208✔
2316

2317
        if node2Timestamp.IsZero() {
426✔
2318
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
208✔
2319
        }
208✔
2320

2321
        return chanInfo
218✔
2322
}
2323

2324
// BlockChannelRange represents a range of channels for a given block height.
2325
type BlockChannelRange struct {
2326
        // Height is the height of the block all of the channels below were
2327
        // included in.
2328
        Height uint32
2329

2330
        // Channels is the list of channels identified by their short ID
2331
        // representation known to us that were included in the block height
2332
        // above. The list may include channel update timestamp information if
2333
        // requested.
2334
        Channels []ChannelUpdateInfo
2335
}
2336

2337
// FilterChannelRange returns the channel ID's of all known channels which were
2338
// mined in a block height within the passed range. The channel IDs are grouped
2339
// by their common block height. This method can be used to quickly share with a
2340
// peer the set of channels we know of within a particular range to catch them
2341
// up after a period of time offline. If withTimestamps is true then the
2342
// timestamp info of the latest received channel update messages of the channel
2343
// will be included in the response.
2344
func (c *ChannelGraph) FilterChannelRange(startHeight,
2345
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
11✔
2346

11✔
2347
        startChanID := &lnwire.ShortChannelID{
11✔
2348
                BlockHeight: startHeight,
11✔
2349
        }
11✔
2350

11✔
2351
        endChanID := lnwire.ShortChannelID{
11✔
2352
                BlockHeight: endHeight,
11✔
2353
                TxIndex:     math.MaxUint32 & 0x00ffffff,
11✔
2354
                TxPosition:  math.MaxUint16,
11✔
2355
        }
11✔
2356

11✔
2357
        // As we need to perform a range scan, we'll convert the starting and
11✔
2358
        // ending height to their corresponding values when encoded using short
11✔
2359
        // channel ID's.
11✔
2360
        var chanIDStart, chanIDEnd [8]byte
11✔
2361
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
11✔
2362
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
11✔
2363

11✔
2364
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
11✔
2365
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
2366
                edges := tx.ReadBucket(edgeBucket)
11✔
2367
                if edges == nil {
11✔
2368
                        return ErrGraphNoEdgesFound
×
2369
                }
×
2370
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
2371
                if edgeIndex == nil {
11✔
2372
                        return ErrGraphNoEdgesFound
×
2373
                }
×
2374

2375
                cursor := edgeIndex.ReadCursor()
11✔
2376

11✔
2377
                // We'll now iterate through the database, and find each
11✔
2378
                // channel ID that resides within the specified range.
11✔
2379
                //
11✔
2380
                //nolint:ll
11✔
2381
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
11✔
2382
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
55✔
2383
                        // Don't send alias SCIDs during gossip sync.
44✔
2384
                        edgeReader := bytes.NewReader(v)
44✔
2385
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
44✔
2386
                        if err != nil {
44✔
2387
                                return err
×
2388
                        }
×
2389

2390
                        if edgeInfo.AuthProof == nil {
44✔
2391
                                continue
×
2392
                        }
2393

2394
                        // This channel ID rests within the target range, so
2395
                        // we'll add it to our returned set.
2396
                        rawCid := byteOrder.Uint64(k)
44✔
2397
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
44✔
2398

44✔
2399
                        chanInfo := NewChannelUpdateInfo(
44✔
2400
                                cid, time.Time{}, time.Time{},
44✔
2401
                        )
44✔
2402

44✔
2403
                        if !withTimestamps {
66✔
2404
                                channelsPerBlock[cid.BlockHeight] = append(
22✔
2405
                                        channelsPerBlock[cid.BlockHeight],
22✔
2406
                                        chanInfo,
22✔
2407
                                )
22✔
2408

22✔
2409
                                continue
22✔
2410
                        }
2411

2412
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
22✔
2413

22✔
2414
                        rawPolicy := edges.Get(node1Key)
22✔
2415
                        if len(rawPolicy) != 0 {
28✔
2416
                                r := bytes.NewReader(rawPolicy)
6✔
2417

6✔
2418
                                edge, err := deserializeChanEdgePolicyRaw(r)
6✔
2419
                                if err != nil && !errors.Is(
6✔
2420
                                        err, ErrEdgePolicyOptionalFieldNotFound,
6✔
2421
                                ) {
6✔
2422

×
2423
                                        return err
×
2424
                                }
×
2425

2426
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
6✔
2427
                        }
2428

2429
                        rawPolicy = edges.Get(node2Key)
22✔
2430
                        if len(rawPolicy) != 0 {
33✔
2431
                                r := bytes.NewReader(rawPolicy)
11✔
2432

11✔
2433
                                edge, err := deserializeChanEdgePolicyRaw(r)
11✔
2434
                                if err != nil && !errors.Is(
11✔
2435
                                        err, ErrEdgePolicyOptionalFieldNotFound,
11✔
2436
                                ) {
11✔
2437

×
2438
                                        return err
×
2439
                                }
×
2440

2441
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
11✔
2442
                        }
2443

2444
                        channelsPerBlock[cid.BlockHeight] = append(
22✔
2445
                                channelsPerBlock[cid.BlockHeight], chanInfo,
22✔
2446
                        )
22✔
2447
                }
2448

2449
                return nil
11✔
2450
        }, func() {
11✔
2451
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
11✔
2452
        })
11✔
2453

2454
        switch {
11✔
2455
        // If we don't know of any channels yet, then there's nothing to
2456
        // filter, so we'll return an empty slice.
2457
        case err == ErrGraphNoEdgesFound || len(channelsPerBlock) == 0:
3✔
2458
                return nil, nil
3✔
2459

2460
        case err != nil:
×
2461
                return nil, err
×
2462
        }
2463

2464
        // Return the channel ranges in ascending block height order.
2465
        blocks := make([]uint32, 0, len(channelsPerBlock))
8✔
2466
        for block := range channelsPerBlock {
30✔
2467
                blocks = append(blocks, block)
22✔
2468
        }
22✔
2469
        sort.Slice(blocks, func(i, j int) bool {
37✔
2470
                return blocks[i] < blocks[j]
29✔
2471
        })
29✔
2472

2473
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
8✔
2474
        for _, block := range blocks {
30✔
2475
                channelRanges = append(channelRanges, BlockChannelRange{
22✔
2476
                        Height:   block,
22✔
2477
                        Channels: channelsPerBlock[block],
22✔
2478
                })
22✔
2479
        }
22✔
2480

2481
        return channelRanges, nil
8✔
2482
}
2483

2484
// FetchChanInfos returns the set of channel edges that correspond to the passed
2485
// channel ID's. If an edge is the query is unknown to the database, it will
2486
// skipped and the result will contain only those edges that exist at the time
2487
// of the query. This can be used to respond to peer queries that are seeking to
2488
// fill in gaps in their view of the channel graph.
2489
func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
3✔
2490
        return c.fetchChanInfos(nil, chanIDs)
3✔
2491
}
3✔
2492

2493
// fetchChanInfos returns the set of channel edges that correspond to the passed
2494
// channel ID's. If an edge is the query is unknown to the database, it will
2495
// skipped and the result will contain only those edges that exist at the time
2496
// of the query. This can be used to respond to peer queries that are seeking to
2497
// fill in gaps in their view of the channel graph.
2498
//
2499
// NOTE: An optional transaction may be provided. If none is provided, then a
2500
// new one will be created.
2501
func (c *ChannelGraph) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2502
        []ChannelEdge, error) {
24✔
2503
        // TODO(roasbeef): sort cids?
24✔
2504

24✔
2505
        var (
24✔
2506
                chanEdges []ChannelEdge
24✔
2507
                cidBytes  [8]byte
24✔
2508
        )
24✔
2509

24✔
2510
        fetchChanInfos := func(tx kvdb.RTx) error {
48✔
2511
                edges := tx.ReadBucket(edgeBucket)
24✔
2512
                if edges == nil {
24✔
2513
                        return ErrGraphNoEdgesFound
×
2514
                }
×
2515
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
24✔
2516
                if edgeIndex == nil {
24✔
2517
                        return ErrGraphNoEdgesFound
×
2518
                }
×
2519
                nodes := tx.ReadBucket(nodeBucket)
24✔
2520
                if nodes == nil {
24✔
2521
                        return ErrGraphNotFound
×
2522
                }
×
2523

2524
                for _, cid := range chanIDs {
55✔
2525
                        byteOrder.PutUint64(cidBytes[:], cid)
31✔
2526

31✔
2527
                        // First, we'll fetch the static edge information. If
31✔
2528
                        // the edge is unknown, we will skip the edge and
31✔
2529
                        // continue gathering all known edges.
31✔
2530
                        edgeInfo, err := fetchChanEdgeInfo(
31✔
2531
                                edgeIndex, cidBytes[:],
31✔
2532
                        )
31✔
2533
                        switch {
31✔
2534
                        case errors.Is(err, ErrEdgeNotFound):
23✔
2535
                                continue
23✔
2536
                        case err != nil:
×
2537
                                return err
×
2538
                        }
2539

2540
                        // With the static information obtained, we'll now
2541
                        // fetch the dynamic policy info.
2542
                        edge1, edge2, err := fetchChanEdgePolicies(
8✔
2543
                                edgeIndex, edges, cidBytes[:],
8✔
2544
                        )
8✔
2545
                        if err != nil {
8✔
2546
                                return err
×
2547
                        }
×
2548

2549
                        node1, err := fetchLightningNode(
8✔
2550
                                nodes, edgeInfo.NodeKey1Bytes[:],
8✔
2551
                        )
8✔
2552
                        if err != nil {
8✔
2553
                                return err
×
2554
                        }
×
2555

2556
                        node2, err := fetchLightningNode(
8✔
2557
                                nodes, edgeInfo.NodeKey2Bytes[:],
8✔
2558
                        )
8✔
2559
                        if err != nil {
8✔
2560
                                return err
×
2561
                        }
×
2562

2563
                        chanEdges = append(chanEdges, ChannelEdge{
8✔
2564
                                Info:    &edgeInfo,
8✔
2565
                                Policy1: edge1,
8✔
2566
                                Policy2: edge2,
8✔
2567
                                Node1:   &node1,
8✔
2568
                                Node2:   &node2,
8✔
2569
                        })
8✔
2570
                }
2571
                return nil
24✔
2572
        }
2573

2574
        if tx == nil {
28✔
2575
                err := kvdb.View(c.db, fetchChanInfos, func() {
8✔
2576
                        chanEdges = nil
4✔
2577
                })
4✔
2578
                if err != nil {
4✔
2579
                        return nil, err
×
2580
                }
×
2581

2582
                return chanEdges, nil
4✔
2583
        }
2584

2585
        err := fetchChanInfos(tx)
20✔
2586
        if err != nil {
20✔
2587
                return nil, err
×
2588
        }
×
2589

2590
        return chanEdges, nil
20✔
2591
}
2592

2593
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2594
        edge1, edge2 *models.ChannelEdgePolicy) error {
141✔
2595

141✔
2596
        // First, we'll fetch the edge update index bucket which currently
141✔
2597
        // stores an entry for the channel we're about to delete.
141✔
2598
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
141✔
2599
        if updateIndex == nil {
141✔
2600
                // No edges in bucket, return early.
×
2601
                return nil
×
2602
        }
×
2603

2604
        // Now that we have the bucket, we'll attempt to construct a template
2605
        // for the index key: updateTime || chanid.
2606
        var indexKey [8 + 8]byte
141✔
2607
        byteOrder.PutUint64(indexKey[8:], chanID)
141✔
2608

141✔
2609
        // With the template constructed, we'll attempt to delete an entry that
141✔
2610
        // would have been created by both edges: we'll alternate the update
141✔
2611
        // times, as one may had overridden the other.
141✔
2612
        if edge1 != nil {
151✔
2613
                byteOrder.PutUint64(
10✔
2614
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
10✔
2615
                )
10✔
2616
                if err := updateIndex.Delete(indexKey[:]); err != nil {
10✔
2617
                        return err
×
2618
                }
×
2619
        }
2620

2621
        // We'll also attempt to delete the entry that may have been created by
2622
        // the second edge.
2623
        if edge2 != nil {
153✔
2624
                byteOrder.PutUint64(
12✔
2625
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
12✔
2626
                )
12✔
2627
                if err := updateIndex.Delete(indexKey[:]); err != nil {
12✔
2628
                        return err
×
2629
                }
×
2630
        }
2631

2632
        return nil
141✔
2633
}
2634

2635
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2636
// cache. It then goes on to delete any policy info and edge info for this
2637
// channel from the DB and finally, if isZombie is true, it will add an entry
2638
// for this channel in the zombie index.
2639
//
2640
// NOTE: this method MUST only be called if the cacheMu has already been
2641
// acquired.
2642
func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2643
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2644
        strictZombie bool) error {
199✔
2645

199✔
2646
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
199✔
2647
        if err != nil {
257✔
2648
                return err
58✔
2649
        }
58✔
2650

2651
        if c.graphCache != nil {
282✔
2652
                c.graphCache.RemoveChannel(
141✔
2653
                        edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
141✔
2654
                        edgeInfo.ChannelID,
141✔
2655
                )
141✔
2656
        }
141✔
2657

2658
        // We'll also remove the entry in the edge update index bucket before
2659
        // we delete the edges themselves so we can access their last update
2660
        // times.
2661
        cid := byteOrder.Uint64(chanID)
141✔
2662
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
141✔
2663
        if err != nil {
141✔
2664
                return err
×
2665
        }
×
2666
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
141✔
2667
        if err != nil {
141✔
2668
                return err
×
2669
        }
×
2670

2671
        // The edge key is of the format pubKey || chanID. First we construct
2672
        // the latter half, populating the channel ID.
2673
        var edgeKey [33 + 8]byte
141✔
2674
        copy(edgeKey[33:], chanID)
141✔
2675

141✔
2676
        // With the latter half constructed, copy over the first public key to
141✔
2677
        // delete the edge in this direction, then the second to delete the
141✔
2678
        // edge in the opposite direction.
141✔
2679
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
141✔
2680
        if edges.Get(edgeKey[:]) != nil {
282✔
2681
                if err := edges.Delete(edgeKey[:]); err != nil {
141✔
2682
                        return err
×
2683
                }
×
2684
        }
2685
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
141✔
2686
        if edges.Get(edgeKey[:]) != nil {
282✔
2687
                if err := edges.Delete(edgeKey[:]); err != nil {
141✔
2688
                        return err
×
2689
                }
×
2690
        }
2691

2692
        // As part of deleting the edge we also remove all disabled entries
2693
        // from the edgePolicyDisabledIndex bucket. We do that for both
2694
        // directions.
2695
        updateEdgePolicyDisabledIndex(edges, cid, false, false)
141✔
2696
        updateEdgePolicyDisabledIndex(edges, cid, true, false)
141✔
2697

141✔
2698
        // With the edge data deleted, we can purge the information from the two
141✔
2699
        // edge indexes.
141✔
2700
        if err := edgeIndex.Delete(chanID); err != nil {
141✔
2701
                return err
×
2702
        }
×
2703
        var b bytes.Buffer
141✔
2704
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
141✔
2705
                return err
×
2706
        }
×
2707
        if err := chanIndex.Delete(b.Bytes()); err != nil {
141✔
2708
                return err
×
2709
        }
×
2710

2711
        // Finally, we'll mark the edge as a zombie within our index if it's
2712
        // being removed due to the channel becoming a zombie. We do this to
2713
        // ensure we don't store unnecessary data for spent channels.
2714
        if !isZombie {
257✔
2715
                return nil
116✔
2716
        }
116✔
2717

2718
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
25✔
2719
        if strictZombie {
30✔
2720
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
5✔
2721
        }
5✔
2722

2723
        return markEdgeZombie(
25✔
2724
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
25✔
2725
        )
25✔
2726
}
2727

2728
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2729
// particular pair of channel policies. The return values are one of:
2730
//  1. (pubkey1, pubkey2)
2731
//  2. (pubkey1, blank)
2732
//  3. (blank, pubkey2)
2733
//
2734
// A blank pubkey means that corresponding node will be unable to resurrect a
2735
// channel on its own. For example, node1 may continue to publish recent
2736
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2737
// we don't want another fresh update from node1 to resurrect, as the edge can
2738
// only become live once node2 finally sends something recent.
2739
//
2740
// In the case where we have neither update, we allow either party to resurrect
2741
// the channel. If the channel were to be marked zombie again, it would be
2742
// marked with the correct lagging channel since we received an update from only
2743
// one side.
2744
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
2745
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
5✔
2746

5✔
2747
        switch {
5✔
2748
        // If we don't have either edge policy, we'll return both pubkeys so
2749
        // that the channel can be resurrected by either party.
2750
        case e1 == nil && e2 == nil:
2✔
2751
                return info.NodeKey1Bytes, info.NodeKey2Bytes
2✔
2752

2753
        // If we're missing edge1, or if both edges are present but edge1 is
2754
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2755
        // means that only an update from edge1 will be able to resurrect the
2756
        // channel.
2757
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
1✔
2758
                return info.NodeKey1Bytes, [33]byte{}
1✔
2759

2760
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2761
        // return a blank pubkey for edge1. In this case, only an update from
2762
        // edge2 can resurect the channel.
2763
        default:
2✔
2764
                return [33]byte{}, info.NodeKey2Bytes
2✔
2765
        }
2766
}
2767

2768
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2769
// within the database for the referenced channel. The `flags` attribute within
2770
// the ChannelEdgePolicy determines which of the directed edges are being
2771
// updated. If the flag is 1, then the first node's information is being
2772
// updated, otherwise it's the second node's information. The node ordering is
2773
// determined by the lexicographical ordering of the identity public keys of the
2774
// nodes on either side of the channel.
2775
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2776
        op ...batch.SchedulerOption) error {
2,663✔
2777

2,663✔
2778
        var (
2,663✔
2779
                isUpdate1    bool
2,663✔
2780
                edgeNotFound bool
2,663✔
2781
        )
2,663✔
2782

2,663✔
2783
        r := &batch.Request{
2,663✔
2784
                Reset: func() {
5,326✔
2785
                        isUpdate1 = false
2,663✔
2786
                        edgeNotFound = false
2,663✔
2787
                },
2,663✔
2788
                Update: func(tx kvdb.RwTx) error {
2,663✔
2789
                        var err error
2,663✔
2790
                        isUpdate1, err = updateEdgePolicy(
2,663✔
2791
                                tx, edge, c.graphCache,
2,663✔
2792
                        )
2,663✔
2793

2,663✔
2794
                        // Silence ErrEdgeNotFound so that the batch can
2,663✔
2795
                        // succeed, but propagate the error via local state.
2,663✔
2796
                        if errors.Is(err, ErrEdgeNotFound) {
2,666✔
2797
                                edgeNotFound = true
3✔
2798
                                return nil
3✔
2799
                        }
3✔
2800

2801
                        return err
2,660✔
2802
                },
2803
                OnCommit: func(err error) error {
2,663✔
2804
                        switch {
2,663✔
2805
                        case err != nil:
×
2806
                                return err
×
2807
                        case edgeNotFound:
3✔
2808
                                return ErrEdgeNotFound
3✔
2809
                        default:
2,660✔
2810
                                c.updateEdgeCache(edge, isUpdate1)
2,660✔
2811
                                return nil
2,660✔
2812
                        }
2813
                },
2814
        }
2815

2816
        for _, f := range op {
2,663✔
2817
                f(r)
×
2818
        }
×
2819

2820
        return c.chanScheduler.Execute(r)
2,663✔
2821
}
2822

2823
func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy,
2824
        isUpdate1 bool) {
2,660✔
2825

2,660✔
2826
        // If an entry for this channel is found in reject cache, we'll modify
2,660✔
2827
        // the entry with the updated timestamp for the direction that was just
2,660✔
2828
        // written. If the edge doesn't exist, we'll load the cache entry lazily
2,660✔
2829
        // during the next query for this edge.
2,660✔
2830
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2,665✔
2831
                if isUpdate1 {
8✔
2832
                        entry.upd1Time = e.LastUpdate.Unix()
3✔
2833
                } else {
5✔
2834
                        entry.upd2Time = e.LastUpdate.Unix()
2✔
2835
                }
2✔
2836
                c.rejectCache.insert(e.ChannelID, entry)
5✔
2837
        }
2838

2839
        // If an entry for this channel is found in channel cache, we'll modify
2840
        // the entry with the updated policy for the direction that was just
2841
        // written. If the edge doesn't exist, we'll defer loading the info and
2842
        // policies and lazily read from disk during the next query.
2843
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
2,660✔
2844
                if isUpdate1 {
×
2845
                        channel.Policy1 = e
×
2846
                } else {
×
2847
                        channel.Policy2 = e
×
2848
                }
×
2849
                c.chanCache.insert(e.ChannelID, channel)
×
2850
        }
2851
}
2852

2853
// updateEdgePolicy attempts to update an edge's policy within the relevant
2854
// buckets using an existing database transaction. The returned boolean will be
2855
// true if the updated policy belongs to node1, and false if the policy belonged
2856
// to node2.
2857
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2858
        graphCache *GraphCache) (bool, error) {
2,663✔
2859

2,663✔
2860
        edges := tx.ReadWriteBucket(edgeBucket)
2,663✔
2861
        if edges == nil {
2,663✔
2862
                return false, ErrEdgeNotFound
×
2863
        }
×
2864
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
2,663✔
2865
        if edgeIndex == nil {
2,663✔
2866
                return false, ErrEdgeNotFound
×
2867
        }
×
2868

2869
        // Create the channelID key be converting the channel ID
2870
        // integer into a byte slice.
2871
        var chanID [8]byte
2,663✔
2872
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
2,663✔
2873

2,663✔
2874
        // With the channel ID, we then fetch the value storing the two
2,663✔
2875
        // nodes which connect this channel edge.
2,663✔
2876
        nodeInfo := edgeIndex.Get(chanID[:])
2,663✔
2877
        if nodeInfo == nil {
2,666✔
2878
                return false, ErrEdgeNotFound
3✔
2879
        }
3✔
2880

2881
        // Depending on the flags value passed above, either the first
2882
        // or second edge policy is being updated.
2883
        var fromNode, toNode []byte
2,660✔
2884
        var isUpdate1 bool
2,660✔
2885
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,994✔
2886
                fromNode = nodeInfo[:33]
1,334✔
2887
                toNode = nodeInfo[33:66]
1,334✔
2888
                isUpdate1 = true
1,334✔
2889
        } else {
2,660✔
2890
                fromNode = nodeInfo[33:66]
1,326✔
2891
                toNode = nodeInfo[:33]
1,326✔
2892
                isUpdate1 = false
1,326✔
2893
        }
1,326✔
2894

2895
        // Finally, with the direction of the edge being updated
2896
        // identified, we update the on-disk edge representation.
2897
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
2,660✔
2898
        if err != nil {
2,660✔
2899
                return false, err
×
2900
        }
×
2901

2902
        var (
2,660✔
2903
                fromNodePubKey route.Vertex
2,660✔
2904
                toNodePubKey   route.Vertex
2,660✔
2905
        )
2,660✔
2906
        copy(fromNodePubKey[:], fromNode)
2,660✔
2907
        copy(toNodePubKey[:], toNode)
2,660✔
2908

2,660✔
2909
        if graphCache != nil {
4,934✔
2910
                graphCache.UpdatePolicy(
2,274✔
2911
                        edge, fromNodePubKey, toNodePubKey, isUpdate1,
2,274✔
2912
                )
2,274✔
2913
        }
2,274✔
2914

2915
        return isUpdate1, nil
2,660✔
2916
}
2917

2918
// isPublic determines whether the node is seen as public within the graph from
2919
// the source node's point of view. An existing database transaction can also be
2920
// specified.
2921
func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2922
        sourcePubKey []byte) (bool, error) {
13✔
2923

13✔
2924
        // In order to determine whether this node is publicly advertised within
13✔
2925
        // the graph, we'll need to look at all of its edges and check whether
13✔
2926
        // they extend to any other node than the source node. errDone will be
13✔
2927
        // used to terminate the check early.
13✔
2928
        nodeIsPublic := false
13✔
2929
        errDone := errors.New("done")
13✔
2930
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
13✔
2931
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
13✔
2932
                _ *models.ChannelEdgePolicy) error {
23✔
2933

10✔
2934
                // If this edge doesn't extend to the source node, we'll
10✔
2935
                // terminate our search as we can now conclude that the node is
10✔
2936
                // publicly advertised within the graph due to the local node
10✔
2937
                // knowing of the current edge.
10✔
2938
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
10✔
2939
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
13✔
2940

3✔
2941
                        nodeIsPublic = true
3✔
2942
                        return errDone
3✔
2943
                }
3✔
2944

2945
                // Since the edge _does_ extend to the source node, we'll also
2946
                // need to ensure that this is a public edge.
2947
                if info.AuthProof != nil {
13✔
2948
                        nodeIsPublic = true
6✔
2949
                        return errDone
6✔
2950
                }
6✔
2951

2952
                // Otherwise, we'll continue our search.
2953
                return nil
1✔
2954
        })
2955
        if err != nil && err != errDone {
13✔
2956
                return false, err
×
2957
        }
×
2958

2959
        return nodeIsPublic, nil
13✔
2960
}
2961

2962
// FetchLightningNodeTx attempts to look up a target node by its identity
2963
// public key. If the node isn't found in the database, then
2964
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
2965
// If none is provided, then a new one will be created.
2966
func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
2967
        *models.LightningNode, error) {
2,944✔
2968

2,944✔
2969
        return c.fetchLightningNode(tx, nodePub)
2,944✔
2970
}
2,944✔
2971

2972
// FetchLightningNode attempts to look up a target node by its identity public
2973
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2974
// returned.
2975
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
2976
        *models.LightningNode, error) {
837✔
2977

837✔
2978
        return c.fetchLightningNode(nil, nodePub)
837✔
2979
}
837✔
2980

2981
// fetchLightningNode attempts to look up a target node by its identity public
2982
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2983
// returned. An optional transaction may be provided. If none is provided, then
2984
// a new one will be created.
2985
func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
2986
        nodePub route.Vertex) (*models.LightningNode, error) {
3,781✔
2987

3,781✔
2988
        var node *models.LightningNode
3,781✔
2989
        fetch := func(tx kvdb.RTx) error {
7,562✔
2990
                // First grab the nodes bucket which stores the mapping from
3,781✔
2991
                // pubKey to node information.
3,781✔
2992
                nodes := tx.ReadBucket(nodeBucket)
3,781✔
2993
                if nodes == nil {
3,781✔
2994
                        return ErrGraphNotFound
×
2995
                }
×
2996

2997
                // If a key for this serialized public key isn't found, then
2998
                // the target node doesn't exist within the database.
2999
                nodeBytes := nodes.Get(nodePub[:])
3,781✔
3000
                if nodeBytes == nil {
3,795✔
3001
                        return ErrGraphNodeNotFound
14✔
3002
                }
14✔
3003

3004
                // If the node is found, then we can de deserialize the node
3005
                // information to return to the user.
3006
                nodeReader := bytes.NewReader(nodeBytes)
3,767✔
3007
                n, err := deserializeLightningNode(nodeReader)
3,767✔
3008
                if err != nil {
3,767✔
3009
                        return err
×
3010
                }
×
3011

3012
                node = &n
3,767✔
3013

3,767✔
3014
                return nil
3,767✔
3015
        }
3016

3017
        if tx == nil {
4,618✔
3018
                err := kvdb.View(
837✔
3019
                        c.db, fetch, func() {
1,674✔
3020
                                node = nil
837✔
3021
                        },
837✔
3022
                )
3023
                if err != nil {
851✔
3024
                        return nil, err
14✔
3025
                }
14✔
3026

3027
                return node, nil
823✔
3028
        }
3029

3030
        err := fetch(tx)
2,944✔
3031
        if err != nil {
2,944✔
3032
                return nil, err
×
3033
        }
×
3034

3035
        return node, nil
2,944✔
3036
}
3037

3038
// graphCacheNode is a struct that wraps a LightningNode in a way that it can be
3039
// cached in the graph cache.
3040
type graphCacheNode struct {
3041
        pubKeyBytes route.Vertex
3042
        features    *lnwire.FeatureVector
3043
}
3044

3045
// newGraphCacheNode returns a new cache optimized node.
3046
func newGraphCacheNode(pubKey route.Vertex,
3047
        features *lnwire.FeatureVector) *graphCacheNode {
732✔
3048

732✔
3049
        return &graphCacheNode{
732✔
3050
                pubKeyBytes: pubKey,
732✔
3051
                features:    features,
732✔
3052
        }
732✔
3053
}
732✔
3054

3055
// PubKey returns the node's public identity key.
3056
func (n *graphCacheNode) PubKey() route.Vertex {
732✔
3057
        return n.pubKeyBytes
732✔
3058
}
732✔
3059

3060
// Features returns the node's features.
3061
func (n *graphCacheNode) Features() *lnwire.FeatureVector {
712✔
3062
        return n.features
712✔
3063
}
712✔
3064

3065
// ForEachChannel iterates through all channels of this node, executing the
3066
// passed callback with an edge info structure and the policies of each end
3067
// of the channel. The first edge policy is the outgoing edge *to* the
3068
// connecting node, while the second is the incoming edge *from* the
3069
// connecting node. If the callback returns an error, then the iteration is
3070
// halted with the error propagated back up to the caller.
3071
//
3072
// Unknown policies are passed into the callback as nil values.
3073
func (n *graphCacheNode) ForEachChannel(tx kvdb.RTx,
3074
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3075
                *models.ChannelEdgePolicy) error) error {
632✔
3076

632✔
3077
        return nodeTraversal(tx, n.pubKeyBytes[:], nil, cb)
632✔
3078
}
632✔
3079

3080
var _ GraphCacheNode = (*graphCacheNode)(nil)
3081

3082
// HasLightningNode determines if the graph has a vertex identified by the
3083
// target node identity public key. If the node exists in the database, a
3084
// timestamp of when the data for the node was lasted updated is returned along
3085
// with a true boolean. Otherwise, an empty time.Time is returned with a false
3086
// boolean.
3087
func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool,
3088
        error) {
16✔
3089

16✔
3090
        var (
16✔
3091
                updateTime time.Time
16✔
3092
                exists     bool
16✔
3093
        )
16✔
3094

16✔
3095
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
32✔
3096
                // First grab the nodes bucket which stores the mapping from
16✔
3097
                // pubKey to node information.
16✔
3098
                nodes := tx.ReadBucket(nodeBucket)
16✔
3099
                if nodes == nil {
16✔
3100
                        return ErrGraphNotFound
×
3101
                }
×
3102

3103
                // If a key for this serialized public key isn't found, we can
3104
                // exit early.
3105
                nodeBytes := nodes.Get(nodePub[:])
16✔
3106
                if nodeBytes == nil {
19✔
3107
                        exists = false
3✔
3108
                        return nil
3✔
3109
                }
3✔
3110

3111
                // Otherwise we continue on to obtain the time stamp
3112
                // representing the last time the data for this node was
3113
                // updated.
3114
                nodeReader := bytes.NewReader(nodeBytes)
13✔
3115
                node, err := deserializeLightningNode(nodeReader)
13✔
3116
                if err != nil {
13✔
3117
                        return err
×
3118
                }
×
3119

3120
                exists = true
13✔
3121
                updateTime = node.LastUpdate
13✔
3122
                return nil
13✔
3123
        }, func() {
16✔
3124
                updateTime = time.Time{}
16✔
3125
                exists = false
16✔
3126
        })
16✔
3127
        if err != nil {
16✔
3128
                return time.Time{}, exists, err
×
3129
        }
×
3130

3131
        return updateTime, exists, nil
16✔
3132
}
3133

3134
// nodeTraversal is used to traverse all channels of a node given by its
3135
// public key and passes channel information into the specified callback.
3136
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3137
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3138
                *models.ChannelEdgePolicy) error) error {
1,878✔
3139

1,878✔
3140
        traversal := func(tx kvdb.RTx) error {
3,756✔
3141
                edges := tx.ReadBucket(edgeBucket)
1,878✔
3142
                if edges == nil {
1,878✔
3143
                        return ErrGraphNotFound
×
3144
                }
×
3145
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1,878✔
3146
                if edgeIndex == nil {
1,878✔
3147
                        return ErrGraphNoEdgesFound
×
3148
                }
×
3149

3150
                // In order to reach all the edges for this node, we take
3151
                // advantage of the construction of the key-space within the
3152
                // edge bucket. The keys are stored in the form: pubKey ||
3153
                // chanID. Therefore, starting from a chanID of zero, we can
3154
                // scan forward in the bucket, grabbing all the edges for the
3155
                // node. Once the prefix no longer matches, then we know we're
3156
                // done.
3157
                var nodeStart [33 + 8]byte
1,878✔
3158
                copy(nodeStart[:], nodePub)
1,878✔
3159
                copy(nodeStart[33:], chanStart[:])
1,878✔
3160

1,878✔
3161
                // Starting from the key pubKey || 0, we seek forward in the
1,878✔
3162
                // bucket until the retrieved key no longer has the public key
1,878✔
3163
                // as its prefix. This indicates that we've stepped over into
1,878✔
3164
                // another node's edges, so we can terminate our scan.
1,878✔
3165
                edgeCursor := edges.ReadCursor()
1,878✔
3166
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
5,727✔
3167
                        // If the prefix still matches, the channel id is
3,849✔
3168
                        // returned in nodeEdge. Channel id is used to lookup
3,849✔
3169
                        // the node at the other end of the channel and both
3,849✔
3170
                        // edge policies.
3,849✔
3171
                        chanID := nodeEdge[33:]
3,849✔
3172
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3,849✔
3173
                        if err != nil {
3,849✔
3174
                                return err
×
3175
                        }
×
3176

3177
                        outgoingPolicy, err := fetchChanEdgePolicy(
3,849✔
3178
                                edges, chanID, nodePub,
3,849✔
3179
                        )
3,849✔
3180
                        if err != nil {
3,849✔
3181
                                return err
×
3182
                        }
×
3183

3184
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3,849✔
3185
                        if err != nil {
3,849✔
3186
                                return err
×
3187
                        }
×
3188

3189
                        incomingPolicy, err := fetchChanEdgePolicy(
3,849✔
3190
                                edges, chanID, otherNode[:],
3,849✔
3191
                        )
3,849✔
3192
                        if err != nil {
3,849✔
3193
                                return err
×
3194
                        }
×
3195

3196
                        // Finally, we execute the callback.
3197
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3,849✔
3198
                        if err != nil {
3,858✔
3199
                                return err
9✔
3200
                        }
9✔
3201
                }
3202

3203
                return nil
1,869✔
3204
        }
3205

3206
        // If no transaction was provided, then we'll create a new transaction
3207
        // to execute the transaction within.
3208
        if tx == nil {
1,887✔
3209
                return kvdb.View(db, traversal, func() {})
18✔
3210
        }
3211

3212
        // Otherwise, we re-use the existing transaction to execute the graph
3213
        // traversal.
3214
        return traversal(tx)
1,869✔
3215
}
3216

3217
// ForEachNodeChannel iterates through all channels of the given node,
3218
// executing the passed callback with an edge info structure and the policies
3219
// of each end of the channel. The first edge policy is the outgoing edge *to*
3220
// the connecting node, while the second is the incoming edge *from* the
3221
// connecting node. If the callback returns an error, then the iteration is
3222
// halted with the error propagated back up to the caller.
3223
//
3224
// Unknown policies are passed into the callback as nil values.
3225
func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex,
3226
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3227
                *models.ChannelEdgePolicy) error) error {
6✔
3228

6✔
3229
        return nodeTraversal(nil, nodePub[:], c.db, cb)
6✔
3230
}
6✔
3231

3232
// ForEachNodeChannelTx iterates through all channels of the given node,
3233
// executing the passed callback with an edge info structure and the policies
3234
// of each end of the channel. The first edge policy is the outgoing edge *to*
3235
// the connecting node, while the second is the incoming edge *from* the
3236
// connecting node. If the callback returns an error, then the iteration is
3237
// halted with the error propagated back up to the caller.
3238
//
3239
// Unknown policies are passed into the callback as nil values.
3240
//
3241
// If the caller wishes to re-use an existing boltdb transaction, then it
3242
// should be passed as the first argument.  Otherwise, the first argument should
3243
// be nil and a fresh transaction will be created to execute the graph
3244
// traversal.
3245
func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx,
3246
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3247
                *models.ChannelEdgePolicy,
3248
                *models.ChannelEdgePolicy) error) error {
998✔
3249

998✔
3250
        return nodeTraversal(tx, nodePub[:], c.db, cb)
998✔
3251
}
998✔
3252

3253
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3254
// the target node in the channel. This is useful when one knows the pubkey of
3255
// one of the nodes, and wishes to obtain the full LightningNode for the other
3256
// end of the channel.
3257
func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
3258
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
3259
        *models.LightningNode, error) {
×
3260

×
3261
        // Ensure that the node passed in is actually a member of the channel.
×
3262
        var targetNodeBytes [33]byte
×
3263
        switch {
×
3264
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
×
3265
                targetNodeBytes = channel.NodeKey2Bytes
×
3266
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
×
3267
                targetNodeBytes = channel.NodeKey1Bytes
×
3268
        default:
×
3269
                return nil, fmt.Errorf("node not participating in this channel")
×
3270
        }
3271

3272
        var targetNode *models.LightningNode
×
3273
        fetchNodeFunc := func(tx kvdb.RTx) error {
×
3274
                // First grab the nodes bucket which stores the mapping from
×
3275
                // pubKey to node information.
×
3276
                nodes := tx.ReadBucket(nodeBucket)
×
3277
                if nodes == nil {
×
3278
                        return ErrGraphNotFound
×
3279
                }
×
3280

3281
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
×
3282
                if err != nil {
×
3283
                        return err
×
3284
                }
×
3285

3286
                targetNode = &node
×
3287

×
3288
                return nil
×
3289
        }
3290

3291
        // If the transaction is nil, then we'll need to create a new one,
3292
        // otherwise we can use the existing db transaction.
3293
        var err error
×
3294
        if tx == nil {
×
3295
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3296
                        targetNode = nil
×
3297
                })
×
3298
        } else {
×
3299
                err = fetchNodeFunc(tx)
×
3300
        }
×
3301

3302
        return targetNode, err
×
3303
}
3304

3305
// computeEdgePolicyKeys is a helper function that can be used to compute the
3306
// keys used to index the channel edge policy info for the two nodes of the
3307
// edge. The keys for node 1 and node 2 are returned respectively.
3308
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
22✔
3309
        var (
22✔
3310
                node1Key [33 + 8]byte
22✔
3311
                node2Key [33 + 8]byte
22✔
3312
        )
22✔
3313

22✔
3314
        copy(node1Key[:], info.NodeKey1Bytes[:])
22✔
3315
        copy(node2Key[:], info.NodeKey2Bytes[:])
22✔
3316

22✔
3317
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
22✔
3318
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
22✔
3319

22✔
3320
        return node1Key[:], node2Key[:]
22✔
3321
}
22✔
3322

3323
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3324
// the channel identified by the funding outpoint. If the channel can't be
3325
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3326
// information for the channel itself is returned as well as two structs that
3327
// contain the routing policies for the channel in either direction.
3328
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3329
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3330
        *models.ChannelEdgePolicy, error) {
11✔
3331

11✔
3332
        var (
11✔
3333
                edgeInfo *models.ChannelEdgeInfo
11✔
3334
                policy1  *models.ChannelEdgePolicy
11✔
3335
                policy2  *models.ChannelEdgePolicy
11✔
3336
        )
11✔
3337

11✔
3338
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
3339
                // First, grab the node bucket. This will be used to populate
11✔
3340
                // the Node pointers in each edge read from disk.
11✔
3341
                nodes := tx.ReadBucket(nodeBucket)
11✔
3342
                if nodes == nil {
11✔
3343
                        return ErrGraphNotFound
×
3344
                }
×
3345

3346
                // Next, grab the edge bucket which stores the edges, and also
3347
                // the index itself so we can group the directed edges together
3348
                // logically.
3349
                edges := tx.ReadBucket(edgeBucket)
11✔
3350
                if edges == nil {
11✔
3351
                        return ErrGraphNoEdgesFound
×
3352
                }
×
3353
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
3354
                if edgeIndex == nil {
11✔
3355
                        return ErrGraphNoEdgesFound
×
3356
                }
×
3357

3358
                // If the channel's outpoint doesn't exist within the outpoint
3359
                // index, then the edge does not exist.
3360
                chanIndex := edges.NestedReadBucket(channelPointBucket)
11✔
3361
                if chanIndex == nil {
11✔
3362
                        return ErrGraphNoEdgesFound
×
3363
                }
×
3364
                var b bytes.Buffer
11✔
3365
                if err := WriteOutpoint(&b, op); err != nil {
11✔
3366
                        return err
×
3367
                }
×
3368
                chanID := chanIndex.Get(b.Bytes())
11✔
3369
                if chanID == nil {
21✔
3370
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
10✔
3371
                }
10✔
3372

3373
                // If the channel is found to exists, then we'll first retrieve
3374
                // the general information for the channel.
3375
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
1✔
3376
                if err != nil {
1✔
3377
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3378
                }
×
3379
                edgeInfo = &edge
1✔
3380

1✔
3381
                // Once we have the information about the channels' parameters,
1✔
3382
                // we'll fetch the routing policies for each for the directed
1✔
3383
                // edges.
1✔
3384
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
1✔
3385
                if err != nil {
1✔
3386
                        return fmt.Errorf("failed to find policy: %w", err)
×
3387
                }
×
3388

3389
                policy1 = e1
1✔
3390
                policy2 = e2
1✔
3391
                return nil
1✔
3392
        }, func() {
11✔
3393
                edgeInfo = nil
11✔
3394
                policy1 = nil
11✔
3395
                policy2 = nil
11✔
3396
        })
11✔
3397
        if err != nil {
21✔
3398
                return nil, nil, nil, err
10✔
3399
        }
10✔
3400

3401
        return edgeInfo, policy1, policy2, nil
1✔
3402
}
3403

3404
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3405
// channel identified by the channel ID. If the channel can't be found, then
3406
// ErrEdgeNotFound is returned. A struct which houses the general information
3407
// for the channel itself is returned as well as two structs that contain the
3408
// routing policies for the channel in either direction.
3409
//
3410
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3411
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3412
// the ChannelEdgeInfo will only include the public keys of each node.
3413
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (
3414
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3415
        *models.ChannelEdgePolicy, error) {
25✔
3416

25✔
3417
        var (
25✔
3418
                edgeInfo  *models.ChannelEdgeInfo
25✔
3419
                policy1   *models.ChannelEdgePolicy
25✔
3420
                policy2   *models.ChannelEdgePolicy
25✔
3421
                channelID [8]byte
25✔
3422
        )
25✔
3423

25✔
3424
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
50✔
3425
                // First, grab the node bucket. This will be used to populate
25✔
3426
                // the Node pointers in each edge read from disk.
25✔
3427
                nodes := tx.ReadBucket(nodeBucket)
25✔
3428
                if nodes == nil {
25✔
3429
                        return ErrGraphNotFound
×
3430
                }
×
3431

3432
                // Next, grab the edge bucket which stores the edges, and also
3433
                // the index itself so we can group the directed edges together
3434
                // logically.
3435
                edges := tx.ReadBucket(edgeBucket)
25✔
3436
                if edges == nil {
25✔
3437
                        return ErrGraphNoEdgesFound
×
3438
                }
×
3439
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
25✔
3440
                if edgeIndex == nil {
25✔
3441
                        return ErrGraphNoEdgesFound
×
3442
                }
×
3443

3444
                byteOrder.PutUint64(channelID[:], chanID)
25✔
3445

25✔
3446
                // Now, attempt to fetch edge.
25✔
3447
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
25✔
3448

25✔
3449
                // If it doesn't exist, we'll quickly check our zombie index to
25✔
3450
                // see if we've previously marked it as so.
25✔
3451
                if errors.Is(err, ErrEdgeNotFound) {
26✔
3452
                        // If the zombie index doesn't exist, or the edge is not
1✔
3453
                        // marked as a zombie within it, then we'll return the
1✔
3454
                        // original ErrEdgeNotFound error.
1✔
3455
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
1✔
3456
                        if zombieIndex == nil {
1✔
3457
                                return ErrEdgeNotFound
×
3458
                        }
×
3459

3460
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
1✔
3461
                                zombieIndex, chanID,
1✔
3462
                        )
1✔
3463
                        if !isZombie {
1✔
3464
                                return ErrEdgeNotFound
×
3465
                        }
×
3466

3467
                        // Otherwise, the edge is marked as a zombie, so we'll
3468
                        // populate the edge info with the public keys of each
3469
                        // party as this is the only information we have about
3470
                        // it and return an error signaling so.
3471
                        edgeInfo = &models.ChannelEdgeInfo{
1✔
3472
                                NodeKey1Bytes: pubKey1,
1✔
3473
                                NodeKey2Bytes: pubKey2,
1✔
3474
                        }
1✔
3475
                        return ErrZombieEdge
1✔
3476
                }
3477

3478
                // Otherwise, we'll just return the error if any.
3479
                if err != nil {
24✔
3480
                        return err
×
3481
                }
×
3482

3483
                edgeInfo = &edge
24✔
3484

24✔
3485
                // Then we'll attempt to fetch the accompanying policies of this
24✔
3486
                // edge.
24✔
3487
                e1, e2, err := fetchChanEdgePolicies(
24✔
3488
                        edgeIndex, edges, channelID[:],
24✔
3489
                )
24✔
3490
                if err != nil {
24✔
3491
                        return err
×
3492
                }
×
3493

3494
                policy1 = e1
24✔
3495
                policy2 = e2
24✔
3496
                return nil
24✔
3497
        }, func() {
25✔
3498
                edgeInfo = nil
25✔
3499
                policy1 = nil
25✔
3500
                policy2 = nil
25✔
3501
        })
25✔
3502
        if err == ErrZombieEdge {
26✔
3503
                return edgeInfo, nil, nil, err
1✔
3504
        }
1✔
3505
        if err != nil {
24✔
3506
                return nil, nil, nil, err
×
3507
        }
×
3508

3509
        return edgeInfo, policy1, policy2, nil
24✔
3510
}
3511

3512
// IsPublicNode is a helper method that determines whether the node with the
3513
// given public key is seen as a public node in the graph from the graph's
3514
// source node's point of view.
3515
func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
13✔
3516
        var nodeIsPublic bool
13✔
3517
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
26✔
3518
                nodes := tx.ReadBucket(nodeBucket)
13✔
3519
                if nodes == nil {
13✔
3520
                        return ErrGraphNodesNotFound
×
3521
                }
×
3522
                ourPubKey := nodes.Get(sourceKey)
13✔
3523
                if ourPubKey == nil {
13✔
3524
                        return ErrSourceNodeNotSet
×
3525
                }
×
3526
                node, err := fetchLightningNode(nodes, pubKey[:])
13✔
3527
                if err != nil {
13✔
3528
                        return err
×
3529
                }
×
3530

3531
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
13✔
3532
                return err
13✔
3533
        }, func() {
13✔
3534
                nodeIsPublic = false
13✔
3535
        })
13✔
3536
        if err != nil {
13✔
3537
                return false, err
×
3538
        }
×
3539

3540
        return nodeIsPublic, nil
13✔
3541
}
3542

3543
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3544
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
46✔
3545
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
46✔
3546
        if err != nil {
46✔
3547
                return nil, err
×
3548
        }
×
3549

3550
        // With the witness script generated, we'll now turn it into a p2wsh
3551
        // script:
3552
        //  * OP_0 <sha256(script)>
3553
        bldr := txscript.NewScriptBuilder(
46✔
3554
                txscript.WithScriptAllocSize(input.P2WSHSize),
46✔
3555
        )
46✔
3556
        bldr.AddOp(txscript.OP_0)
46✔
3557
        scriptHash := sha256.Sum256(witnessScript)
46✔
3558
        bldr.AddData(scriptHash[:])
46✔
3559

46✔
3560
        return bldr.Script()
46✔
3561
}
3562

3563
// EdgePoint couples the outpoint of a channel with the funding script that it
3564
// creates. The FilteredChainView will use this to watch for spends of this
3565
// edge point on chain. We require both of these values as depending on the
3566
// concrete implementation, either the pkScript, or the out point will be used.
3567
type EdgePoint struct {
3568
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3569
        FundingPkScript []byte
3570

3571
        // OutPoint is the outpoint of the target channel.
3572
        OutPoint wire.OutPoint
3573
}
3574

3575
// String returns a human readable version of the target EdgePoint. We return
3576
// the outpoint directly as it is enough to uniquely identify the edge point.
3577
func (e *EdgePoint) String() string {
×
3578
        return e.OutPoint.String()
×
3579
}
×
3580

3581
// ChannelView returns the verifiable edge information for each active channel
3582
// within the known channel graph. The set of UTXO's (along with their scripts)
3583
// returned are the ones that need to be watched on chain to detect channel
3584
// closes on the resident blockchain.
3585
func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) {
23✔
3586
        var edgePoints []EdgePoint
23✔
3587
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
46✔
3588
                // We're going to iterate over the entire channel index, so
23✔
3589
                // we'll need to fetch the edgeBucket to get to the index as
23✔
3590
                // it's a sub-bucket.
23✔
3591
                edges := tx.ReadBucket(edgeBucket)
23✔
3592
                if edges == nil {
23✔
3593
                        return ErrGraphNoEdgesFound
×
3594
                }
×
3595
                chanIndex := edges.NestedReadBucket(channelPointBucket)
23✔
3596
                if chanIndex == nil {
23✔
3597
                        return ErrGraphNoEdgesFound
×
3598
                }
×
3599
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
23✔
3600
                if edgeIndex == nil {
23✔
3601
                        return ErrGraphNoEdgesFound
×
3602
                }
×
3603

3604
                // Once we have the proper bucket, we'll range over each key
3605
                // (which is the channel point for the channel) and decode it,
3606
                // accumulating each entry.
3607
                return chanIndex.ForEach(
23✔
3608
                        func(chanPointBytes, chanID []byte) error {
65✔
3609
                                chanPointReader := bytes.NewReader(
42✔
3610
                                        chanPointBytes,
42✔
3611
                                )
42✔
3612

42✔
3613
                                var chanPoint wire.OutPoint
42✔
3614
                                err := ReadOutpoint(chanPointReader, &chanPoint)
42✔
3615
                                if err != nil {
42✔
3616
                                        return err
×
3617
                                }
×
3618

3619
                                edgeInfo, err := fetchChanEdgeInfo(
42✔
3620
                                        edgeIndex, chanID,
42✔
3621
                                )
42✔
3622
                                if err != nil {
42✔
3623
                                        return err
×
3624
                                }
×
3625

3626
                                pkScript, err := genMultiSigP2WSH(
42✔
3627
                                        edgeInfo.BitcoinKey1Bytes[:],
42✔
3628
                                        edgeInfo.BitcoinKey2Bytes[:],
42✔
3629
                                )
42✔
3630
                                if err != nil {
42✔
3631
                                        return err
×
3632
                                }
×
3633

3634
                                edgePoints = append(edgePoints, EdgePoint{
42✔
3635
                                        FundingPkScript: pkScript,
42✔
3636
                                        OutPoint:        chanPoint,
42✔
3637
                                })
42✔
3638

42✔
3639
                                return nil
42✔
3640
                        },
3641
                )
3642
        }, func() {
23✔
3643
                edgePoints = nil
23✔
3644
        }); err != nil {
23✔
3645
                return nil, err
×
3646
        }
×
3647

3648
        return edgePoints, nil
23✔
3649
}
3650

3651
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3652
// zombie. This method is used on an ad-hoc basis, when channels need to be
3653
// marked as zombies outside the normal pruning cycle.
3654
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
3655
        pubKey1, pubKey2 [33]byte) error {
126✔
3656

126✔
3657
        c.cacheMu.Lock()
126✔
3658
        defer c.cacheMu.Unlock()
126✔
3659

126✔
3660
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
252✔
3661
                edges := tx.ReadWriteBucket(edgeBucket)
126✔
3662
                if edges == nil {
126✔
3663
                        return ErrGraphNoEdgesFound
×
3664
                }
×
3665
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
126✔
3666
                if err != nil {
126✔
3667
                        return fmt.Errorf("unable to create zombie "+
×
3668
                                "bucket: %w", err)
×
3669
                }
×
3670

3671
                if c.graphCache != nil {
252✔
3672
                        c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
126✔
3673
                }
126✔
3674

3675
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
126✔
3676
        })
3677
        if err != nil {
126✔
3678
                return err
×
3679
        }
×
3680

3681
        c.rejectCache.remove(chanID)
126✔
3682
        c.chanCache.remove(chanID)
126✔
3683

126✔
3684
        return nil
126✔
3685
}
3686

3687
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3688
// keys should represent the node public keys of the two parties involved in the
3689
// edge.
3690
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3691
        pubKey2 [33]byte) error {
151✔
3692

151✔
3693
        var k [8]byte
151✔
3694
        byteOrder.PutUint64(k[:], chanID)
151✔
3695

151✔
3696
        var v [66]byte
151✔
3697
        copy(v[:33], pubKey1[:])
151✔
3698
        copy(v[33:], pubKey2[:])
151✔
3699

151✔
3700
        return zombieIndex.Put(k[:], v[:])
151✔
3701
}
151✔
3702

3703
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3704
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
3705
        c.cacheMu.Lock()
2✔
3706
        defer c.cacheMu.Unlock()
2✔
3707

2✔
3708
        return c.markEdgeLiveUnsafe(nil, chanID)
2✔
3709
}
2✔
3710

3711
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3712
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3713
// case a new transaction will be created.
3714
//
3715
// NOTE: this method MUST only be called if the cacheMu has already been
3716
// acquired.
3717
func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
22✔
3718
        dbFn := func(tx kvdb.RwTx) error {
44✔
3719
                edges := tx.ReadWriteBucket(edgeBucket)
22✔
3720
                if edges == nil {
22✔
3721
                        return ErrGraphNoEdgesFound
×
3722
                }
×
3723
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
22✔
3724
                if zombieIndex == nil {
22✔
3725
                        return nil
×
3726
                }
×
3727

3728
                var k [8]byte
22✔
3729
                byteOrder.PutUint64(k[:], chanID)
22✔
3730

22✔
3731
                if len(zombieIndex.Get(k[:])) == 0 {
23✔
3732
                        return ErrZombieEdgeNotFound
1✔
3733
                }
1✔
3734

3735
                return zombieIndex.Delete(k[:])
21✔
3736
        }
3737

3738
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3739
        // the existing transaction
3740
        var err error
22✔
3741
        if tx == nil {
24✔
3742
                err = kvdb.Update(c.db, dbFn, func() {})
4✔
3743
        } else {
20✔
3744
                err = dbFn(tx)
20✔
3745
        }
20✔
3746
        if err != nil {
23✔
3747
                return err
1✔
3748
        }
1✔
3749

3750
        c.rejectCache.remove(chanID)
21✔
3751
        c.chanCache.remove(chanID)
21✔
3752

21✔
3753
        // We need to add the channel back into our graph cache, otherwise we
21✔
3754
        // won't use it for path finding.
21✔
3755
        if c.graphCache != nil {
42✔
3756
                edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
21✔
3757
                if err != nil {
21✔
3758
                        return err
×
3759
                }
×
3760

3761
                for _, edgeInfo := range edgeInfos {
21✔
3762
                        c.graphCache.AddChannel(
×
3763
                                edgeInfo.Info, edgeInfo.Policy1,
×
3764
                                edgeInfo.Policy2,
×
3765
                        )
×
3766
                }
×
3767
        }
3768

3769
        return nil
21✔
3770
}
3771

3772
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3773
// zombie, then the two node public keys corresponding to this edge are also
3774
// returned.
3775
func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
5✔
3776
        var (
5✔
3777
                isZombie         bool
5✔
3778
                pubKey1, pubKey2 [33]byte
5✔
3779
        )
5✔
3780

5✔
3781
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
3782
                edges := tx.ReadBucket(edgeBucket)
5✔
3783
                if edges == nil {
5✔
3784
                        return ErrGraphNoEdgesFound
×
3785
                }
×
3786
                zombieIndex := edges.NestedReadBucket(zombieBucket)
5✔
3787
                if zombieIndex == nil {
5✔
3788
                        return nil
×
3789
                }
×
3790

3791
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
5✔
3792
                return nil
5✔
3793
        }, func() {
5✔
3794
                isZombie = false
5✔
3795
                pubKey1 = [33]byte{}
5✔
3796
                pubKey2 = [33]byte{}
5✔
3797
        })
5✔
3798
        if err != nil {
5✔
3799
                return false, [33]byte{}, [33]byte{}
×
3800
        }
×
3801

3802
        return isZombie, pubKey1, pubKey2
5✔
3803
}
3804

3805
// isZombieEdge returns whether an entry exists for the given channel in the
3806
// zombie index. If an entry exists, then the two node public keys corresponding
3807
// to this edge are also returned.
3808
func isZombieEdge(zombieIndex kvdb.RBucket,
3809
        chanID uint64) (bool, [33]byte, [33]byte) {
204✔
3810

204✔
3811
        var k [8]byte
204✔
3812
        byteOrder.PutUint64(k[:], chanID)
204✔
3813

204✔
3814
        v := zombieIndex.Get(k[:])
204✔
3815
        if v == nil {
319✔
3816
                return false, [33]byte{}, [33]byte{}
115✔
3817
        }
115✔
3818

3819
        var pubKey1, pubKey2 [33]byte
89✔
3820
        copy(pubKey1[:], v[:33])
89✔
3821
        copy(pubKey2[:], v[33:])
89✔
3822

89✔
3823
        return true, pubKey1, pubKey2
89✔
3824
}
3825

3826
// NumZombies returns the current number of zombie channels in the graph.
3827
func (c *ChannelGraph) NumZombies() (uint64, error) {
4✔
3828
        var numZombies uint64
4✔
3829
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
3830
                edges := tx.ReadBucket(edgeBucket)
4✔
3831
                if edges == nil {
4✔
3832
                        return nil
×
3833
                }
×
3834
                zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3835
                if zombieIndex == nil {
4✔
3836
                        return nil
×
3837
                }
×
3838

3839
                return zombieIndex.ForEach(func(_, _ []byte) error {
6✔
3840
                        numZombies++
2✔
3841
                        return nil
2✔
3842
                })
2✔
3843
        }, func() {
4✔
3844
                numZombies = 0
4✔
3845
        })
4✔
3846
        if err != nil {
4✔
3847
                return 0, err
×
3848
        }
×
3849

3850
        return numZombies, nil
4✔
3851
}
3852

3853
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3854
// that we can ignore channel announcements that we know to be closed without
3855
// having to validate them and fetch a block.
3856
func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error {
1✔
3857
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
3858
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
1✔
3859
                if err != nil {
1✔
3860
                        return err
×
3861
                }
×
3862

3863
                var k [8]byte
1✔
3864
                byteOrder.PutUint64(k[:], scid.ToUint64())
1✔
3865

1✔
3866
                return closedScids.Put(k[:], []byte{})
1✔
3867
        }, func() {})
1✔
3868
}
3869

3870
// IsClosedScid checks whether a channel identified by the passed in scid is
3871
// closed. This helps avoid having to perform expensive validation checks.
3872
// TODO: Add an LRU cache to cut down on disc reads.
3873
func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
2✔
3874
        var isClosed bool
2✔
3875
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
3876
                closedScids := tx.ReadBucket(closedScidBucket)
2✔
3877
                if closedScids == nil {
2✔
3878
                        return ErrClosedScidsNotFound
×
3879
                }
×
3880

3881
                var k [8]byte
2✔
3882
                byteOrder.PutUint64(k[:], scid.ToUint64())
2✔
3883

2✔
3884
                if closedScids.Get(k[:]) != nil {
3✔
3885
                        isClosed = true
1✔
3886
                        return nil
1✔
3887
                }
1✔
3888

3889
                return nil
1✔
3890
        }, func() {
2✔
3891
                isClosed = false
2✔
3892
        })
2✔
3893
        if err != nil {
2✔
3894
                return false, err
×
3895
        }
×
3896

3897
        return isClosed, nil
2✔
3898
}
3899

3900
func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl
3901
        updateIndex kvdb.RwBucket, node *models.LightningNode) error {
994✔
3902

994✔
3903
        var (
994✔
3904
                scratch [16]byte
994✔
3905
                b       bytes.Buffer
994✔
3906
        )
994✔
3907

994✔
3908
        pub, err := node.PubKey()
994✔
3909
        if err != nil {
994✔
3910
                return err
×
3911
        }
×
3912
        nodePub := pub.SerializeCompressed()
994✔
3913

994✔
3914
        // If the node has the update time set, write it, else write 0.
994✔
3915
        updateUnix := uint64(0)
994✔
3916
        if node.LastUpdate.Unix() > 0 {
1,857✔
3917
                updateUnix = uint64(node.LastUpdate.Unix())
863✔
3918
        }
863✔
3919

3920
        byteOrder.PutUint64(scratch[:8], updateUnix)
994✔
3921
        if _, err := b.Write(scratch[:8]); err != nil {
994✔
3922
                return err
×
3923
        }
×
3924

3925
        if _, err := b.Write(nodePub); err != nil {
994✔
3926
                return err
×
3927
        }
×
3928

3929
        // If we got a node announcement for this node, we will have the rest
3930
        // of the data available. If not we don't have more data to write.
3931
        if !node.HaveNodeAnnouncement {
1,072✔
3932
                // Write HaveNodeAnnouncement=0.
78✔
3933
                byteOrder.PutUint16(scratch[:2], 0)
78✔
3934
                if _, err := b.Write(scratch[:2]); err != nil {
78✔
3935
                        return err
×
3936
                }
×
3937

3938
                return nodeBucket.Put(nodePub, b.Bytes())
78✔
3939
        }
3940

3941
        // Write HaveNodeAnnouncement=1.
3942
        byteOrder.PutUint16(scratch[:2], 1)
916✔
3943
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3944
                return err
×
3945
        }
×
3946

3947
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
916✔
3948
                return err
×
3949
        }
×
3950
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
916✔
3951
                return err
×
3952
        }
×
3953
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
916✔
3954
                return err
×
3955
        }
×
3956

3957
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
916✔
3958
                return err
×
3959
        }
×
3960

3961
        if err := node.Features.Encode(&b); err != nil {
916✔
3962
                return err
×
3963
        }
×
3964

3965
        numAddresses := uint16(len(node.Addresses))
916✔
3966
        byteOrder.PutUint16(scratch[:2], numAddresses)
916✔
3967
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3968
                return err
×
3969
        }
×
3970

3971
        for _, address := range node.Addresses {
2,058✔
3972
                if err := SerializeAddr(&b, address); err != nil {
1,142✔
3973
                        return err
×
3974
                }
×
3975
        }
3976

3977
        sigLen := len(node.AuthSigBytes)
916✔
3978
        if sigLen > 80 {
916✔
3979
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
3980
                        sigLen)
×
3981
        }
×
3982

3983
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
916✔
3984
        if err != nil {
916✔
3985
                return err
×
3986
        }
×
3987

3988
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
916✔
3989
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
3990
        }
×
3991
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
916✔
3992
        if err != nil {
916✔
3993
                return err
×
3994
        }
×
3995

3996
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
916✔
3997
                return err
×
3998
        }
×
3999

4000
        // With the alias bucket updated, we'll now update the index that
4001
        // tracks the time series of node updates.
4002
        var indexKey [8 + 33]byte
916✔
4003
        byteOrder.PutUint64(indexKey[:8], updateUnix)
916✔
4004
        copy(indexKey[8:], nodePub)
916✔
4005

916✔
4006
        // If there was already an old index entry for this node, then we'll
916✔
4007
        // delete the old one before we write the new entry.
916✔
4008
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
1,020✔
4009
                // Extract out the old update time to we can reconstruct the
104✔
4010
                // prior index key to delete it from the index.
104✔
4011
                oldUpdateTime := nodeBytes[:8]
104✔
4012

104✔
4013
                var oldIndexKey [8 + 33]byte
104✔
4014
                copy(oldIndexKey[:8], oldUpdateTime)
104✔
4015
                copy(oldIndexKey[8:], nodePub)
104✔
4016

104✔
4017
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
104✔
4018
                        return err
×
4019
                }
×
4020
        }
4021

4022
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
916✔
4023
                return err
×
4024
        }
×
4025

4026
        return nodeBucket.Put(nodePub, b.Bytes())
916✔
4027
}
4028

4029
func fetchLightningNode(nodeBucket kvdb.RBucket,
4030
        nodePub []byte) (models.LightningNode, error) {
3,628✔
4031

3,628✔
4032
        nodeBytes := nodeBucket.Get(nodePub)
3,628✔
4033
        if nodeBytes == nil {
3,705✔
4034
                return models.LightningNode{}, ErrGraphNodeNotFound
77✔
4035
        }
77✔
4036

4037
        nodeReader := bytes.NewReader(nodeBytes)
3,551✔
4038
        return deserializeLightningNode(nodeReader)
3,551✔
4039
}
4040

4041
func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) {
120✔
4042
        // Always populate a feature vector, even if we don't have a node
120✔
4043
        // announcement and short circuit below.
120✔
4044
        node := newGraphCacheNode(
120✔
4045
                route.Vertex{},
120✔
4046
                lnwire.EmptyFeatureVector(),
120✔
4047
        )
120✔
4048

120✔
4049
        var nodeScratch [8]byte
120✔
4050

120✔
4051
        // Skip ahead:
120✔
4052
        // - LastUpdate (8 bytes)
120✔
4053
        if _, err := r.Read(nodeScratch[:]); err != nil {
120✔
4054
                return nil, err
×
4055
        }
×
4056

4057
        if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil {
120✔
4058
                return nil, err
×
4059
        }
×
4060

4061
        // Read the node announcement flag.
4062
        if _, err := r.Read(nodeScratch[:2]); err != nil {
120✔
4063
                return nil, err
×
4064
        }
×
4065
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
120✔
4066

120✔
4067
        // The rest of the data is optional, and will only be there if we got a
120✔
4068
        // node announcement for this node.
120✔
4069
        if hasNodeAnn == 0 {
120✔
4070
                return node, nil
×
4071
        }
×
4072

4073
        // We did get a node announcement for this node, so we'll have the rest
4074
        // of the data available.
4075
        var rgb uint8
120✔
4076
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4077
                return nil, err
×
4078
        }
×
4079
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4080
                return nil, err
×
4081
        }
×
4082
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4083
                return nil, err
×
4084
        }
×
4085

4086
        if _, err := wire.ReadVarString(r, 0); err != nil {
120✔
4087
                return nil, err
×
4088
        }
×
4089

4090
        if err := node.features.Decode(r); err != nil {
120✔
4091
                return nil, err
×
4092
        }
×
4093

4094
        return node, nil
120✔
4095
}
4096

4097
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
8,509✔
4098
        var (
8,509✔
4099
                node    models.LightningNode
8,509✔
4100
                scratch [8]byte
8,509✔
4101
                err     error
8,509✔
4102
        )
8,509✔
4103

8,509✔
4104
        // Always populate a feature vector, even if we don't have a node
8,509✔
4105
        // announcement and short circuit below.
8,509✔
4106
        node.Features = lnwire.EmptyFeatureVector()
8,509✔
4107

8,509✔
4108
        if _, err := r.Read(scratch[:]); err != nil {
8,509✔
4109
                return models.LightningNode{}, err
×
4110
        }
×
4111

4112
        unix := int64(byteOrder.Uint64(scratch[:]))
8,509✔
4113
        node.LastUpdate = time.Unix(unix, 0)
8,509✔
4114

8,509✔
4115
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
8,509✔
4116
                return models.LightningNode{}, err
×
4117
        }
×
4118

4119
        if _, err := r.Read(scratch[:2]); err != nil {
8,509✔
4120
                return models.LightningNode{}, err
×
4121
        }
×
4122

4123
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
8,509✔
4124
        if hasNodeAnn == 1 {
16,875✔
4125
                node.HaveNodeAnnouncement = true
8,366✔
4126
        } else {
8,509✔
4127
                node.HaveNodeAnnouncement = false
143✔
4128
        }
143✔
4129

4130
        // The rest of the data is optional, and will only be there if we got a
4131
        // node announcement for this node.
4132
        if !node.HaveNodeAnnouncement {
8,652✔
4133
                return node, nil
143✔
4134
        }
143✔
4135

4136
        // We did get a node announcement for this node, so we'll have the rest
4137
        // of the data available.
4138
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
8,366✔
4139
                return models.LightningNode{}, err
×
4140
        }
×
4141
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
8,366✔
4142
                return models.LightningNode{}, err
×
4143
        }
×
4144
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
8,366✔
4145
                return models.LightningNode{}, err
×
4146
        }
×
4147

4148
        node.Alias, err = wire.ReadVarString(r, 0)
8,366✔
4149
        if err != nil {
8,366✔
4150
                return models.LightningNode{}, err
×
4151
        }
×
4152

4153
        err = node.Features.Decode(r)
8,366✔
4154
        if err != nil {
8,366✔
4155
                return models.LightningNode{}, err
×
4156
        }
×
4157

4158
        if _, err := r.Read(scratch[:2]); err != nil {
8,366✔
4159
                return models.LightningNode{}, err
×
4160
        }
×
4161
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
8,366✔
4162

8,366✔
4163
        var addresses []net.Addr
8,366✔
4164
        for i := 0; i < numAddresses; i++ {
18,964✔
4165
                address, err := DeserializeAddr(r)
10,598✔
4166
                if err != nil {
10,598✔
4167
                        return models.LightningNode{}, err
×
4168
                }
×
4169
                addresses = append(addresses, address)
10,598✔
4170
        }
4171
        node.Addresses = addresses
8,366✔
4172

8,366✔
4173
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,366✔
4174
        if err != nil {
8,366✔
4175
                return models.LightningNode{}, err
×
4176
        }
×
4177

4178
        // We'll try and see if there are any opaque bytes left, if not, then
4179
        // we'll ignore the EOF error and return the node as is.
4180
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
8,366✔
4181
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,366✔
4182
        )
8,366✔
4183
        switch {
8,366✔
4184
        case err == io.ErrUnexpectedEOF:
×
4185
        case err == io.EOF:
×
4186
        case err != nil:
×
4187
                return models.LightningNode{}, err
×
4188
        }
4189

4190
        return node, nil
8,366✔
4191
}
4192

4193
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4194
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
1,485✔
4195

1,485✔
4196
        var b bytes.Buffer
1,485✔
4197

1,485✔
4198
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
1,485✔
4199
                return err
×
4200
        }
×
4201
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
1,485✔
4202
                return err
×
4203
        }
×
4204
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
1,485✔
4205
                return err
×
4206
        }
×
4207
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
1,485✔
4208
                return err
×
4209
        }
×
4210

4211
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
1,485✔
4212
                return err
×
4213
        }
×
4214

4215
        authProof := edgeInfo.AuthProof
1,485✔
4216
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
1,485✔
4217
        if authProof != nil {
2,887✔
4218
                nodeSig1 = authProof.NodeSig1Bytes
1,402✔
4219
                nodeSig2 = authProof.NodeSig2Bytes
1,402✔
4220
                bitcoinSig1 = authProof.BitcoinSig1Bytes
1,402✔
4221
                bitcoinSig2 = authProof.BitcoinSig2Bytes
1,402✔
4222
        }
1,402✔
4223

4224
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
1,485✔
4225
                return err
×
4226
        }
×
4227
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
1,485✔
4228
                return err
×
4229
        }
×
4230
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
1,485✔
4231
                return err
×
4232
        }
×
4233
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
1,485✔
4234
                return err
×
4235
        }
×
4236

4237
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
1,485✔
4238
                return err
×
4239
        }
×
4240
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
1,485✔
4241
        if err != nil {
1,485✔
4242
                return err
×
4243
        }
×
4244
        if _, err := b.Write(chanID[:]); err != nil {
1,485✔
4245
                return err
×
4246
        }
×
4247
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
1,485✔
4248
                return err
×
4249
        }
×
4250

4251
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
1,485✔
4252
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4253
        }
×
4254
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
1,485✔
4255
        if err != nil {
1,485✔
4256
                return err
×
4257
        }
×
4258

4259
        return edgeIndex.Put(chanID[:], b.Bytes())
1,485✔
4260
}
4261

4262
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4263
        chanID []byte) (models.ChannelEdgeInfo, error) {
4,189✔
4264

4,189✔
4265
        edgeInfoBytes := edgeIndex.Get(chanID)
4,189✔
4266
        if edgeInfoBytes == nil {
4,271✔
4267
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
82✔
4268
        }
82✔
4269

4270
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
4,107✔
4271
        return deserializeChanEdgeInfo(edgeInfoReader)
4,107✔
4272
}
4273

4274
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
4,737✔
4275
        var (
4,737✔
4276
                err      error
4,737✔
4277
                edgeInfo models.ChannelEdgeInfo
4,737✔
4278
        )
4,737✔
4279

4,737✔
4280
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
4,737✔
4281
                return models.ChannelEdgeInfo{}, err
×
4282
        }
×
4283
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
4,737✔
4284
                return models.ChannelEdgeInfo{}, err
×
4285
        }
×
4286
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
4,737✔
4287
                return models.ChannelEdgeInfo{}, err
×
4288
        }
×
4289
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
4,737✔
4290
                return models.ChannelEdgeInfo{}, err
×
4291
        }
×
4292

4293
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
4,737✔
4294
        if err != nil {
4,737✔
4295
                return models.ChannelEdgeInfo{}, err
×
4296
        }
×
4297

4298
        proof := &models.ChannelAuthProof{}
4,737✔
4299

4,737✔
4300
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,737✔
4301
        if err != nil {
4,737✔
4302
                return models.ChannelEdgeInfo{}, err
×
4303
        }
×
4304
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,737✔
4305
        if err != nil {
4,737✔
4306
                return models.ChannelEdgeInfo{}, err
×
4307
        }
×
4308
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,737✔
4309
        if err != nil {
4,737✔
4310
                return models.ChannelEdgeInfo{}, err
×
4311
        }
×
4312
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,737✔
4313
        if err != nil {
4,737✔
4314
                return models.ChannelEdgeInfo{}, err
×
4315
        }
×
4316

4317
        if !proof.IsEmpty() {
6,521✔
4318
                edgeInfo.AuthProof = proof
1,784✔
4319
        }
1,784✔
4320

4321
        edgeInfo.ChannelPoint = wire.OutPoint{}
4,737✔
4322
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
4,737✔
4323
                return models.ChannelEdgeInfo{}, err
×
4324
        }
×
4325
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
4,737✔
4326
                return models.ChannelEdgeInfo{}, err
×
4327
        }
×
4328
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
4,737✔
4329
                return models.ChannelEdgeInfo{}, err
×
4330
        }
×
4331

4332
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
4,737✔
4333
                return models.ChannelEdgeInfo{}, err
×
4334
        }
×
4335

4336
        // We'll try and see if there are any opaque bytes left, if not, then
4337
        // we'll ignore the EOF error and return the edge as is.
4338
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
4,737✔
4339
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
4,737✔
4340
        )
4,737✔
4341
        switch {
4,737✔
4342
        case err == io.ErrUnexpectedEOF:
×
4343
        case err == io.EOF:
×
4344
        case err != nil:
×
4345
                return models.ChannelEdgeInfo{}, err
×
4346
        }
4347

4348
        return edgeInfo, nil
4,737✔
4349
}
4350

4351
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4352
        from, to []byte) error {
2,660✔
4353

2,660✔
4354
        var edgeKey [33 + 8]byte
2,660✔
4355
        copy(edgeKey[:], from)
2,660✔
4356
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
2,660✔
4357

2,660✔
4358
        var b bytes.Buffer
2,660✔
4359
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
2,660✔
4360
                return err
×
4361
        }
×
4362

4363
        // Before we write out the new edge, we'll create a new entry in the
4364
        // update index in order to keep it fresh.
4365
        updateUnix := uint64(edge.LastUpdate.Unix())
2,660✔
4366
        var indexKey [8 + 8]byte
2,660✔
4367
        byteOrder.PutUint64(indexKey[:8], updateUnix)
2,660✔
4368
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
2,660✔
4369

2,660✔
4370
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
2,660✔
4371
        if err != nil {
2,660✔
4372
                return err
×
4373
        }
×
4374

4375
        // If there was already an entry for this edge, then we'll need to
4376
        // delete the old one to ensure we don't leave around any after-images.
4377
        // An unknown policy value does not have a update time recorded, so
4378
        // it also does not need to be removed.
4379
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
2,660✔
4380
                !bytes.Equal(edgeBytes[:], unknownPolicy) {
2,684✔
4381

24✔
4382
                // In order to delete the old entry, we'll need to obtain the
24✔
4383
                // *prior* update time in order to delete it. To do this, we'll
24✔
4384
                // need to deserialize the existing policy within the database
24✔
4385
                // (now outdated by the new one), and delete its corresponding
24✔
4386
                // entry within the update index. We'll ignore any
24✔
4387
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
24✔
4388
                // the channel ID and update time to delete the entry.
24✔
4389
                // TODO(halseth): get rid of these invalid policies in a
24✔
4390
                // migration.
24✔
4391
                oldEdgePolicy, err := deserializeChanEdgePolicy(
24✔
4392
                        bytes.NewReader(edgeBytes),
24✔
4393
                )
24✔
4394
                if err != nil && err != ErrEdgePolicyOptionalFieldNotFound {
24✔
4395
                        return err
×
4396
                }
×
4397

4398
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
24✔
4399

24✔
4400
                var oldIndexKey [8 + 8]byte
24✔
4401
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
24✔
4402
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
24✔
4403

24✔
4404
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
24✔
4405
                        return err
×
4406
                }
×
4407
        }
4408

4409
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
2,660✔
4410
                return err
×
4411
        }
×
4412

4413
        updateEdgePolicyDisabledIndex(
2,660✔
4414
                edges, edge.ChannelID,
2,660✔
4415
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
2,660✔
4416
                edge.IsDisabled(),
2,660✔
4417
        )
2,660✔
4418

2,660✔
4419
        return edges.Put(edgeKey[:], b.Bytes()[:])
2,660✔
4420
}
4421

4422
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4423
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4424
// one.
4425
// The direction represents the direction of the edge and disabled is used for
4426
// deciding whether to remove or add an entry to the bucket.
4427
// In general a channel is disabled if two entries for the same chanID exist
4428
// in this bucket.
4429
// Maintaining the bucket this way allows a fast retrieval of disabled
4430
// channels, for example when prune is needed.
4431
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4432
        direction bool, disabled bool) error {
2,942✔
4433

2,942✔
4434
        var disabledEdgeKey [8 + 1]byte
2,942✔
4435
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
2,942✔
4436
        if direction {
4,409✔
4437
                disabledEdgeKey[8] = 1
1,467✔
4438
        }
1,467✔
4439

4440
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
2,942✔
4441
                disabledEdgePolicyBucket,
2,942✔
4442
        )
2,942✔
4443
        if err != nil {
2,942✔
4444
                return err
×
4445
        }
×
4446

4447
        if disabled {
2,968✔
4448
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
26✔
4449
        }
26✔
4450

4451
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
2,916✔
4452
}
4453

4454
// putChanEdgePolicyUnknown marks the edge policy as unknown
4455
// in the edges bucket.
4456
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4457
        from []byte) error {
2,968✔
4458

2,968✔
4459
        var edgeKey [33 + 8]byte
2,968✔
4460
        copy(edgeKey[:], from)
2,968✔
4461
        byteOrder.PutUint64(edgeKey[33:], channelID)
2,968✔
4462

2,968✔
4463
        if edges.Get(edgeKey[:]) != nil {
2,968✔
4464
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4465
                        " when there is already a policy present", channelID)
×
4466
        }
×
4467

4468
        return edges.Put(edgeKey[:], unknownPolicy)
2,968✔
4469
}
4470

4471
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4472
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
8,178✔
4473

8,178✔
4474
        var edgeKey [33 + 8]byte
8,178✔
4475
        copy(edgeKey[:], nodePub)
8,178✔
4476
        copy(edgeKey[33:], chanID[:])
8,178✔
4477

8,178✔
4478
        edgeBytes := edges.Get(edgeKey[:])
8,178✔
4479
        if edgeBytes == nil {
8,178✔
4480
                return nil, ErrEdgeNotFound
×
4481
        }
×
4482

4483
        // No need to deserialize unknown policy.
4484
        if bytes.Equal(edgeBytes[:], unknownPolicy) {
8,553✔
4485
                return nil, nil
375✔
4486
        }
375✔
4487

4488
        edgeReader := bytes.NewReader(edgeBytes)
7,803✔
4489

7,803✔
4490
        ep, err := deserializeChanEdgePolicy(edgeReader)
7,803✔
4491
        switch {
7,803✔
4492
        // If the db policy was missing an expected optional field, we return
4493
        // nil as if the policy was unknown.
4494
        case err == ErrEdgePolicyOptionalFieldNotFound:
1✔
4495
                return nil, nil
1✔
4496

4497
        case err != nil:
×
4498
                return nil, err
×
4499
        }
4500

4501
        return ep, nil
7,802✔
4502
}
4503

4504
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4505
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4506
        error) {
240✔
4507

240✔
4508
        edgeInfo := edgeIndex.Get(chanID)
240✔
4509
        if edgeInfo == nil {
240✔
4510
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4511
                        chanID)
×
4512
        }
×
4513

4514
        // The first node is contained within the first half of the edge
4515
        // information. We only propagate the error here and below if it's
4516
        // something other than edge non-existence.
4517
        node1Pub := edgeInfo[:33]
240✔
4518
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
240✔
4519
        if err != nil {
240✔
4520
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4521
                        node1Pub)
×
4522
        }
×
4523

4524
        // Similarly, the second node is contained within the latter
4525
        // half of the edge information.
4526
        node2Pub := edgeInfo[33:66]
240✔
4527
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
240✔
4528
        if err != nil {
240✔
4529
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4530
                        node2Pub)
×
4531
        }
×
4532

4533
        return edge1, edge2, nil
240✔
4534
}
4535

4536
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4537
        to []byte) error {
2,662✔
4538

2,662✔
4539
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
2,662✔
4540
        if err != nil {
2,662✔
4541
                return err
×
4542
        }
×
4543

4544
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
2,662✔
4545
                return err
×
4546
        }
×
4547

4548
        var scratch [8]byte
2,662✔
4549
        updateUnix := uint64(edge.LastUpdate.Unix())
2,662✔
4550
        byteOrder.PutUint64(scratch[:], updateUnix)
2,662✔
4551
        if _, err := w.Write(scratch[:]); err != nil {
2,662✔
4552
                return err
×
4553
        }
×
4554

4555
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
2,662✔
4556
                return err
×
4557
        }
×
4558
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
2,662✔
4559
                return err
×
4560
        }
×
4561
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
2,662✔
4562
                return err
×
4563
        }
×
4564
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
2,662✔
4565
                return err
×
4566
        }
×
4567
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
2,662✔
4568
        if err != nil {
2,662✔
4569
                return err
×
4570
        }
×
4571
        err = binary.Write(
2,662✔
4572
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
2,662✔
4573
        )
2,662✔
4574
        if err != nil {
2,662✔
4575
                return err
×
4576
        }
×
4577

4578
        if _, err := w.Write(to); err != nil {
2,662✔
4579
                return err
×
4580
        }
×
4581

4582
        // If the max_htlc field is present, we write it. To be compatible with
4583
        // older versions that wasn't aware of this field, we write it as part
4584
        // of the opaque data.
4585
        // TODO(halseth): clean up when moving to TLV.
4586
        var opaqueBuf bytes.Buffer
2,662✔
4587
        if edge.MessageFlags.HasMaxHtlc() {
4,940✔
4588
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
2,278✔
4589
                if err != nil {
2,278✔
4590
                        return err
×
4591
                }
×
4592
        }
4593

4594
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
2,662✔
4595
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4596
        }
×
4597
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
2,662✔
4598
                return err
×
4599
        }
×
4600

4601
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
2,662✔
4602
                return err
×
4603
        }
×
4604
        return nil
2,662✔
4605
}
4606

4607
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
7,828✔
4608
        // Deserialize the policy. Note that in case an optional field is not
7,828✔
4609
        // found, both an error and a populated policy object are returned.
7,828✔
4610
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
7,828✔
4611
        if deserializeErr != nil &&
7,828✔
4612
                deserializeErr != ErrEdgePolicyOptionalFieldNotFound {
7,828✔
4613

×
4614
                return nil, deserializeErr
×
4615
        }
×
4616

4617
        return edge, deserializeErr
7,828✔
4618
}
4619

4620
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4621
        error) {
8,835✔
4622

8,835✔
4623
        edge := &models.ChannelEdgePolicy{}
8,835✔
4624

8,835✔
4625
        var err error
8,835✔
4626
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,835✔
4627
        if err != nil {
8,835✔
4628
                return nil, err
×
4629
        }
×
4630

4631
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
8,835✔
4632
                return nil, err
×
4633
        }
×
4634

4635
        var scratch [8]byte
8,835✔
4636
        if _, err := r.Read(scratch[:]); err != nil {
8,835✔
4637
                return nil, err
×
4638
        }
×
4639
        unix := int64(byteOrder.Uint64(scratch[:]))
8,835✔
4640
        edge.LastUpdate = time.Unix(unix, 0)
8,835✔
4641

8,835✔
4642
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
8,835✔
4643
                return nil, err
×
4644
        }
×
4645
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
8,835✔
4646
                return nil, err
×
4647
        }
×
4648
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
8,835✔
4649
                return nil, err
×
4650
        }
×
4651

4652
        var n uint64
8,835✔
4653
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,835✔
4654
                return nil, err
×
4655
        }
×
4656
        edge.MinHTLC = lnwire.MilliSatoshi(n)
8,835✔
4657

8,835✔
4658
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,835✔
4659
                return nil, err
×
4660
        }
×
4661
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
8,835✔
4662

8,835✔
4663
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,835✔
4664
                return nil, err
×
4665
        }
×
4666
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
8,835✔
4667

8,835✔
4668
        if _, err := r.Read(edge.ToNode[:]); err != nil {
8,835✔
4669
                return nil, err
×
4670
        }
×
4671

4672
        // We'll try and see if there are any opaque bytes left, if not, then
4673
        // we'll ignore the EOF error and return the edge as is.
4674
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
8,835✔
4675
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,835✔
4676
        )
8,835✔
4677
        switch {
8,835✔
4678
        case err == io.ErrUnexpectedEOF:
×
4679
        case err == io.EOF:
3✔
4680
        case err != nil:
×
4681
                return nil, err
×
4682
        }
4683

4684
        // See if optional fields are present.
4685
        if edge.MessageFlags.HasMaxHtlc() {
17,290✔
4686
                // The max_htlc field should be at the beginning of the opaque
8,455✔
4687
                // bytes.
8,455✔
4688
                opq := edge.ExtraOpaqueData
8,455✔
4689

8,455✔
4690
                // If the max_htlc field is not present, it might be old data
8,455✔
4691
                // stored before this field was validated. We'll return the
8,455✔
4692
                // edge along with an error.
8,455✔
4693
                if len(opq) < 8 {
8,458✔
4694
                        return edge, ErrEdgePolicyOptionalFieldNotFound
3✔
4695
                }
3✔
4696

4697
                maxHtlc := byteOrder.Uint64(opq[:8])
8,452✔
4698
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
8,452✔
4699

8,452✔
4700
                // Exclude the parsed field from the rest of the opaque data.
8,452✔
4701
                edge.ExtraOpaqueData = opq[8:]
8,452✔
4702
        }
4703

4704
        return edge, nil
8,832✔
4705
}
4706

4707
// MakeTestGraph creates a new instance of the ChannelGraph for testing
4708
// purposes.
4709
func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph,
4710
        error) {
39✔
4711

39✔
4712
        opts := DefaultOptions()
39✔
4713
        for _, modifier := range modifiers {
39✔
4714
                modifier(opts)
×
4715
        }
×
4716

4717
        // Next, create channelgraph for the first time.
4718
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
39✔
4719
        if err != nil {
39✔
4720
                backendCleanup()
×
4721
                return nil, err
×
4722
        }
×
4723

4724
        graph, err := NewChannelGraph(backend)
39✔
4725
        if err != nil {
39✔
4726
                backendCleanup()
×
4727
                return nil, err
×
4728
        }
×
4729

4730
        t.Cleanup(func() {
78✔
4731
                _ = backend.Close()
39✔
4732
                backendCleanup()
39✔
4733
        })
39✔
4734

4735
        return graph, nil
39✔
4736
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc