• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12583319996

02 Jan 2025 01:38PM UTC coverage: 57.522% (-1.1%) from 58.598%
12583319996

Pull #9361

github

starius
fn/ContextGuard: use context.AfterFunc to wait

Simplifies context cancellation handling by using context.AfterFunc instead of a
goroutine to wait for context cancellation. This approach avoids the overhead of
a goroutine during the waiting period.

For ctxQuitUnsafe, since g.quit is closed only in the Quit method (which also
cancels all associated contexts), waiting on context cancellation ensures the
same behavior without unnecessary dependency on g.quit.

Added a test to ensure that the Create method does not launch any goroutines.
Pull Request #9361: fn: optimize context guard

102587 of 178344 relevant lines covered (57.52%)

24734.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.5
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/lightningnetwork/lnd/aliasmgr"
22
        "github.com/lightningnetwork/lnd/batch"
23
        "github.com/lightningnetwork/lnd/graph/db/models"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/kvdb"
26
        "github.com/lightningnetwork/lnd/lnwire"
27
        "github.com/lightningnetwork/lnd/routing/route"
28
)
29

30
var (
31
        // nodeBucket is a bucket which houses all the vertices or nodes within
32
        // the channel graph. This bucket has a single-sub bucket which adds an
33
        // additional index from pubkey -> alias. Within the top-level of this
34
        // bucket, the key space maps a node's compressed public key to the
35
        // serialized information for that node. Additionally, there's a
36
        // special key "source" which stores the pubkey of the source node. The
37
        // source node is used as the starting point for all graph/queries and
38
        // traversals. The graph is formed as a star-graph with the source node
39
        // at the center.
40
        //
41
        // maps: pubKey -> nodeInfo
42
        // maps: source -> selfPubKey
43
        nodeBucket = []byte("graph-node")
44

45
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
46
        // will be used to quickly look up the "freshness" of a node's last
47
        // update to the network. The bucket only contains keys, and no values,
48
        // it's mapping:
49
        //
50
        // maps: updateTime || nodeID -> nil
51
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
52

53
        // sourceKey is a special key that resides within the nodeBucket. The
54
        // sourceKey maps a key to the public key of the "self node".
55
        sourceKey = []byte("source")
56

57
        // aliasIndexBucket is a sub-bucket that's nested within the main
58
        // nodeBucket. This bucket maps the public key of a node to its
59
        // current alias. This bucket is provided as it can be used within a
60
        // future UI layer to add an additional degree of confirmation.
61
        aliasIndexBucket = []byte("alias")
62

63
        // edgeBucket is a bucket which houses all of the edge or channel
64
        // information within the channel graph. This bucket essentially acts
65
        // as an adjacency list, which in conjunction with a range scan, can be
66
        // used to iterate over all the incoming and outgoing edges for a
67
        // particular node. Key in the bucket use a prefix scheme which leads
68
        // with the node's public key and sends with the compact edge ID.
69
        // For each chanID, there will be two entries within the bucket, as the
70
        // graph is directed: nodes may have different policies w.r.t to fees
71
        // for their respective directions.
72
        //
73
        // maps: pubKey || chanID -> channel edge policy for node
74
        edgeBucket = []byte("graph-edge")
75

76
        // unknownPolicy is represented as an empty slice. It is
77
        // used as the value in edgeBucket for unknown channel edge policies.
78
        // Unknown policies are still stored in the database to enable efficient
79
        // lookup of incoming channel edges.
80
        unknownPolicy = []byte{}
81

82
        // chanStart is an array of all zero bytes which is used to perform
83
        // range scans within the edgeBucket to obtain all of the outgoing
84
        // edges for a particular node.
85
        chanStart [8]byte
86

87
        // edgeIndexBucket is an index which can be used to iterate all edges
88
        // in the bucket, grouping them according to their in/out nodes.
89
        // Additionally, the items in this bucket also contain the complete
90
        // edge information for a channel. The edge information includes the
91
        // capacity of the channel, the nodes that made the channel, etc. This
92
        // bucket resides within the edgeBucket above. Creation of an edge
93
        // proceeds in two phases: first the edge is added to the edge index,
94
        // afterwards the edgeBucket can be updated with the latest details of
95
        // the edge as they are announced on the network.
96
        //
97
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
98
        edgeIndexBucket = []byte("edge-index")
99

100
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
101
        // bucket contains an index which allows us to gauge the "freshness" of
102
        // a channel's last updates.
103
        //
104
        // maps: updateTime || chanID -> nil
105
        edgeUpdateIndexBucket = []byte("edge-update-index")
106

107
        // channelPointBucket maps a channel's full outpoint (txid:index) to
108
        // its short 8-byte channel ID. This bucket resides within the
109
        // edgeBucket above, and can be used to quickly remove an edge due to
110
        // the outpoint being spent, or to query for existence of a channel.
111
        //
112
        // maps: outPoint -> chanID
113
        channelPointBucket = []byte("chan-index")
114

115
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
116
        // responsible for maintaining an index of zombie channels. Each entry
117
        // exists within the bucket as follows:
118
        //
119
        // maps: chanID -> pubKey1 || pubKey2
120
        //
121
        // The chanID represents the channel ID of the edge that is marked as a
122
        // zombie and is used as the key, which maps to the public keys of the
123
        // edge's participants.
124
        zombieBucket = []byte("zombie-index")
125

126
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
127
        // bucket responsible for maintaining an index of disabled edge
128
        // policies. Each entry exists within the bucket as follows:
129
        //
130
        // maps: <chanID><direction> -> []byte{}
131
        //
132
        // The chanID represents the channel ID of the edge and the direction is
133
        // one byte representing the direction of the edge. The main purpose of
134
        // this index is to allow pruning disabled channels in a fast way without
135
        // the need to iterate all over the graph.
136
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
137

138
        // graphMetaBucket is a top-level bucket which stores various meta-deta
139
        // related to the on-disk channel graph. Data stored in this bucket
140
        // includes the block to which the graph has been synced to, the total
141
        // number of channels, etc.
142
        graphMetaBucket = []byte("graph-meta")
143

144
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
145
        // a mapping from the block height to the hash for the blocks used to
146
        // prune the graph.
147
        // Once a new block is discovered, any channels that have been closed
148
        // (by spending the outpoint) can safely be removed from the graph, and
149
        // the block is added to the prune log. We need to keep such a log for
150
        // the case where a reorg happens, and we must "rewind" the state of the
151
        // graph by removing channels that were previously confirmed. In such a
152
        // case we'll remove all entries from the prune log with a block height
153
        // that no longer exists.
154
        pruneLogBucket = []byte("prune-log")
155

156
        // closedScidBucket is a top-level bucket that stores scids for
157
        // channels that we know to be closed. This is used so that we don't
158
        // need to perform expensive validation checks if we receive a channel
159
        // announcement for the channel again.
160
        //
161
        // maps: scid -> []byte{}
162
        closedScidBucket = []byte("closed-scid")
163
)
164

165
const (
166
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
167
        // we'll permit to be written to disk. We limit this as otherwise, it
168
        // would be possible for a node to create a ton of updates and slowly
169
        // fill our disk, and also waste bandwidth due to relaying.
170
        MaxAllowedExtraOpaqueBytes = 10000
171
)
172

173
// ChannelGraph is a persistent, on-disk graph representation of the Lightning
174
// Network. This struct can be used to implement path finding algorithms on top
175
// of, and also to update a node's view based on information received from the
176
// p2p network. Internally, the graph is stored using a modified adjacency list
177
// representation with some added object interaction possible with each
178
// serialized edge/node. The graph is stored is directed, meaning that are two
179
// edges stored for each channel: an inbound/outbound edge for each node pair.
180
// Nodes, edges, and edge information can all be added to the graph
181
// independently. Edge removal results in the deletion of all edge information
182
// for that edge.
183
type ChannelGraph struct {
184
        db kvdb.Backend
185

186
        // cacheMu guards all caches (rejectCache, chanCache, graphCache). If
187
        // this mutex will be acquired at the same time as the DB mutex then
188
        // the cacheMu MUST be acquired first to prevent deadlock.
189
        cacheMu     sync.RWMutex
190
        rejectCache *rejectCache
191
        chanCache   *channelCache
192
        graphCache  *GraphCache
193

194
        chanScheduler batch.Scheduler
195
        nodeScheduler batch.Scheduler
196
}
197

198
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
199
// returned instance has its own unique reject cache and channel cache.
200
func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
201
        error) {
173✔
202

173✔
203
        opts := DefaultOptions()
173✔
204
        for _, o := range options {
276✔
205
                o(opts)
103✔
206
        }
103✔
207

208
        if !opts.NoMigration {
346✔
209
                if err := initChannelGraph(db); err != nil {
173✔
210
                        return nil, err
×
211
                }
×
212
        }
213

214
        g := &ChannelGraph{
173✔
215
                db:          db,
173✔
216
                rejectCache: newRejectCache(opts.RejectCacheSize),
173✔
217
                chanCache:   newChannelCache(opts.ChannelCacheSize),
173✔
218
        }
173✔
219
        g.chanScheduler = batch.NewTimeScheduler(
173✔
220
                db, &g.cacheMu, opts.BatchCommitInterval,
173✔
221
        )
173✔
222
        g.nodeScheduler = batch.NewTimeScheduler(
173✔
223
                db, nil, opts.BatchCommitInterval,
173✔
224
        )
173✔
225

173✔
226
        // The graph cache can be turned off (e.g. for mobile users) for a
173✔
227
        // speed/memory usage tradeoff.
173✔
228
        if opts.UseGraphCache {
313✔
229
                g.graphCache = NewGraphCache(opts.PreAllocCacheNumNodes)
140✔
230
                startTime := time.Now()
140✔
231
                log.Debugf("Populating in-memory channel graph, this might " +
140✔
232
                        "take a while...")
140✔
233

140✔
234
                err := g.ForEachNodeCacheable(
140✔
235
                        func(tx kvdb.RTx, node GraphCacheNode) error {
240✔
236
                                g.graphCache.AddNodeFeatures(node)
100✔
237

100✔
238
                                return nil
100✔
239
                        },
100✔
240
                )
241
                if err != nil {
140✔
242
                        return nil, err
×
243
                }
×
244

245
                err = g.ForEachChannel(func(info *models.ChannelEdgeInfo,
140✔
246
                        policy1, policy2 *models.ChannelEdgePolicy) error {
536✔
247

396✔
248
                        g.graphCache.AddChannel(info, policy1, policy2)
396✔
249

396✔
250
                        return nil
396✔
251
                })
396✔
252
                if err != nil {
140✔
253
                        return nil, err
×
254
                }
×
255

256
                log.Debugf("Finished populating in-memory channel graph (took "+
140✔
257
                        "%v, %s)", time.Since(startTime), g.graphCache.Stats())
140✔
258
        }
259

260
        return g, nil
173✔
261
}
262

263
// channelMapKey is the key structure used for storing channel edge policies.
264
type channelMapKey struct {
265
        nodeKey route.Vertex
266
        chanID  [8]byte
267
}
268

269
// getChannelMap loads all channel edge policies from the database and stores
270
// them in a map.
271
func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
272
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
144✔
273

144✔
274
        // Create a map to store all channel edge policies.
144✔
275
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
144✔
276

144✔
277
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
1,715✔
278
                // Skip embedded buckets.
1,571✔
279
                if bytes.Equal(k, edgeIndexBucket) ||
1,571✔
280
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
1,571✔
281
                        bytes.Equal(k, zombieBucket) ||
1,571✔
282
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
1,571✔
283
                        bytes.Equal(k, channelPointBucket) {
2,152✔
284

581✔
285
                        return nil
581✔
286
                }
581✔
287

288
                // Validate key length.
289
                if len(k) != 33+8 {
990✔
290
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
291
                }
×
292

293
                var key channelMapKey
990✔
294
                copy(key.nodeKey[:], k[:33])
990✔
295
                copy(key.chanID[:], k[33:])
990✔
296

990✔
297
                // No need to deserialize unknown policy.
990✔
298
                if bytes.Equal(edgeBytes, unknownPolicy) {
990✔
299
                        return nil
×
300
                }
×
301

302
                edgeReader := bytes.NewReader(edgeBytes)
990✔
303
                edge, err := deserializeChanEdgePolicyRaw(
990✔
304
                        edgeReader,
990✔
305
                )
990✔
306

990✔
307
                switch {
990✔
308
                // If the db policy was missing an expected optional field, we
309
                // return nil as if the policy was unknown.
310
                case err == ErrEdgePolicyOptionalFieldNotFound:
×
311
                        return nil
×
312

313
                case err != nil:
×
314
                        return err
×
315
                }
316

317
                channelMap[key] = edge
990✔
318

990✔
319
                return nil
990✔
320
        })
321
        if err != nil {
144✔
322
                return nil, err
×
323
        }
×
324

325
        return channelMap, nil
144✔
326
}
327

328
var graphTopLevelBuckets = [][]byte{
329
        nodeBucket,
330
        edgeBucket,
331
        graphMetaBucket,
332
        closedScidBucket,
333
}
334

335
// Wipe completely deletes all saved state within all used buckets within the
336
// database. The deletion is done in a single transaction, therefore this
337
// operation is fully atomic.
338
func (c *ChannelGraph) Wipe() error {
×
339
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
340
                for _, tlb := range graphTopLevelBuckets {
×
341
                        err := tx.DeleteTopLevelBucket(tlb)
×
342
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
343
                                return err
×
344
                        }
×
345
                }
346
                return nil
×
347
        }, func() {})
×
348
        if err != nil {
×
349
                return err
×
350
        }
×
351

352
        return initChannelGraph(c.db)
×
353
}
354

355
// createChannelDB creates and initializes a fresh version of  In
356
// the case that the target path has not yet been created or doesn't yet exist,
357
// then the path is created. Additionally, all required top-level buckets used
358
// within the database are created.
359
func initChannelGraph(db kvdb.Backend) error {
173✔
360
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
346✔
361
                for _, tlb := range graphTopLevelBuckets {
865✔
362
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
692✔
363
                                return err
×
364
                        }
×
365
                }
366

367
                nodes := tx.ReadWriteBucket(nodeBucket)
173✔
368
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
173✔
369
                if err != nil {
173✔
370
                        return err
×
371
                }
×
372
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
173✔
373
                if err != nil {
173✔
374
                        return err
×
375
                }
×
376

377
                edges := tx.ReadWriteBucket(edgeBucket)
173✔
378
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
173✔
379
                if err != nil {
173✔
380
                        return err
×
381
                }
×
382
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
173✔
383
                if err != nil {
173✔
384
                        return err
×
385
                }
×
386
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
173✔
387
                if err != nil {
173✔
388
                        return err
×
389
                }
×
390
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
173✔
391
                if err != nil {
173✔
392
                        return err
×
393
                }
×
394

395
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
173✔
396
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
173✔
397
                return err
173✔
398
        }, func() {})
173✔
399
        if err != nil {
173✔
400
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
401
        }
×
402

403
        return nil
173✔
404
}
405

406
// NewPathFindTx returns a new read transaction that can be used for a single
407
// path finding session. Will return nil if the graph cache is enabled.
408
func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) {
133✔
409
        if c.graphCache != nil {
212✔
410
                return nil, nil
79✔
411
        }
79✔
412

413
        return c.db.BeginReadTx()
54✔
414
}
415

416
// AddrsForNode returns all known addresses for the target node public key that
417
// the graph DB is aware of. The returned boolean indicates if the given node is
418
// unknown to the graph DB or not.
419
//
420
// NOTE: this is part of the channeldb.AddrSource interface.
421
func (c *ChannelGraph) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
422
        error) {
1✔
423

1✔
424
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
1✔
425
        if err != nil {
1✔
426
                return false, nil, err
×
427
        }
×
428

429
        node, err := c.FetchLightningNode(pubKey)
1✔
430
        // We don't consider it an error if the graph is unaware of the node.
1✔
431
        switch {
1✔
432
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
433
                return false, nil, err
×
434

435
        case errors.Is(err, ErrGraphNodeNotFound):
×
436
                return false, nil, nil
×
437
        }
438

439
        return true, node.Addresses, nil
1✔
440
}
441

442
// ForEachChannel iterates through all the channel edges stored within the
443
// graph and invokes the passed callback for each edge. The callback takes two
444
// edges as since this is a directed graph, both the in/out edges are visited.
445
// If the callback returns an error, then the transaction is aborted and the
446
// iteration stops early.
447
//
448
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
449
// for that particular channel edge routing policy will be passed into the
450
// callback.
451
func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo,
452
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
144✔
453

144✔
454
        return c.db.View(func(tx kvdb.RTx) error {
288✔
455
                edges := tx.ReadBucket(edgeBucket)
144✔
456
                if edges == nil {
144✔
457
                        return ErrGraphNoEdgesFound
×
458
                }
×
459

460
                // First, load all edges in memory indexed by node and channel
461
                // id.
462
                channelMap, err := c.getChannelMap(edges)
144✔
463
                if err != nil {
144✔
464
                        return err
×
465
                }
×
466

467
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
144✔
468
                if edgeIndex == nil {
144✔
469
                        return ErrGraphNoEdgesFound
×
470
                }
×
471

472
                // Load edge index, recombine each channel with the policies
473
                // loaded above and invoke the callback.
474
                return kvdb.ForAll(
144✔
475
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
639✔
476
                                var chanID [8]byte
495✔
477
                                copy(chanID[:], k)
495✔
478

495✔
479
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
495✔
480
                                info, err := deserializeChanEdgeInfo(
495✔
481
                                        edgeInfoReader,
495✔
482
                                )
495✔
483
                                if err != nil {
495✔
484
                                        return err
×
485
                                }
×
486

487
                                policy1 := channelMap[channelMapKey{
495✔
488
                                        nodeKey: info.NodeKey1Bytes,
495✔
489
                                        chanID:  chanID,
495✔
490
                                }]
495✔
491

495✔
492
                                policy2 := channelMap[channelMapKey{
495✔
493
                                        nodeKey: info.NodeKey2Bytes,
495✔
494
                                        chanID:  chanID,
495✔
495
                                }]
495✔
496

495✔
497
                                return cb(&info, policy1, policy2)
495✔
498
                        },
499
                )
500
        }, func() {})
144✔
501
}
502

503
// ForEachNodeDirectedChannel iterates through all channels of a given node,
504
// executing the passed callback on the directed edge representing the channel
505
// and its incoming policy. If the callback returns an error, then the iteration
506
// is halted with the error propagated back up to the caller.
507
//
508
// Unknown policies are passed into the callback as nil values.
509
func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
510
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
704✔
511

704✔
512
        if c.graphCache != nil {
1,166✔
513
                return c.graphCache.ForEachChannel(node, cb)
462✔
514
        }
462✔
515

516
        // Fallback that uses the database.
517
        toNodeCallback := func() route.Vertex {
374✔
518
                return node
132✔
519
        }
132✔
520
        toNodeFeatures, err := c.FetchNodeFeatures(node)
242✔
521
        if err != nil {
242✔
522
                return err
×
523
        }
×
524

525
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
242✔
526
                p2 *models.ChannelEdgePolicy) error {
738✔
527

496✔
528
                var cachedInPolicy *models.CachedEdgePolicy
496✔
529
                if p2 != nil {
989✔
530
                        cachedInPolicy = models.NewCachedPolicy(p2)
493✔
531
                        cachedInPolicy.ToNodePubKey = toNodeCallback
493✔
532
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
493✔
533
                }
493✔
534

535
                var inboundFee lnwire.Fee
496✔
536
                if p1 != nil {
991✔
537
                        // Extract inbound fee. If there is a decoding error,
495✔
538
                        // skip this edge.
495✔
539
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
495✔
540
                        if err != nil {
496✔
541
                                return nil
1✔
542
                        }
1✔
543
                }
544

545
                directedChannel := &DirectedChannel{
495✔
546
                        ChannelID:    e.ChannelID,
495✔
547
                        IsNode1:      node == e.NodeKey1Bytes,
495✔
548
                        OtherNode:    e.NodeKey2Bytes,
495✔
549
                        Capacity:     e.Capacity,
495✔
550
                        OutPolicySet: p1 != nil,
495✔
551
                        InPolicy:     cachedInPolicy,
495✔
552
                        InboundFee:   inboundFee,
495✔
553
                }
495✔
554

495✔
555
                if node == e.NodeKey2Bytes {
743✔
556
                        directedChannel.OtherNode = e.NodeKey1Bytes
248✔
557
                }
248✔
558

559
                return cb(directedChannel)
495✔
560
        }
561
        return nodeTraversal(tx, node[:], c.db, dbCallback)
242✔
562
}
563

564
// FetchNodeFeatures returns the features of a given node. If no features are
565
// known for the node, an empty feature vector is returned.
566
func (c *ChannelGraph) FetchNodeFeatures(
567
        node route.Vertex) (*lnwire.FeatureVector, error) {
1,139✔
568

1,139✔
569
        if c.graphCache != nil {
1,592✔
570
                return c.graphCache.GetFeatures(node), nil
453✔
571
        }
453✔
572

573
        // Fallback that uses the database.
574
        targetNode, err := c.FetchLightningNode(node)
686✔
575
        switch err {
686✔
576
        // If the node exists and has features, return them directly.
577
        case nil:
675✔
578
                return targetNode.Features, nil
675✔
579

580
        // If we couldn't find a node announcement, populate a blank feature
581
        // vector.
582
        case ErrGraphNodeNotFound:
11✔
583
                return lnwire.EmptyFeatureVector(), nil
11✔
584

585
        // Otherwise, bubble the error up.
586
        default:
×
587
                return nil, err
×
588
        }
589
}
590

591
// ForEachNodeCached is similar to ForEachNode, but it utilizes the channel
592
// graph cache instead. Note that this doesn't return all the information the
593
// regular ForEachNode method does.
594
//
595
// NOTE: The callback contents MUST not be modified.
596
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
597
        chans map[uint64]*DirectedChannel) error) error {
1✔
598

1✔
599
        if c.graphCache != nil {
1✔
600
                return c.graphCache.ForEachNode(cb)
×
601
        }
×
602

603
        // Otherwise call back to a version that uses the database directly.
604
        // We'll iterate over each node, then the set of channels for each
605
        // node, and construct a similar callback functiopn signature as the
606
        // main funcotin expects.
607
        return c.ForEachNode(func(tx kvdb.RTx,
1✔
608
                node *models.LightningNode) error {
21✔
609

20✔
610
                channels := make(map[uint64]*DirectedChannel)
20✔
611

20✔
612
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
20✔
613
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
20✔
614
                                p1 *models.ChannelEdgePolicy,
20✔
615
                                p2 *models.ChannelEdgePolicy) error {
210✔
616

190✔
617
                                toNodeCallback := func() route.Vertex {
190✔
618
                                        return node.PubKeyBytes
×
619
                                }
×
620
                                toNodeFeatures, err := c.FetchNodeFeatures(
190✔
621
                                        node.PubKeyBytes,
190✔
622
                                )
190✔
623
                                if err != nil {
190✔
624
                                        return err
×
625
                                }
×
626

627
                                var cachedInPolicy *models.CachedEdgePolicy
190✔
628
                                if p2 != nil {
380✔
629
                                        cachedInPolicy =
190✔
630
                                                models.NewCachedPolicy(p2)
190✔
631
                                        cachedInPolicy.ToNodePubKey =
190✔
632
                                                toNodeCallback
190✔
633
                                        cachedInPolicy.ToNodeFeatures =
190✔
634
                                                toNodeFeatures
190✔
635
                                }
190✔
636

637
                                directedChannel := &DirectedChannel{
190✔
638
                                        ChannelID: e.ChannelID,
190✔
639
                                        IsNode1: node.PubKeyBytes ==
190✔
640
                                                e.NodeKey1Bytes,
190✔
641
                                        OtherNode:    e.NodeKey2Bytes,
190✔
642
                                        Capacity:     e.Capacity,
190✔
643
                                        OutPolicySet: p1 != nil,
190✔
644
                                        InPolicy:     cachedInPolicy,
190✔
645
                                }
190✔
646

190✔
647
                                if node.PubKeyBytes == e.NodeKey2Bytes {
285✔
648
                                        directedChannel.OtherNode =
95✔
649
                                                e.NodeKey1Bytes
95✔
650
                                }
95✔
651

652
                                channels[e.ChannelID] = directedChannel
190✔
653

190✔
654
                                return nil
190✔
655
                        })
656
                if err != nil {
20✔
657
                        return err
×
658
                }
×
659

660
                return cb(node.PubKeyBytes, channels)
20✔
661
        })
662
}
663

664
// DisabledChannelIDs returns the channel ids of disabled channels.
665
// A channel is disabled when two of the associated ChanelEdgePolicies
666
// have their disabled bit on.
667
func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
6✔
668
        var disabledChanIDs []uint64
6✔
669
        var chanEdgeFound map[uint64]struct{}
6✔
670

6✔
671
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
672
                edges := tx.ReadBucket(edgeBucket)
6✔
673
                if edges == nil {
6✔
674
                        return ErrGraphNoEdgesFound
×
675
                }
×
676

677
                disabledEdgePolicyIndex := edges.NestedReadBucket(
6✔
678
                        disabledEdgePolicyBucket,
6✔
679
                )
6✔
680
                if disabledEdgePolicyIndex == nil {
7✔
681
                        return nil
1✔
682
                }
1✔
683

684
                // We iterate over all disabled policies and we add each channel
685
                // that has more than one disabled policy to disabledChanIDs
686
                // array.
687
                return disabledEdgePolicyIndex.ForEach(
5✔
688
                        func(k, v []byte) error {
16✔
689
                                chanID := byteOrder.Uint64(k[:8])
11✔
690
                                _, edgeFound := chanEdgeFound[chanID]
11✔
691
                                if edgeFound {
15✔
692
                                        delete(chanEdgeFound, chanID)
4✔
693
                                        disabledChanIDs = append(
4✔
694
                                                disabledChanIDs, chanID,
4✔
695
                                        )
4✔
696

4✔
697
                                        return nil
4✔
698
                                }
4✔
699

700
                                chanEdgeFound[chanID] = struct{}{}
7✔
701

7✔
702
                                return nil
7✔
703
                        },
704
                )
705
        }, func() {
6✔
706
                disabledChanIDs = nil
6✔
707
                chanEdgeFound = make(map[uint64]struct{})
6✔
708
        })
6✔
709
        if err != nil {
6✔
710
                return nil, err
×
711
        }
×
712

713
        return disabledChanIDs, nil
6✔
714
}
715

716
// ForEachNode iterates through all the stored vertices/nodes in the graph,
717
// executing the passed callback with each node encountered. If the callback
718
// returns an error, then the transaction is aborted and the iteration stops
719
// early.
720
//
721
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
722
// traversal when graph gets mega
723
func (c *ChannelGraph) ForEachNode(
724
        cb func(kvdb.RTx, *models.LightningNode) error) error {
129✔
725

129✔
726
        traversal := func(tx kvdb.RTx) error {
258✔
727
                // First grab the nodes bucket which stores the mapping from
129✔
728
                // pubKey to node information.
129✔
729
                nodes := tx.ReadBucket(nodeBucket)
129✔
730
                if nodes == nil {
129✔
731
                        return ErrGraphNotFound
×
732
                }
×
733

734
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,568✔
735
                        // If this is the source key, then we skip this
1,439✔
736
                        // iteration as the value for this key is a pubKey
1,439✔
737
                        // rather than raw node information.
1,439✔
738
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
1,700✔
739
                                return nil
261✔
740
                        }
261✔
741

742
                        nodeReader := bytes.NewReader(nodeBytes)
1,178✔
743
                        node, err := deserializeLightningNode(nodeReader)
1,178✔
744
                        if err != nil {
1,178✔
745
                                return err
×
746
                        }
×
747

748
                        // Execute the callback, the transaction will abort if
749
                        // this returns an error.
750
                        return cb(tx, &node)
1,178✔
751
                })
752
        }
753

754
        return kvdb.View(c.db, traversal, func() {})
258✔
755
}
756

757
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
758
// graph, executing the passed callback with each node encountered. If the
759
// callback returns an error, then the transaction is aborted and the iteration
760
// stops early.
761
func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
762
        GraphCacheNode) error) error {
141✔
763

141✔
764
        traversal := func(tx kvdb.RTx) error {
282✔
765
                // First grab the nodes bucket which stores the mapping from
141✔
766
                // pubKey to node information.
141✔
767
                nodes := tx.ReadBucket(nodeBucket)
141✔
768
                if nodes == nil {
141✔
769
                        return ErrGraphNotFound
×
770
                }
×
771

772
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
543✔
773
                        // If this is the source key, then we skip this
402✔
774
                        // iteration as the value for this key is a pubKey
402✔
775
                        // rather than raw node information.
402✔
776
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
684✔
777
                                return nil
282✔
778
                        }
282✔
779

780
                        nodeReader := bytes.NewReader(nodeBytes)
120✔
781
                        cacheableNode, err := deserializeLightningNodeCacheable(
120✔
782
                                nodeReader,
120✔
783
                        )
120✔
784
                        if err != nil {
120✔
785
                                return err
×
786
                        }
×
787

788
                        // Execute the callback, the transaction will abort if
789
                        // this returns an error.
790
                        return cb(tx, cacheableNode)
120✔
791
                })
792
        }
793

794
        return kvdb.View(c.db, traversal, func() {})
282✔
795
}
796

797
// SourceNode returns the source node of the graph. The source node is treated
798
// as the center node within a star-graph. This method may be used to kick off
799
// a path finding algorithm in order to explore the reachability of another
800
// node based off the source node.
801
func (c *ChannelGraph) SourceNode() (*models.LightningNode, error) {
232✔
802
        var source *models.LightningNode
232✔
803
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
464✔
804
                // First grab the nodes bucket which stores the mapping from
232✔
805
                // pubKey to node information.
232✔
806
                nodes := tx.ReadBucket(nodeBucket)
232✔
807
                if nodes == nil {
232✔
808
                        return ErrGraphNotFound
×
809
                }
×
810

811
                node, err := c.sourceNode(nodes)
232✔
812
                if err != nil {
233✔
813
                        return err
1✔
814
                }
1✔
815
                source = node
231✔
816

231✔
817
                return nil
231✔
818
        }, func() {
232✔
819
                source = nil
232✔
820
        })
232✔
821
        if err != nil {
233✔
822
                return nil, err
1✔
823
        }
1✔
824

825
        return source, nil
231✔
826
}
827

828
// sourceNode uses an existing database transaction and returns the source node
829
// of the graph. The source node is treated as the center node within a
830
// star-graph. This method may be used to kick off a path finding algorithm in
831
// order to explore the reachability of another node based off the source node.
832
func (c *ChannelGraph) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
833
        error) {
504✔
834

504✔
835
        selfPub := nodes.Get(sourceKey)
504✔
836
        if selfPub == nil {
505✔
837
                return nil, ErrSourceNodeNotSet
1✔
838
        }
1✔
839

840
        // With the pubKey of the source node retrieved, we're able to
841
        // fetch the full node information.
842
        node, err := fetchLightningNode(nodes, selfPub)
503✔
843
        if err != nil {
503✔
844
                return nil, err
×
845
        }
×
846

847
        return &node, nil
503✔
848
}
849

850
// SetSourceNode sets the source node within the graph database. The source
851
// node is to be used as the center of a star-graph within path finding
852
// algorithms.
853
func (c *ChannelGraph) SetSourceNode(node *models.LightningNode) error {
118✔
854
        nodePubBytes := node.PubKeyBytes[:]
118✔
855

118✔
856
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
236✔
857
                // First grab the nodes bucket which stores the mapping from
118✔
858
                // pubKey to node information.
118✔
859
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
118✔
860
                if err != nil {
118✔
861
                        return err
×
862
                }
×
863

864
                // Next we create the mapping from source to the targeted
865
                // public key.
866
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
118✔
867
                        return err
×
868
                }
×
869

870
                // Finally, we commit the information of the lightning node
871
                // itself.
872
                return addLightningNode(tx, node)
118✔
873
        }, func() {})
118✔
874
}
875

876
// AddLightningNode adds a vertex/node to the graph database. If the node is not
877
// in the database from before, this will add a new, unconnected one to the
878
// graph. If it is present from before, this will update that node's
879
// information. Note that this method is expected to only be called to update an
880
// already present node from a node announcement, or to insert a node found in a
881
// channel update.
882
//
883
// TODO(roasbeef): also need sig of announcement
884
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
885
        op ...batch.SchedulerOption) error {
799✔
886

799✔
887
        r := &batch.Request{
799✔
888
                Update: func(tx kvdb.RwTx) error {
1,598✔
889
                        if c.graphCache != nil {
1,411✔
890
                                cNode := newGraphCacheNode(
612✔
891
                                        node.PubKeyBytes, node.Features,
612✔
892
                                )
612✔
893
                                err := c.graphCache.AddNode(tx, cNode)
612✔
894
                                if err != nil {
612✔
895
                                        return err
×
896
                                }
×
897
                        }
898

899
                        return addLightningNode(tx, node)
799✔
900
                },
901
        }
902

903
        for _, f := range op {
799✔
904
                f(r)
×
905
        }
×
906

907
        return c.nodeScheduler.Execute(r)
799✔
908
}
909

910
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
991✔
911
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
991✔
912
        if err != nil {
991✔
913
                return err
×
914
        }
×
915

916
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
991✔
917
        if err != nil {
991✔
918
                return err
×
919
        }
×
920

921
        updateIndex, err := nodes.CreateBucketIfNotExists(
991✔
922
                nodeUpdateIndexBucket,
991✔
923
        )
991✔
924
        if err != nil {
991✔
925
                return err
×
926
        }
×
927

928
        return putLightningNode(nodes, aliases, updateIndex, node)
991✔
929
}
930

931
// LookupAlias attempts to return the alias as advertised by the target node.
932
// TODO(roasbeef): currently assumes that aliases are unique...
933
func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
2✔
934
        var alias string
2✔
935

2✔
936
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
937
                nodes := tx.ReadBucket(nodeBucket)
2✔
938
                if nodes == nil {
2✔
939
                        return ErrGraphNodesNotFound
×
940
                }
×
941

942
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
2✔
943
                if aliases == nil {
2✔
944
                        return ErrGraphNodesNotFound
×
945
                }
×
946

947
                nodePub := pub.SerializeCompressed()
2✔
948
                a := aliases.Get(nodePub)
2✔
949
                if a == nil {
3✔
950
                        return ErrNodeAliasNotFound
1✔
951
                }
1✔
952

953
                // TODO(roasbeef): should actually be using the utf-8
954
                // package...
955
                alias = string(a)
1✔
956
                return nil
1✔
957
        }, func() {
2✔
958
                alias = ""
2✔
959
        })
2✔
960
        if err != nil {
3✔
961
                return "", err
1✔
962
        }
1✔
963

964
        return alias, nil
1✔
965
}
966

967
// DeleteLightningNode starts a new database transaction to remove a vertex/node
968
// from the database according to the node's public key.
969
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
970
        // TODO(roasbeef): ensure dangling edges are removed...
3✔
971
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
972
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
973
                if nodes == nil {
3✔
974
                        return ErrGraphNodeNotFound
×
975
                }
×
976

977
                if c.graphCache != nil {
6✔
978
                        c.graphCache.RemoveNode(nodePub)
3✔
979
                }
3✔
980

981
                return c.deleteLightningNode(nodes, nodePub[:])
3✔
982
        }, func() {})
3✔
983
}
984

985
// deleteLightningNode uses an existing database transaction to remove a
986
// vertex/node from the database according to the node's public key.
987
func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
988
        compressedPubKey []byte) error {
66✔
989

66✔
990
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
66✔
991
        if aliases == nil {
66✔
992
                return ErrGraphNodesNotFound
×
993
        }
×
994

995
        if err := aliases.Delete(compressedPubKey); err != nil {
66✔
996
                return err
×
997
        }
×
998

999
        // Before we delete the node, we'll fetch its current state so we can
1000
        // determine when its last update was to clear out the node update
1001
        // index.
1002
        node, err := fetchLightningNode(nodes, compressedPubKey)
66✔
1003
        if err != nil {
66✔
1004
                return err
×
1005
        }
×
1006

1007
        if err := nodes.Delete(compressedPubKey); err != nil {
66✔
1008
                return err
×
1009
        }
×
1010

1011
        // Finally, we'll delete the index entry for the node within the
1012
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
1013
        // need to track its last update.
1014
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
66✔
1015
        if nodeUpdateIndex == nil {
66✔
1016
                return ErrGraphNodesNotFound
×
1017
        }
×
1018

1019
        // In order to delete the entry, we'll need to reconstruct the key for
1020
        // its last update.
1021
        updateUnix := uint64(node.LastUpdate.Unix())
66✔
1022
        var indexKey [8 + 33]byte
66✔
1023
        byteOrder.PutUint64(indexKey[:8], updateUnix)
66✔
1024
        copy(indexKey[8:], compressedPubKey)
66✔
1025

66✔
1026
        return nodeUpdateIndex.Delete(indexKey[:])
66✔
1027
}
1028

1029
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1030
// undirected edge from the two target nodes are created. The information stored
1031
// denotes the static attributes of the channel, such as the channelID, the keys
1032
// involved in creation of the channel, and the set of features that the channel
1033
// supports. The chanPoint and chanID are used to uniquely identify the edge
1034
// globally within the database.
1035
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
1036
        op ...batch.SchedulerOption) error {
1,719✔
1037

1,719✔
1038
        var alreadyExists bool
1,719✔
1039
        r := &batch.Request{
1,719✔
1040
                Reset: func() {
3,438✔
1041
                        alreadyExists = false
1,719✔
1042
                },
1,719✔
1043
                Update: func(tx kvdb.RwTx) error {
1,719✔
1044
                        err := c.addChannelEdge(tx, edge)
1,719✔
1045

1,719✔
1046
                        // Silence ErrEdgeAlreadyExist so that the batch can
1,719✔
1047
                        // succeed, but propagate the error via local state.
1,719✔
1048
                        if err == ErrEdgeAlreadyExist {
1,953✔
1049
                                alreadyExists = true
234✔
1050
                                return nil
234✔
1051
                        }
234✔
1052

1053
                        return err
1,485✔
1054
                },
1055
                OnCommit: func(err error) error {
1,719✔
1056
                        switch {
1,719✔
1057
                        case err != nil:
×
1058
                                return err
×
1059
                        case alreadyExists:
234✔
1060
                                return ErrEdgeAlreadyExist
234✔
1061
                        default:
1,485✔
1062
                                c.rejectCache.remove(edge.ChannelID)
1,485✔
1063
                                c.chanCache.remove(edge.ChannelID)
1,485✔
1064
                                return nil
1,485✔
1065
                        }
1066
                },
1067
        }
1068

1069
        for _, f := range op {
1,719✔
1070
                if f == nil {
×
1071
                        return fmt.Errorf("nil scheduler option was used")
×
1072
                }
×
1073

1074
                f(r)
×
1075
        }
1076

1077
        return c.chanScheduler.Execute(r)
1,719✔
1078
}
1079

1080
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1081
// utilize an existing db transaction.
1082
func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
1083
        edge *models.ChannelEdgeInfo) error {
1,719✔
1084

1,719✔
1085
        // Construct the channel's primary key which is the 8-byte channel ID.
1,719✔
1086
        var chanKey [8]byte
1,719✔
1087
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1,719✔
1088

1,719✔
1089
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,719✔
1090
        if err != nil {
1,719✔
1091
                return err
×
1092
        }
×
1093
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
1,719✔
1094
        if err != nil {
1,719✔
1095
                return err
×
1096
        }
×
1097
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
1,719✔
1098
        if err != nil {
1,719✔
1099
                return err
×
1100
        }
×
1101
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
1,719✔
1102
        if err != nil {
1,719✔
1103
                return err
×
1104
        }
×
1105

1106
        // First, attempt to check if this edge has already been created. If
1107
        // so, then we can exit early as this method is meant to be idempotent.
1108
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
1,953✔
1109
                return ErrEdgeAlreadyExist
234✔
1110
        }
234✔
1111

1112
        if c.graphCache != nil {
2,780✔
1113
                c.graphCache.AddChannel(edge, nil, nil)
1,295✔
1114
        }
1,295✔
1115

1116
        // Before we insert the channel into the database, we'll ensure that
1117
        // both nodes already exist in the channel graph. If either node
1118
        // doesn't, then we'll insert a "shell" node that just includes its
1119
        // public key, so subsequent validation and queries can work properly.
1120
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
1,485✔
1121
        switch {
1,485✔
1122
        case node1Err == ErrGraphNodeNotFound:
18✔
1123
                node1Shell := models.LightningNode{
18✔
1124
                        PubKeyBytes:          edge.NodeKey1Bytes,
18✔
1125
                        HaveNodeAnnouncement: false,
18✔
1126
                }
18✔
1127
                err := addLightningNode(tx, &node1Shell)
18✔
1128
                if err != nil {
18✔
1129
                        return fmt.Errorf("unable to create shell node "+
×
1130
                                "for: %x: %w", edge.NodeKey1Bytes, err)
×
1131
                }
×
1132
        case node1Err != nil:
×
1133
                return node1Err
×
1134
        }
1135

1136
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
1,485✔
1137
        switch {
1,485✔
1138
        case node2Err == ErrGraphNodeNotFound:
56✔
1139
                node2Shell := models.LightningNode{
56✔
1140
                        PubKeyBytes:          edge.NodeKey2Bytes,
56✔
1141
                        HaveNodeAnnouncement: false,
56✔
1142
                }
56✔
1143
                err := addLightningNode(tx, &node2Shell)
56✔
1144
                if err != nil {
56✔
1145
                        return fmt.Errorf("unable to create shell node "+
×
1146
                                "for: %x: %w", edge.NodeKey2Bytes, err)
×
1147
                }
×
1148
        case node2Err != nil:
×
1149
                return node2Err
×
1150
        }
1151

1152
        // If the edge hasn't been created yet, then we'll first add it to the
1153
        // edge index in order to associate the edge between two nodes and also
1154
        // store the static components of the channel.
1155
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
1,485✔
1156
                return err
×
1157
        }
×
1158

1159
        // Mark edge policies for both sides as unknown. This is to enable
1160
        // efficient incoming channel lookup for a node.
1161
        keys := []*[33]byte{
1,485✔
1162
                &edge.NodeKey1Bytes,
1,485✔
1163
                &edge.NodeKey2Bytes,
1,485✔
1164
        }
1,485✔
1165
        for _, key := range keys {
4,455✔
1166
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
2,970✔
1167
                if err != nil {
2,970✔
1168
                        return err
×
1169
                }
×
1170
        }
1171

1172
        // Finally we add it to the channel index which maps channel points
1173
        // (outpoints) to the shorter channel ID's.
1174
        var b bytes.Buffer
1,485✔
1175
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
1,485✔
1176
                return err
×
1177
        }
×
1178
        return chanIndex.Put(b.Bytes(), chanKey[:])
1,485✔
1179
}
1180

1181
// HasChannelEdge returns true if the database knows of a channel edge with the
1182
// passed channel ID, and false otherwise. If an edge with that ID is found
1183
// within the graph, then two time stamps representing the last time the edge
1184
// was updated for both directed edges are returned along with the boolean. If
1185
// it is not found, then the zombie index is checked and its result is returned
1186
// as the second boolean.
1187
func (c *ChannelGraph) HasChannelEdge(
1188
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
216✔
1189

216✔
1190
        var (
216✔
1191
                upd1Time time.Time
216✔
1192
                upd2Time time.Time
216✔
1193
                exists   bool
216✔
1194
                isZombie bool
216✔
1195
        )
216✔
1196

216✔
1197
        // We'll query the cache with the shared lock held to allow multiple
216✔
1198
        // readers to access values in the cache concurrently if they exist.
216✔
1199
        c.cacheMu.RLock()
216✔
1200
        if entry, ok := c.rejectCache.get(chanID); ok {
278✔
1201
                c.cacheMu.RUnlock()
62✔
1202
                upd1Time = time.Unix(entry.upd1Time, 0)
62✔
1203
                upd2Time = time.Unix(entry.upd2Time, 0)
62✔
1204
                exists, isZombie = entry.flags.unpack()
62✔
1205
                return upd1Time, upd2Time, exists, isZombie, nil
62✔
1206
        }
62✔
1207
        c.cacheMu.RUnlock()
154✔
1208

154✔
1209
        c.cacheMu.Lock()
154✔
1210
        defer c.cacheMu.Unlock()
154✔
1211

154✔
1212
        // The item was not found with the shared lock, so we'll acquire the
154✔
1213
        // exclusive lock and check the cache again in case another method added
154✔
1214
        // the entry to the cache while no lock was held.
154✔
1215
        if entry, ok := c.rejectCache.get(chanID); ok {
160✔
1216
                upd1Time = time.Unix(entry.upd1Time, 0)
6✔
1217
                upd2Time = time.Unix(entry.upd2Time, 0)
6✔
1218
                exists, isZombie = entry.flags.unpack()
6✔
1219
                return upd1Time, upd2Time, exists, isZombie, nil
6✔
1220
        }
6✔
1221

1222
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
296✔
1223
                edges := tx.ReadBucket(edgeBucket)
148✔
1224
                if edges == nil {
148✔
1225
                        return ErrGraphNoEdgesFound
×
1226
                }
×
1227
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
148✔
1228
                if edgeIndex == nil {
148✔
1229
                        return ErrGraphNoEdgesFound
×
1230
                }
×
1231

1232
                var channelID [8]byte
148✔
1233
                byteOrder.PutUint64(channelID[:], chanID)
148✔
1234

148✔
1235
                // If the edge doesn't exist, then we'll also check our zombie
148✔
1236
                // index.
148✔
1237
                if edgeIndex.Get(channelID[:]) == nil {
246✔
1238
                        exists = false
98✔
1239
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
98✔
1240
                        if zombieIndex != nil {
196✔
1241
                                isZombie, _, _ = isZombieEdge(
98✔
1242
                                        zombieIndex, chanID,
98✔
1243
                                )
98✔
1244
                        }
98✔
1245

1246
                        return nil
98✔
1247
                }
1248

1249
                exists = true
50✔
1250
                isZombie = false
50✔
1251

50✔
1252
                // If the channel has been found in the graph, then retrieve
50✔
1253
                // the edges itself so we can return the last updated
50✔
1254
                // timestamps.
50✔
1255
                nodes := tx.ReadBucket(nodeBucket)
50✔
1256
                if nodes == nil {
50✔
1257
                        return ErrGraphNodeNotFound
×
1258
                }
×
1259

1260
                e1, e2, err := fetchChanEdgePolicies(
50✔
1261
                        edgeIndex, edges, channelID[:],
50✔
1262
                )
50✔
1263
                if err != nil {
50✔
1264
                        return err
×
1265
                }
×
1266

1267
                // As we may have only one of the edges populated, only set the
1268
                // update time if the edge was found in the database.
1269
                if e1 != nil {
68✔
1270
                        upd1Time = e1.LastUpdate
18✔
1271
                }
18✔
1272
                if e2 != nil {
66✔
1273
                        upd2Time = e2.LastUpdate
16✔
1274
                }
16✔
1275

1276
                return nil
50✔
1277
        }, func() {}); err != nil {
148✔
1278
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1279
        }
×
1280

1281
        c.rejectCache.insert(chanID, rejectCacheEntry{
148✔
1282
                upd1Time: upd1Time.Unix(),
148✔
1283
                upd2Time: upd2Time.Unix(),
148✔
1284
                flags:    packRejectFlags(exists, isZombie),
148✔
1285
        })
148✔
1286

148✔
1287
        return upd1Time, upd2Time, exists, isZombie, nil
148✔
1288
}
1289

1290
// UpdateChannelEdge retrieves and update edge of the graph database. Method
1291
// only reserved for updating an edge info after its already been created.
1292
// In order to maintain this constraints, we return an error in the scenario
1293
// that an edge info hasn't yet been created yet, but someone attempts to update
1294
// it.
1295
func (c *ChannelGraph) UpdateChannelEdge(edge *models.ChannelEdgeInfo) error {
1✔
1296
        // Construct the channel's primary key which is the 8-byte channel ID.
1✔
1297
        var chanKey [8]byte
1✔
1298
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1✔
1299

1✔
1300
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
1301
                edges := tx.ReadWriteBucket(edgeBucket)
1✔
1302
                if edge == nil {
1✔
1303
                        return ErrEdgeNotFound
×
1304
                }
×
1305

1306
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
1✔
1307
                if edgeIndex == nil {
1✔
1308
                        return ErrEdgeNotFound
×
1309
                }
×
1310

1311
                if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo == nil {
1✔
1312
                        return ErrEdgeNotFound
×
1313
                }
×
1314

1315
                if c.graphCache != nil {
2✔
1316
                        c.graphCache.UpdateChannel(edge)
1✔
1317
                }
1✔
1318

1319
                return putChanEdgeInfo(edgeIndex, edge, chanKey)
1✔
1320
        }, func() {})
1✔
1321
}
1322

1323
const (
1324
        // pruneTipBytes is the total size of the value which stores a prune
1325
        // entry of the graph in the prune log. The "prune tip" is the last
1326
        // entry in the prune log, and indicates if the channel graph is in
1327
        // sync with the current UTXO state. The structure of the value
1328
        // is: blockHash, taking 32 bytes total.
1329
        pruneTipBytes = 32
1330
)
1331

1332
// PruneGraph prunes newly closed channels from the channel graph in response
1333
// to a new block being solved on the network. Any transactions which spend the
1334
// funding output of any known channels within he graph will be deleted.
1335
// Additionally, the "prune tip", or the last block which has been used to
1336
// prune the graph is stored so callers can ensure the graph is fully in sync
1337
// with the current UTXO state. A slice of channels that have been closed by
1338
// the target block are returned if the function succeeds without error.
1339
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
1340
        blockHash *chainhash.Hash, blockHeight uint32) (
1341
        []*models.ChannelEdgeInfo, error) {
248✔
1342

248✔
1343
        c.cacheMu.Lock()
248✔
1344
        defer c.cacheMu.Unlock()
248✔
1345

248✔
1346
        var chansClosed []*models.ChannelEdgeInfo
248✔
1347

248✔
1348
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
496✔
1349
                // First grab the edges bucket which houses the information
248✔
1350
                // we'd like to delete
248✔
1351
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
248✔
1352
                if err != nil {
248✔
1353
                        return err
×
1354
                }
×
1355

1356
                // Next grab the two edge indexes which will also need to be
1357
                // updated.
1358
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
248✔
1359
                if err != nil {
248✔
1360
                        return err
×
1361
                }
×
1362
                chanIndex, err := edges.CreateBucketIfNotExists(
248✔
1363
                        channelPointBucket,
248✔
1364
                )
248✔
1365
                if err != nil {
248✔
1366
                        return err
×
1367
                }
×
1368
                nodes := tx.ReadWriteBucket(nodeBucket)
248✔
1369
                if nodes == nil {
248✔
1370
                        return ErrSourceNodeNotSet
×
1371
                }
×
1372
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
248✔
1373
                if err != nil {
248✔
1374
                        return err
×
1375
                }
×
1376

1377
                // For each of the outpoints that have been spent within the
1378
                // block, we attempt to delete them from the graph as if that
1379
                // outpoint was a channel, then it has now been closed.
1380
                for _, chanPoint := range spentOutputs {
387✔
1381
                        // TODO(roasbeef): load channel bloom filter, continue
139✔
1382
                        // if NOT if filter
139✔
1383

139✔
1384
                        var opBytes bytes.Buffer
139✔
1385
                        err := WriteOutpoint(&opBytes, chanPoint)
139✔
1386
                        if err != nil {
139✔
1387
                                return err
×
1388
                        }
×
1389

1390
                        // First attempt to see if the channel exists within
1391
                        // the database, if not, then we can exit early.
1392
                        chanID := chanIndex.Get(opBytes.Bytes())
139✔
1393
                        if chanID == nil {
254✔
1394
                                continue
115✔
1395
                        }
1396

1397
                        // However, if it does, then we'll read out the full
1398
                        // version so we can add it to the set of deleted
1399
                        // channels.
1400
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
24✔
1401
                        if err != nil {
24✔
1402
                                return err
×
1403
                        }
×
1404

1405
                        // Attempt to delete the channel, an ErrEdgeNotFound
1406
                        // will be returned if that outpoint isn't known to be
1407
                        // a channel. If no error is returned, then a channel
1408
                        // was successfully pruned.
1409
                        err = c.delChannelEdgeUnsafe(
24✔
1410
                                edges, edgeIndex, chanIndex, zombieIndex,
24✔
1411
                                chanID, false, false,
24✔
1412
                        )
24✔
1413
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
24✔
1414
                                return err
×
1415
                        }
×
1416

1417
                        chansClosed = append(chansClosed, &edgeInfo)
24✔
1418
                }
1419

1420
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
248✔
1421
                if err != nil {
248✔
1422
                        return err
×
1423
                }
×
1424

1425
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
248✔
1426
                        pruneLogBucket,
248✔
1427
                )
248✔
1428
                if err != nil {
248✔
1429
                        return err
×
1430
                }
×
1431

1432
                // With the graph pruned, add a new entry to the prune log,
1433
                // which can be used to check if the graph is fully synced with
1434
                // the current UTXO state.
1435
                var blockHeightBytes [4]byte
248✔
1436
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
248✔
1437

248✔
1438
                var newTip [pruneTipBytes]byte
248✔
1439
                copy(newTip[:], blockHash[:])
248✔
1440

248✔
1441
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
248✔
1442
                if err != nil {
248✔
1443
                        return err
×
1444
                }
×
1445

1446
                // Now that the graph has been pruned, we'll also attempt to
1447
                // prune any nodes that have had a channel closed within the
1448
                // latest block.
1449
                return c.pruneGraphNodes(nodes, edgeIndex)
248✔
1450
        }, func() {
248✔
1451
                chansClosed = nil
248✔
1452
        })
248✔
1453
        if err != nil {
248✔
1454
                return nil, err
×
1455
        }
×
1456

1457
        for _, channel := range chansClosed {
272✔
1458
                c.rejectCache.remove(channel.ChannelID)
24✔
1459
                c.chanCache.remove(channel.ChannelID)
24✔
1460
        }
24✔
1461

1462
        if c.graphCache != nil {
496✔
1463
                log.Debugf("Pruned graph, cache now has %s",
248✔
1464
                        c.graphCache.Stats())
248✔
1465
        }
248✔
1466

1467
        return chansClosed, nil
248✔
1468
}
1469

1470
// PruneGraphNodes is a garbage collection method which attempts to prune out
1471
// any nodes from the channel graph that are currently unconnected. This ensure
1472
// that we only maintain a graph of reachable nodes. In the event that a pruned
1473
// node gains more channels, it will be re-added back to the graph.
1474
func (c *ChannelGraph) PruneGraphNodes() error {
24✔
1475
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
48✔
1476
                nodes := tx.ReadWriteBucket(nodeBucket)
24✔
1477
                if nodes == nil {
24✔
1478
                        return ErrGraphNodesNotFound
×
1479
                }
×
1480
                edges := tx.ReadWriteBucket(edgeBucket)
24✔
1481
                if edges == nil {
24✔
1482
                        return ErrGraphNotFound
×
1483
                }
×
1484
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
24✔
1485
                if edgeIndex == nil {
24✔
1486
                        return ErrGraphNoEdgesFound
×
1487
                }
×
1488

1489
                return c.pruneGraphNodes(nodes, edgeIndex)
24✔
1490
        }, func() {})
24✔
1491
}
1492

1493
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1494
// channel closed within the current block. If the node still has existing
1495
// channels in the graph, this will act as a no-op.
1496
func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
1497
        edgeIndex kvdb.RwBucket) error {
272✔
1498

272✔
1499
        log.Trace("Pruning nodes from graph with no open channels")
272✔
1500

272✔
1501
        // We'll retrieve the graph's source node to ensure we don't remove it
272✔
1502
        // even if it no longer has any open channels.
272✔
1503
        sourceNode, err := c.sourceNode(nodes)
272✔
1504
        if err != nil {
272✔
1505
                return err
×
1506
        }
×
1507

1508
        // We'll use this map to keep count the number of references to a node
1509
        // in the graph. A node should only be removed once it has no more
1510
        // references in the graph.
1511
        nodeRefCounts := make(map[[33]byte]int)
272✔
1512
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,607✔
1513
                // If this is the source key, then we skip this
1,335✔
1514
                // iteration as the value for this key is a pubKey
1,335✔
1515
                // rather than raw node information.
1,335✔
1516
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
2,151✔
1517
                        return nil
816✔
1518
                }
816✔
1519

1520
                var nodePub [33]byte
519✔
1521
                copy(nodePub[:], pubKey)
519✔
1522
                nodeRefCounts[nodePub] = 0
519✔
1523

519✔
1524
                return nil
519✔
1525
        })
1526
        if err != nil {
272✔
1527
                return err
×
1528
        }
×
1529

1530
        // To ensure we never delete the source node, we'll start off by
1531
        // bumping its ref count to 1.
1532
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
272✔
1533

272✔
1534
        // Next, we'll run through the edgeIndex which maps a channel ID to the
272✔
1535
        // edge info. We'll use this scan to populate our reference count map
272✔
1536
        // above.
272✔
1537
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
467✔
1538
                // The first 66 bytes of the edge info contain the pubkeys of
195✔
1539
                // the nodes that this edge attaches. We'll extract them, and
195✔
1540
                // add them to the ref count map.
195✔
1541
                var node1, node2 [33]byte
195✔
1542
                copy(node1[:], edgeInfoBytes[:33])
195✔
1543
                copy(node2[:], edgeInfoBytes[33:])
195✔
1544

195✔
1545
                // With the nodes extracted, we'll increase the ref count of
195✔
1546
                // each of the nodes.
195✔
1547
                nodeRefCounts[node1]++
195✔
1548
                nodeRefCounts[node2]++
195✔
1549

195✔
1550
                return nil
195✔
1551
        })
195✔
1552
        if err != nil {
272✔
1553
                return err
×
1554
        }
×
1555

1556
        // Finally, we'll make a second pass over the set of nodes, and delete
1557
        // any nodes that have a ref count of zero.
1558
        var numNodesPruned int
272✔
1559
        for nodePubKey, refCount := range nodeRefCounts {
791✔
1560
                // If the ref count of the node isn't zero, then we can safely
519✔
1561
                // skip it as it still has edges to or from it within the
519✔
1562
                // graph.
519✔
1563
                if refCount != 0 {
975✔
1564
                        continue
456✔
1565
                }
1566

1567
                if c.graphCache != nil {
126✔
1568
                        c.graphCache.RemoveNode(nodePubKey)
63✔
1569
                }
63✔
1570

1571
                // If we reach this point, then there are no longer any edges
1572
                // that connect this node, so we can delete it.
1573
                err := c.deleteLightningNode(nodes, nodePubKey[:])
63✔
1574
                if err != nil {
63✔
1575
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1576
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1577

×
1578
                                log.Warnf("Unable to prune node %x from the "+
×
1579
                                        "graph: %v", nodePubKey, err)
×
1580
                                continue
×
1581
                        }
1582

1583
                        return err
×
1584
                }
1585

1586
                log.Infof("Pruned unconnected node %x from channel graph",
63✔
1587
                        nodePubKey[:])
63✔
1588

63✔
1589
                numNodesPruned++
63✔
1590
        }
1591

1592
        if numNodesPruned > 0 {
319✔
1593
                log.Infof("Pruned %v unconnected nodes from the channel graph",
47✔
1594
                        numNodesPruned)
47✔
1595
        }
47✔
1596

1597
        return nil
272✔
1598
}
1599

1600
// DisconnectBlockAtHeight is used to indicate that the block specified
1601
// by the passed height has been disconnected from the main chain. This
1602
// will "rewind" the graph back to the height below, deleting channels
1603
// that are no longer confirmed from the graph. The prune log will be
1604
// set to the last prune height valid for the remaining chain.
1605
// Channels that were removed from the graph resulting from the
1606
// disconnected block are returned.
1607
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
1608
        []*models.ChannelEdgeInfo, error) {
161✔
1609

161✔
1610
        // Every channel having a ShortChannelID starting at 'height'
161✔
1611
        // will no longer be confirmed.
161✔
1612
        startShortChanID := lnwire.ShortChannelID{
161✔
1613
                BlockHeight: height,
161✔
1614
        }
161✔
1615

161✔
1616
        // Delete everything after this height from the db up until the
161✔
1617
        // SCID alias range.
161✔
1618
        endShortChanID := aliasmgr.StartingAlias
161✔
1619

161✔
1620
        // The block height will be the 3 first bytes of the channel IDs.
161✔
1621
        var chanIDStart [8]byte
161✔
1622
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
161✔
1623
        var chanIDEnd [8]byte
161✔
1624
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
161✔
1625

161✔
1626
        c.cacheMu.Lock()
161✔
1627
        defer c.cacheMu.Unlock()
161✔
1628

161✔
1629
        // Keep track of the channels that are removed from the graph.
161✔
1630
        var removedChans []*models.ChannelEdgeInfo
161✔
1631

161✔
1632
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
322✔
1633
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
161✔
1634
                if err != nil {
161✔
1635
                        return err
×
1636
                }
×
1637
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
161✔
1638
                if err != nil {
161✔
1639
                        return err
×
1640
                }
×
1641
                chanIndex, err := edges.CreateBucketIfNotExists(
161✔
1642
                        channelPointBucket,
161✔
1643
                )
161✔
1644
                if err != nil {
161✔
1645
                        return err
×
1646
                }
×
1647
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
161✔
1648
                if err != nil {
161✔
1649
                        return err
×
1650
                }
×
1651

1652
                // Scan from chanIDStart to chanIDEnd, deleting every
1653
                // found edge.
1654
                // NOTE: we must delete the edges after the cursor loop, since
1655
                // modifying the bucket while traversing is not safe.
1656
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1657
                // so that the StartingAlias itself isn't deleted.
1658
                var keys [][]byte
161✔
1659
                cursor := edgeIndex.ReadWriteCursor()
161✔
1660

161✔
1661
                //nolint:ll
161✔
1662
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
161✔
1663
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, v = cursor.Next() {
257✔
1664
                        edgeInfoReader := bytes.NewReader(v)
96✔
1665
                        edgeInfo, err := deserializeChanEdgeInfo(edgeInfoReader)
96✔
1666
                        if err != nil {
96✔
1667
                                return err
×
1668
                        }
×
1669

1670
                        keys = append(keys, k)
96✔
1671
                        removedChans = append(removedChans, &edgeInfo)
96✔
1672
                }
1673

1674
                for _, k := range keys {
257✔
1675
                        err = c.delChannelEdgeUnsafe(
96✔
1676
                                edges, edgeIndex, chanIndex, zombieIndex,
96✔
1677
                                k, false, false,
96✔
1678
                        )
96✔
1679
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
96✔
1680
                                return err
×
1681
                        }
×
1682
                }
1683

1684
                // Delete all the entries in the prune log having a height
1685
                // greater or equal to the block disconnected.
1686
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
161✔
1687
                if err != nil {
161✔
1688
                        return err
×
1689
                }
×
1690

1691
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
161✔
1692
                        pruneLogBucket,
161✔
1693
                )
161✔
1694
                if err != nil {
161✔
1695
                        return err
×
1696
                }
×
1697

1698
                var pruneKeyStart [4]byte
161✔
1699
                byteOrder.PutUint32(pruneKeyStart[:], height)
161✔
1700

161✔
1701
                var pruneKeyEnd [4]byte
161✔
1702
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
161✔
1703

161✔
1704
                // To avoid modifying the bucket while traversing, we delete
161✔
1705
                // the keys in a second loop.
161✔
1706
                var pruneKeys [][]byte
161✔
1707
                pruneCursor := pruneBucket.ReadWriteCursor()
161✔
1708
                //nolint:ll
161✔
1709
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
161✔
1710
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
260✔
1711
                        pruneKeys = append(pruneKeys, k)
99✔
1712
                }
99✔
1713

1714
                for _, k := range pruneKeys {
260✔
1715
                        if err := pruneBucket.Delete(k); err != nil {
99✔
1716
                                return err
×
1717
                        }
×
1718
                }
1719

1720
                return nil
161✔
1721
        }, func() {
161✔
1722
                removedChans = nil
161✔
1723
        }); err != nil {
161✔
1724
                return nil, err
×
1725
        }
×
1726

1727
        for _, channel := range removedChans {
257✔
1728
                c.rejectCache.remove(channel.ChannelID)
96✔
1729
                c.chanCache.remove(channel.ChannelID)
96✔
1730
        }
96✔
1731

1732
        return removedChans, nil
161✔
1733
}
1734

1735
// PruneTip returns the block height and hash of the latest block that has been
1736
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1737
// to tell if the graph is currently in sync with the current best known UTXO
1738
// state.
1739
func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
55✔
1740
        var (
55✔
1741
                tipHash   chainhash.Hash
55✔
1742
                tipHeight uint32
55✔
1743
        )
55✔
1744

55✔
1745
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
110✔
1746
                graphMeta := tx.ReadBucket(graphMetaBucket)
55✔
1747
                if graphMeta == nil {
55✔
1748
                        return ErrGraphNotFound
×
1749
                }
×
1750
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
55✔
1751
                if pruneBucket == nil {
55✔
1752
                        return ErrGraphNeverPruned
×
1753
                }
×
1754

1755
                pruneCursor := pruneBucket.ReadCursor()
55✔
1756

55✔
1757
                // The prune key with the largest block height will be our
55✔
1758
                // prune tip.
55✔
1759
                k, v := pruneCursor.Last()
55✔
1760
                if k == nil {
74✔
1761
                        return ErrGraphNeverPruned
19✔
1762
                }
19✔
1763

1764
                // Once we have the prune tip, the value will be the block hash,
1765
                // and the key the block height.
1766
                copy(tipHash[:], v[:])
36✔
1767
                tipHeight = byteOrder.Uint32(k[:])
36✔
1768

36✔
1769
                return nil
36✔
1770
        }, func() {})
55✔
1771
        if err != nil {
74✔
1772
                return nil, 0, err
19✔
1773
        }
19✔
1774

1775
        return &tipHash, tipHeight, nil
36✔
1776
}
1777

1778
// DeleteChannelEdges removes edges with the given channel IDs from the
1779
// database and marks them as zombies. This ensures that we're unable to re-add
1780
// it to our database once again. If an edge does not exist within the
1781
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1782
// true, then when we mark these edges as zombies, we'll set up the keys such
1783
// that we require the node that failed to send the fresh update to be the one
1784
// that resurrects the channel from its zombie state. The markZombie bool
1785
// denotes whether or not to mark the channel as a zombie.
1786
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1787
        chanIDs ...uint64) error {
141✔
1788

141✔
1789
        // TODO(roasbeef): possibly delete from node bucket if node has no more
141✔
1790
        // channels
141✔
1791
        // TODO(roasbeef): don't delete both edges?
141✔
1792

141✔
1793
        c.cacheMu.Lock()
141✔
1794
        defer c.cacheMu.Unlock()
141✔
1795

141✔
1796
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
282✔
1797
                edges := tx.ReadWriteBucket(edgeBucket)
141✔
1798
                if edges == nil {
141✔
1799
                        return ErrEdgeNotFound
×
1800
                }
×
1801
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
141✔
1802
                if edgeIndex == nil {
141✔
1803
                        return ErrEdgeNotFound
×
1804
                }
×
1805
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
141✔
1806
                if chanIndex == nil {
141✔
1807
                        return ErrEdgeNotFound
×
1808
                }
×
1809
                nodes := tx.ReadWriteBucket(nodeBucket)
141✔
1810
                if nodes == nil {
141✔
1811
                        return ErrGraphNodeNotFound
×
1812
                }
×
1813
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
141✔
1814
                if err != nil {
141✔
1815
                        return err
×
1816
                }
×
1817

1818
                var rawChanID [8]byte
141✔
1819
                for _, chanID := range chanIDs {
225✔
1820
                        byteOrder.PutUint64(rawChanID[:], chanID)
84✔
1821
                        err := c.delChannelEdgeUnsafe(
84✔
1822
                                edges, edgeIndex, chanIndex, zombieIndex,
84✔
1823
                                rawChanID[:], markZombie, strictZombiePruning,
84✔
1824
                        )
84✔
1825
                        if err != nil {
144✔
1826
                                return err
60✔
1827
                        }
60✔
1828
                }
1829

1830
                return nil
81✔
1831
        }, func() {})
141✔
1832
        if err != nil {
201✔
1833
                return err
60✔
1834
        }
60✔
1835

1836
        for _, chanID := range chanIDs {
105✔
1837
                c.rejectCache.remove(chanID)
24✔
1838
                c.chanCache.remove(chanID)
24✔
1839
        }
24✔
1840

1841
        return nil
81✔
1842
}
1843

1844
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1845
// passed channel point (outpoint). If the passed channel doesn't exist within
1846
// the database, then ErrEdgeNotFound is returned.
1847
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
1✔
1848
        var chanID uint64
1✔
1849
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2✔
1850
                var err error
1✔
1851
                chanID, err = getChanID(tx, chanPoint)
1✔
1852
                return err
1✔
1853
        }, func() {
2✔
1854
                chanID = 0
1✔
1855
        }); err != nil {
1✔
1856
                return 0, err
×
1857
        }
×
1858

1859
        return chanID, nil
1✔
1860
}
1861

1862
// getChanID returns the assigned channel ID for a given channel point.
1863
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
1✔
1864
        var b bytes.Buffer
1✔
1865
        if err := WriteOutpoint(&b, chanPoint); err != nil {
1✔
1866
                return 0, err
×
1867
        }
×
1868

1869
        edges := tx.ReadBucket(edgeBucket)
1✔
1870
        if edges == nil {
1✔
1871
                return 0, ErrGraphNoEdgesFound
×
1872
        }
×
1873
        chanIndex := edges.NestedReadBucket(channelPointBucket)
1✔
1874
        if chanIndex == nil {
1✔
1875
                return 0, ErrGraphNoEdgesFound
×
1876
        }
×
1877

1878
        chanIDBytes := chanIndex.Get(b.Bytes())
1✔
1879
        if chanIDBytes == nil {
1✔
1880
                return 0, ErrEdgeNotFound
×
1881
        }
×
1882

1883
        chanID := byteOrder.Uint64(chanIDBytes)
1✔
1884

1✔
1885
        return chanID, nil
1✔
1886
}
1887

1888
// TODO(roasbeef): allow updates to use Batch?
1889

1890
// HighestChanID returns the "highest" known channel ID in the channel graph.
1891
// This represents the "newest" channel from the PoV of the chain. This method
1892
// can be used by peers to quickly determine if they're graphs are in sync.
1893
func (c *ChannelGraph) HighestChanID() (uint64, error) {
3✔
1894
        var cid uint64
3✔
1895

3✔
1896
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1897
                edges := tx.ReadBucket(edgeBucket)
3✔
1898
                if edges == nil {
3✔
1899
                        return ErrGraphNoEdgesFound
×
1900
                }
×
1901
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1902
                if edgeIndex == nil {
3✔
1903
                        return ErrGraphNoEdgesFound
×
1904
                }
×
1905

1906
                // In order to find the highest chan ID, we'll fetch a cursor
1907
                // and use that to seek to the "end" of our known rage.
1908
                cidCursor := edgeIndex.ReadCursor()
3✔
1909

3✔
1910
                lastChanID, _ := cidCursor.Last()
3✔
1911

3✔
1912
                // If there's no key, then this means that we don't actually
3✔
1913
                // know of any channels, so we'll return a predicable error.
3✔
1914
                if lastChanID == nil {
4✔
1915
                        return ErrGraphNoEdgesFound
1✔
1916
                }
1✔
1917

1918
                // Otherwise, we'll de serialize the channel ID and return it
1919
                // to the caller.
1920
                cid = byteOrder.Uint64(lastChanID)
2✔
1921
                return nil
2✔
1922
        }, func() {
3✔
1923
                cid = 0
3✔
1924
        })
3✔
1925
        if err != nil && err != ErrGraphNoEdgesFound {
3✔
1926
                return 0, err
×
1927
        }
×
1928

1929
        return cid, nil
3✔
1930
}
1931

1932
// ChannelEdge represents the complete set of information for a channel edge in
1933
// the known channel graph. This struct couples the core information of the
1934
// edge as well as each of the known advertised edge policies.
1935
type ChannelEdge struct {
1936
        // Info contains all the static information describing the channel.
1937
        Info *models.ChannelEdgeInfo
1938

1939
        // Policy1 points to the "first" edge policy of the channel containing
1940
        // the dynamic information required to properly route through the edge.
1941
        Policy1 *models.ChannelEdgePolicy
1942

1943
        // Policy2 points to the "second" edge policy of the channel containing
1944
        // the dynamic information required to properly route through the edge.
1945
        Policy2 *models.ChannelEdgePolicy
1946

1947
        // Node1 is "node 1" in the channel. This is the node that would have
1948
        // produced Policy1 if it exists.
1949
        Node1 *models.LightningNode
1950

1951
        // Node2 is "node 2" in the channel. This is the node that would have
1952
        // produced Policy2 if it exists.
1953
        Node2 *models.LightningNode
1954
}
1955

1956
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1957
// one edge that has an update timestamp within the specified horizon.
1958
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
1959
        endTime time.Time) ([]ChannelEdge, error) {
139✔
1960

139✔
1961
        // To ensure we don't return duplicate ChannelEdges, we'll use an
139✔
1962
        // additional map to keep track of the edges already seen to prevent
139✔
1963
        // re-adding it.
139✔
1964
        var edgesSeen map[uint64]struct{}
139✔
1965
        var edgesToCache map[uint64]ChannelEdge
139✔
1966
        var edgesInHorizon []ChannelEdge
139✔
1967

139✔
1968
        c.cacheMu.Lock()
139✔
1969
        defer c.cacheMu.Unlock()
139✔
1970

139✔
1971
        var hits int
139✔
1972
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
278✔
1973
                edges := tx.ReadBucket(edgeBucket)
139✔
1974
                if edges == nil {
139✔
1975
                        return ErrGraphNoEdgesFound
×
1976
                }
×
1977
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
139✔
1978
                if edgeIndex == nil {
139✔
1979
                        return ErrGraphNoEdgesFound
×
1980
                }
×
1981
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
139✔
1982
                if edgeUpdateIndex == nil {
139✔
1983
                        return ErrGraphNoEdgesFound
×
1984
                }
×
1985

1986
                nodes := tx.ReadBucket(nodeBucket)
139✔
1987
                if nodes == nil {
139✔
1988
                        return ErrGraphNodesNotFound
×
1989
                }
×
1990

1991
                // We'll now obtain a cursor to perform a range query within
1992
                // the index to find all channels within the horizon.
1993
                updateCursor := edgeUpdateIndex.ReadCursor()
139✔
1994

139✔
1995
                var startTimeBytes, endTimeBytes [8 + 8]byte
139✔
1996
                byteOrder.PutUint64(
139✔
1997
                        startTimeBytes[:8], uint64(startTime.Unix()),
139✔
1998
                )
139✔
1999
                byteOrder.PutUint64(
139✔
2000
                        endTimeBytes[:8], uint64(endTime.Unix()),
139✔
2001
                )
139✔
2002

139✔
2003
                // With our start and end times constructed, we'll step through
139✔
2004
                // the index collecting the info and policy of each update of
139✔
2005
                // each channel that has a last update within the time range.
139✔
2006
                //
139✔
2007
                //nolint:ll
139✔
2008
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
139✔
2009
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
185✔
2010

46✔
2011
                        // We have a new eligible entry, so we'll slice of the
46✔
2012
                        // chan ID so we can query it in the DB.
46✔
2013
                        chanID := indexKey[8:]
46✔
2014

46✔
2015
                        // If we've already retrieved the info and policies for
46✔
2016
                        // this edge, then we can skip it as we don't need to do
46✔
2017
                        // so again.
46✔
2018
                        chanIDInt := byteOrder.Uint64(chanID)
46✔
2019
                        if _, ok := edgesSeen[chanIDInt]; ok {
65✔
2020
                                continue
19✔
2021
                        }
2022

2023
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
36✔
2024
                                hits++
9✔
2025
                                edgesSeen[chanIDInt] = struct{}{}
9✔
2026
                                edgesInHorizon = append(edgesInHorizon, channel)
9✔
2027
                                continue
9✔
2028
                        }
2029

2030
                        // First, we'll fetch the static edge information.
2031
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
18✔
2032
                        if err != nil {
18✔
2033
                                chanID := byteOrder.Uint64(chanID)
×
2034
                                return fmt.Errorf("unable to fetch info for "+
×
2035
                                        "edge with chan_id=%v: %v", chanID, err)
×
2036
                        }
×
2037

2038
                        // With the static information obtained, we'll now
2039
                        // fetch the dynamic policy info.
2040
                        edge1, edge2, err := fetchChanEdgePolicies(
18✔
2041
                                edgeIndex, edges, chanID,
18✔
2042
                        )
18✔
2043
                        if err != nil {
18✔
2044
                                chanID := byteOrder.Uint64(chanID)
×
2045
                                return fmt.Errorf("unable to fetch policies "+
×
2046
                                        "for edge with chan_id=%v: %v", chanID,
×
2047
                                        err)
×
2048
                        }
×
2049

2050
                        node1, err := fetchLightningNode(
18✔
2051
                                nodes, edgeInfo.NodeKey1Bytes[:],
18✔
2052
                        )
18✔
2053
                        if err != nil {
18✔
2054
                                return err
×
2055
                        }
×
2056

2057
                        node2, err := fetchLightningNode(
18✔
2058
                                nodes, edgeInfo.NodeKey2Bytes[:],
18✔
2059
                        )
18✔
2060
                        if err != nil {
18✔
2061
                                return err
×
2062
                        }
×
2063

2064
                        // Finally, we'll collate this edge with the rest of
2065
                        // edges to be returned.
2066
                        edgesSeen[chanIDInt] = struct{}{}
18✔
2067
                        channel := ChannelEdge{
18✔
2068
                                Info:    &edgeInfo,
18✔
2069
                                Policy1: edge1,
18✔
2070
                                Policy2: edge2,
18✔
2071
                                Node1:   &node1,
18✔
2072
                                Node2:   &node2,
18✔
2073
                        }
18✔
2074
                        edgesInHorizon = append(edgesInHorizon, channel)
18✔
2075
                        edgesToCache[chanIDInt] = channel
18✔
2076
                }
2077

2078
                return nil
139✔
2079
        }, func() {
139✔
2080
                edgesSeen = make(map[uint64]struct{})
139✔
2081
                edgesToCache = make(map[uint64]ChannelEdge)
139✔
2082
                edgesInHorizon = nil
139✔
2083
        })
139✔
2084
        switch {
139✔
2085
        case err == ErrGraphNoEdgesFound:
×
2086
                fallthrough
×
2087
        case err == ErrGraphNodesNotFound:
×
2088
                break
×
2089

2090
        case err != nil:
×
2091
                return nil, err
×
2092
        }
2093

2094
        // Insert any edges loaded from disk into the cache.
2095
        for chanid, channel := range edgesToCache {
157✔
2096
                c.chanCache.insert(chanid, channel)
18✔
2097
        }
18✔
2098

2099
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
139✔
2100
                float64(hits)/float64(len(edgesInHorizon)), hits,
139✔
2101
                len(edgesInHorizon))
139✔
2102

139✔
2103
        return edgesInHorizon, nil
139✔
2104
}
2105

2106
// NodeUpdatesInHorizon returns all the known lightning node which have an
2107
// update timestamp within the passed range. This method can be used by two
2108
// nodes to quickly determine if they have the same set of up to date node
2109
// announcements.
2110
func (c *ChannelGraph) NodeUpdatesInHorizon(startTime,
2111
        endTime time.Time) ([]models.LightningNode, error) {
8✔
2112

8✔
2113
        var nodesInHorizon []models.LightningNode
8✔
2114

8✔
2115
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
16✔
2116
                nodes := tx.ReadBucket(nodeBucket)
8✔
2117
                if nodes == nil {
8✔
2118
                        return ErrGraphNodesNotFound
×
2119
                }
×
2120

2121
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
8✔
2122
                if nodeUpdateIndex == nil {
8✔
2123
                        return ErrGraphNodesNotFound
×
2124
                }
×
2125

2126
                // We'll now obtain a cursor to perform a range query within
2127
                // the index to find all node announcements within the horizon.
2128
                updateCursor := nodeUpdateIndex.ReadCursor()
8✔
2129

8✔
2130
                var startTimeBytes, endTimeBytes [8 + 33]byte
8✔
2131
                byteOrder.PutUint64(
8✔
2132
                        startTimeBytes[:8], uint64(startTime.Unix()),
8✔
2133
                )
8✔
2134
                byteOrder.PutUint64(
8✔
2135
                        endTimeBytes[:8], uint64(endTime.Unix()),
8✔
2136
                )
8✔
2137

8✔
2138
                // With our start and end times constructed, we'll step through
8✔
2139
                // the index collecting info for each node within the time
8✔
2140
                // range.
8✔
2141
                //
8✔
2142
                //nolint:ll
8✔
2143
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
8✔
2144
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
37✔
2145

29✔
2146
                        nodePub := indexKey[8:]
29✔
2147
                        node, err := fetchLightningNode(nodes, nodePub)
29✔
2148
                        if err != nil {
29✔
2149
                                return err
×
2150
                        }
×
2151

2152
                        nodesInHorizon = append(nodesInHorizon, node)
29✔
2153
                }
2154

2155
                return nil
8✔
2156
        }, func() {
8✔
2157
                nodesInHorizon = nil
8✔
2158
        })
8✔
2159
        switch {
8✔
2160
        case err == ErrGraphNoEdgesFound:
×
2161
                fallthrough
×
2162
        case err == ErrGraphNodesNotFound:
×
2163
                break
×
2164

2165
        case err != nil:
×
2166
                return nil, err
×
2167
        }
2168

2169
        return nodesInHorizon, nil
8✔
2170
}
2171

2172
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2173
// ID's that we don't know and are not known zombies of the passed set. In other
2174
// words, we perform a set difference of our set of chan ID's and the ones
2175
// passed in. This method can be used by callers to determine the set of
2176
// channels another peer knows of that we don't.
2177
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
2178
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
123✔
2179

123✔
2180
        var newChanIDs []uint64
123✔
2181

123✔
2182
        c.cacheMu.Lock()
123✔
2183
        defer c.cacheMu.Unlock()
123✔
2184

123✔
2185
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
246✔
2186
                edges := tx.ReadBucket(edgeBucket)
123✔
2187
                if edges == nil {
123✔
2188
                        return ErrGraphNoEdgesFound
×
2189
                }
×
2190
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
123✔
2191
                if edgeIndex == nil {
123✔
2192
                        return ErrGraphNoEdgesFound
×
2193
                }
×
2194

2195
                // Fetch the zombie index, it may not exist if no edges have
2196
                // ever been marked as zombies. If the index has been
2197
                // initialized, we will use it later to skip known zombie edges.
2198
                zombieIndex := edges.NestedReadBucket(zombieBucket)
123✔
2199

123✔
2200
                // We'll run through the set of chanIDs and collate only the
123✔
2201
                // set of channel that are unable to be found within our db.
123✔
2202
                var cidBytes [8]byte
123✔
2203
                for _, info := range chansInfo {
250✔
2204
                        scid := info.ShortChannelID.ToUint64()
127✔
2205
                        byteOrder.PutUint64(cidBytes[:], scid)
127✔
2206

127✔
2207
                        // If the edge is already known, skip it.
127✔
2208
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
150✔
2209
                                continue
23✔
2210
                        }
2211

2212
                        // If the edge is a known zombie, skip it.
2213
                        if zombieIndex != nil {
208✔
2214
                                isZombie, _, _ := isZombieEdge(
104✔
2215
                                        zombieIndex, scid,
104✔
2216
                                )
104✔
2217

104✔
2218
                                // TODO(ziggie): Make sure that for the strict
104✔
2219
                                // pruning case we compare the pubkeys and
104✔
2220
                                // whether the right timestamp is not older than
104✔
2221
                                // the `ChannelPruneExpiry`.
104✔
2222
                                //
104✔
2223
                                // NOTE: The timestamp data has no verification
104✔
2224
                                // attached to it in the `ReplyChannelRange` msg
104✔
2225
                                // so we are trusting this data at this point.
104✔
2226
                                // However it is not critical because we are
104✔
2227
                                // just removing the channel from the db when
104✔
2228
                                // the timestamps are more recent. During the
104✔
2229
                                // querying of the gossip msg verification
104✔
2230
                                // happens as usual.
104✔
2231
                                // However we should start punishing peers when
104✔
2232
                                // they don't provide us honest data ?
104✔
2233
                                isStillZombie := isZombieChan(
104✔
2234
                                        info.Node1UpdateTimestamp,
104✔
2235
                                        info.Node2UpdateTimestamp,
104✔
2236
                                )
104✔
2237

104✔
2238
                                switch {
104✔
2239
                                // If the edge is a known zombie and if we
2240
                                // would still consider it a zombie given the
2241
                                // latest update timestamps, then we skip this
2242
                                // channel.
2243
                                case isZombie && isStillZombie:
29✔
2244
                                        continue
29✔
2245

2246
                                // Otherwise, if we have marked it as a zombie
2247
                                // but the latest update timestamps could bring
2248
                                // it back from the dead, then we mark it alive,
2249
                                // and we let it be added to the set of IDs to
2250
                                // query our peer for.
2251
                                case isZombie && !isStillZombie:
19✔
2252
                                        err := c.markEdgeLiveUnsafe(tx, scid)
19✔
2253
                                        if err != nil {
19✔
2254
                                                return err
×
2255
                                        }
×
2256
                                }
2257
                        }
2258

2259
                        newChanIDs = append(newChanIDs, scid)
75✔
2260
                }
2261

2262
                return nil
123✔
2263
        }, func() {
123✔
2264
                newChanIDs = nil
123✔
2265
        })
123✔
2266
        switch {
123✔
2267
        // If we don't know of any edges yet, then we'll return the entire set
2268
        // of chan IDs specified.
2269
        case err == ErrGraphNoEdgesFound:
×
2270
                ogChanIDs := make([]uint64, len(chansInfo))
×
2271
                for i, info := range chansInfo {
×
2272
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2273
                }
×
2274

2275
                return ogChanIDs, nil
×
2276

2277
        case err != nil:
×
2278
                return nil, err
×
2279
        }
2280

2281
        return newChanIDs, nil
123✔
2282
}
2283

2284
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2285
// latest received channel updates for the channel.
2286
type ChannelUpdateInfo struct {
2287
        // ShortChannelID is the SCID identifier of the channel.
2288
        ShortChannelID lnwire.ShortChannelID
2289

2290
        // Node1UpdateTimestamp is the timestamp of the latest received update
2291
        // from the node 1 channel peer. This will be set to zero time if no
2292
        // update has yet been received from this node.
2293
        Node1UpdateTimestamp time.Time
2294

2295
        // Node2UpdateTimestamp is the timestamp of the latest received update
2296
        // from the node 2 channel peer. This will be set to zero time if no
2297
        // update has yet been received from this node.
2298
        Node2UpdateTimestamp time.Time
2299
}
2300

2301
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2302
// timestamps with zero seconds unix timestamp which equals
2303
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2304
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2305
        node2Timestamp time.Time) ChannelUpdateInfo {
218✔
2306

218✔
2307
        chanInfo := ChannelUpdateInfo{
218✔
2308
                ShortChannelID:       scid,
218✔
2309
                Node1UpdateTimestamp: node1Timestamp,
218✔
2310
                Node2UpdateTimestamp: node2Timestamp,
218✔
2311
        }
218✔
2312

218✔
2313
        if node1Timestamp.IsZero() {
426✔
2314
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
208✔
2315
        }
208✔
2316

2317
        if node2Timestamp.IsZero() {
426✔
2318
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
208✔
2319
        }
208✔
2320

2321
        return chanInfo
218✔
2322
}
2323

2324
// BlockChannelRange represents a range of channels for a given block height.
2325
type BlockChannelRange struct {
2326
        // Height is the height of the block all of the channels below were
2327
        // included in.
2328
        Height uint32
2329

2330
        // Channels is the list of channels identified by their short ID
2331
        // representation known to us that were included in the block height
2332
        // above. The list may include channel update timestamp information if
2333
        // requested.
2334
        Channels []ChannelUpdateInfo
2335
}
2336

2337
// FilterChannelRange returns the channel ID's of all known channels which were
2338
// mined in a block height within the passed range. The channel IDs are grouped
2339
// by their common block height. This method can be used to quickly share with a
2340
// peer the set of channels we know of within a particular range to catch them
2341
// up after a period of time offline. If withTimestamps is true then the
2342
// timestamp info of the latest received channel update messages of the channel
2343
// will be included in the response.
2344
func (c *ChannelGraph) FilterChannelRange(startHeight,
2345
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
11✔
2346

11✔
2347
        startChanID := &lnwire.ShortChannelID{
11✔
2348
                BlockHeight: startHeight,
11✔
2349
        }
11✔
2350

11✔
2351
        endChanID := lnwire.ShortChannelID{
11✔
2352
                BlockHeight: endHeight,
11✔
2353
                TxIndex:     math.MaxUint32 & 0x00ffffff,
11✔
2354
                TxPosition:  math.MaxUint16,
11✔
2355
        }
11✔
2356

11✔
2357
        // As we need to perform a range scan, we'll convert the starting and
11✔
2358
        // ending height to their corresponding values when encoded using short
11✔
2359
        // channel ID's.
11✔
2360
        var chanIDStart, chanIDEnd [8]byte
11✔
2361
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
11✔
2362
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
11✔
2363

11✔
2364
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
11✔
2365
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
2366
                edges := tx.ReadBucket(edgeBucket)
11✔
2367
                if edges == nil {
11✔
2368
                        return ErrGraphNoEdgesFound
×
2369
                }
×
2370
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
2371
                if edgeIndex == nil {
11✔
2372
                        return ErrGraphNoEdgesFound
×
2373
                }
×
2374

2375
                cursor := edgeIndex.ReadCursor()
11✔
2376

11✔
2377
                // We'll now iterate through the database, and find each
11✔
2378
                // channel ID that resides within the specified range.
11✔
2379
                //
11✔
2380
                //nolint:ll
11✔
2381
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
11✔
2382
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
55✔
2383
                        // Don't send alias SCIDs during gossip sync.
44✔
2384
                        edgeReader := bytes.NewReader(v)
44✔
2385
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
44✔
2386
                        if err != nil {
44✔
2387
                                return err
×
2388
                        }
×
2389

2390
                        if edgeInfo.AuthProof == nil {
44✔
2391
                                continue
×
2392
                        }
2393

2394
                        // This channel ID rests within the target range, so
2395
                        // we'll add it to our returned set.
2396
                        rawCid := byteOrder.Uint64(k)
44✔
2397
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
44✔
2398

44✔
2399
                        chanInfo := NewChannelUpdateInfo(
44✔
2400
                                cid, time.Time{}, time.Time{},
44✔
2401
                        )
44✔
2402

44✔
2403
                        if !withTimestamps {
66✔
2404
                                channelsPerBlock[cid.BlockHeight] = append(
22✔
2405
                                        channelsPerBlock[cid.BlockHeight],
22✔
2406
                                        chanInfo,
22✔
2407
                                )
22✔
2408

22✔
2409
                                continue
22✔
2410
                        }
2411

2412
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
22✔
2413

22✔
2414
                        rawPolicy := edges.Get(node1Key)
22✔
2415
                        if len(rawPolicy) != 0 {
36✔
2416
                                r := bytes.NewReader(rawPolicy)
14✔
2417

14✔
2418
                                edge, err := deserializeChanEdgePolicyRaw(r)
14✔
2419
                                if err != nil && !errors.Is(
14✔
2420
                                        err, ErrEdgePolicyOptionalFieldNotFound,
14✔
2421
                                ) {
14✔
2422

×
2423
                                        return err
×
2424
                                }
×
2425

2426
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
14✔
2427
                        }
2428

2429
                        rawPolicy = edges.Get(node2Key)
22✔
2430
                        if len(rawPolicy) != 0 {
26✔
2431
                                r := bytes.NewReader(rawPolicy)
4✔
2432

4✔
2433
                                edge, err := deserializeChanEdgePolicyRaw(r)
4✔
2434
                                if err != nil && !errors.Is(
4✔
2435
                                        err, ErrEdgePolicyOptionalFieldNotFound,
4✔
2436
                                ) {
4✔
2437

×
2438
                                        return err
×
2439
                                }
×
2440

2441
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
4✔
2442
                        }
2443

2444
                        channelsPerBlock[cid.BlockHeight] = append(
22✔
2445
                                channelsPerBlock[cid.BlockHeight], chanInfo,
22✔
2446
                        )
22✔
2447
                }
2448

2449
                return nil
11✔
2450
        }, func() {
11✔
2451
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
11✔
2452
        })
11✔
2453

2454
        switch {
11✔
2455
        // If we don't know of any channels yet, then there's nothing to
2456
        // filter, so we'll return an empty slice.
2457
        case err == ErrGraphNoEdgesFound || len(channelsPerBlock) == 0:
3✔
2458
                return nil, nil
3✔
2459

2460
        case err != nil:
×
2461
                return nil, err
×
2462
        }
2463

2464
        // Return the channel ranges in ascending block height order.
2465
        blocks := make([]uint32, 0, len(channelsPerBlock))
8✔
2466
        for block := range channelsPerBlock {
30✔
2467
                blocks = append(blocks, block)
22✔
2468
        }
22✔
2469
        sort.Slice(blocks, func(i, j int) bool {
26✔
2470
                return blocks[i] < blocks[j]
18✔
2471
        })
18✔
2472

2473
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
8✔
2474
        for _, block := range blocks {
30✔
2475
                channelRanges = append(channelRanges, BlockChannelRange{
22✔
2476
                        Height:   block,
22✔
2477
                        Channels: channelsPerBlock[block],
22✔
2478
                })
22✔
2479
        }
22✔
2480

2481
        return channelRanges, nil
8✔
2482
}
2483

2484
// FetchChanInfos returns the set of channel edges that correspond to the passed
2485
// channel ID's. If an edge is the query is unknown to the database, it will
2486
// skipped and the result will contain only those edges that exist at the time
2487
// of the query. This can be used to respond to peer queries that are seeking to
2488
// fill in gaps in their view of the channel graph.
2489
func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
3✔
2490
        return c.fetchChanInfos(nil, chanIDs)
3✔
2491
}
3✔
2492

2493
// fetchChanInfos returns the set of channel edges that correspond to the passed
2494
// channel ID's. If an edge is the query is unknown to the database, it will
2495
// skipped and the result will contain only those edges that exist at the time
2496
// of the query. This can be used to respond to peer queries that are seeking to
2497
// fill in gaps in their view of the channel graph.
2498
//
2499
// NOTE: An optional transaction may be provided. If none is provided, then a
2500
// new one will be created.
2501
func (c *ChannelGraph) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2502
        []ChannelEdge, error) {
23✔
2503
        // TODO(roasbeef): sort cids?
23✔
2504

23✔
2505
        var (
23✔
2506
                chanEdges []ChannelEdge
23✔
2507
                cidBytes  [8]byte
23✔
2508
        )
23✔
2509

23✔
2510
        fetchChanInfos := func(tx kvdb.RTx) error {
46✔
2511
                edges := tx.ReadBucket(edgeBucket)
23✔
2512
                if edges == nil {
23✔
2513
                        return ErrGraphNoEdgesFound
×
2514
                }
×
2515
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
23✔
2516
                if edgeIndex == nil {
23✔
2517
                        return ErrGraphNoEdgesFound
×
2518
                }
×
2519
                nodes := tx.ReadBucket(nodeBucket)
23✔
2520
                if nodes == nil {
23✔
2521
                        return ErrGraphNotFound
×
2522
                }
×
2523

2524
                for _, cid := range chanIDs {
53✔
2525
                        byteOrder.PutUint64(cidBytes[:], cid)
30✔
2526

30✔
2527
                        // First, we'll fetch the static edge information. If
30✔
2528
                        // the edge is unknown, we will skip the edge and
30✔
2529
                        // continue gathering all known edges.
30✔
2530
                        edgeInfo, err := fetchChanEdgeInfo(
30✔
2531
                                edgeIndex, cidBytes[:],
30✔
2532
                        )
30✔
2533
                        switch {
30✔
2534
                        case errors.Is(err, ErrEdgeNotFound):
22✔
2535
                                continue
22✔
2536
                        case err != nil:
×
2537
                                return err
×
2538
                        }
2539

2540
                        // With the static information obtained, we'll now
2541
                        // fetch the dynamic policy info.
2542
                        edge1, edge2, err := fetchChanEdgePolicies(
8✔
2543
                                edgeIndex, edges, cidBytes[:],
8✔
2544
                        )
8✔
2545
                        if err != nil {
8✔
2546
                                return err
×
2547
                        }
×
2548

2549
                        node1, err := fetchLightningNode(
8✔
2550
                                nodes, edgeInfo.NodeKey1Bytes[:],
8✔
2551
                        )
8✔
2552
                        if err != nil {
8✔
2553
                                return err
×
2554
                        }
×
2555

2556
                        node2, err := fetchLightningNode(
8✔
2557
                                nodes, edgeInfo.NodeKey2Bytes[:],
8✔
2558
                        )
8✔
2559
                        if err != nil {
8✔
2560
                                return err
×
2561
                        }
×
2562

2563
                        chanEdges = append(chanEdges, ChannelEdge{
8✔
2564
                                Info:    &edgeInfo,
8✔
2565
                                Policy1: edge1,
8✔
2566
                                Policy2: edge2,
8✔
2567
                                Node1:   &node1,
8✔
2568
                                Node2:   &node2,
8✔
2569
                        })
8✔
2570
                }
2571
                return nil
23✔
2572
        }
2573

2574
        if tx == nil {
27✔
2575
                err := kvdb.View(c.db, fetchChanInfos, func() {
8✔
2576
                        chanEdges = nil
4✔
2577
                })
4✔
2578
                if err != nil {
4✔
2579
                        return nil, err
×
2580
                }
×
2581

2582
                return chanEdges, nil
4✔
2583
        }
2584

2585
        err := fetchChanInfos(tx)
19✔
2586
        if err != nil {
19✔
2587
                return nil, err
×
2588
        }
×
2589

2590
        return chanEdges, nil
19✔
2591
}
2592

2593
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2594
        edge1, edge2 *models.ChannelEdgePolicy) error {
144✔
2595

144✔
2596
        // First, we'll fetch the edge update index bucket which currently
144✔
2597
        // stores an entry for the channel we're about to delete.
144✔
2598
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
144✔
2599
        if updateIndex == nil {
144✔
2600
                // No edges in bucket, return early.
×
2601
                return nil
×
2602
        }
×
2603

2604
        // Now that we have the bucket, we'll attempt to construct a template
2605
        // for the index key: updateTime || chanid.
2606
        var indexKey [8 + 8]byte
144✔
2607
        byteOrder.PutUint64(indexKey[8:], chanID)
144✔
2608

144✔
2609
        // With the template constructed, we'll attempt to delete an entry that
144✔
2610
        // would have been created by both edges: we'll alternate the update
144✔
2611
        // times, as one may had overridden the other.
144✔
2612
        if edge1 != nil {
154✔
2613
                byteOrder.PutUint64(
10✔
2614
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
10✔
2615
                )
10✔
2616
                if err := updateIndex.Delete(indexKey[:]); err != nil {
10✔
2617
                        return err
×
2618
                }
×
2619
        }
2620

2621
        // We'll also attempt to delete the entry that may have been created by
2622
        // the second edge.
2623
        if edge2 != nil {
156✔
2624
                byteOrder.PutUint64(
12✔
2625
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
12✔
2626
                )
12✔
2627
                if err := updateIndex.Delete(indexKey[:]); err != nil {
12✔
2628
                        return err
×
2629
                }
×
2630
        }
2631

2632
        return nil
144✔
2633
}
2634

2635
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2636
// cache. It then goes on to delete any policy info and edge info for this
2637
// channel from the DB and finally, if isZombie is true, it will add an entry
2638
// for this channel in the zombie index.
2639
//
2640
// NOTE: this method MUST only be called if the cacheMu has already been
2641
// acquired.
2642
func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2643
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2644
        strictZombie bool) error {
204✔
2645

204✔
2646
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
204✔
2647
        if err != nil {
264✔
2648
                return err
60✔
2649
        }
60✔
2650

2651
        if c.graphCache != nil {
288✔
2652
                c.graphCache.RemoveChannel(
144✔
2653
                        edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
144✔
2654
                        edgeInfo.ChannelID,
144✔
2655
                )
144✔
2656
        }
144✔
2657

2658
        // We'll also remove the entry in the edge update index bucket before
2659
        // we delete the edges themselves so we can access their last update
2660
        // times.
2661
        cid := byteOrder.Uint64(chanID)
144✔
2662
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
144✔
2663
        if err != nil {
144✔
2664
                return err
×
2665
        }
×
2666
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
144✔
2667
        if err != nil {
144✔
2668
                return err
×
2669
        }
×
2670

2671
        // The edge key is of the format pubKey || chanID. First we construct
2672
        // the latter half, populating the channel ID.
2673
        var edgeKey [33 + 8]byte
144✔
2674
        copy(edgeKey[33:], chanID)
144✔
2675

144✔
2676
        // With the latter half constructed, copy over the first public key to
144✔
2677
        // delete the edge in this direction, then the second to delete the
144✔
2678
        // edge in the opposite direction.
144✔
2679
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
144✔
2680
        if edges.Get(edgeKey[:]) != nil {
288✔
2681
                if err := edges.Delete(edgeKey[:]); err != nil {
144✔
2682
                        return err
×
2683
                }
×
2684
        }
2685
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
144✔
2686
        if edges.Get(edgeKey[:]) != nil {
288✔
2687
                if err := edges.Delete(edgeKey[:]); err != nil {
144✔
2688
                        return err
×
2689
                }
×
2690
        }
2691

2692
        // As part of deleting the edge we also remove all disabled entries
2693
        // from the edgePolicyDisabledIndex bucket. We do that for both
2694
        // directions.
2695
        err = updateEdgePolicyDisabledIndex(edges, cid, false, false)
144✔
2696
        if err != nil {
144✔
2697
                return err
×
2698
        }
×
2699
        err = updateEdgePolicyDisabledIndex(edges, cid, true, false)
144✔
2700
        if err != nil {
144✔
2701
                return err
×
2702
        }
×
2703

2704
        // With the edge data deleted, we can purge the information from the two
2705
        // edge indexes.
2706
        if err := edgeIndex.Delete(chanID); err != nil {
144✔
2707
                return err
×
2708
        }
×
2709
        var b bytes.Buffer
144✔
2710
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
144✔
2711
                return err
×
2712
        }
×
2713
        if err := chanIndex.Delete(b.Bytes()); err != nil {
144✔
2714
                return err
×
2715
        }
×
2716

2717
        // Finally, we'll mark the edge as a zombie within our index if it's
2718
        // being removed due to the channel becoming a zombie. We do this to
2719
        // ensure we don't store unnecessary data for spent channels.
2720
        if !isZombie {
265✔
2721
                return nil
121✔
2722
        }
121✔
2723

2724
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
23✔
2725
        if strictZombie {
26✔
2726
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
3✔
2727
        }
3✔
2728

2729
        return markEdgeZombie(
23✔
2730
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
23✔
2731
        )
23✔
2732
}
2733

2734
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2735
// particular pair of channel policies. The return values are one of:
2736
//  1. (pubkey1, pubkey2)
2737
//  2. (pubkey1, blank)
2738
//  3. (blank, pubkey2)
2739
//
2740
// A blank pubkey means that corresponding node will be unable to resurrect a
2741
// channel on its own. For example, node1 may continue to publish recent
2742
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2743
// we don't want another fresh update from node1 to resurrect, as the edge can
2744
// only become live once node2 finally sends something recent.
2745
//
2746
// In the case where we have neither update, we allow either party to resurrect
2747
// the channel. If the channel were to be marked zombie again, it would be
2748
// marked with the correct lagging channel since we received an update from only
2749
// one side.
2750
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
2751
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
3✔
2752

3✔
2753
        switch {
3✔
2754
        // If we don't have either edge policy, we'll return both pubkeys so
2755
        // that the channel can be resurrected by either party.
2756
        case e1 == nil && e2 == nil:
×
2757
                return info.NodeKey1Bytes, info.NodeKey2Bytes
×
2758

2759
        // If we're missing edge1, or if both edges are present but edge1 is
2760
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2761
        // means that only an update from edge1 will be able to resurrect the
2762
        // channel.
2763
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
1✔
2764
                return info.NodeKey1Bytes, [33]byte{}
1✔
2765

2766
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2767
        // return a blank pubkey for edge1. In this case, only an update from
2768
        // edge2 can resurect the channel.
2769
        default:
2✔
2770
                return [33]byte{}, info.NodeKey2Bytes
2✔
2771
        }
2772
}
2773

2774
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2775
// within the database for the referenced channel. The `flags` attribute within
2776
// the ChannelEdgePolicy determines which of the directed edges are being
2777
// updated. If the flag is 1, then the first node's information is being
2778
// updated, otherwise it's the second node's information. The node ordering is
2779
// determined by the lexicographical ordering of the identity public keys of the
2780
// nodes on either side of the channel.
2781
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2782
        op ...batch.SchedulerOption) error {
2,663✔
2783

2,663✔
2784
        var (
2,663✔
2785
                isUpdate1    bool
2,663✔
2786
                edgeNotFound bool
2,663✔
2787
        )
2,663✔
2788

2,663✔
2789
        r := &batch.Request{
2,663✔
2790
                Reset: func() {
5,326✔
2791
                        isUpdate1 = false
2,663✔
2792
                        edgeNotFound = false
2,663✔
2793
                },
2,663✔
2794
                Update: func(tx kvdb.RwTx) error {
2,663✔
2795
                        var err error
2,663✔
2796
                        isUpdate1, err = updateEdgePolicy(
2,663✔
2797
                                tx, edge, c.graphCache,
2,663✔
2798
                        )
2,663✔
2799

2,663✔
2800
                        // Silence ErrEdgeNotFound so that the batch can
2,663✔
2801
                        // succeed, but propagate the error via local state.
2,663✔
2802
                        if errors.Is(err, ErrEdgeNotFound) {
2,666✔
2803
                                edgeNotFound = true
3✔
2804
                                return nil
3✔
2805
                        }
3✔
2806

2807
                        return err
2,660✔
2808
                },
2809
                OnCommit: func(err error) error {
2,663✔
2810
                        switch {
2,663✔
2811
                        case err != nil:
×
2812
                                return err
×
2813
                        case edgeNotFound:
3✔
2814
                                return ErrEdgeNotFound
3✔
2815
                        default:
2,660✔
2816
                                c.updateEdgeCache(edge, isUpdate1)
2,660✔
2817
                                return nil
2,660✔
2818
                        }
2819
                },
2820
        }
2821

2822
        for _, f := range op {
2,663✔
2823
                f(r)
×
2824
        }
×
2825

2826
        return c.chanScheduler.Execute(r)
2,663✔
2827
}
2828

2829
func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy,
2830
        isUpdate1 bool) {
2,660✔
2831

2,660✔
2832
        // If an entry for this channel is found in reject cache, we'll modify
2,660✔
2833
        // the entry with the updated timestamp for the direction that was just
2,660✔
2834
        // written. If the edge doesn't exist, we'll load the cache entry lazily
2,660✔
2835
        // during the next query for this edge.
2,660✔
2836
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2,665✔
2837
                if isUpdate1 {
8✔
2838
                        entry.upd1Time = e.LastUpdate.Unix()
3✔
2839
                } else {
5✔
2840
                        entry.upd2Time = e.LastUpdate.Unix()
2✔
2841
                }
2✔
2842
                c.rejectCache.insert(e.ChannelID, entry)
5✔
2843
        }
2844

2845
        // If an entry for this channel is found in channel cache, we'll modify
2846
        // the entry with the updated policy for the direction that was just
2847
        // written. If the edge doesn't exist, we'll defer loading the info and
2848
        // policies and lazily read from disk during the next query.
2849
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
2,660✔
2850
                if isUpdate1 {
×
2851
                        channel.Policy1 = e
×
2852
                } else {
×
2853
                        channel.Policy2 = e
×
2854
                }
×
2855
                c.chanCache.insert(e.ChannelID, channel)
×
2856
        }
2857
}
2858

2859
// updateEdgePolicy attempts to update an edge's policy within the relevant
2860
// buckets using an existing database transaction. The returned boolean will be
2861
// true if the updated policy belongs to node1, and false if the policy belonged
2862
// to node2.
2863
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2864
        graphCache *GraphCache) (bool, error) {
2,663✔
2865

2,663✔
2866
        edges := tx.ReadWriteBucket(edgeBucket)
2,663✔
2867
        if edges == nil {
2,663✔
2868
                return false, ErrEdgeNotFound
×
2869
        }
×
2870
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
2,663✔
2871
        if edgeIndex == nil {
2,663✔
2872
                return false, ErrEdgeNotFound
×
2873
        }
×
2874

2875
        // Create the channelID key be converting the channel ID
2876
        // integer into a byte slice.
2877
        var chanID [8]byte
2,663✔
2878
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
2,663✔
2879

2,663✔
2880
        // With the channel ID, we then fetch the value storing the two
2,663✔
2881
        // nodes which connect this channel edge.
2,663✔
2882
        nodeInfo := edgeIndex.Get(chanID[:])
2,663✔
2883
        if nodeInfo == nil {
2,666✔
2884
                return false, ErrEdgeNotFound
3✔
2885
        }
3✔
2886

2887
        // Depending on the flags value passed above, either the first
2888
        // or second edge policy is being updated.
2889
        var fromNode, toNode []byte
2,660✔
2890
        var isUpdate1 bool
2,660✔
2891
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,997✔
2892
                fromNode = nodeInfo[:33]
1,337✔
2893
                toNode = nodeInfo[33:66]
1,337✔
2894
                isUpdate1 = true
1,337✔
2895
        } else {
2,660✔
2896
                fromNode = nodeInfo[33:66]
1,323✔
2897
                toNode = nodeInfo[:33]
1,323✔
2898
                isUpdate1 = false
1,323✔
2899
        }
1,323✔
2900

2901
        // Finally, with the direction of the edge being updated
2902
        // identified, we update the on-disk edge representation.
2903
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
2,660✔
2904
        if err != nil {
2,660✔
2905
                return false, err
×
2906
        }
×
2907

2908
        var (
2,660✔
2909
                fromNodePubKey route.Vertex
2,660✔
2910
                toNodePubKey   route.Vertex
2,660✔
2911
        )
2,660✔
2912
        copy(fromNodePubKey[:], fromNode)
2,660✔
2913
        copy(toNodePubKey[:], toNode)
2,660✔
2914

2,660✔
2915
        if graphCache != nil {
4,934✔
2916
                graphCache.UpdatePolicy(
2,274✔
2917
                        edge, fromNodePubKey, toNodePubKey, isUpdate1,
2,274✔
2918
                )
2,274✔
2919
        }
2,274✔
2920

2921
        return isUpdate1, nil
2,660✔
2922
}
2923

2924
// isPublic determines whether the node is seen as public within the graph from
2925
// the source node's point of view. An existing database transaction can also be
2926
// specified.
2927
func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2928
        sourcePubKey []byte) (bool, error) {
13✔
2929

13✔
2930
        // In order to determine whether this node is publicly advertised within
13✔
2931
        // the graph, we'll need to look at all of its edges and check whether
13✔
2932
        // they extend to any other node than the source node. errDone will be
13✔
2933
        // used to terminate the check early.
13✔
2934
        nodeIsPublic := false
13✔
2935
        errDone := errors.New("done")
13✔
2936
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
13✔
2937
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
13✔
2938
                _ *models.ChannelEdgePolicy) error {
23✔
2939

10✔
2940
                // If this edge doesn't extend to the source node, we'll
10✔
2941
                // terminate our search as we can now conclude that the node is
10✔
2942
                // publicly advertised within the graph due to the local node
10✔
2943
                // knowing of the current edge.
10✔
2944
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
10✔
2945
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
13✔
2946

3✔
2947
                        nodeIsPublic = true
3✔
2948
                        return errDone
3✔
2949
                }
3✔
2950

2951
                // Since the edge _does_ extend to the source node, we'll also
2952
                // need to ensure that this is a public edge.
2953
                if info.AuthProof != nil {
13✔
2954
                        nodeIsPublic = true
6✔
2955
                        return errDone
6✔
2956
                }
6✔
2957

2958
                // Otherwise, we'll continue our search.
2959
                return nil
1✔
2960
        })
2961
        if err != nil && err != errDone {
13✔
2962
                return false, err
×
2963
        }
×
2964

2965
        return nodeIsPublic, nil
13✔
2966
}
2967

2968
// FetchLightningNodeTx attempts to look up a target node by its identity
2969
// public key. If the node isn't found in the database, then
2970
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
2971
// If none is provided, then a new one will be created.
2972
func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
2973
        *models.LightningNode, error) {
2,944✔
2974

2,944✔
2975
        return c.fetchLightningNode(tx, nodePub)
2,944✔
2976
}
2,944✔
2977

2978
// FetchLightningNode attempts to look up a target node by its identity public
2979
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2980
// returned.
2981
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
2982
        *models.LightningNode, error) {
837✔
2983

837✔
2984
        return c.fetchLightningNode(nil, nodePub)
837✔
2985
}
837✔
2986

2987
// fetchLightningNode attempts to look up a target node by its identity public
2988
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
2989
// returned. An optional transaction may be provided. If none is provided, then
2990
// a new one will be created.
2991
func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
2992
        nodePub route.Vertex) (*models.LightningNode, error) {
3,781✔
2993

3,781✔
2994
        var node *models.LightningNode
3,781✔
2995
        fetch := func(tx kvdb.RTx) error {
7,562✔
2996
                // First grab the nodes bucket which stores the mapping from
3,781✔
2997
                // pubKey to node information.
3,781✔
2998
                nodes := tx.ReadBucket(nodeBucket)
3,781✔
2999
                if nodes == nil {
3,781✔
3000
                        return ErrGraphNotFound
×
3001
                }
×
3002

3003
                // If a key for this serialized public key isn't found, then
3004
                // the target node doesn't exist within the database.
3005
                nodeBytes := nodes.Get(nodePub[:])
3,781✔
3006
                if nodeBytes == nil {
3,795✔
3007
                        return ErrGraphNodeNotFound
14✔
3008
                }
14✔
3009

3010
                // If the node is found, then we can de deserialize the node
3011
                // information to return to the user.
3012
                nodeReader := bytes.NewReader(nodeBytes)
3,767✔
3013
                n, err := deserializeLightningNode(nodeReader)
3,767✔
3014
                if err != nil {
3,767✔
3015
                        return err
×
3016
                }
×
3017

3018
                node = &n
3,767✔
3019

3,767✔
3020
                return nil
3,767✔
3021
        }
3022

3023
        if tx == nil {
4,618✔
3024
                err := kvdb.View(
837✔
3025
                        c.db, fetch, func() {
1,674✔
3026
                                node = nil
837✔
3027
                        },
837✔
3028
                )
3029
                if err != nil {
851✔
3030
                        return nil, err
14✔
3031
                }
14✔
3032

3033
                return node, nil
823✔
3034
        }
3035

3036
        err := fetch(tx)
2,944✔
3037
        if err != nil {
2,944✔
3038
                return nil, err
×
3039
        }
×
3040

3041
        return node, nil
2,944✔
3042
}
3043

3044
// graphCacheNode is a struct that wraps a LightningNode in a way that it can be
3045
// cached in the graph cache.
3046
type graphCacheNode struct {
3047
        pubKeyBytes route.Vertex
3048
        features    *lnwire.FeatureVector
3049
}
3050

3051
// newGraphCacheNode returns a new cache optimized node.
3052
func newGraphCacheNode(pubKey route.Vertex,
3053
        features *lnwire.FeatureVector) *graphCacheNode {
732✔
3054

732✔
3055
        return &graphCacheNode{
732✔
3056
                pubKeyBytes: pubKey,
732✔
3057
                features:    features,
732✔
3058
        }
732✔
3059
}
732✔
3060

3061
// PubKey returns the node's public identity key.
3062
func (n *graphCacheNode) PubKey() route.Vertex {
732✔
3063
        return n.pubKeyBytes
732✔
3064
}
732✔
3065

3066
// Features returns the node's features.
3067
func (n *graphCacheNode) Features() *lnwire.FeatureVector {
712✔
3068
        return n.features
712✔
3069
}
712✔
3070

3071
// ForEachChannel iterates through all channels of this node, executing the
3072
// passed callback with an edge info structure and the policies of each end
3073
// of the channel. The first edge policy is the outgoing edge *to* the
3074
// connecting node, while the second is the incoming edge *from* the
3075
// connecting node. If the callback returns an error, then the iteration is
3076
// halted with the error propagated back up to the caller.
3077
//
3078
// Unknown policies are passed into the callback as nil values.
3079
func (n *graphCacheNode) ForEachChannel(tx kvdb.RTx,
3080
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3081
                *models.ChannelEdgePolicy) error) error {
632✔
3082

632✔
3083
        return nodeTraversal(tx, n.pubKeyBytes[:], nil, cb)
632✔
3084
}
632✔
3085

3086
var _ GraphCacheNode = (*graphCacheNode)(nil)
3087

3088
// HasLightningNode determines if the graph has a vertex identified by the
3089
// target node identity public key. If the node exists in the database, a
3090
// timestamp of when the data for the node was lasted updated is returned along
3091
// with a true boolean. Otherwise, an empty time.Time is returned with a false
3092
// boolean.
3093
func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool,
3094
        error) {
16✔
3095

16✔
3096
        var (
16✔
3097
                updateTime time.Time
16✔
3098
                exists     bool
16✔
3099
        )
16✔
3100

16✔
3101
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
32✔
3102
                // First grab the nodes bucket which stores the mapping from
16✔
3103
                // pubKey to node information.
16✔
3104
                nodes := tx.ReadBucket(nodeBucket)
16✔
3105
                if nodes == nil {
16✔
3106
                        return ErrGraphNotFound
×
3107
                }
×
3108

3109
                // If a key for this serialized public key isn't found, we can
3110
                // exit early.
3111
                nodeBytes := nodes.Get(nodePub[:])
16✔
3112
                if nodeBytes == nil {
19✔
3113
                        exists = false
3✔
3114
                        return nil
3✔
3115
                }
3✔
3116

3117
                // Otherwise we continue on to obtain the time stamp
3118
                // representing the last time the data for this node was
3119
                // updated.
3120
                nodeReader := bytes.NewReader(nodeBytes)
13✔
3121
                node, err := deserializeLightningNode(nodeReader)
13✔
3122
                if err != nil {
13✔
3123
                        return err
×
3124
                }
×
3125

3126
                exists = true
13✔
3127
                updateTime = node.LastUpdate
13✔
3128
                return nil
13✔
3129
        }, func() {
16✔
3130
                updateTime = time.Time{}
16✔
3131
                exists = false
16✔
3132
        })
16✔
3133
        if err != nil {
16✔
3134
                return time.Time{}, exists, err
×
3135
        }
×
3136

3137
        return updateTime, exists, nil
16✔
3138
}
3139

3140
// nodeTraversal is used to traverse all channels of a node given by its
3141
// public key and passes channel information into the specified callback.
3142
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3143
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3144
                *models.ChannelEdgePolicy) error) error {
1,878✔
3145

1,878✔
3146
        traversal := func(tx kvdb.RTx) error {
3,756✔
3147
                edges := tx.ReadBucket(edgeBucket)
1,878✔
3148
                if edges == nil {
1,878✔
3149
                        return ErrGraphNotFound
×
3150
                }
×
3151
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1,878✔
3152
                if edgeIndex == nil {
1,878✔
3153
                        return ErrGraphNoEdgesFound
×
3154
                }
×
3155

3156
                // In order to reach all the edges for this node, we take
3157
                // advantage of the construction of the key-space within the
3158
                // edge bucket. The keys are stored in the form: pubKey ||
3159
                // chanID. Therefore, starting from a chanID of zero, we can
3160
                // scan forward in the bucket, grabbing all the edges for the
3161
                // node. Once the prefix no longer matches, then we know we're
3162
                // done.
3163
                var nodeStart [33 + 8]byte
1,878✔
3164
                copy(nodeStart[:], nodePub)
1,878✔
3165
                copy(nodeStart[33:], chanStart[:])
1,878✔
3166

1,878✔
3167
                // Starting from the key pubKey || 0, we seek forward in the
1,878✔
3168
                // bucket until the retrieved key no longer has the public key
1,878✔
3169
                // as its prefix. This indicates that we've stepped over into
1,878✔
3170
                // another node's edges, so we can terminate our scan.
1,878✔
3171
                edgeCursor := edges.ReadCursor()
1,878✔
3172
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
5,727✔
3173
                        // If the prefix still matches, the channel id is
3,849✔
3174
                        // returned in nodeEdge. Channel id is used to lookup
3,849✔
3175
                        // the node at the other end of the channel and both
3,849✔
3176
                        // edge policies.
3,849✔
3177
                        chanID := nodeEdge[33:]
3,849✔
3178
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3,849✔
3179
                        if err != nil {
3,849✔
3180
                                return err
×
3181
                        }
×
3182

3183
                        outgoingPolicy, err := fetchChanEdgePolicy(
3,849✔
3184
                                edges, chanID, nodePub,
3,849✔
3185
                        )
3,849✔
3186
                        if err != nil {
3,849✔
3187
                                return err
×
3188
                        }
×
3189

3190
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3,849✔
3191
                        if err != nil {
3,849✔
3192
                                return err
×
3193
                        }
×
3194

3195
                        incomingPolicy, err := fetchChanEdgePolicy(
3,849✔
3196
                                edges, chanID, otherNode[:],
3,849✔
3197
                        )
3,849✔
3198
                        if err != nil {
3,849✔
3199
                                return err
×
3200
                        }
×
3201

3202
                        // Finally, we execute the callback.
3203
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3,849✔
3204
                        if err != nil {
3,858✔
3205
                                return err
9✔
3206
                        }
9✔
3207
                }
3208

3209
                return nil
1,869✔
3210
        }
3211

3212
        // If no transaction was provided, then we'll create a new transaction
3213
        // to execute the transaction within.
3214
        if tx == nil {
1,887✔
3215
                return kvdb.View(db, traversal, func() {})
18✔
3216
        }
3217

3218
        // Otherwise, we re-use the existing transaction to execute the graph
3219
        // traversal.
3220
        return traversal(tx)
1,869✔
3221
}
3222

3223
// ForEachNodeChannel iterates through all channels of the given node,
3224
// executing the passed callback with an edge info structure and the policies
3225
// of each end of the channel. The first edge policy is the outgoing edge *to*
3226
// the connecting node, while the second is the incoming edge *from* the
3227
// connecting node. If the callback returns an error, then the iteration is
3228
// halted with the error propagated back up to the caller.
3229
//
3230
// Unknown policies are passed into the callback as nil values.
3231
func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex,
3232
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3233
                *models.ChannelEdgePolicy) error) error {
6✔
3234

6✔
3235
        return nodeTraversal(nil, nodePub[:], c.db, cb)
6✔
3236
}
6✔
3237

3238
// ForEachNodeChannelTx iterates through all channels of the given node,
3239
// executing the passed callback with an edge info structure and the policies
3240
// of each end of the channel. The first edge policy is the outgoing edge *to*
3241
// the connecting node, while the second is the incoming edge *from* the
3242
// connecting node. If the callback returns an error, then the iteration is
3243
// halted with the error propagated back up to the caller.
3244
//
3245
// Unknown policies are passed into the callback as nil values.
3246
//
3247
// If the caller wishes to re-use an existing boltdb transaction, then it
3248
// should be passed as the first argument.  Otherwise, the first argument should
3249
// be nil and a fresh transaction will be created to execute the graph
3250
// traversal.
3251
func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx,
3252
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3253
                *models.ChannelEdgePolicy,
3254
                *models.ChannelEdgePolicy) error) error {
998✔
3255

998✔
3256
        return nodeTraversal(tx, nodePub[:], c.db, cb)
998✔
3257
}
998✔
3258

3259
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3260
// the target node in the channel. This is useful when one knows the pubkey of
3261
// one of the nodes, and wishes to obtain the full LightningNode for the other
3262
// end of the channel.
3263
func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
3264
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
3265
        *models.LightningNode, error) {
×
3266

×
3267
        // Ensure that the node passed in is actually a member of the channel.
×
3268
        var targetNodeBytes [33]byte
×
3269
        switch {
×
3270
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
×
3271
                targetNodeBytes = channel.NodeKey2Bytes
×
3272
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
×
3273
                targetNodeBytes = channel.NodeKey1Bytes
×
3274
        default:
×
3275
                return nil, fmt.Errorf("node not participating in this channel")
×
3276
        }
3277

3278
        var targetNode *models.LightningNode
×
3279
        fetchNodeFunc := func(tx kvdb.RTx) error {
×
3280
                // First grab the nodes bucket which stores the mapping from
×
3281
                // pubKey to node information.
×
3282
                nodes := tx.ReadBucket(nodeBucket)
×
3283
                if nodes == nil {
×
3284
                        return ErrGraphNotFound
×
3285
                }
×
3286

3287
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
×
3288
                if err != nil {
×
3289
                        return err
×
3290
                }
×
3291

3292
                targetNode = &node
×
3293

×
3294
                return nil
×
3295
        }
3296

3297
        // If the transaction is nil, then we'll need to create a new one,
3298
        // otherwise we can use the existing db transaction.
3299
        var err error
×
3300
        if tx == nil {
×
3301
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3302
                        targetNode = nil
×
3303
                })
×
3304
        } else {
×
3305
                err = fetchNodeFunc(tx)
×
3306
        }
×
3307

3308
        return targetNode, err
×
3309
}
3310

3311
// computeEdgePolicyKeys is a helper function that can be used to compute the
3312
// keys used to index the channel edge policy info for the two nodes of the
3313
// edge. The keys for node 1 and node 2 are returned respectively.
3314
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
22✔
3315
        var (
22✔
3316
                node1Key [33 + 8]byte
22✔
3317
                node2Key [33 + 8]byte
22✔
3318
        )
22✔
3319

22✔
3320
        copy(node1Key[:], info.NodeKey1Bytes[:])
22✔
3321
        copy(node2Key[:], info.NodeKey2Bytes[:])
22✔
3322

22✔
3323
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
22✔
3324
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
22✔
3325

22✔
3326
        return node1Key[:], node2Key[:]
22✔
3327
}
22✔
3328

3329
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3330
// the channel identified by the funding outpoint. If the channel can't be
3331
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3332
// information for the channel itself is returned as well as two structs that
3333
// contain the routing policies for the channel in either direction.
3334
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3335
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3336
        *models.ChannelEdgePolicy, error) {
11✔
3337

11✔
3338
        var (
11✔
3339
                edgeInfo *models.ChannelEdgeInfo
11✔
3340
                policy1  *models.ChannelEdgePolicy
11✔
3341
                policy2  *models.ChannelEdgePolicy
11✔
3342
        )
11✔
3343

11✔
3344
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
3345
                // First, grab the node bucket. This will be used to populate
11✔
3346
                // the Node pointers in each edge read from disk.
11✔
3347
                nodes := tx.ReadBucket(nodeBucket)
11✔
3348
                if nodes == nil {
11✔
3349
                        return ErrGraphNotFound
×
3350
                }
×
3351

3352
                // Next, grab the edge bucket which stores the edges, and also
3353
                // the index itself so we can group the directed edges together
3354
                // logically.
3355
                edges := tx.ReadBucket(edgeBucket)
11✔
3356
                if edges == nil {
11✔
3357
                        return ErrGraphNoEdgesFound
×
3358
                }
×
3359
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
3360
                if edgeIndex == nil {
11✔
3361
                        return ErrGraphNoEdgesFound
×
3362
                }
×
3363

3364
                // If the channel's outpoint doesn't exist within the outpoint
3365
                // index, then the edge does not exist.
3366
                chanIndex := edges.NestedReadBucket(channelPointBucket)
11✔
3367
                if chanIndex == nil {
11✔
3368
                        return ErrGraphNoEdgesFound
×
3369
                }
×
3370
                var b bytes.Buffer
11✔
3371
                if err := WriteOutpoint(&b, op); err != nil {
11✔
3372
                        return err
×
3373
                }
×
3374
                chanID := chanIndex.Get(b.Bytes())
11✔
3375
                if chanID == nil {
21✔
3376
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
10✔
3377
                }
10✔
3378

3379
                // If the channel is found to exists, then we'll first retrieve
3380
                // the general information for the channel.
3381
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
1✔
3382
                if err != nil {
1✔
3383
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3384
                }
×
3385
                edgeInfo = &edge
1✔
3386

1✔
3387
                // Once we have the information about the channels' parameters,
1✔
3388
                // we'll fetch the routing policies for each for the directed
1✔
3389
                // edges.
1✔
3390
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
1✔
3391
                if err != nil {
1✔
3392
                        return fmt.Errorf("failed to find policy: %w", err)
×
3393
                }
×
3394

3395
                policy1 = e1
1✔
3396
                policy2 = e2
1✔
3397
                return nil
1✔
3398
        }, func() {
11✔
3399
                edgeInfo = nil
11✔
3400
                policy1 = nil
11✔
3401
                policy2 = nil
11✔
3402
        })
11✔
3403
        if err != nil {
21✔
3404
                return nil, nil, nil, err
10✔
3405
        }
10✔
3406

3407
        return edgeInfo, policy1, policy2, nil
1✔
3408
}
3409

3410
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3411
// channel identified by the channel ID. If the channel can't be found, then
3412
// ErrEdgeNotFound is returned. A struct which houses the general information
3413
// for the channel itself is returned as well as two structs that contain the
3414
// routing policies for the channel in either direction.
3415
//
3416
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3417
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3418
// the ChannelEdgeInfo will only include the public keys of each node.
3419
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (
3420
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3421
        *models.ChannelEdgePolicy, error) {
25✔
3422

25✔
3423
        var (
25✔
3424
                edgeInfo  *models.ChannelEdgeInfo
25✔
3425
                policy1   *models.ChannelEdgePolicy
25✔
3426
                policy2   *models.ChannelEdgePolicy
25✔
3427
                channelID [8]byte
25✔
3428
        )
25✔
3429

25✔
3430
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
50✔
3431
                // First, grab the node bucket. This will be used to populate
25✔
3432
                // the Node pointers in each edge read from disk.
25✔
3433
                nodes := tx.ReadBucket(nodeBucket)
25✔
3434
                if nodes == nil {
25✔
3435
                        return ErrGraphNotFound
×
3436
                }
×
3437

3438
                // Next, grab the edge bucket which stores the edges, and also
3439
                // the index itself so we can group the directed edges together
3440
                // logically.
3441
                edges := tx.ReadBucket(edgeBucket)
25✔
3442
                if edges == nil {
25✔
3443
                        return ErrGraphNoEdgesFound
×
3444
                }
×
3445
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
25✔
3446
                if edgeIndex == nil {
25✔
3447
                        return ErrGraphNoEdgesFound
×
3448
                }
×
3449

3450
                byteOrder.PutUint64(channelID[:], chanID)
25✔
3451

25✔
3452
                // Now, attempt to fetch edge.
25✔
3453
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
25✔
3454

25✔
3455
                // If it doesn't exist, we'll quickly check our zombie index to
25✔
3456
                // see if we've previously marked it as so.
25✔
3457
                if errors.Is(err, ErrEdgeNotFound) {
26✔
3458
                        // If the zombie index doesn't exist, or the edge is not
1✔
3459
                        // marked as a zombie within it, then we'll return the
1✔
3460
                        // original ErrEdgeNotFound error.
1✔
3461
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
1✔
3462
                        if zombieIndex == nil {
1✔
3463
                                return ErrEdgeNotFound
×
3464
                        }
×
3465

3466
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
1✔
3467
                                zombieIndex, chanID,
1✔
3468
                        )
1✔
3469
                        if !isZombie {
1✔
3470
                                return ErrEdgeNotFound
×
3471
                        }
×
3472

3473
                        // Otherwise, the edge is marked as a zombie, so we'll
3474
                        // populate the edge info with the public keys of each
3475
                        // party as this is the only information we have about
3476
                        // it and return an error signaling so.
3477
                        edgeInfo = &models.ChannelEdgeInfo{
1✔
3478
                                NodeKey1Bytes: pubKey1,
1✔
3479
                                NodeKey2Bytes: pubKey2,
1✔
3480
                        }
1✔
3481
                        return ErrZombieEdge
1✔
3482
                }
3483

3484
                // Otherwise, we'll just return the error if any.
3485
                if err != nil {
24✔
3486
                        return err
×
3487
                }
×
3488

3489
                edgeInfo = &edge
24✔
3490

24✔
3491
                // Then we'll attempt to fetch the accompanying policies of this
24✔
3492
                // edge.
24✔
3493
                e1, e2, err := fetchChanEdgePolicies(
24✔
3494
                        edgeIndex, edges, channelID[:],
24✔
3495
                )
24✔
3496
                if err != nil {
24✔
3497
                        return err
×
3498
                }
×
3499

3500
                policy1 = e1
24✔
3501
                policy2 = e2
24✔
3502
                return nil
24✔
3503
        }, func() {
25✔
3504
                edgeInfo = nil
25✔
3505
                policy1 = nil
25✔
3506
                policy2 = nil
25✔
3507
        })
25✔
3508
        if err == ErrZombieEdge {
26✔
3509
                return edgeInfo, nil, nil, err
1✔
3510
        }
1✔
3511
        if err != nil {
24✔
3512
                return nil, nil, nil, err
×
3513
        }
×
3514

3515
        return edgeInfo, policy1, policy2, nil
24✔
3516
}
3517

3518
// IsPublicNode is a helper method that determines whether the node with the
3519
// given public key is seen as a public node in the graph from the graph's
3520
// source node's point of view.
3521
func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
13✔
3522
        var nodeIsPublic bool
13✔
3523
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
26✔
3524
                nodes := tx.ReadBucket(nodeBucket)
13✔
3525
                if nodes == nil {
13✔
3526
                        return ErrGraphNodesNotFound
×
3527
                }
×
3528
                ourPubKey := nodes.Get(sourceKey)
13✔
3529
                if ourPubKey == nil {
13✔
3530
                        return ErrSourceNodeNotSet
×
3531
                }
×
3532
                node, err := fetchLightningNode(nodes, pubKey[:])
13✔
3533
                if err != nil {
13✔
3534
                        return err
×
3535
                }
×
3536

3537
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
13✔
3538
                return err
13✔
3539
        }, func() {
13✔
3540
                nodeIsPublic = false
13✔
3541
        })
13✔
3542
        if err != nil {
13✔
3543
                return false, err
×
3544
        }
×
3545

3546
        return nodeIsPublic, nil
13✔
3547
}
3548

3549
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3550
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
46✔
3551
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
46✔
3552
        if err != nil {
46✔
3553
                return nil, err
×
3554
        }
×
3555

3556
        // With the witness script generated, we'll now turn it into a p2wsh
3557
        // script:
3558
        //  * OP_0 <sha256(script)>
3559
        bldr := txscript.NewScriptBuilder(
46✔
3560
                txscript.WithScriptAllocSize(input.P2WSHSize),
46✔
3561
        )
46✔
3562
        bldr.AddOp(txscript.OP_0)
46✔
3563
        scriptHash := sha256.Sum256(witnessScript)
46✔
3564
        bldr.AddData(scriptHash[:])
46✔
3565

46✔
3566
        return bldr.Script()
46✔
3567
}
3568

3569
// EdgePoint couples the outpoint of a channel with the funding script that it
3570
// creates. The FilteredChainView will use this to watch for spends of this
3571
// edge point on chain. We require both of these values as depending on the
3572
// concrete implementation, either the pkScript, or the out point will be used.
3573
type EdgePoint struct {
3574
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3575
        FundingPkScript []byte
3576

3577
        // OutPoint is the outpoint of the target channel.
3578
        OutPoint wire.OutPoint
3579
}
3580

3581
// String returns a human readable version of the target EdgePoint. We return
3582
// the outpoint directly as it is enough to uniquely identify the edge point.
3583
func (e *EdgePoint) String() string {
×
3584
        return e.OutPoint.String()
×
3585
}
×
3586

3587
// ChannelView returns the verifiable edge information for each active channel
3588
// within the known channel graph. The set of UTXO's (along with their scripts)
3589
// returned are the ones that need to be watched on chain to detect channel
3590
// closes on the resident blockchain.
3591
func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) {
23✔
3592
        var edgePoints []EdgePoint
23✔
3593
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
46✔
3594
                // We're going to iterate over the entire channel index, so
23✔
3595
                // we'll need to fetch the edgeBucket to get to the index as
23✔
3596
                // it's a sub-bucket.
23✔
3597
                edges := tx.ReadBucket(edgeBucket)
23✔
3598
                if edges == nil {
23✔
3599
                        return ErrGraphNoEdgesFound
×
3600
                }
×
3601
                chanIndex := edges.NestedReadBucket(channelPointBucket)
23✔
3602
                if chanIndex == nil {
23✔
3603
                        return ErrGraphNoEdgesFound
×
3604
                }
×
3605
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
23✔
3606
                if edgeIndex == nil {
23✔
3607
                        return ErrGraphNoEdgesFound
×
3608
                }
×
3609

3610
                // Once we have the proper bucket, we'll range over each key
3611
                // (which is the channel point for the channel) and decode it,
3612
                // accumulating each entry.
3613
                return chanIndex.ForEach(
23✔
3614
                        func(chanPointBytes, chanID []byte) error {
65✔
3615
                                chanPointReader := bytes.NewReader(
42✔
3616
                                        chanPointBytes,
42✔
3617
                                )
42✔
3618

42✔
3619
                                var chanPoint wire.OutPoint
42✔
3620
                                err := ReadOutpoint(chanPointReader, &chanPoint)
42✔
3621
                                if err != nil {
42✔
3622
                                        return err
×
3623
                                }
×
3624

3625
                                edgeInfo, err := fetchChanEdgeInfo(
42✔
3626
                                        edgeIndex, chanID,
42✔
3627
                                )
42✔
3628
                                if err != nil {
42✔
3629
                                        return err
×
3630
                                }
×
3631

3632
                                pkScript, err := genMultiSigP2WSH(
42✔
3633
                                        edgeInfo.BitcoinKey1Bytes[:],
42✔
3634
                                        edgeInfo.BitcoinKey2Bytes[:],
42✔
3635
                                )
42✔
3636
                                if err != nil {
42✔
3637
                                        return err
×
3638
                                }
×
3639

3640
                                edgePoints = append(edgePoints, EdgePoint{
42✔
3641
                                        FundingPkScript: pkScript,
42✔
3642
                                        OutPoint:        chanPoint,
42✔
3643
                                })
42✔
3644

42✔
3645
                                return nil
42✔
3646
                        },
3647
                )
3648
        }, func() {
23✔
3649
                edgePoints = nil
23✔
3650
        }); err != nil {
23✔
3651
                return nil, err
×
3652
        }
×
3653

3654
        return edgePoints, nil
23✔
3655
}
3656

3657
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3658
// zombie. This method is used on an ad-hoc basis, when channels need to be
3659
// marked as zombies outside the normal pruning cycle.
3660
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
3661
        pubKey1, pubKey2 [33]byte) error {
116✔
3662

116✔
3663
        c.cacheMu.Lock()
116✔
3664
        defer c.cacheMu.Unlock()
116✔
3665

116✔
3666
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
232✔
3667
                edges := tx.ReadWriteBucket(edgeBucket)
116✔
3668
                if edges == nil {
116✔
3669
                        return ErrGraphNoEdgesFound
×
3670
                }
×
3671
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
116✔
3672
                if err != nil {
116✔
3673
                        return fmt.Errorf("unable to create zombie "+
×
3674
                                "bucket: %w", err)
×
3675
                }
×
3676

3677
                if c.graphCache != nil {
232✔
3678
                        c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
116✔
3679
                }
116✔
3680

3681
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
116✔
3682
        })
3683
        if err != nil {
116✔
3684
                return err
×
3685
        }
×
3686

3687
        c.rejectCache.remove(chanID)
116✔
3688
        c.chanCache.remove(chanID)
116✔
3689

116✔
3690
        return nil
116✔
3691
}
3692

3693
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3694
// keys should represent the node public keys of the two parties involved in the
3695
// edge.
3696
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3697
        pubKey2 [33]byte) error {
139✔
3698

139✔
3699
        var k [8]byte
139✔
3700
        byteOrder.PutUint64(k[:], chanID)
139✔
3701

139✔
3702
        var v [66]byte
139✔
3703
        copy(v[:33], pubKey1[:])
139✔
3704
        copy(v[33:], pubKey2[:])
139✔
3705

139✔
3706
        return zombieIndex.Put(k[:], v[:])
139✔
3707
}
139✔
3708

3709
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3710
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
3711
        c.cacheMu.Lock()
2✔
3712
        defer c.cacheMu.Unlock()
2✔
3713

2✔
3714
        return c.markEdgeLiveUnsafe(nil, chanID)
2✔
3715
}
2✔
3716

3717
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3718
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3719
// case a new transaction will be created.
3720
//
3721
// NOTE: this method MUST only be called if the cacheMu has already been
3722
// acquired.
3723
func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
21✔
3724
        dbFn := func(tx kvdb.RwTx) error {
42✔
3725
                edges := tx.ReadWriteBucket(edgeBucket)
21✔
3726
                if edges == nil {
21✔
3727
                        return ErrGraphNoEdgesFound
×
3728
                }
×
3729
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
21✔
3730
                if zombieIndex == nil {
21✔
3731
                        return nil
×
3732
                }
×
3733

3734
                var k [8]byte
21✔
3735
                byteOrder.PutUint64(k[:], chanID)
21✔
3736

21✔
3737
                if len(zombieIndex.Get(k[:])) == 0 {
22✔
3738
                        return ErrZombieEdgeNotFound
1✔
3739
                }
1✔
3740

3741
                return zombieIndex.Delete(k[:])
20✔
3742
        }
3743

3744
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3745
        // the existing transaction
3746
        var err error
21✔
3747
        if tx == nil {
23✔
3748
                err = kvdb.Update(c.db, dbFn, func() {})
4✔
3749
        } else {
19✔
3750
                err = dbFn(tx)
19✔
3751
        }
19✔
3752
        if err != nil {
22✔
3753
                return err
1✔
3754
        }
1✔
3755

3756
        c.rejectCache.remove(chanID)
20✔
3757
        c.chanCache.remove(chanID)
20✔
3758

20✔
3759
        // We need to add the channel back into our graph cache, otherwise we
20✔
3760
        // won't use it for path finding.
20✔
3761
        if c.graphCache != nil {
40✔
3762
                edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
20✔
3763
                if err != nil {
20✔
3764
                        return err
×
3765
                }
×
3766

3767
                for _, edgeInfo := range edgeInfos {
20✔
3768
                        c.graphCache.AddChannel(
×
3769
                                edgeInfo.Info, edgeInfo.Policy1,
×
3770
                                edgeInfo.Policy2,
×
3771
                        )
×
3772
                }
×
3773
        }
3774

3775
        return nil
20✔
3776
}
3777

3778
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3779
// zombie, then the two node public keys corresponding to this edge are also
3780
// returned.
3781
func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
5✔
3782
        var (
5✔
3783
                isZombie         bool
5✔
3784
                pubKey1, pubKey2 [33]byte
5✔
3785
        )
5✔
3786

5✔
3787
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
3788
                edges := tx.ReadBucket(edgeBucket)
5✔
3789
                if edges == nil {
5✔
3790
                        return ErrGraphNoEdgesFound
×
3791
                }
×
3792
                zombieIndex := edges.NestedReadBucket(zombieBucket)
5✔
3793
                if zombieIndex == nil {
5✔
3794
                        return nil
×
3795
                }
×
3796

3797
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
5✔
3798
                return nil
5✔
3799
        }, func() {
5✔
3800
                isZombie = false
5✔
3801
                pubKey1 = [33]byte{}
5✔
3802
                pubKey2 = [33]byte{}
5✔
3803
        })
5✔
3804
        if err != nil {
5✔
3805
                return false, [33]byte{}, [33]byte{}
×
3806
        }
×
3807

3808
        return isZombie, pubKey1, pubKey2
5✔
3809
}
3810

3811
// isZombieEdge returns whether an entry exists for the given channel in the
3812
// zombie index. If an entry exists, then the two node public keys corresponding
3813
// to this edge are also returned.
3814
func isZombieEdge(zombieIndex kvdb.RBucket,
3815
        chanID uint64) (bool, [33]byte, [33]byte) {
208✔
3816

208✔
3817
        var k [8]byte
208✔
3818
        byteOrder.PutUint64(k[:], chanID)
208✔
3819

208✔
3820
        v := zombieIndex.Get(k[:])
208✔
3821
        if v == nil {
329✔
3822
                return false, [33]byte{}, [33]byte{}
121✔
3823
        }
121✔
3824

3825
        var pubKey1, pubKey2 [33]byte
87✔
3826
        copy(pubKey1[:], v[:33])
87✔
3827
        copy(pubKey2[:], v[33:])
87✔
3828

87✔
3829
        return true, pubKey1, pubKey2
87✔
3830
}
3831

3832
// NumZombies returns the current number of zombie channels in the graph.
3833
func (c *ChannelGraph) NumZombies() (uint64, error) {
4✔
3834
        var numZombies uint64
4✔
3835
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
3836
                edges := tx.ReadBucket(edgeBucket)
4✔
3837
                if edges == nil {
4✔
3838
                        return nil
×
3839
                }
×
3840
                zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3841
                if zombieIndex == nil {
4✔
3842
                        return nil
×
3843
                }
×
3844

3845
                return zombieIndex.ForEach(func(_, _ []byte) error {
6✔
3846
                        numZombies++
2✔
3847
                        return nil
2✔
3848
                })
2✔
3849
        }, func() {
4✔
3850
                numZombies = 0
4✔
3851
        })
4✔
3852
        if err != nil {
4✔
3853
                return 0, err
×
3854
        }
×
3855

3856
        return numZombies, nil
4✔
3857
}
3858

3859
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3860
// that we can ignore channel announcements that we know to be closed without
3861
// having to validate them and fetch a block.
3862
func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error {
1✔
3863
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
3864
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
1✔
3865
                if err != nil {
1✔
3866
                        return err
×
3867
                }
×
3868

3869
                var k [8]byte
1✔
3870
                byteOrder.PutUint64(k[:], scid.ToUint64())
1✔
3871

1✔
3872
                return closedScids.Put(k[:], []byte{})
1✔
3873
        }, func() {})
1✔
3874
}
3875

3876
// IsClosedScid checks whether a channel identified by the passed in scid is
3877
// closed. This helps avoid having to perform expensive validation checks.
3878
// TODO: Add an LRU cache to cut down on disc reads.
3879
func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
2✔
3880
        var isClosed bool
2✔
3881
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
3882
                closedScids := tx.ReadBucket(closedScidBucket)
2✔
3883
                if closedScids == nil {
2✔
3884
                        return ErrClosedScidsNotFound
×
3885
                }
×
3886

3887
                var k [8]byte
2✔
3888
                byteOrder.PutUint64(k[:], scid.ToUint64())
2✔
3889

2✔
3890
                if closedScids.Get(k[:]) != nil {
3✔
3891
                        isClosed = true
1✔
3892
                        return nil
1✔
3893
                }
1✔
3894

3895
                return nil
1✔
3896
        }, func() {
2✔
3897
                isClosed = false
2✔
3898
        })
2✔
3899
        if err != nil {
2✔
3900
                return false, err
×
3901
        }
×
3902

3903
        return isClosed, nil
2✔
3904
}
3905

3906
func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl
3907
        updateIndex kvdb.RwBucket, node *models.LightningNode) error {
991✔
3908

991✔
3909
        var (
991✔
3910
                scratch [16]byte
991✔
3911
                b       bytes.Buffer
991✔
3912
        )
991✔
3913

991✔
3914
        pub, err := node.PubKey()
991✔
3915
        if err != nil {
991✔
3916
                return err
×
3917
        }
×
3918
        nodePub := pub.SerializeCompressed()
991✔
3919

991✔
3920
        // If the node has the update time set, write it, else write 0.
991✔
3921
        updateUnix := uint64(0)
991✔
3922
        if node.LastUpdate.Unix() > 0 {
1,854✔
3923
                updateUnix = uint64(node.LastUpdate.Unix())
863✔
3924
        }
863✔
3925

3926
        byteOrder.PutUint64(scratch[:8], updateUnix)
991✔
3927
        if _, err := b.Write(scratch[:8]); err != nil {
991✔
3928
                return err
×
3929
        }
×
3930

3931
        if _, err := b.Write(nodePub); err != nil {
991✔
3932
                return err
×
3933
        }
×
3934

3935
        // If we got a node announcement for this node, we will have the rest
3936
        // of the data available. If not we don't have more data to write.
3937
        if !node.HaveNodeAnnouncement {
1,066✔
3938
                // Write HaveNodeAnnouncement=0.
75✔
3939
                byteOrder.PutUint16(scratch[:2], 0)
75✔
3940
                if _, err := b.Write(scratch[:2]); err != nil {
75✔
3941
                        return err
×
3942
                }
×
3943

3944
                return nodeBucket.Put(nodePub, b.Bytes())
75✔
3945
        }
3946

3947
        // Write HaveNodeAnnouncement=1.
3948
        byteOrder.PutUint16(scratch[:2], 1)
916✔
3949
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3950
                return err
×
3951
        }
×
3952

3953
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
916✔
3954
                return err
×
3955
        }
×
3956
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
916✔
3957
                return err
×
3958
        }
×
3959
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
916✔
3960
                return err
×
3961
        }
×
3962

3963
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
916✔
3964
                return err
×
3965
        }
×
3966

3967
        if err := node.Features.Encode(&b); err != nil {
916✔
3968
                return err
×
3969
        }
×
3970

3971
        numAddresses := uint16(len(node.Addresses))
916✔
3972
        byteOrder.PutUint16(scratch[:2], numAddresses)
916✔
3973
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3974
                return err
×
3975
        }
×
3976

3977
        for _, address := range node.Addresses {
2,058✔
3978
                if err := SerializeAddr(&b, address); err != nil {
1,142✔
3979
                        return err
×
3980
                }
×
3981
        }
3982

3983
        sigLen := len(node.AuthSigBytes)
916✔
3984
        if sigLen > 80 {
916✔
3985
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
3986
                        sigLen)
×
3987
        }
×
3988

3989
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
916✔
3990
        if err != nil {
916✔
3991
                return err
×
3992
        }
×
3993

3994
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
916✔
3995
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
3996
        }
×
3997
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
916✔
3998
        if err != nil {
916✔
3999
                return err
×
4000
        }
×
4001

4002
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
916✔
4003
                return err
×
4004
        }
×
4005

4006
        // With the alias bucket updated, we'll now update the index that
4007
        // tracks the time series of node updates.
4008
        var indexKey [8 + 33]byte
916✔
4009
        byteOrder.PutUint64(indexKey[:8], updateUnix)
916✔
4010
        copy(indexKey[8:], nodePub)
916✔
4011

916✔
4012
        // If there was already an old index entry for this node, then we'll
916✔
4013
        // delete the old one before we write the new entry.
916✔
4014
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
1,020✔
4015
                // Extract out the old update time to we can reconstruct the
104✔
4016
                // prior index key to delete it from the index.
104✔
4017
                oldUpdateTime := nodeBytes[:8]
104✔
4018

104✔
4019
                var oldIndexKey [8 + 33]byte
104✔
4020
                copy(oldIndexKey[:8], oldUpdateTime)
104✔
4021
                copy(oldIndexKey[8:], nodePub)
104✔
4022

104✔
4023
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
104✔
4024
                        return err
×
4025
                }
×
4026
        }
4027

4028
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
916✔
4029
                return err
×
4030
        }
×
4031

4032
        return nodeBucket.Put(nodePub, b.Bytes())
916✔
4033
}
4034

4035
func fetchLightningNode(nodeBucket kvdb.RBucket,
4036
        nodePub []byte) (models.LightningNode, error) {
3,633✔
4037

3,633✔
4038
        nodeBytes := nodeBucket.Get(nodePub)
3,633✔
4039
        if nodeBytes == nil {
3,707✔
4040
                return models.LightningNode{}, ErrGraphNodeNotFound
74✔
4041
        }
74✔
4042

4043
        nodeReader := bytes.NewReader(nodeBytes)
3,559✔
4044
        return deserializeLightningNode(nodeReader)
3,559✔
4045
}
4046

4047
func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) {
120✔
4048
        // Always populate a feature vector, even if we don't have a node
120✔
4049
        // announcement and short circuit below.
120✔
4050
        node := newGraphCacheNode(
120✔
4051
                route.Vertex{},
120✔
4052
                lnwire.EmptyFeatureVector(),
120✔
4053
        )
120✔
4054

120✔
4055
        var nodeScratch [8]byte
120✔
4056

120✔
4057
        // Skip ahead:
120✔
4058
        // - LastUpdate (8 bytes)
120✔
4059
        if _, err := r.Read(nodeScratch[:]); err != nil {
120✔
4060
                return nil, err
×
4061
        }
×
4062

4063
        if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil {
120✔
4064
                return nil, err
×
4065
        }
×
4066

4067
        // Read the node announcement flag.
4068
        if _, err := r.Read(nodeScratch[:2]); err != nil {
120✔
4069
                return nil, err
×
4070
        }
×
4071
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
120✔
4072

120✔
4073
        // The rest of the data is optional, and will only be there if we got a
120✔
4074
        // node announcement for this node.
120✔
4075
        if hasNodeAnn == 0 {
120✔
4076
                return node, nil
×
4077
        }
×
4078

4079
        // We did get a node announcement for this node, so we'll have the rest
4080
        // of the data available.
4081
        var rgb uint8
120✔
4082
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4083
                return nil, err
×
4084
        }
×
4085
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4086
                return nil, err
×
4087
        }
×
4088
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4089
                return nil, err
×
4090
        }
×
4091

4092
        if _, err := wire.ReadVarString(r, 0); err != nil {
120✔
4093
                return nil, err
×
4094
        }
×
4095

4096
        if err := node.features.Decode(r); err != nil {
120✔
4097
                return nil, err
×
4098
        }
×
4099

4100
        return node, nil
120✔
4101
}
4102

4103
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
8,517✔
4104
        var (
8,517✔
4105
                node    models.LightningNode
8,517✔
4106
                scratch [8]byte
8,517✔
4107
                err     error
8,517✔
4108
        )
8,517✔
4109

8,517✔
4110
        // Always populate a feature vector, even if we don't have a node
8,517✔
4111
        // announcement and short circuit below.
8,517✔
4112
        node.Features = lnwire.EmptyFeatureVector()
8,517✔
4113

8,517✔
4114
        if _, err := r.Read(scratch[:]); err != nil {
8,517✔
4115
                return models.LightningNode{}, err
×
4116
        }
×
4117

4118
        unix := int64(byteOrder.Uint64(scratch[:]))
8,517✔
4119
        node.LastUpdate = time.Unix(unix, 0)
8,517✔
4120

8,517✔
4121
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
8,517✔
4122
                return models.LightningNode{}, err
×
4123
        }
×
4124

4125
        if _, err := r.Read(scratch[:2]); err != nil {
8,517✔
4126
                return models.LightningNode{}, err
×
4127
        }
×
4128

4129
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
8,517✔
4130
        if hasNodeAnn == 1 {
16,894✔
4131
                node.HaveNodeAnnouncement = true
8,377✔
4132
        } else {
8,517✔
4133
                node.HaveNodeAnnouncement = false
140✔
4134
        }
140✔
4135

4136
        // The rest of the data is optional, and will only be there if we got a
4137
        // node announcement for this node.
4138
        if !node.HaveNodeAnnouncement {
8,657✔
4139
                return node, nil
140✔
4140
        }
140✔
4141

4142
        // We did get a node announcement for this node, so we'll have the rest
4143
        // of the data available.
4144
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
8,377✔
4145
                return models.LightningNode{}, err
×
4146
        }
×
4147
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
8,377✔
4148
                return models.LightningNode{}, err
×
4149
        }
×
4150
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
8,377✔
4151
                return models.LightningNode{}, err
×
4152
        }
×
4153

4154
        node.Alias, err = wire.ReadVarString(r, 0)
8,377✔
4155
        if err != nil {
8,377✔
4156
                return models.LightningNode{}, err
×
4157
        }
×
4158

4159
        err = node.Features.Decode(r)
8,377✔
4160
        if err != nil {
8,377✔
4161
                return models.LightningNode{}, err
×
4162
        }
×
4163

4164
        if _, err := r.Read(scratch[:2]); err != nil {
8,377✔
4165
                return models.LightningNode{}, err
×
4166
        }
×
4167
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
8,377✔
4168

8,377✔
4169
        var addresses []net.Addr
8,377✔
4170
        for i := 0; i < numAddresses; i++ {
18,997✔
4171
                address, err := DeserializeAddr(r)
10,620✔
4172
                if err != nil {
10,620✔
4173
                        return models.LightningNode{}, err
×
4174
                }
×
4175
                addresses = append(addresses, address)
10,620✔
4176
        }
4177
        node.Addresses = addresses
8,377✔
4178

8,377✔
4179
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,377✔
4180
        if err != nil {
8,377✔
4181
                return models.LightningNode{}, err
×
4182
        }
×
4183

4184
        // We'll try and see if there are any opaque bytes left, if not, then
4185
        // we'll ignore the EOF error and return the node as is.
4186
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
8,377✔
4187
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,377✔
4188
        )
8,377✔
4189
        switch {
8,377✔
4190
        case err == io.ErrUnexpectedEOF:
×
4191
        case err == io.EOF:
×
4192
        case err != nil:
×
4193
                return models.LightningNode{}, err
×
4194
        }
4195

4196
        return node, nil
8,377✔
4197
}
4198

4199
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4200
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
1,486✔
4201

1,486✔
4202
        var b bytes.Buffer
1,486✔
4203

1,486✔
4204
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
1,486✔
4205
                return err
×
4206
        }
×
4207
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
1,486✔
4208
                return err
×
4209
        }
×
4210
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
1,486✔
4211
                return err
×
4212
        }
×
4213
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
1,486✔
4214
                return err
×
4215
        }
×
4216

4217
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
1,486✔
4218
                return err
×
4219
        }
×
4220

4221
        authProof := edgeInfo.AuthProof
1,486✔
4222
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
1,486✔
4223
        if authProof != nil {
2,889✔
4224
                nodeSig1 = authProof.NodeSig1Bytes
1,403✔
4225
                nodeSig2 = authProof.NodeSig2Bytes
1,403✔
4226
                bitcoinSig1 = authProof.BitcoinSig1Bytes
1,403✔
4227
                bitcoinSig2 = authProof.BitcoinSig2Bytes
1,403✔
4228
        }
1,403✔
4229

4230
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
1,486✔
4231
                return err
×
4232
        }
×
4233
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
1,486✔
4234
                return err
×
4235
        }
×
4236
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
1,486✔
4237
                return err
×
4238
        }
×
4239
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
1,486✔
4240
                return err
×
4241
        }
×
4242

4243
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
1,486✔
4244
                return err
×
4245
        }
×
4246
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
1,486✔
4247
        if err != nil {
1,486✔
4248
                return err
×
4249
        }
×
4250
        if _, err := b.Write(chanID[:]); err != nil {
1,486✔
4251
                return err
×
4252
        }
×
4253
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
1,486✔
4254
                return err
×
4255
        }
×
4256

4257
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
1,486✔
4258
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4259
        }
×
4260
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
1,486✔
4261
        if err != nil {
1,486✔
4262
                return err
×
4263
        }
×
4264

4265
        return edgeIndex.Put(chanID[:], b.Bytes())
1,486✔
4266
}
4267

4268
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4269
        chanID []byte) (models.ChannelEdgeInfo, error) {
4,193✔
4270

4,193✔
4271
        edgeInfoBytes := edgeIndex.Get(chanID)
4,193✔
4272
        if edgeInfoBytes == nil {
4,276✔
4273
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
83✔
4274
        }
83✔
4275

4276
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
4,110✔
4277
        return deserializeChanEdgeInfo(edgeInfoReader)
4,110✔
4278
}
4279

4280
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
4,745✔
4281
        var (
4,745✔
4282
                err      error
4,745✔
4283
                edgeInfo models.ChannelEdgeInfo
4,745✔
4284
        )
4,745✔
4285

4,745✔
4286
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
4,745✔
4287
                return models.ChannelEdgeInfo{}, err
×
4288
        }
×
4289
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
4,745✔
4290
                return models.ChannelEdgeInfo{}, err
×
4291
        }
×
4292
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
4,745✔
4293
                return models.ChannelEdgeInfo{}, err
×
4294
        }
×
4295
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
4,745✔
4296
                return models.ChannelEdgeInfo{}, err
×
4297
        }
×
4298

4299
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
4,745✔
4300
        if err != nil {
4,745✔
4301
                return models.ChannelEdgeInfo{}, err
×
4302
        }
×
4303

4304
        proof := &models.ChannelAuthProof{}
4,745✔
4305

4,745✔
4306
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,745✔
4307
        if err != nil {
4,745✔
4308
                return models.ChannelEdgeInfo{}, err
×
4309
        }
×
4310
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,745✔
4311
        if err != nil {
4,745✔
4312
                return models.ChannelEdgeInfo{}, err
×
4313
        }
×
4314
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,745✔
4315
        if err != nil {
4,745✔
4316
                return models.ChannelEdgeInfo{}, err
×
4317
        }
×
4318
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,745✔
4319
        if err != nil {
4,745✔
4320
                return models.ChannelEdgeInfo{}, err
×
4321
        }
×
4322

4323
        if !proof.IsEmpty() {
6,537✔
4324
                edgeInfo.AuthProof = proof
1,792✔
4325
        }
1,792✔
4326

4327
        edgeInfo.ChannelPoint = wire.OutPoint{}
4,745✔
4328
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
4,745✔
4329
                return models.ChannelEdgeInfo{}, err
×
4330
        }
×
4331
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
4,745✔
4332
                return models.ChannelEdgeInfo{}, err
×
4333
        }
×
4334
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
4,745✔
4335
                return models.ChannelEdgeInfo{}, err
×
4336
        }
×
4337

4338
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
4,745✔
4339
                return models.ChannelEdgeInfo{}, err
×
4340
        }
×
4341

4342
        // We'll try and see if there are any opaque bytes left, if not, then
4343
        // we'll ignore the EOF error and return the edge as is.
4344
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
4,745✔
4345
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
4,745✔
4346
        )
4,745✔
4347
        switch {
4,745✔
4348
        case err == io.ErrUnexpectedEOF:
×
4349
        case err == io.EOF:
×
4350
        case err != nil:
×
4351
                return models.ChannelEdgeInfo{}, err
×
4352
        }
4353

4354
        return edgeInfo, nil
4,745✔
4355
}
4356

4357
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4358
        from, to []byte) error {
2,660✔
4359

2,660✔
4360
        var edgeKey [33 + 8]byte
2,660✔
4361
        copy(edgeKey[:], from)
2,660✔
4362
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
2,660✔
4363

2,660✔
4364
        var b bytes.Buffer
2,660✔
4365
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
2,660✔
4366
                return err
×
4367
        }
×
4368

4369
        // Before we write out the new edge, we'll create a new entry in the
4370
        // update index in order to keep it fresh.
4371
        updateUnix := uint64(edge.LastUpdate.Unix())
2,660✔
4372
        var indexKey [8 + 8]byte
2,660✔
4373
        byteOrder.PutUint64(indexKey[:8], updateUnix)
2,660✔
4374
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
2,660✔
4375

2,660✔
4376
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
2,660✔
4377
        if err != nil {
2,660✔
4378
                return err
×
4379
        }
×
4380

4381
        // If there was already an entry for this edge, then we'll need to
4382
        // delete the old one to ensure we don't leave around any after-images.
4383
        // An unknown policy value does not have a update time recorded, so
4384
        // it also does not need to be removed.
4385
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
2,660✔
4386
                !bytes.Equal(edgeBytes[:], unknownPolicy) {
2,684✔
4387

24✔
4388
                // In order to delete the old entry, we'll need to obtain the
24✔
4389
                // *prior* update time in order to delete it. To do this, we'll
24✔
4390
                // need to deserialize the existing policy within the database
24✔
4391
                // (now outdated by the new one), and delete its corresponding
24✔
4392
                // entry within the update index. We'll ignore any
24✔
4393
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
24✔
4394
                // the channel ID and update time to delete the entry.
24✔
4395
                // TODO(halseth): get rid of these invalid policies in a
24✔
4396
                // migration.
24✔
4397
                oldEdgePolicy, err := deserializeChanEdgePolicy(
24✔
4398
                        bytes.NewReader(edgeBytes),
24✔
4399
                )
24✔
4400
                if err != nil && err != ErrEdgePolicyOptionalFieldNotFound {
24✔
4401
                        return err
×
4402
                }
×
4403

4404
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
24✔
4405

24✔
4406
                var oldIndexKey [8 + 8]byte
24✔
4407
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
24✔
4408
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
24✔
4409

24✔
4410
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
24✔
4411
                        return err
×
4412
                }
×
4413
        }
4414

4415
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
2,660✔
4416
                return err
×
4417
        }
×
4418

4419
        err = updateEdgePolicyDisabledIndex(
2,660✔
4420
                edges, edge.ChannelID,
2,660✔
4421
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
2,660✔
4422
                edge.IsDisabled(),
2,660✔
4423
        )
2,660✔
4424
        if err != nil {
2,660✔
4425
                return err
×
4426
        }
×
4427

4428
        return edges.Put(edgeKey[:], b.Bytes()[:])
2,660✔
4429
}
4430

4431
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4432
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4433
// one.
4434
// The direction represents the direction of the edge and disabled is used for
4435
// deciding whether to remove or add an entry to the bucket.
4436
// In general a channel is disabled if two entries for the same chanID exist
4437
// in this bucket.
4438
// Maintaining the bucket this way allows a fast retrieval of disabled
4439
// channels, for example when prune is needed.
4440
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4441
        direction bool, disabled bool) error {
2,948✔
4442

2,948✔
4443
        var disabledEdgeKey [8 + 1]byte
2,948✔
4444
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
2,948✔
4445
        if direction {
4,415✔
4446
                disabledEdgeKey[8] = 1
1,467✔
4447
        }
1,467✔
4448

4449
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
2,948✔
4450
                disabledEdgePolicyBucket,
2,948✔
4451
        )
2,948✔
4452
        if err != nil {
2,948✔
4453
                return err
×
4454
        }
×
4455

4456
        if disabled {
2,974✔
4457
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
26✔
4458
        }
26✔
4459

4460
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
2,922✔
4461
}
4462

4463
// putChanEdgePolicyUnknown marks the edge policy as unknown
4464
// in the edges bucket.
4465
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4466
        from []byte) error {
2,970✔
4467

2,970✔
4468
        var edgeKey [33 + 8]byte
2,970✔
4469
        copy(edgeKey[:], from)
2,970✔
4470
        byteOrder.PutUint64(edgeKey[33:], channelID)
2,970✔
4471

2,970✔
4472
        if edges.Get(edgeKey[:]) != nil {
2,970✔
4473
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4474
                        " when there is already a policy present", channelID)
×
4475
        }
×
4476

4477
        return edges.Put(edgeKey[:], unknownPolicy)
2,970✔
4478
}
4479

4480
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4481
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
8,188✔
4482

8,188✔
4483
        var edgeKey [33 + 8]byte
8,188✔
4484
        copy(edgeKey[:], nodePub)
8,188✔
4485
        copy(edgeKey[33:], chanID[:])
8,188✔
4486

8,188✔
4487
        edgeBytes := edges.Get(edgeKey[:])
8,188✔
4488
        if edgeBytes == nil {
8,188✔
4489
                return nil, ErrEdgeNotFound
×
4490
        }
×
4491

4492
        // No need to deserialize unknown policy.
4493
        if bytes.Equal(edgeBytes[:], unknownPolicy) {
8,573✔
4494
                return nil, nil
385✔
4495
        }
385✔
4496

4497
        edgeReader := bytes.NewReader(edgeBytes)
7,803✔
4498

7,803✔
4499
        ep, err := deserializeChanEdgePolicy(edgeReader)
7,803✔
4500
        switch {
7,803✔
4501
        // If the db policy was missing an expected optional field, we return
4502
        // nil as if the policy was unknown.
4503
        case err == ErrEdgePolicyOptionalFieldNotFound:
1✔
4504
                return nil, nil
1✔
4505

4506
        case err != nil:
×
4507
                return nil, err
×
4508
        }
4509

4510
        return ep, nil
7,802✔
4511
}
4512

4513
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4514
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4515
        error) {
245✔
4516

245✔
4517
        edgeInfo := edgeIndex.Get(chanID)
245✔
4518
        if edgeInfo == nil {
245✔
4519
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4520
                        chanID)
×
4521
        }
×
4522

4523
        // The first node is contained within the first half of the edge
4524
        // information. We only propagate the error here and below if it's
4525
        // something other than edge non-existence.
4526
        node1Pub := edgeInfo[:33]
245✔
4527
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
245✔
4528
        if err != nil {
245✔
4529
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4530
                        node1Pub)
×
4531
        }
×
4532

4533
        // Similarly, the second node is contained within the latter
4534
        // half of the edge information.
4535
        node2Pub := edgeInfo[33:66]
245✔
4536
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
245✔
4537
        if err != nil {
245✔
4538
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4539
                        node2Pub)
×
4540
        }
×
4541

4542
        return edge1, edge2, nil
245✔
4543
}
4544

4545
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4546
        to []byte) error {
2,662✔
4547

2,662✔
4548
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
2,662✔
4549
        if err != nil {
2,662✔
4550
                return err
×
4551
        }
×
4552

4553
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
2,662✔
4554
                return err
×
4555
        }
×
4556

4557
        var scratch [8]byte
2,662✔
4558
        updateUnix := uint64(edge.LastUpdate.Unix())
2,662✔
4559
        byteOrder.PutUint64(scratch[:], updateUnix)
2,662✔
4560
        if _, err := w.Write(scratch[:]); err != nil {
2,662✔
4561
                return err
×
4562
        }
×
4563

4564
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
2,662✔
4565
                return err
×
4566
        }
×
4567
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
2,662✔
4568
                return err
×
4569
        }
×
4570
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
2,662✔
4571
                return err
×
4572
        }
×
4573
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
2,662✔
4574
                return err
×
4575
        }
×
4576
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
2,662✔
4577
        if err != nil {
2,662✔
4578
                return err
×
4579
        }
×
4580
        err = binary.Write(
2,662✔
4581
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
2,662✔
4582
        )
2,662✔
4583
        if err != nil {
2,662✔
4584
                return err
×
4585
        }
×
4586

4587
        if _, err := w.Write(to); err != nil {
2,662✔
4588
                return err
×
4589
        }
×
4590

4591
        // If the max_htlc field is present, we write it. To be compatible with
4592
        // older versions that wasn't aware of this field, we write it as part
4593
        // of the opaque data.
4594
        // TODO(halseth): clean up when moving to TLV.
4595
        var opaqueBuf bytes.Buffer
2,662✔
4596
        if edge.MessageFlags.HasMaxHtlc() {
4,940✔
4597
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
2,278✔
4598
                if err != nil {
2,278✔
4599
                        return err
×
4600
                }
×
4601
        }
4602

4603
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
2,662✔
4604
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4605
        }
×
4606
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
2,662✔
4607
                return err
×
4608
        }
×
4609

4610
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
2,662✔
4611
                return err
×
4612
        }
×
4613
        return nil
2,662✔
4614
}
4615

4616
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
7,828✔
4617
        // Deserialize the policy. Note that in case an optional field is not
7,828✔
4618
        // found, both an error and a populated policy object are returned.
7,828✔
4619
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
7,828✔
4620
        if deserializeErr != nil &&
7,828✔
4621
                deserializeErr != ErrEdgePolicyOptionalFieldNotFound {
7,828✔
4622

×
4623
                return nil, deserializeErr
×
4624
        }
×
4625

4626
        return edge, deserializeErr
7,828✔
4627
}
4628

4629
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4630
        error) {
8,836✔
4631

8,836✔
4632
        edge := &models.ChannelEdgePolicy{}
8,836✔
4633

8,836✔
4634
        var err error
8,836✔
4635
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,836✔
4636
        if err != nil {
8,836✔
4637
                return nil, err
×
4638
        }
×
4639

4640
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
8,836✔
4641
                return nil, err
×
4642
        }
×
4643

4644
        var scratch [8]byte
8,836✔
4645
        if _, err := r.Read(scratch[:]); err != nil {
8,836✔
4646
                return nil, err
×
4647
        }
×
4648
        unix := int64(byteOrder.Uint64(scratch[:]))
8,836✔
4649
        edge.LastUpdate = time.Unix(unix, 0)
8,836✔
4650

8,836✔
4651
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
8,836✔
4652
                return nil, err
×
4653
        }
×
4654
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
8,836✔
4655
                return nil, err
×
4656
        }
×
4657
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
8,836✔
4658
                return nil, err
×
4659
        }
×
4660

4661
        var n uint64
8,836✔
4662
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,836✔
4663
                return nil, err
×
4664
        }
×
4665
        edge.MinHTLC = lnwire.MilliSatoshi(n)
8,836✔
4666

8,836✔
4667
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,836✔
4668
                return nil, err
×
4669
        }
×
4670
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
8,836✔
4671

8,836✔
4672
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,836✔
4673
                return nil, err
×
4674
        }
×
4675
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
8,836✔
4676

8,836✔
4677
        if _, err := r.Read(edge.ToNode[:]); err != nil {
8,836✔
4678
                return nil, err
×
4679
        }
×
4680

4681
        // We'll try and see if there are any opaque bytes left, if not, then
4682
        // we'll ignore the EOF error and return the edge as is.
4683
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
8,836✔
4684
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,836✔
4685
        )
8,836✔
4686
        switch {
8,836✔
4687
        case err == io.ErrUnexpectedEOF:
×
4688
        case err == io.EOF:
3✔
4689
        case err != nil:
×
4690
                return nil, err
×
4691
        }
4692

4693
        // See if optional fields are present.
4694
        if edge.MessageFlags.HasMaxHtlc() {
17,291✔
4695
                // The max_htlc field should be at the beginning of the opaque
8,455✔
4696
                // bytes.
8,455✔
4697
                opq := edge.ExtraOpaqueData
8,455✔
4698

8,455✔
4699
                // If the max_htlc field is not present, it might be old data
8,455✔
4700
                // stored before this field was validated. We'll return the
8,455✔
4701
                // edge along with an error.
8,455✔
4702
                if len(opq) < 8 {
8,458✔
4703
                        return edge, ErrEdgePolicyOptionalFieldNotFound
3✔
4704
                }
3✔
4705

4706
                maxHtlc := byteOrder.Uint64(opq[:8])
8,452✔
4707
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
8,452✔
4708

8,452✔
4709
                // Exclude the parsed field from the rest of the opaque data.
8,452✔
4710
                edge.ExtraOpaqueData = opq[8:]
8,452✔
4711
        }
4712

4713
        return edge, nil
8,833✔
4714
}
4715

4716
// MakeTestGraph creates a new instance of the ChannelGraph for testing
4717
// purposes.
4718
func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph,
4719
        error) {
39✔
4720

39✔
4721
        opts := DefaultOptions()
39✔
4722
        for _, modifier := range modifiers {
39✔
4723
                modifier(opts)
×
4724
        }
×
4725

4726
        // Next, create channelgraph for the first time.
4727
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
39✔
4728
        if err != nil {
39✔
4729
                backendCleanup()
×
4730
                return nil, err
×
4731
        }
×
4732

4733
        graph, err := NewChannelGraph(backend)
39✔
4734
        if err != nil {
39✔
4735
                backendCleanup()
×
4736
                return nil, err
×
4737
        }
×
4738

4739
        t.Cleanup(func() {
78✔
4740
                _ = backend.Close()
39✔
4741
                backendCleanup()
39✔
4742
        })
39✔
4743

4744
        return graph, nil
39✔
4745
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc