• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13440912774

20 Feb 2025 05:14PM UTC coverage: 57.697% (-1.1%) from 58.802%
13440912774

Pull #9535

github

guggero
GitHub: remove duplicate caching

Turns out that actions/setup-go starting with @v4 also adds caching.
With that, our cache size on disk has almost doubled, leading to the
GitHub runner running out of space in certain situation.
We fix that by disabling the automated caching since we already have our
own, custom-tailored version.
Pull Request #9535: GitHub: remove duplicate caching

103519 of 179417 relevant lines covered (57.7%)

24825.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.88
/graph/db/graph.go
1
package graphdb
2

3
import (
4
        "bytes"
5
        "crypto/sha256"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "math"
11
        "net"
12
        "sort"
13
        "sync"
14
        "testing"
15
        "time"
16

17
        "github.com/btcsuite/btcd/btcec/v2"
18
        "github.com/btcsuite/btcd/chaincfg/chainhash"
19
        "github.com/btcsuite/btcd/txscript"
20
        "github.com/btcsuite/btcd/wire"
21
        "github.com/btcsuite/btcwallet/walletdb"
22
        "github.com/lightningnetwork/lnd/aliasmgr"
23
        "github.com/lightningnetwork/lnd/batch"
24
        "github.com/lightningnetwork/lnd/graph/db/models"
25
        "github.com/lightningnetwork/lnd/input"
26
        "github.com/lightningnetwork/lnd/kvdb"
27
        "github.com/lightningnetwork/lnd/lnwire"
28
        "github.com/lightningnetwork/lnd/routing/route"
29
)
30

31
var (
32
        // nodeBucket is a bucket which houses all the vertices or nodes within
33
        // the channel graph. This bucket has a single-sub bucket which adds an
34
        // additional index from pubkey -> alias. Within the top-level of this
35
        // bucket, the key space maps a node's compressed public key to the
36
        // serialized information for that node. Additionally, there's a
37
        // special key "source" which stores the pubkey of the source node. The
38
        // source node is used as the starting point for all graph/queries and
39
        // traversals. The graph is formed as a star-graph with the source node
40
        // at the center.
41
        //
42
        // maps: pubKey -> nodeInfo
43
        // maps: source -> selfPubKey
44
        nodeBucket = []byte("graph-node")
45

46
        // nodeUpdateIndexBucket is a sub-bucket of the nodeBucket. This bucket
47
        // will be used to quickly look up the "freshness" of a node's last
48
        // update to the network. The bucket only contains keys, and no values,
49
        // it's mapping:
50
        //
51
        // maps: updateTime || nodeID -> nil
52
        nodeUpdateIndexBucket = []byte("graph-node-update-index")
53

54
        // sourceKey is a special key that resides within the nodeBucket. The
55
        // sourceKey maps a key to the public key of the "self node".
56
        sourceKey = []byte("source")
57

58
        // aliasIndexBucket is a sub-bucket that's nested within the main
59
        // nodeBucket. This bucket maps the public key of a node to its
60
        // current alias. This bucket is provided as it can be used within a
61
        // future UI layer to add an additional degree of confirmation.
62
        aliasIndexBucket = []byte("alias")
63

64
        // edgeBucket is a bucket which houses all of the edge or channel
65
        // information within the channel graph. This bucket essentially acts
66
        // as an adjacency list, which in conjunction with a range scan, can be
67
        // used to iterate over all the incoming and outgoing edges for a
68
        // particular node. Key in the bucket use a prefix scheme which leads
69
        // with the node's public key and sends with the compact edge ID.
70
        // For each chanID, there will be two entries within the bucket, as the
71
        // graph is directed: nodes may have different policies w.r.t to fees
72
        // for their respective directions.
73
        //
74
        // maps: pubKey || chanID -> channel edge policy for node
75
        edgeBucket = []byte("graph-edge")
76

77
        // unknownPolicy is represented as an empty slice. It is
78
        // used as the value in edgeBucket for unknown channel edge policies.
79
        // Unknown policies are still stored in the database to enable efficient
80
        // lookup of incoming channel edges.
81
        unknownPolicy = []byte{}
82

83
        // chanStart is an array of all zero bytes which is used to perform
84
        // range scans within the edgeBucket to obtain all of the outgoing
85
        // edges for a particular node.
86
        chanStart [8]byte
87

88
        // edgeIndexBucket is an index which can be used to iterate all edges
89
        // in the bucket, grouping them according to their in/out nodes.
90
        // Additionally, the items in this bucket also contain the complete
91
        // edge information for a channel. The edge information includes the
92
        // capacity of the channel, the nodes that made the channel, etc. This
93
        // bucket resides within the edgeBucket above. Creation of an edge
94
        // proceeds in two phases: first the edge is added to the edge index,
95
        // afterwards the edgeBucket can be updated with the latest details of
96
        // the edge as they are announced on the network.
97
        //
98
        // maps: chanID -> pubKey1 || pubKey2 || restofEdgeInfo
99
        edgeIndexBucket = []byte("edge-index")
100

101
        // edgeUpdateIndexBucket is a sub-bucket of the main edgeBucket. This
102
        // bucket contains an index which allows us to gauge the "freshness" of
103
        // a channel's last updates.
104
        //
105
        // maps: updateTime || chanID -> nil
106
        edgeUpdateIndexBucket = []byte("edge-update-index")
107

108
        // channelPointBucket maps a channel's full outpoint (txid:index) to
109
        // its short 8-byte channel ID. This bucket resides within the
110
        // edgeBucket above, and can be used to quickly remove an edge due to
111
        // the outpoint being spent, or to query for existence of a channel.
112
        //
113
        // maps: outPoint -> chanID
114
        channelPointBucket = []byte("chan-index")
115

116
        // zombieBucket is a sub-bucket of the main edgeBucket bucket
117
        // responsible for maintaining an index of zombie channels. Each entry
118
        // exists within the bucket as follows:
119
        //
120
        // maps: chanID -> pubKey1 || pubKey2
121
        //
122
        // The chanID represents the channel ID of the edge that is marked as a
123
        // zombie and is used as the key, which maps to the public keys of the
124
        // edge's participants.
125
        zombieBucket = []byte("zombie-index")
126

127
        // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket
128
        // bucket responsible for maintaining an index of disabled edge
129
        // policies. Each entry exists within the bucket as follows:
130
        //
131
        // maps: <chanID><direction> -> []byte{}
132
        //
133
        // The chanID represents the channel ID of the edge and the direction is
134
        // one byte representing the direction of the edge. The main purpose of
135
        // this index is to allow pruning disabled channels in a fast way without
136
        // the need to iterate all over the graph.
137
        disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
138

139
        // graphMetaBucket is a top-level bucket which stores various meta-deta
140
        // related to the on-disk channel graph. Data stored in this bucket
141
        // includes the block to which the graph has been synced to, the total
142
        // number of channels, etc.
143
        graphMetaBucket = []byte("graph-meta")
144

145
        // pruneLogBucket is a bucket within the graphMetaBucket that stores
146
        // a mapping from the block height to the hash for the blocks used to
147
        // prune the graph.
148
        // Once a new block is discovered, any channels that have been closed
149
        // (by spending the outpoint) can safely be removed from the graph, and
150
        // the block is added to the prune log. We need to keep such a log for
151
        // the case where a reorg happens, and we must "rewind" the state of the
152
        // graph by removing channels that were previously confirmed. In such a
153
        // case we'll remove all entries from the prune log with a block height
154
        // that no longer exists.
155
        pruneLogBucket = []byte("prune-log")
156

157
        // closedScidBucket is a top-level bucket that stores scids for
158
        // channels that we know to be closed. This is used so that we don't
159
        // need to perform expensive validation checks if we receive a channel
160
        // announcement for the channel again.
161
        //
162
        // maps: scid -> []byte{}
163
        closedScidBucket = []byte("closed-scid")
164
)
165

166
const (
167
        // MaxAllowedExtraOpaqueBytes is the largest amount of opaque bytes that
168
        // we'll permit to be written to disk. We limit this as otherwise, it
169
        // would be possible for a node to create a ton of updates and slowly
170
        // fill our disk, and also waste bandwidth due to relaying.
171
        MaxAllowedExtraOpaqueBytes = 10000
172
)
173

174
// ChannelGraph is a persistent, on-disk graph representation of the Lightning
175
// Network. This struct can be used to implement path finding algorithms on top
176
// of, and also to update a node's view based on information received from the
177
// p2p network. Internally, the graph is stored using a modified adjacency list
178
// representation with some added object interaction possible with each
179
// serialized edge/node. The graph is stored is directed, meaning that are two
180
// edges stored for each channel: an inbound/outbound edge for each node pair.
181
// Nodes, edges, and edge information can all be added to the graph
182
// independently. Edge removal results in the deletion of all edge information
183
// for that edge.
184
type ChannelGraph struct {
185
        db kvdb.Backend
186

187
        // cacheMu guards all caches (rejectCache, chanCache, graphCache). If
188
        // this mutex will be acquired at the same time as the DB mutex then
189
        // the cacheMu MUST be acquired first to prevent deadlock.
190
        cacheMu     sync.RWMutex
191
        rejectCache *rejectCache
192
        chanCache   *channelCache
193
        graphCache  *GraphCache
194

195
        chanScheduler batch.Scheduler
196
        nodeScheduler batch.Scheduler
197
}
198

199
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
200
// returned instance has its own unique reject cache and channel cache.
201
func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
202
        error) {
173✔
203

173✔
204
        opts := DefaultOptions()
173✔
205
        for _, o := range options {
275✔
206
                o(opts)
102✔
207
        }
102✔
208

209
        if !opts.NoMigration {
346✔
210
                if err := initChannelGraph(db); err != nil {
173✔
211
                        return nil, err
×
212
                }
×
213
        }
214

215
        g := &ChannelGraph{
173✔
216
                db:          db,
173✔
217
                rejectCache: newRejectCache(opts.RejectCacheSize),
173✔
218
                chanCache:   newChannelCache(opts.ChannelCacheSize),
173✔
219
        }
173✔
220
        g.chanScheduler = batch.NewTimeScheduler(
173✔
221
                db, &g.cacheMu, opts.BatchCommitInterval,
173✔
222
        )
173✔
223
        g.nodeScheduler = batch.NewTimeScheduler(
173✔
224
                db, nil, opts.BatchCommitInterval,
173✔
225
        )
173✔
226

173✔
227
        // The graph cache can be turned off (e.g. for mobile users) for a
173✔
228
        // speed/memory usage tradeoff.
173✔
229
        if opts.UseGraphCache {
313✔
230
                g.graphCache = NewGraphCache(opts.PreAllocCacheNumNodes)
140✔
231
                startTime := time.Now()
140✔
232
                log.Debugf("Populating in-memory channel graph, this might " +
140✔
233
                        "take a while...")
140✔
234

140✔
235
                err := g.ForEachNodeCacheable(func(node route.Vertex,
140✔
236
                        features *lnwire.FeatureVector) error {
240✔
237

100✔
238
                        g.graphCache.AddNodeFeatures(node, features)
100✔
239

100✔
240
                        return nil
100✔
241
                })
100✔
242
                if err != nil {
140✔
243
                        return nil, err
×
244
                }
×
245

246
                err = g.ForEachChannel(func(info *models.ChannelEdgeInfo,
140✔
247
                        policy1, policy2 *models.ChannelEdgePolicy) error {
536✔
248

396✔
249
                        g.graphCache.AddChannel(info, policy1, policy2)
396✔
250

396✔
251
                        return nil
396✔
252
                })
396✔
253
                if err != nil {
140✔
254
                        return nil, err
×
255
                }
×
256

257
                log.Debugf("Finished populating in-memory channel graph (took "+
140✔
258
                        "%v, %s)", time.Since(startTime), g.graphCache.Stats())
140✔
259
        }
260

261
        return g, nil
173✔
262
}
263

264
// channelMapKey is the key structure used for storing channel edge policies.
265
type channelMapKey struct {
266
        nodeKey route.Vertex
267
        chanID  [8]byte
268
}
269

270
// getChannelMap loads all channel edge policies from the database and stores
271
// them in a map.
272
func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
273
        map[channelMapKey]*models.ChannelEdgePolicy, error) {
144✔
274

144✔
275
        // Create a map to store all channel edge policies.
144✔
276
        channelMap := make(map[channelMapKey]*models.ChannelEdgePolicy)
144✔
277

144✔
278
        err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
1,715✔
279
                // Skip embedded buckets.
1,571✔
280
                if bytes.Equal(k, edgeIndexBucket) ||
1,571✔
281
                        bytes.Equal(k, edgeUpdateIndexBucket) ||
1,571✔
282
                        bytes.Equal(k, zombieBucket) ||
1,571✔
283
                        bytes.Equal(k, disabledEdgePolicyBucket) ||
1,571✔
284
                        bytes.Equal(k, channelPointBucket) {
2,152✔
285

581✔
286
                        return nil
581✔
287
                }
581✔
288

289
                // Validate key length.
290
                if len(k) != 33+8 {
990✔
291
                        return fmt.Errorf("invalid edge key %x encountered", k)
×
292
                }
×
293

294
                var key channelMapKey
990✔
295
                copy(key.nodeKey[:], k[:33])
990✔
296
                copy(key.chanID[:], k[33:])
990✔
297

990✔
298
                // No need to deserialize unknown policy.
990✔
299
                if bytes.Equal(edgeBytes, unknownPolicy) {
990✔
300
                        return nil
×
301
                }
×
302

303
                edgeReader := bytes.NewReader(edgeBytes)
990✔
304
                edge, err := deserializeChanEdgePolicyRaw(
990✔
305
                        edgeReader,
990✔
306
                )
990✔
307

990✔
308
                switch {
990✔
309
                // If the db policy was missing an expected optional field, we
310
                // return nil as if the policy was unknown.
311
                case err == ErrEdgePolicyOptionalFieldNotFound:
×
312
                        return nil
×
313

314
                case err != nil:
×
315
                        return err
×
316
                }
317

318
                channelMap[key] = edge
990✔
319

990✔
320
                return nil
990✔
321
        })
322
        if err != nil {
144✔
323
                return nil, err
×
324
        }
×
325

326
        return channelMap, nil
144✔
327
}
328

329
var graphTopLevelBuckets = [][]byte{
330
        nodeBucket,
331
        edgeBucket,
332
        graphMetaBucket,
333
        closedScidBucket,
334
}
335

336
// Wipe completely deletes all saved state within all used buckets within the
337
// database. The deletion is done in a single transaction, therefore this
338
// operation is fully atomic.
339
func (c *ChannelGraph) Wipe() error {
×
340
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
×
341
                for _, tlb := range graphTopLevelBuckets {
×
342
                        err := tx.DeleteTopLevelBucket(tlb)
×
343
                        if err != nil && err != kvdb.ErrBucketNotFound {
×
344
                                return err
×
345
                        }
×
346
                }
347
                return nil
×
348
        }, func() {})
×
349
        if err != nil {
×
350
                return err
×
351
        }
×
352

353
        return initChannelGraph(c.db)
×
354
}
355

356
// createChannelDB creates and initializes a fresh version of  In
357
// the case that the target path has not yet been created or doesn't yet exist,
358
// then the path is created. Additionally, all required top-level buckets used
359
// within the database are created.
360
func initChannelGraph(db kvdb.Backend) error {
173✔
361
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
346✔
362
                for _, tlb := range graphTopLevelBuckets {
865✔
363
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
692✔
364
                                return err
×
365
                        }
×
366
                }
367

368
                nodes := tx.ReadWriteBucket(nodeBucket)
173✔
369
                _, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
173✔
370
                if err != nil {
173✔
371
                        return err
×
372
                }
×
373
                _, err = nodes.CreateBucketIfNotExists(nodeUpdateIndexBucket)
173✔
374
                if err != nil {
173✔
375
                        return err
×
376
                }
×
377

378
                edges := tx.ReadWriteBucket(edgeBucket)
173✔
379
                _, err = edges.CreateBucketIfNotExists(edgeIndexBucket)
173✔
380
                if err != nil {
173✔
381
                        return err
×
382
                }
×
383
                _, err = edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
173✔
384
                if err != nil {
173✔
385
                        return err
×
386
                }
×
387
                _, err = edges.CreateBucketIfNotExists(channelPointBucket)
173✔
388
                if err != nil {
173✔
389
                        return err
×
390
                }
×
391
                _, err = edges.CreateBucketIfNotExists(zombieBucket)
173✔
392
                if err != nil {
173✔
393
                        return err
×
394
                }
×
395

396
                graphMeta := tx.ReadWriteBucket(graphMetaBucket)
173✔
397
                _, err = graphMeta.CreateBucketIfNotExists(pruneLogBucket)
173✔
398
                return err
173✔
399
        }, func() {})
173✔
400
        if err != nil {
173✔
401
                return fmt.Errorf("unable to create new channel graph: %w", err)
×
402
        }
×
403

404
        return nil
173✔
405
}
406

407
// AddrsForNode returns all known addresses for the target node public key that
408
// the graph DB is aware of. The returned boolean indicates if the given node is
409
// unknown to the graph DB or not.
410
//
411
// NOTE: this is part of the channeldb.AddrSource interface.
412
func (c *ChannelGraph) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
413
        error) {
1✔
414

1✔
415
        pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
1✔
416
        if err != nil {
1✔
417
                return false, nil, err
×
418
        }
×
419

420
        node, err := c.FetchLightningNode(pubKey)
1✔
421
        // We don't consider it an error if the graph is unaware of the node.
1✔
422
        switch {
1✔
423
        case err != nil && !errors.Is(err, ErrGraphNodeNotFound):
×
424
                return false, nil, err
×
425

426
        case errors.Is(err, ErrGraphNodeNotFound):
×
427
                return false, nil, nil
×
428
        }
429

430
        return true, node.Addresses, nil
1✔
431
}
432

433
// ForEachChannel iterates through all the channel edges stored within the
434
// graph and invokes the passed callback for each edge. The callback takes two
435
// edges as since this is a directed graph, both the in/out edges are visited.
436
// If the callback returns an error, then the transaction is aborted and the
437
// iteration stops early.
438
//
439
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
440
// for that particular channel edge routing policy will be passed into the
441
// callback.
442
func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo,
443
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
144✔
444

144✔
445
        return c.db.View(func(tx kvdb.RTx) error {
288✔
446
                edges := tx.ReadBucket(edgeBucket)
144✔
447
                if edges == nil {
144✔
448
                        return ErrGraphNoEdgesFound
×
449
                }
×
450

451
                // First, load all edges in memory indexed by node and channel
452
                // id.
453
                channelMap, err := c.getChannelMap(edges)
144✔
454
                if err != nil {
144✔
455
                        return err
×
456
                }
×
457

458
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
144✔
459
                if edgeIndex == nil {
144✔
460
                        return ErrGraphNoEdgesFound
×
461
                }
×
462

463
                // Load edge index, recombine each channel with the policies
464
                // loaded above and invoke the callback.
465
                return kvdb.ForAll(
144✔
466
                        edgeIndex, func(k, edgeInfoBytes []byte) error {
639✔
467
                                var chanID [8]byte
495✔
468
                                copy(chanID[:], k)
495✔
469

495✔
470
                                edgeInfoReader := bytes.NewReader(edgeInfoBytes)
495✔
471
                                info, err := deserializeChanEdgeInfo(
495✔
472
                                        edgeInfoReader,
495✔
473
                                )
495✔
474
                                if err != nil {
495✔
475
                                        return err
×
476
                                }
×
477

478
                                policy1 := channelMap[channelMapKey{
495✔
479
                                        nodeKey: info.NodeKey1Bytes,
495✔
480
                                        chanID:  chanID,
495✔
481
                                }]
495✔
482

495✔
483
                                policy2 := channelMap[channelMapKey{
495✔
484
                                        nodeKey: info.NodeKey2Bytes,
495✔
485
                                        chanID:  chanID,
495✔
486
                                }]
495✔
487

495✔
488
                                return cb(&info, policy1, policy2)
495✔
489
                        },
490
                )
491
        }, func() {})
144✔
492
}
493

494
// forEachNodeDirectedChannel iterates through all channels of a given node,
495
// executing the passed callback on the directed edge representing the channel
496
// and its incoming policy. If the callback returns an error, then the iteration
497
// is halted with the error propagated back up to the caller. An optional read
498
// transaction may be provided. If none is provided, a new one will be created.
499
//
500
// Unknown policies are passed into the callback as nil values.
501
func (c *ChannelGraph) forEachNodeDirectedChannel(tx kvdb.RTx,
502
        node route.Vertex, cb func(channel *DirectedChannel) error) error {
704✔
503

704✔
504
        if c.graphCache != nil {
1,166✔
505
                return c.graphCache.ForEachChannel(node, cb)
462✔
506
        }
462✔
507

508
        // Fallback that uses the database.
509
        toNodeCallback := func() route.Vertex {
374✔
510
                return node
132✔
511
        }
132✔
512
        toNodeFeatures, err := c.fetchNodeFeatures(tx, node)
242✔
513
        if err != nil {
242✔
514
                return err
×
515
        }
×
516

517
        dbCallback := func(tx kvdb.RTx, e *models.ChannelEdgeInfo, p1,
242✔
518
                p2 *models.ChannelEdgePolicy) error {
738✔
519

496✔
520
                var cachedInPolicy *models.CachedEdgePolicy
496✔
521
                if p2 != nil {
989✔
522
                        cachedInPolicy = models.NewCachedPolicy(p2)
493✔
523
                        cachedInPolicy.ToNodePubKey = toNodeCallback
493✔
524
                        cachedInPolicy.ToNodeFeatures = toNodeFeatures
493✔
525
                }
493✔
526

527
                var inboundFee lnwire.Fee
496✔
528
                if p1 != nil {
991✔
529
                        // Extract inbound fee. If there is a decoding error,
495✔
530
                        // skip this edge.
495✔
531
                        _, err := p1.ExtraOpaqueData.ExtractRecords(&inboundFee)
495✔
532
                        if err != nil {
496✔
533
                                return nil
1✔
534
                        }
1✔
535
                }
536

537
                directedChannel := &DirectedChannel{
495✔
538
                        ChannelID:    e.ChannelID,
495✔
539
                        IsNode1:      node == e.NodeKey1Bytes,
495✔
540
                        OtherNode:    e.NodeKey2Bytes,
495✔
541
                        Capacity:     e.Capacity,
495✔
542
                        OutPolicySet: p1 != nil,
495✔
543
                        InPolicy:     cachedInPolicy,
495✔
544
                        InboundFee:   inboundFee,
495✔
545
                }
495✔
546

495✔
547
                if node == e.NodeKey2Bytes {
743✔
548
                        directedChannel.OtherNode = e.NodeKey1Bytes
248✔
549
                }
248✔
550

551
                return cb(directedChannel)
495✔
552
        }
553
        return nodeTraversal(tx, node[:], c.db, dbCallback)
242✔
554
}
555

556
// fetchNodeFeatures returns the features of a given node. If no features are
557
// known for the node, an empty feature vector is returned. An optional read
558
// transaction may be provided. If none is provided, a new one will be created.
559
func (c *ChannelGraph) fetchNodeFeatures(tx kvdb.RTx,
560
        node route.Vertex) (*lnwire.FeatureVector, error) {
1,139✔
561

1,139✔
562
        if c.graphCache != nil {
1,592✔
563
                return c.graphCache.GetFeatures(node), nil
453✔
564
        }
453✔
565

566
        // Fallback that uses the database.
567
        targetNode, err := c.FetchLightningNodeTx(tx, node)
686✔
568
        switch err {
686✔
569
        // If the node exists and has features, return them directly.
570
        case nil:
675✔
571
                return targetNode.Features, nil
675✔
572

573
        // If we couldn't find a node announcement, populate a blank feature
574
        // vector.
575
        case ErrGraphNodeNotFound:
11✔
576
                return lnwire.EmptyFeatureVector(), nil
11✔
577

578
        // Otherwise, bubble the error up.
579
        default:
×
580
                return nil, err
×
581
        }
582
}
583

584
// ForEachNodeDirectedChannel iterates through all channels of a given node,
585
// executing the passed callback on the directed edge representing the channel
586
// and its incoming policy. If the callback returns an error, then the iteration
587
// is halted with the error propagated back up to the caller. If the graphCache
588
// is available, then it will be used to retrieve the node's channels instead
589
// of the database.
590
//
591
// Unknown policies are passed into the callback as nil values.
592
//
593
// NOTE: this is part of the graphdb.NodeTraverser interface.
594
func (c *ChannelGraph) ForEachNodeDirectedChannel(nodePub route.Vertex,
595
        cb func(channel *DirectedChannel) error) error {
111✔
596

111✔
597
        return c.forEachNodeDirectedChannel(nil, nodePub, cb)
111✔
598
}
111✔
599

600
// FetchNodeFeatures returns the features of the given node. If no features are
601
// known for the node, an empty feature vector is returned.
602
// If the graphCache is available, then it will be used to retrieve the node's
603
// features instead of the database.
604
//
605
// NOTE: this is part of the graphdb.NodeTraverser interface.
606
func (c *ChannelGraph) FetchNodeFeatures(nodePub route.Vertex) (
607
        *lnwire.FeatureVector, error) {
87✔
608

87✔
609
        return c.fetchNodeFeatures(nil, nodePub)
87✔
610
}
87✔
611

612
// ForEachNodeCached is similar to forEachNode, but it utilizes the channel
613
// graph cache instead. Note that this doesn't return all the information the
614
// regular forEachNode method does.
615
//
616
// NOTE: The callback contents MUST not be modified.
617
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
618
        chans map[uint64]*DirectedChannel) error) error {
1✔
619

1✔
620
        if c.graphCache != nil {
1✔
621
                return c.graphCache.ForEachNode(cb)
×
622
        }
×
623

624
        // Otherwise call back to a version that uses the database directly.
625
        // We'll iterate over each node, then the set of channels for each
626
        // node, and construct a similar callback functiopn signature as the
627
        // main funcotin expects.
628
        return c.forEachNode(func(tx kvdb.RTx,
1✔
629
                node *models.LightningNode) error {
21✔
630

20✔
631
                channels := make(map[uint64]*DirectedChannel)
20✔
632

20✔
633
                err := c.ForEachNodeChannelTx(tx, node.PubKeyBytes,
20✔
634
                        func(tx kvdb.RTx, e *models.ChannelEdgeInfo,
20✔
635
                                p1 *models.ChannelEdgePolicy,
20✔
636
                                p2 *models.ChannelEdgePolicy) error {
210✔
637

190✔
638
                                toNodeCallback := func() route.Vertex {
190✔
639
                                        return node.PubKeyBytes
×
640
                                }
×
641
                                toNodeFeatures, err := c.fetchNodeFeatures(
190✔
642
                                        tx, node.PubKeyBytes,
190✔
643
                                )
190✔
644
                                if err != nil {
190✔
645
                                        return err
×
646
                                }
×
647

648
                                var cachedInPolicy *models.CachedEdgePolicy
190✔
649
                                if p2 != nil {
380✔
650
                                        cachedInPolicy =
190✔
651
                                                models.NewCachedPolicy(p2)
190✔
652
                                        cachedInPolicy.ToNodePubKey =
190✔
653
                                                toNodeCallback
190✔
654
                                        cachedInPolicy.ToNodeFeatures =
190✔
655
                                                toNodeFeatures
190✔
656
                                }
190✔
657

658
                                directedChannel := &DirectedChannel{
190✔
659
                                        ChannelID: e.ChannelID,
190✔
660
                                        IsNode1: node.PubKeyBytes ==
190✔
661
                                                e.NodeKey1Bytes,
190✔
662
                                        OtherNode:    e.NodeKey2Bytes,
190✔
663
                                        Capacity:     e.Capacity,
190✔
664
                                        OutPolicySet: p1 != nil,
190✔
665
                                        InPolicy:     cachedInPolicy,
190✔
666
                                }
190✔
667

190✔
668
                                if node.PubKeyBytes == e.NodeKey2Bytes {
285✔
669
                                        directedChannel.OtherNode =
95✔
670
                                                e.NodeKey1Bytes
95✔
671
                                }
95✔
672

673
                                channels[e.ChannelID] = directedChannel
190✔
674

190✔
675
                                return nil
190✔
676
                        })
677
                if err != nil {
20✔
678
                        return err
×
679
                }
×
680

681
                return cb(node.PubKeyBytes, channels)
20✔
682
        })
683
}
684

685
// DisabledChannelIDs returns the channel ids of disabled channels.
686
// A channel is disabled when two of the associated ChanelEdgePolicies
687
// have their disabled bit on.
688
func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
6✔
689
        var disabledChanIDs []uint64
6✔
690
        var chanEdgeFound map[uint64]struct{}
6✔
691

6✔
692
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
12✔
693
                edges := tx.ReadBucket(edgeBucket)
6✔
694
                if edges == nil {
6✔
695
                        return ErrGraphNoEdgesFound
×
696
                }
×
697

698
                disabledEdgePolicyIndex := edges.NestedReadBucket(
6✔
699
                        disabledEdgePolicyBucket,
6✔
700
                )
6✔
701
                if disabledEdgePolicyIndex == nil {
7✔
702
                        return nil
1✔
703
                }
1✔
704

705
                // We iterate over all disabled policies and we add each channel
706
                // that has more than one disabled policy to disabledChanIDs
707
                // array.
708
                return disabledEdgePolicyIndex.ForEach(
5✔
709
                        func(k, v []byte) error {
16✔
710
                                chanID := byteOrder.Uint64(k[:8])
11✔
711
                                _, edgeFound := chanEdgeFound[chanID]
11✔
712
                                if edgeFound {
15✔
713
                                        delete(chanEdgeFound, chanID)
4✔
714
                                        disabledChanIDs = append(
4✔
715
                                                disabledChanIDs, chanID,
4✔
716
                                        )
4✔
717

4✔
718
                                        return nil
4✔
719
                                }
4✔
720

721
                                chanEdgeFound[chanID] = struct{}{}
7✔
722

7✔
723
                                return nil
7✔
724
                        },
725
                )
726
        }, func() {
6✔
727
                disabledChanIDs = nil
6✔
728
                chanEdgeFound = make(map[uint64]struct{})
6✔
729
        })
6✔
730
        if err != nil {
6✔
731
                return nil, err
×
732
        }
×
733

734
        return disabledChanIDs, nil
6✔
735
}
736

737
// ForEachNode iterates through all the stored vertices/nodes in the graph,
738
// executing the passed callback with each node encountered. If the callback
739
// returns an error, then the transaction is aborted and the iteration stops
740
// early. Any operations performed on the NodeTx passed to the call-back are
741
// executed under the same read transaction and so, methods on the NodeTx object
742
// _MUST_ only be called from within the call-back.
743
func (c *ChannelGraph) ForEachNode(cb func(tx NodeRTx) error) error {
120✔
744
        return c.forEachNode(func(tx kvdb.RTx,
120✔
745
                node *models.LightningNode) error {
1,090✔
746

970✔
747
                return cb(newChanGraphNodeTx(tx, c, node))
970✔
748
        })
970✔
749
}
750

751
// forEachNode iterates through all the stored vertices/nodes in the graph,
752
// executing the passed callback with each node encountered. If the callback
753
// returns an error, then the transaction is aborted and the iteration stops
754
// early.
755
//
756
// TODO(roasbeef): add iterator interface to allow for memory efficient graph
757
// traversal when graph gets mega.
758
func (c *ChannelGraph) forEachNode(
759
        cb func(kvdb.RTx, *models.LightningNode) error) error {
129✔
760

129✔
761
        traversal := func(tx kvdb.RTx) error {
258✔
762
                // First grab the nodes bucket which stores the mapping from
129✔
763
                // pubKey to node information.
129✔
764
                nodes := tx.ReadBucket(nodeBucket)
129✔
765
                if nodes == nil {
129✔
766
                        return ErrGraphNotFound
×
767
                }
×
768

769
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,568✔
770
                        // If this is the source key, then we skip this
1,439✔
771
                        // iteration as the value for this key is a pubKey
1,439✔
772
                        // rather than raw node information.
1,439✔
773
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
1,700✔
774
                                return nil
261✔
775
                        }
261✔
776

777
                        nodeReader := bytes.NewReader(nodeBytes)
1,178✔
778
                        node, err := deserializeLightningNode(nodeReader)
1,178✔
779
                        if err != nil {
1,178✔
780
                                return err
×
781
                        }
×
782

783
                        // Execute the callback, the transaction will abort if
784
                        // this returns an error.
785
                        return cb(tx, &node)
1,178✔
786
                })
787
        }
788

789
        return kvdb.View(c.db, traversal, func() {})
258✔
790
}
791

792
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the
793
// graph, executing the passed callback with each node encountered. If the
794
// callback returns an error, then the transaction is aborted and the iteration
795
// stops early.
796
func (c *ChannelGraph) ForEachNodeCacheable(cb func(route.Vertex,
797
        *lnwire.FeatureVector) error) error {
141✔
798

141✔
799
        traversal := func(tx kvdb.RTx) error {
282✔
800
                // First grab the nodes bucket which stores the mapping from
141✔
801
                // pubKey to node information.
141✔
802
                nodes := tx.ReadBucket(nodeBucket)
141✔
803
                if nodes == nil {
141✔
804
                        return ErrGraphNotFound
×
805
                }
×
806

807
                return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
543✔
808
                        // If this is the source key, then we skip this
402✔
809
                        // iteration as the value for this key is a pubKey
402✔
810
                        // rather than raw node information.
402✔
811
                        if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
684✔
812
                                return nil
282✔
813
                        }
282✔
814

815
                        nodeReader := bytes.NewReader(nodeBytes)
120✔
816
                        node, features, err := deserializeLightningNodeCacheable( //nolint:ll
120✔
817
                                nodeReader,
120✔
818
                        )
120✔
819
                        if err != nil {
120✔
820
                                return err
×
821
                        }
×
822

823
                        // Execute the callback, the transaction will abort if
824
                        // this returns an error.
825
                        return cb(node, features)
120✔
826
                })
827
        }
828

829
        return kvdb.View(c.db, traversal, func() {})
282✔
830
}
831

832
// SourceNode returns the source node of the graph. The source node is treated
833
// as the center node within a star-graph. This method may be used to kick off
834
// a path finding algorithm in order to explore the reachability of another
835
// node based off the source node.
836
func (c *ChannelGraph) SourceNode() (*models.LightningNode, error) {
231✔
837
        var source *models.LightningNode
231✔
838
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
462✔
839
                // First grab the nodes bucket which stores the mapping from
231✔
840
                // pubKey to node information.
231✔
841
                nodes := tx.ReadBucket(nodeBucket)
231✔
842
                if nodes == nil {
231✔
843
                        return ErrGraphNotFound
×
844
                }
×
845

846
                node, err := c.sourceNode(nodes)
231✔
847
                if err != nil {
232✔
848
                        return err
1✔
849
                }
1✔
850
                source = node
230✔
851

230✔
852
                return nil
230✔
853
        }, func() {
231✔
854
                source = nil
231✔
855
        })
231✔
856
        if err != nil {
232✔
857
                return nil, err
1✔
858
        }
1✔
859

860
        return source, nil
230✔
861
}
862

863
// sourceNode uses an existing database transaction and returns the source node
864
// of the graph. The source node is treated as the center node within a
865
// star-graph. This method may be used to kick off a path finding algorithm in
866
// order to explore the reachability of another node based off the source node.
867
func (c *ChannelGraph) sourceNode(nodes kvdb.RBucket) (*models.LightningNode,
868
        error) {
495✔
869

495✔
870
        selfPub := nodes.Get(sourceKey)
495✔
871
        if selfPub == nil {
496✔
872
                return nil, ErrSourceNodeNotSet
1✔
873
        }
1✔
874

875
        // With the pubKey of the source node retrieved, we're able to
876
        // fetch the full node information.
877
        node, err := fetchLightningNode(nodes, selfPub)
494✔
878
        if err != nil {
494✔
879
                return nil, err
×
880
        }
×
881

882
        return &node, nil
494✔
883
}
884

885
// SetSourceNode sets the source node within the graph database. The source
886
// node is to be used as the center of a star-graph within path finding
887
// algorithms.
888
func (c *ChannelGraph) SetSourceNode(node *models.LightningNode) error {
117✔
889
        nodePubBytes := node.PubKeyBytes[:]
117✔
890

117✔
891
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
234✔
892
                // First grab the nodes bucket which stores the mapping from
117✔
893
                // pubKey to node information.
117✔
894
                nodes, err := tx.CreateTopLevelBucket(nodeBucket)
117✔
895
                if err != nil {
117✔
896
                        return err
×
897
                }
×
898

899
                // Next we create the mapping from source to the targeted
900
                // public key.
901
                if err := nodes.Put(sourceKey, nodePubBytes); err != nil {
117✔
902
                        return err
×
903
                }
×
904

905
                // Finally, we commit the information of the lightning node
906
                // itself.
907
                return addLightningNode(tx, node)
117✔
908
        }, func() {})
117✔
909
}
910

911
// AddLightningNode adds a vertex/node to the graph database. If the node is not
912
// in the database from before, this will add a new, unconnected one to the
913
// graph. If it is present from before, this will update that node's
914
// information. Note that this method is expected to only be called to update an
915
// already present node from a node announcement, or to insert a node found in a
916
// channel update.
917
//
918
// TODO(roasbeef): also need sig of announcement
919
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
920
        op ...batch.SchedulerOption) error {
800✔
921

800✔
922
        r := &batch.Request{
800✔
923
                Update: func(tx kvdb.RwTx) error {
1,600✔
924
                        if c.graphCache != nil {
1,413✔
925
                                c.graphCache.AddNodeFeatures(
613✔
926
                                        node.PubKeyBytes, node.Features,
613✔
927
                                )
613✔
928
                        }
613✔
929

930
                        return addLightningNode(tx, node)
800✔
931
                },
932
        }
933

934
        for _, f := range op {
800✔
935
                f(r)
×
936
        }
×
937

938
        return c.nodeScheduler.Execute(r)
800✔
939
}
940

941
func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error {
989✔
942
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
989✔
943
        if err != nil {
989✔
944
                return err
×
945
        }
×
946

947
        aliases, err := nodes.CreateBucketIfNotExists(aliasIndexBucket)
989✔
948
        if err != nil {
989✔
949
                return err
×
950
        }
×
951

952
        updateIndex, err := nodes.CreateBucketIfNotExists(
989✔
953
                nodeUpdateIndexBucket,
989✔
954
        )
989✔
955
        if err != nil {
989✔
956
                return err
×
957
        }
×
958

959
        return putLightningNode(nodes, aliases, updateIndex, node)
989✔
960
}
961

962
// LookupAlias attempts to return the alias as advertised by the target node.
963
// TODO(roasbeef): currently assumes that aliases are unique...
964
func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
2✔
965
        var alias string
2✔
966

2✔
967
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
968
                nodes := tx.ReadBucket(nodeBucket)
2✔
969
                if nodes == nil {
2✔
970
                        return ErrGraphNodesNotFound
×
971
                }
×
972

973
                aliases := nodes.NestedReadBucket(aliasIndexBucket)
2✔
974
                if aliases == nil {
2✔
975
                        return ErrGraphNodesNotFound
×
976
                }
×
977

978
                nodePub := pub.SerializeCompressed()
2✔
979
                a := aliases.Get(nodePub)
2✔
980
                if a == nil {
3✔
981
                        return ErrNodeAliasNotFound
1✔
982
                }
1✔
983

984
                // TODO(roasbeef): should actually be using the utf-8
985
                // package...
986
                alias = string(a)
1✔
987
                return nil
1✔
988
        }, func() {
2✔
989
                alias = ""
2✔
990
        })
2✔
991
        if err != nil {
3✔
992
                return "", err
1✔
993
        }
1✔
994

995
        return alias, nil
1✔
996
}
997

998
// DeleteLightningNode starts a new database transaction to remove a vertex/node
999
// from the database according to the node's public key.
1000
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
3✔
1001
        // TODO(roasbeef): ensure dangling edges are removed...
3✔
1002
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
6✔
1003
                nodes := tx.ReadWriteBucket(nodeBucket)
3✔
1004
                if nodes == nil {
3✔
1005
                        return ErrGraphNodeNotFound
×
1006
                }
×
1007

1008
                if c.graphCache != nil {
6✔
1009
                        c.graphCache.RemoveNode(nodePub)
3✔
1010
                }
3✔
1011

1012
                return c.deleteLightningNode(nodes, nodePub[:])
3✔
1013
        }, func() {})
3✔
1014
}
1015

1016
// deleteLightningNode uses an existing database transaction to remove a
1017
// vertex/node from the database according to the node's public key.
1018
func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket,
1019
        compressedPubKey []byte) error {
65✔
1020

65✔
1021
        aliases := nodes.NestedReadWriteBucket(aliasIndexBucket)
65✔
1022
        if aliases == nil {
65✔
1023
                return ErrGraphNodesNotFound
×
1024
        }
×
1025

1026
        if err := aliases.Delete(compressedPubKey); err != nil {
65✔
1027
                return err
×
1028
        }
×
1029

1030
        // Before we delete the node, we'll fetch its current state so we can
1031
        // determine when its last update was to clear out the node update
1032
        // index.
1033
        node, err := fetchLightningNode(nodes, compressedPubKey)
65✔
1034
        if err != nil {
65✔
1035
                return err
×
1036
        }
×
1037

1038
        if err := nodes.Delete(compressedPubKey); err != nil {
65✔
1039
                return err
×
1040
        }
×
1041

1042
        // Finally, we'll delete the index entry for the node within the
1043
        // nodeUpdateIndexBucket as this node is no longer active, so we don't
1044
        // need to track its last update.
1045
        nodeUpdateIndex := nodes.NestedReadWriteBucket(nodeUpdateIndexBucket)
65✔
1046
        if nodeUpdateIndex == nil {
65✔
1047
                return ErrGraphNodesNotFound
×
1048
        }
×
1049

1050
        // In order to delete the entry, we'll need to reconstruct the key for
1051
        // its last update.
1052
        updateUnix := uint64(node.LastUpdate.Unix())
65✔
1053
        var indexKey [8 + 33]byte
65✔
1054
        byteOrder.PutUint64(indexKey[:8], updateUnix)
65✔
1055
        copy(indexKey[8:], compressedPubKey)
65✔
1056

65✔
1057
        return nodeUpdateIndex.Delete(indexKey[:])
65✔
1058
}
1059

1060
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
1061
// undirected edge from the two target nodes are created. The information stored
1062
// denotes the static attributes of the channel, such as the channelID, the keys
1063
// involved in creation of the channel, and the set of features that the channel
1064
// supports. The chanPoint and chanID are used to uniquely identify the edge
1065
// globally within the database.
1066
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
1067
        op ...batch.SchedulerOption) error {
1,722✔
1068

1,722✔
1069
        var alreadyExists bool
1,722✔
1070
        r := &batch.Request{
1,722✔
1071
                Reset: func() {
3,444✔
1072
                        alreadyExists = false
1,722✔
1073
                },
1,722✔
1074
                Update: func(tx kvdb.RwTx) error {
1,722✔
1075
                        err := c.addChannelEdge(tx, edge)
1,722✔
1076

1,722✔
1077
                        // Silence ErrEdgeAlreadyExist so that the batch can
1,722✔
1078
                        // succeed, but propagate the error via local state.
1,722✔
1079
                        if err == ErrEdgeAlreadyExist {
1,956✔
1080
                                alreadyExists = true
234✔
1081
                                return nil
234✔
1082
                        }
234✔
1083

1084
                        return err
1,488✔
1085
                },
1086
                OnCommit: func(err error) error {
1,722✔
1087
                        switch {
1,722✔
1088
                        case err != nil:
×
1089
                                return err
×
1090
                        case alreadyExists:
234✔
1091
                                return ErrEdgeAlreadyExist
234✔
1092
                        default:
1,488✔
1093
                                c.rejectCache.remove(edge.ChannelID)
1,488✔
1094
                                c.chanCache.remove(edge.ChannelID)
1,488✔
1095
                                return nil
1,488✔
1096
                        }
1097
                },
1098
        }
1099

1100
        for _, f := range op {
1,722✔
1101
                if f == nil {
×
1102
                        return fmt.Errorf("nil scheduler option was used")
×
1103
                }
×
1104

1105
                f(r)
×
1106
        }
1107

1108
        return c.chanScheduler.Execute(r)
1,722✔
1109
}
1110

1111
// addChannelEdge is the private form of AddChannelEdge that allows callers to
1112
// utilize an existing db transaction.
1113
func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
1114
        edge *models.ChannelEdgeInfo) error {
1,722✔
1115

1,722✔
1116
        // Construct the channel's primary key which is the 8-byte channel ID.
1,722✔
1117
        var chanKey [8]byte
1,722✔
1118
        binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
1,722✔
1119

1,722✔
1120
        nodes, err := tx.CreateTopLevelBucket(nodeBucket)
1,722✔
1121
        if err != nil {
1,722✔
1122
                return err
×
1123
        }
×
1124
        edges, err := tx.CreateTopLevelBucket(edgeBucket)
1,722✔
1125
        if err != nil {
1,722✔
1126
                return err
×
1127
        }
×
1128
        edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
1,722✔
1129
        if err != nil {
1,722✔
1130
                return err
×
1131
        }
×
1132
        chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
1,722✔
1133
        if err != nil {
1,722✔
1134
                return err
×
1135
        }
×
1136

1137
        // First, attempt to check if this edge has already been created. If
1138
        // so, then we can exit early as this method is meant to be idempotent.
1139
        if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
1,956✔
1140
                return ErrEdgeAlreadyExist
234✔
1141
        }
234✔
1142

1143
        if c.graphCache != nil {
2,786✔
1144
                c.graphCache.AddChannel(edge, nil, nil)
1,298✔
1145
        }
1,298✔
1146

1147
        // Before we insert the channel into the database, we'll ensure that
1148
        // both nodes already exist in the channel graph. If either node
1149
        // doesn't, then we'll insert a "shell" node that just includes its
1150
        // public key, so subsequent validation and queries can work properly.
1151
        _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
1,488✔
1152
        switch {
1,488✔
1153
        case node1Err == ErrGraphNodeNotFound:
18✔
1154
                node1Shell := models.LightningNode{
18✔
1155
                        PubKeyBytes:          edge.NodeKey1Bytes,
18✔
1156
                        HaveNodeAnnouncement: false,
18✔
1157
                }
18✔
1158
                err := addLightningNode(tx, &node1Shell)
18✔
1159
                if err != nil {
18✔
1160
                        return fmt.Errorf("unable to create shell node "+
×
1161
                                "for: %x: %w", edge.NodeKey1Bytes, err)
×
1162
                }
×
1163
        case node1Err != nil:
×
1164
                return node1Err
×
1165
        }
1166

1167
        _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
1,488✔
1168
        switch {
1,488✔
1169
        case node2Err == ErrGraphNodeNotFound:
54✔
1170
                node2Shell := models.LightningNode{
54✔
1171
                        PubKeyBytes:          edge.NodeKey2Bytes,
54✔
1172
                        HaveNodeAnnouncement: false,
54✔
1173
                }
54✔
1174
                err := addLightningNode(tx, &node2Shell)
54✔
1175
                if err != nil {
54✔
1176
                        return fmt.Errorf("unable to create shell node "+
×
1177
                                "for: %x: %w", edge.NodeKey2Bytes, err)
×
1178
                }
×
1179
        case node2Err != nil:
×
1180
                return node2Err
×
1181
        }
1182

1183
        // If the edge hasn't been created yet, then we'll first add it to the
1184
        // edge index in order to associate the edge between two nodes and also
1185
        // store the static components of the channel.
1186
        if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
1,488✔
1187
                return err
×
1188
        }
×
1189

1190
        // Mark edge policies for both sides as unknown. This is to enable
1191
        // efficient incoming channel lookup for a node.
1192
        keys := []*[33]byte{
1,488✔
1193
                &edge.NodeKey1Bytes,
1,488✔
1194
                &edge.NodeKey2Bytes,
1,488✔
1195
        }
1,488✔
1196
        for _, key := range keys {
4,464✔
1197
                err := putChanEdgePolicyUnknown(edges, edge.ChannelID, key[:])
2,976✔
1198
                if err != nil {
2,976✔
1199
                        return err
×
1200
                }
×
1201
        }
1202

1203
        // Finally we add it to the channel index which maps channel points
1204
        // (outpoints) to the shorter channel ID's.
1205
        var b bytes.Buffer
1,488✔
1206
        if err := WriteOutpoint(&b, &edge.ChannelPoint); err != nil {
1,488✔
1207
                return err
×
1208
        }
×
1209
        return chanIndex.Put(b.Bytes(), chanKey[:])
1,488✔
1210
}
1211

1212
// HasChannelEdge returns true if the database knows of a channel edge with the
1213
// passed channel ID, and false otherwise. If an edge with that ID is found
1214
// within the graph, then two time stamps representing the last time the edge
1215
// was updated for both directed edges are returned along with the boolean. If
1216
// it is not found, then the zombie index is checked and its result is returned
1217
// as the second boolean.
1218
func (c *ChannelGraph) HasChannelEdge(
1219
        chanID uint64) (time.Time, time.Time, bool, bool, error) {
213✔
1220

213✔
1221
        var (
213✔
1222
                upd1Time time.Time
213✔
1223
                upd2Time time.Time
213✔
1224
                exists   bool
213✔
1225
                isZombie bool
213✔
1226
        )
213✔
1227

213✔
1228
        // We'll query the cache with the shared lock held to allow multiple
213✔
1229
        // readers to access values in the cache concurrently if they exist.
213✔
1230
        c.cacheMu.RLock()
213✔
1231
        if entry, ok := c.rejectCache.get(chanID); ok {
279✔
1232
                c.cacheMu.RUnlock()
66✔
1233
                upd1Time = time.Unix(entry.upd1Time, 0)
66✔
1234
                upd2Time = time.Unix(entry.upd2Time, 0)
66✔
1235
                exists, isZombie = entry.flags.unpack()
66✔
1236
                return upd1Time, upd2Time, exists, isZombie, nil
66✔
1237
        }
66✔
1238
        c.cacheMu.RUnlock()
147✔
1239

147✔
1240
        c.cacheMu.Lock()
147✔
1241
        defer c.cacheMu.Unlock()
147✔
1242

147✔
1243
        // The item was not found with the shared lock, so we'll acquire the
147✔
1244
        // exclusive lock and check the cache again in case another method added
147✔
1245
        // the entry to the cache while no lock was held.
147✔
1246
        if entry, ok := c.rejectCache.get(chanID); ok {
157✔
1247
                upd1Time = time.Unix(entry.upd1Time, 0)
10✔
1248
                upd2Time = time.Unix(entry.upd2Time, 0)
10✔
1249
                exists, isZombie = entry.flags.unpack()
10✔
1250
                return upd1Time, upd2Time, exists, isZombie, nil
10✔
1251
        }
10✔
1252

1253
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
274✔
1254
                edges := tx.ReadBucket(edgeBucket)
137✔
1255
                if edges == nil {
137✔
1256
                        return ErrGraphNoEdgesFound
×
1257
                }
×
1258
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
137✔
1259
                if edgeIndex == nil {
137✔
1260
                        return ErrGraphNoEdgesFound
×
1261
                }
×
1262

1263
                var channelID [8]byte
137✔
1264
                byteOrder.PutUint64(channelID[:], chanID)
137✔
1265

137✔
1266
                // If the edge doesn't exist, then we'll also check our zombie
137✔
1267
                // index.
137✔
1268
                if edgeIndex.Get(channelID[:]) == nil {
222✔
1269
                        exists = false
85✔
1270
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
85✔
1271
                        if zombieIndex != nil {
170✔
1272
                                isZombie, _, _ = isZombieEdge(
85✔
1273
                                        zombieIndex, chanID,
85✔
1274
                                )
85✔
1275
                        }
85✔
1276

1277
                        return nil
85✔
1278
                }
1279

1280
                exists = true
52✔
1281
                isZombie = false
52✔
1282

52✔
1283
                // If the channel has been found in the graph, then retrieve
52✔
1284
                // the edges itself so we can return the last updated
52✔
1285
                // timestamps.
52✔
1286
                nodes := tx.ReadBucket(nodeBucket)
52✔
1287
                if nodes == nil {
52✔
1288
                        return ErrGraphNodeNotFound
×
1289
                }
×
1290

1291
                e1, e2, err := fetchChanEdgePolicies(
52✔
1292
                        edgeIndex, edges, channelID[:],
52✔
1293
                )
52✔
1294
                if err != nil {
52✔
1295
                        return err
×
1296
                }
×
1297

1298
                // As we may have only one of the edges populated, only set the
1299
                // update time if the edge was found in the database.
1300
                if e1 != nil {
70✔
1301
                        upd1Time = e1.LastUpdate
18✔
1302
                }
18✔
1303
                if e2 != nil {
68✔
1304
                        upd2Time = e2.LastUpdate
16✔
1305
                }
16✔
1306

1307
                return nil
52✔
1308
        }, func() {}); err != nil {
137✔
1309
                return time.Time{}, time.Time{}, exists, isZombie, err
×
1310
        }
×
1311

1312
        c.rejectCache.insert(chanID, rejectCacheEntry{
137✔
1313
                upd1Time: upd1Time.Unix(),
137✔
1314
                upd2Time: upd2Time.Unix(),
137✔
1315
                flags:    packRejectFlags(exists, isZombie),
137✔
1316
        })
137✔
1317

137✔
1318
        return upd1Time, upd2Time, exists, isZombie, nil
137✔
1319
}
1320

1321
// AddEdgeProof sets the proof of an existing edge in the graph database.
1322
func (c *ChannelGraph) AddEdgeProof(chanID lnwire.ShortChannelID,
1323
        proof *models.ChannelAuthProof) error {
1✔
1324

1✔
1325
        // Construct the channel's primary key which is the 8-byte channel ID.
1✔
1326
        var chanKey [8]byte
1✔
1327
        binary.BigEndian.PutUint64(chanKey[:], chanID.ToUint64())
1✔
1328

1✔
1329
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
1330
                edges := tx.ReadWriteBucket(edgeBucket)
1✔
1331
                if edges == nil {
1✔
1332
                        return ErrEdgeNotFound
×
1333
                }
×
1334

1335
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
1✔
1336
                if edgeIndex == nil {
1✔
1337
                        return ErrEdgeNotFound
×
1338
                }
×
1339

1340
                edge, err := fetchChanEdgeInfo(edgeIndex, chanKey[:])
1✔
1341
                if err != nil {
1✔
1342
                        return err
×
1343
                }
×
1344

1345
                edge.AuthProof = proof
1✔
1346

1✔
1347
                return putChanEdgeInfo(edgeIndex, &edge, chanKey)
1✔
1348
        }, func() {})
1✔
1349
}
1350

1351
const (
1352
        // pruneTipBytes is the total size of the value which stores a prune
1353
        // entry of the graph in the prune log. The "prune tip" is the last
1354
        // entry in the prune log, and indicates if the channel graph is in
1355
        // sync with the current UTXO state. The structure of the value
1356
        // is: blockHash, taking 32 bytes total.
1357
        pruneTipBytes = 32
1358
)
1359

1360
// PruneGraph prunes newly closed channels from the channel graph in response
1361
// to a new block being solved on the network. Any transactions which spend the
1362
// funding output of any known channels within he graph will be deleted.
1363
// Additionally, the "prune tip", or the last block which has been used to
1364
// prune the graph is stored so callers can ensure the graph is fully in sync
1365
// with the current UTXO state. A slice of channels that have been closed by
1366
// the target block are returned if the function succeeds without error.
1367
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
1368
        blockHash *chainhash.Hash, blockHeight uint32) (
1369
        []*models.ChannelEdgeInfo, error) {
241✔
1370

241✔
1371
        c.cacheMu.Lock()
241✔
1372
        defer c.cacheMu.Unlock()
241✔
1373

241✔
1374
        var chansClosed []*models.ChannelEdgeInfo
241✔
1375

241✔
1376
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
482✔
1377
                // First grab the edges bucket which houses the information
241✔
1378
                // we'd like to delete
241✔
1379
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
241✔
1380
                if err != nil {
241✔
1381
                        return err
×
1382
                }
×
1383

1384
                // Next grab the two edge indexes which will also need to be
1385
                // updated.
1386
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
241✔
1387
                if err != nil {
241✔
1388
                        return err
×
1389
                }
×
1390
                chanIndex, err := edges.CreateBucketIfNotExists(
241✔
1391
                        channelPointBucket,
241✔
1392
                )
241✔
1393
                if err != nil {
241✔
1394
                        return err
×
1395
                }
×
1396
                nodes := tx.ReadWriteBucket(nodeBucket)
241✔
1397
                if nodes == nil {
241✔
1398
                        return ErrSourceNodeNotSet
×
1399
                }
×
1400
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
241✔
1401
                if err != nil {
241✔
1402
                        return err
×
1403
                }
×
1404

1405
                // For each of the outpoints that have been spent within the
1406
                // block, we attempt to delete them from the graph as if that
1407
                // outpoint was a channel, then it has now been closed.
1408
                for _, chanPoint := range spentOutputs {
397✔
1409
                        // TODO(roasbeef): load channel bloom filter, continue
156✔
1410
                        // if NOT if filter
156✔
1411

156✔
1412
                        var opBytes bytes.Buffer
156✔
1413
                        err := WriteOutpoint(&opBytes, chanPoint)
156✔
1414
                        if err != nil {
156✔
1415
                                return err
×
1416
                        }
×
1417

1418
                        // First attempt to see if the channel exists within
1419
                        // the database, if not, then we can exit early.
1420
                        chanID := chanIndex.Get(opBytes.Bytes())
156✔
1421
                        if chanID == nil {
282✔
1422
                                continue
126✔
1423
                        }
1424

1425
                        // However, if it does, then we'll read out the full
1426
                        // version so we can add it to the set of deleted
1427
                        // channels.
1428
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
30✔
1429
                        if err != nil {
30✔
1430
                                return err
×
1431
                        }
×
1432

1433
                        // Attempt to delete the channel, an ErrEdgeNotFound
1434
                        // will be returned if that outpoint isn't known to be
1435
                        // a channel. If no error is returned, then a channel
1436
                        // was successfully pruned.
1437
                        err = c.delChannelEdgeUnsafe(
30✔
1438
                                edges, edgeIndex, chanIndex, zombieIndex,
30✔
1439
                                chanID, false, false,
30✔
1440
                        )
30✔
1441
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
30✔
1442
                                return err
×
1443
                        }
×
1444

1445
                        chansClosed = append(chansClosed, &edgeInfo)
30✔
1446
                }
1447

1448
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
241✔
1449
                if err != nil {
241✔
1450
                        return err
×
1451
                }
×
1452

1453
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
241✔
1454
                        pruneLogBucket,
241✔
1455
                )
241✔
1456
                if err != nil {
241✔
1457
                        return err
×
1458
                }
×
1459

1460
                // With the graph pruned, add a new entry to the prune log,
1461
                // which can be used to check if the graph is fully synced with
1462
                // the current UTXO state.
1463
                var blockHeightBytes [4]byte
241✔
1464
                byteOrder.PutUint32(blockHeightBytes[:], blockHeight)
241✔
1465

241✔
1466
                var newTip [pruneTipBytes]byte
241✔
1467
                copy(newTip[:], blockHash[:])
241✔
1468

241✔
1469
                err = pruneBucket.Put(blockHeightBytes[:], newTip[:])
241✔
1470
                if err != nil {
241✔
1471
                        return err
×
1472
                }
×
1473

1474
                // Now that the graph has been pruned, we'll also attempt to
1475
                // prune any nodes that have had a channel closed within the
1476
                // latest block.
1477
                return c.pruneGraphNodes(nodes, edgeIndex)
241✔
1478
        }, func() {
241✔
1479
                chansClosed = nil
241✔
1480
        })
241✔
1481
        if err != nil {
241✔
1482
                return nil, err
×
1483
        }
×
1484

1485
        for _, channel := range chansClosed {
271✔
1486
                c.rejectCache.remove(channel.ChannelID)
30✔
1487
                c.chanCache.remove(channel.ChannelID)
30✔
1488
        }
30✔
1489

1490
        if c.graphCache != nil {
482✔
1491
                log.Debugf("Pruned graph, cache now has %s",
241✔
1492
                        c.graphCache.Stats())
241✔
1493
        }
241✔
1494

1495
        return chansClosed, nil
241✔
1496
}
1497

1498
// PruneGraphNodes is a garbage collection method which attempts to prune out
1499
// any nodes from the channel graph that are currently unconnected. This ensure
1500
// that we only maintain a graph of reachable nodes. In the event that a pruned
1501
// node gains more channels, it will be re-added back to the graph.
1502
func (c *ChannelGraph) PruneGraphNodes() error {
23✔
1503
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
46✔
1504
                nodes := tx.ReadWriteBucket(nodeBucket)
23✔
1505
                if nodes == nil {
23✔
1506
                        return ErrGraphNodesNotFound
×
1507
                }
×
1508
                edges := tx.ReadWriteBucket(edgeBucket)
23✔
1509
                if edges == nil {
23✔
1510
                        return ErrGraphNotFound
×
1511
                }
×
1512
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
23✔
1513
                if edgeIndex == nil {
23✔
1514
                        return ErrGraphNoEdgesFound
×
1515
                }
×
1516

1517
                return c.pruneGraphNodes(nodes, edgeIndex)
23✔
1518
        }, func() {})
23✔
1519
}
1520

1521
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
1522
// channel closed within the current block. If the node still has existing
1523
// channels in the graph, this will act as a no-op.
1524
func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
1525
        edgeIndex kvdb.RwBucket) error {
264✔
1526

264✔
1527
        log.Trace("Pruning nodes from graph with no open channels")
264✔
1528

264✔
1529
        // We'll retrieve the graph's source node to ensure we don't remove it
264✔
1530
        // even if it no longer has any open channels.
264✔
1531
        sourceNode, err := c.sourceNode(nodes)
264✔
1532
        if err != nil {
264✔
1533
                return err
×
1534
        }
×
1535

1536
        // We'll use this map to keep count the number of references to a node
1537
        // in the graph. A node should only be removed once it has no more
1538
        // references in the graph.
1539
        nodeRefCounts := make(map[[33]byte]int)
264✔
1540
        err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
1,574✔
1541
                // If this is the source key, then we skip this
1,310✔
1542
                // iteration as the value for this key is a pubKey
1,310✔
1543
                // rather than raw node information.
1,310✔
1544
                if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
2,102✔
1545
                        return nil
792✔
1546
                }
792✔
1547

1548
                var nodePub [33]byte
518✔
1549
                copy(nodePub[:], pubKey)
518✔
1550
                nodeRefCounts[nodePub] = 0
518✔
1551

518✔
1552
                return nil
518✔
1553
        })
1554
        if err != nil {
264✔
1555
                return err
×
1556
        }
×
1557

1558
        // To ensure we never delete the source node, we'll start off by
1559
        // bumping its ref count to 1.
1560
        nodeRefCounts[sourceNode.PubKeyBytes] = 1
264✔
1561

264✔
1562
        // Next, we'll run through the edgeIndex which maps a channel ID to the
264✔
1563
        // edge info. We'll use this scan to populate our reference count map
264✔
1564
        // above.
264✔
1565
        err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
478✔
1566
                // The first 66 bytes of the edge info contain the pubkeys of
214✔
1567
                // the nodes that this edge attaches. We'll extract them, and
214✔
1568
                // add them to the ref count map.
214✔
1569
                var node1, node2 [33]byte
214✔
1570
                copy(node1[:], edgeInfoBytes[:33])
214✔
1571
                copy(node2[:], edgeInfoBytes[33:])
214✔
1572

214✔
1573
                // With the nodes extracted, we'll increase the ref count of
214✔
1574
                // each of the nodes.
214✔
1575
                nodeRefCounts[node1]++
214✔
1576
                nodeRefCounts[node2]++
214✔
1577

214✔
1578
                return nil
214✔
1579
        })
214✔
1580
        if err != nil {
264✔
1581
                return err
×
1582
        }
×
1583

1584
        // Finally, we'll make a second pass over the set of nodes, and delete
1585
        // any nodes that have a ref count of zero.
1586
        var numNodesPruned int
264✔
1587
        for nodePubKey, refCount := range nodeRefCounts {
782✔
1588
                // If the ref count of the node isn't zero, then we can safely
518✔
1589
                // skip it as it still has edges to or from it within the
518✔
1590
                // graph.
518✔
1591
                if refCount != 0 {
974✔
1592
                        continue
456✔
1593
                }
1594

1595
                if c.graphCache != nil {
124✔
1596
                        c.graphCache.RemoveNode(nodePubKey)
62✔
1597
                }
62✔
1598

1599
                // If we reach this point, then there are no longer any edges
1600
                // that connect this node, so we can delete it.
1601
                err := c.deleteLightningNode(nodes, nodePubKey[:])
62✔
1602
                if err != nil {
62✔
1603
                        if errors.Is(err, ErrGraphNodeNotFound) ||
×
1604
                                errors.Is(err, ErrGraphNodesNotFound) {
×
1605

×
1606
                                log.Warnf("Unable to prune node %x from the "+
×
1607
                                        "graph: %v", nodePubKey, err)
×
1608
                                continue
×
1609
                        }
1610

1611
                        return err
×
1612
                }
1613

1614
                log.Infof("Pruned unconnected node %x from channel graph",
62✔
1615
                        nodePubKey[:])
62✔
1616

62✔
1617
                numNodesPruned++
62✔
1618
        }
1619

1620
        if numNodesPruned > 0 {
310✔
1621
                log.Infof("Pruned %v unconnected nodes from the channel graph",
46✔
1622
                        numNodesPruned)
46✔
1623
        }
46✔
1624

1625
        return nil
264✔
1626
}
1627

1628
// DisconnectBlockAtHeight is used to indicate that the block specified
1629
// by the passed height has been disconnected from the main chain. This
1630
// will "rewind" the graph back to the height below, deleting channels
1631
// that are no longer confirmed from the graph. The prune log will be
1632
// set to the last prune height valid for the remaining chain.
1633
// Channels that were removed from the graph resulting from the
1634
// disconnected block are returned.
1635
func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) (
1636
        []*models.ChannelEdgeInfo, error) {
153✔
1637

153✔
1638
        // Every channel having a ShortChannelID starting at 'height'
153✔
1639
        // will no longer be confirmed.
153✔
1640
        startShortChanID := lnwire.ShortChannelID{
153✔
1641
                BlockHeight: height,
153✔
1642
        }
153✔
1643

153✔
1644
        // Delete everything after this height from the db up until the
153✔
1645
        // SCID alias range.
153✔
1646
        endShortChanID := aliasmgr.StartingAlias
153✔
1647

153✔
1648
        // The block height will be the 3 first bytes of the channel IDs.
153✔
1649
        var chanIDStart [8]byte
153✔
1650
        byteOrder.PutUint64(chanIDStart[:], startShortChanID.ToUint64())
153✔
1651
        var chanIDEnd [8]byte
153✔
1652
        byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
153✔
1653

153✔
1654
        c.cacheMu.Lock()
153✔
1655
        defer c.cacheMu.Unlock()
153✔
1656

153✔
1657
        // Keep track of the channels that are removed from the graph.
153✔
1658
        var removedChans []*models.ChannelEdgeInfo
153✔
1659

153✔
1660
        if err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
306✔
1661
                edges, err := tx.CreateTopLevelBucket(edgeBucket)
153✔
1662
                if err != nil {
153✔
1663
                        return err
×
1664
                }
×
1665
                edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
153✔
1666
                if err != nil {
153✔
1667
                        return err
×
1668
                }
×
1669
                chanIndex, err := edges.CreateBucketIfNotExists(
153✔
1670
                        channelPointBucket,
153✔
1671
                )
153✔
1672
                if err != nil {
153✔
1673
                        return err
×
1674
                }
×
1675
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
153✔
1676
                if err != nil {
153✔
1677
                        return err
×
1678
                }
×
1679

1680
                // Scan from chanIDStart to chanIDEnd, deleting every
1681
                // found edge.
1682
                // NOTE: we must delete the edges after the cursor loop, since
1683
                // modifying the bucket while traversing is not safe.
1684
                // NOTE: We use a < comparison in bytes.Compare instead of <=
1685
                // so that the StartingAlias itself isn't deleted.
1686
                var keys [][]byte
153✔
1687
                cursor := edgeIndex.ReadWriteCursor()
153✔
1688

153✔
1689
                //nolint:ll
153✔
1690
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
153✔
1691
                        bytes.Compare(k, chanIDEnd[:]) < 0; k, v = cursor.Next() {
245✔
1692
                        edgeInfoReader := bytes.NewReader(v)
92✔
1693
                        edgeInfo, err := deserializeChanEdgeInfo(edgeInfoReader)
92✔
1694
                        if err != nil {
92✔
1695
                                return err
×
1696
                        }
×
1697

1698
                        keys = append(keys, k)
92✔
1699
                        removedChans = append(removedChans, &edgeInfo)
92✔
1700
                }
1701

1702
                for _, k := range keys {
245✔
1703
                        err = c.delChannelEdgeUnsafe(
92✔
1704
                                edges, edgeIndex, chanIndex, zombieIndex,
92✔
1705
                                k, false, false,
92✔
1706
                        )
92✔
1707
                        if err != nil && !errors.Is(err, ErrEdgeNotFound) {
92✔
1708
                                return err
×
1709
                        }
×
1710
                }
1711

1712
                // Delete all the entries in the prune log having a height
1713
                // greater or equal to the block disconnected.
1714
                metaBucket, err := tx.CreateTopLevelBucket(graphMetaBucket)
153✔
1715
                if err != nil {
153✔
1716
                        return err
×
1717
                }
×
1718

1719
                pruneBucket, err := metaBucket.CreateBucketIfNotExists(
153✔
1720
                        pruneLogBucket,
153✔
1721
                )
153✔
1722
                if err != nil {
153✔
1723
                        return err
×
1724
                }
×
1725

1726
                var pruneKeyStart [4]byte
153✔
1727
                byteOrder.PutUint32(pruneKeyStart[:], height)
153✔
1728

153✔
1729
                var pruneKeyEnd [4]byte
153✔
1730
                byteOrder.PutUint32(pruneKeyEnd[:], math.MaxUint32)
153✔
1731

153✔
1732
                // To avoid modifying the bucket while traversing, we delete
153✔
1733
                // the keys in a second loop.
153✔
1734
                var pruneKeys [][]byte
153✔
1735
                pruneCursor := pruneBucket.ReadWriteCursor()
153✔
1736
                //nolint:ll
153✔
1737
                for k, _ := pruneCursor.Seek(pruneKeyStart[:]); k != nil &&
153✔
1738
                        bytes.Compare(k, pruneKeyEnd[:]) <= 0; k, _ = pruneCursor.Next() {
244✔
1739
                        pruneKeys = append(pruneKeys, k)
91✔
1740
                }
91✔
1741

1742
                for _, k := range pruneKeys {
244✔
1743
                        if err := pruneBucket.Delete(k); err != nil {
91✔
1744
                                return err
×
1745
                        }
×
1746
                }
1747

1748
                return nil
153✔
1749
        }, func() {
153✔
1750
                removedChans = nil
153✔
1751
        }); err != nil {
153✔
1752
                return nil, err
×
1753
        }
×
1754

1755
        for _, channel := range removedChans {
245✔
1756
                c.rejectCache.remove(channel.ChannelID)
92✔
1757
                c.chanCache.remove(channel.ChannelID)
92✔
1758
        }
92✔
1759

1760
        return removedChans, nil
153✔
1761
}
1762

1763
// PruneTip returns the block height and hash of the latest block that has been
1764
// used to prune channels in the graph. Knowing the "prune tip" allows callers
1765
// to tell if the graph is currently in sync with the current best known UTXO
1766
// state.
1767
func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) {
53✔
1768
        var (
53✔
1769
                tipHash   chainhash.Hash
53✔
1770
                tipHeight uint32
53✔
1771
        )
53✔
1772

53✔
1773
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
106✔
1774
                graphMeta := tx.ReadBucket(graphMetaBucket)
53✔
1775
                if graphMeta == nil {
53✔
1776
                        return ErrGraphNotFound
×
1777
                }
×
1778
                pruneBucket := graphMeta.NestedReadBucket(pruneLogBucket)
53✔
1779
                if pruneBucket == nil {
53✔
1780
                        return ErrGraphNeverPruned
×
1781
                }
×
1782

1783
                pruneCursor := pruneBucket.ReadCursor()
53✔
1784

53✔
1785
                // The prune key with the largest block height will be our
53✔
1786
                // prune tip.
53✔
1787
                k, v := pruneCursor.Last()
53✔
1788
                if k == nil {
71✔
1789
                        return ErrGraphNeverPruned
18✔
1790
                }
18✔
1791

1792
                // Once we have the prune tip, the value will be the block hash,
1793
                // and the key the block height.
1794
                copy(tipHash[:], v[:])
35✔
1795
                tipHeight = byteOrder.Uint32(k[:])
35✔
1796

35✔
1797
                return nil
35✔
1798
        }, func() {})
53✔
1799
        if err != nil {
71✔
1800
                return nil, 0, err
18✔
1801
        }
18✔
1802

1803
        return &tipHash, tipHeight, nil
35✔
1804
}
1805

1806
// DeleteChannelEdges removes edges with the given channel IDs from the
1807
// database and marks them as zombies. This ensures that we're unable to re-add
1808
// it to our database once again. If an edge does not exist within the
1809
// database, then ErrEdgeNotFound will be returned. If strictZombiePruning is
1810
// true, then when we mark these edges as zombies, we'll set up the keys such
1811
// that we require the node that failed to send the fresh update to be the one
1812
// that resurrects the channel from its zombie state. The markZombie bool
1813
// denotes whether or not to mark the channel as a zombie.
1814
func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool,
1815
        chanIDs ...uint64) error {
135✔
1816

135✔
1817
        // TODO(roasbeef): possibly delete from node bucket if node has no more
135✔
1818
        // channels
135✔
1819
        // TODO(roasbeef): don't delete both edges?
135✔
1820

135✔
1821
        c.cacheMu.Lock()
135✔
1822
        defer c.cacheMu.Unlock()
135✔
1823

135✔
1824
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
270✔
1825
                edges := tx.ReadWriteBucket(edgeBucket)
135✔
1826
                if edges == nil {
135✔
1827
                        return ErrEdgeNotFound
×
1828
                }
×
1829
                edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
135✔
1830
                if edgeIndex == nil {
135✔
1831
                        return ErrEdgeNotFound
×
1832
                }
×
1833
                chanIndex := edges.NestedReadWriteBucket(channelPointBucket)
135✔
1834
                if chanIndex == nil {
135✔
1835
                        return ErrEdgeNotFound
×
1836
                }
×
1837
                nodes := tx.ReadWriteBucket(nodeBucket)
135✔
1838
                if nodes == nil {
135✔
1839
                        return ErrGraphNodeNotFound
×
1840
                }
×
1841
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
135✔
1842
                if err != nil {
135✔
1843
                        return err
×
1844
                }
×
1845

1846
                var rawChanID [8]byte
135✔
1847
                for _, chanID := range chanIDs {
220✔
1848
                        byteOrder.PutUint64(rawChanID[:], chanID)
85✔
1849
                        err := c.delChannelEdgeUnsafe(
85✔
1850
                                edges, edgeIndex, chanIndex, zombieIndex,
85✔
1851
                                rawChanID[:], markZombie, strictZombiePruning,
85✔
1852
                        )
85✔
1853
                        if err != nil {
144✔
1854
                                return err
59✔
1855
                        }
59✔
1856
                }
1857

1858
                return nil
76✔
1859
        }, func() {})
135✔
1860
        if err != nil {
194✔
1861
                return err
59✔
1862
        }
59✔
1863

1864
        for _, chanID := range chanIDs {
101✔
1865
                c.rejectCache.remove(chanID)
25✔
1866
                c.chanCache.remove(chanID)
25✔
1867
        }
25✔
1868

1869
        return nil
76✔
1870
}
1871

1872
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
1873
// passed channel point (outpoint). If the passed channel doesn't exist within
1874
// the database, then ErrEdgeNotFound is returned.
1875
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
1✔
1876
        var chanID uint64
1✔
1877
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2✔
1878
                var err error
1✔
1879
                chanID, err = getChanID(tx, chanPoint)
1✔
1880
                return err
1✔
1881
        }, func() {
2✔
1882
                chanID = 0
1✔
1883
        }); err != nil {
1✔
1884
                return 0, err
×
1885
        }
×
1886

1887
        return chanID, nil
1✔
1888
}
1889

1890
// getChanID returns the assigned channel ID for a given channel point.
1891
func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) {
1✔
1892
        var b bytes.Buffer
1✔
1893
        if err := WriteOutpoint(&b, chanPoint); err != nil {
1✔
1894
                return 0, err
×
1895
        }
×
1896

1897
        edges := tx.ReadBucket(edgeBucket)
1✔
1898
        if edges == nil {
1✔
1899
                return 0, ErrGraphNoEdgesFound
×
1900
        }
×
1901
        chanIndex := edges.NestedReadBucket(channelPointBucket)
1✔
1902
        if chanIndex == nil {
1✔
1903
                return 0, ErrGraphNoEdgesFound
×
1904
        }
×
1905

1906
        chanIDBytes := chanIndex.Get(b.Bytes())
1✔
1907
        if chanIDBytes == nil {
1✔
1908
                return 0, ErrEdgeNotFound
×
1909
        }
×
1910

1911
        chanID := byteOrder.Uint64(chanIDBytes)
1✔
1912

1✔
1913
        return chanID, nil
1✔
1914
}
1915

1916
// TODO(roasbeef): allow updates to use Batch?
1917

1918
// HighestChanID returns the "highest" known channel ID in the channel graph.
1919
// This represents the "newest" channel from the PoV of the chain. This method
1920
// can be used by peers to quickly determine if they're graphs are in sync.
1921
func (c *ChannelGraph) HighestChanID() (uint64, error) {
3✔
1922
        var cid uint64
3✔
1923

3✔
1924
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
6✔
1925
                edges := tx.ReadBucket(edgeBucket)
3✔
1926
                if edges == nil {
3✔
1927
                        return ErrGraphNoEdgesFound
×
1928
                }
×
1929
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
3✔
1930
                if edgeIndex == nil {
3✔
1931
                        return ErrGraphNoEdgesFound
×
1932
                }
×
1933

1934
                // In order to find the highest chan ID, we'll fetch a cursor
1935
                // and use that to seek to the "end" of our known rage.
1936
                cidCursor := edgeIndex.ReadCursor()
3✔
1937

3✔
1938
                lastChanID, _ := cidCursor.Last()
3✔
1939

3✔
1940
                // If there's no key, then this means that we don't actually
3✔
1941
                // know of any channels, so we'll return a predicable error.
3✔
1942
                if lastChanID == nil {
4✔
1943
                        return ErrGraphNoEdgesFound
1✔
1944
                }
1✔
1945

1946
                // Otherwise, we'll de serialize the channel ID and return it
1947
                // to the caller.
1948
                cid = byteOrder.Uint64(lastChanID)
2✔
1949
                return nil
2✔
1950
        }, func() {
3✔
1951
                cid = 0
3✔
1952
        })
3✔
1953
        if err != nil && err != ErrGraphNoEdgesFound {
3✔
1954
                return 0, err
×
1955
        }
×
1956

1957
        return cid, nil
3✔
1958
}
1959

1960
// ChannelEdge represents the complete set of information for a channel edge in
1961
// the known channel graph. This struct couples the core information of the
1962
// edge as well as each of the known advertised edge policies.
1963
type ChannelEdge struct {
1964
        // Info contains all the static information describing the channel.
1965
        Info *models.ChannelEdgeInfo
1966

1967
        // Policy1 points to the "first" edge policy of the channel containing
1968
        // the dynamic information required to properly route through the edge.
1969
        Policy1 *models.ChannelEdgePolicy
1970

1971
        // Policy2 points to the "second" edge policy of the channel containing
1972
        // the dynamic information required to properly route through the edge.
1973
        Policy2 *models.ChannelEdgePolicy
1974

1975
        // Node1 is "node 1" in the channel. This is the node that would have
1976
        // produced Policy1 if it exists.
1977
        Node1 *models.LightningNode
1978

1979
        // Node2 is "node 2" in the channel. This is the node that would have
1980
        // produced Policy2 if it exists.
1981
        Node2 *models.LightningNode
1982
}
1983

1984
// ChanUpdatesInHorizon returns all the known channel edges which have at least
1985
// one edge that has an update timestamp within the specified horizon.
1986
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
1987
        endTime time.Time) ([]ChannelEdge, error) {
150✔
1988

150✔
1989
        // To ensure we don't return duplicate ChannelEdges, we'll use an
150✔
1990
        // additional map to keep track of the edges already seen to prevent
150✔
1991
        // re-adding it.
150✔
1992
        var edgesSeen map[uint64]struct{}
150✔
1993
        var edgesToCache map[uint64]ChannelEdge
150✔
1994
        var edgesInHorizon []ChannelEdge
150✔
1995

150✔
1996
        c.cacheMu.Lock()
150✔
1997
        defer c.cacheMu.Unlock()
150✔
1998

150✔
1999
        var hits int
150✔
2000
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
300✔
2001
                edges := tx.ReadBucket(edgeBucket)
150✔
2002
                if edges == nil {
150✔
2003
                        return ErrGraphNoEdgesFound
×
2004
                }
×
2005
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
150✔
2006
                if edgeIndex == nil {
150✔
2007
                        return ErrGraphNoEdgesFound
×
2008
                }
×
2009
                edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
150✔
2010
                if edgeUpdateIndex == nil {
150✔
2011
                        return ErrGraphNoEdgesFound
×
2012
                }
×
2013

2014
                nodes := tx.ReadBucket(nodeBucket)
150✔
2015
                if nodes == nil {
150✔
2016
                        return ErrGraphNodesNotFound
×
2017
                }
×
2018

2019
                // We'll now obtain a cursor to perform a range query within
2020
                // the index to find all channels within the horizon.
2021
                updateCursor := edgeUpdateIndex.ReadCursor()
150✔
2022

150✔
2023
                var startTimeBytes, endTimeBytes [8 + 8]byte
150✔
2024
                byteOrder.PutUint64(
150✔
2025
                        startTimeBytes[:8], uint64(startTime.Unix()),
150✔
2026
                )
150✔
2027
                byteOrder.PutUint64(
150✔
2028
                        endTimeBytes[:8], uint64(endTime.Unix()),
150✔
2029
                )
150✔
2030

150✔
2031
                // With our start and end times constructed, we'll step through
150✔
2032
                // the index collecting the info and policy of each update of
150✔
2033
                // each channel that has a last update within the time range.
150✔
2034
                //
150✔
2035
                //nolint:ll
150✔
2036
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
150✔
2037
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
196✔
2038

46✔
2039
                        // We have a new eligible entry, so we'll slice of the
46✔
2040
                        // chan ID so we can query it in the DB.
46✔
2041
                        chanID := indexKey[8:]
46✔
2042

46✔
2043
                        // If we've already retrieved the info and policies for
46✔
2044
                        // this edge, then we can skip it as we don't need to do
46✔
2045
                        // so again.
46✔
2046
                        chanIDInt := byteOrder.Uint64(chanID)
46✔
2047
                        if _, ok := edgesSeen[chanIDInt]; ok {
65✔
2048
                                continue
19✔
2049
                        }
2050

2051
                        if channel, ok := c.chanCache.get(chanIDInt); ok {
36✔
2052
                                hits++
9✔
2053
                                edgesSeen[chanIDInt] = struct{}{}
9✔
2054
                                edgesInHorizon = append(edgesInHorizon, channel)
9✔
2055
                                continue
9✔
2056
                        }
2057

2058
                        // First, we'll fetch the static edge information.
2059
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
18✔
2060
                        if err != nil {
18✔
2061
                                chanID := byteOrder.Uint64(chanID)
×
2062
                                return fmt.Errorf("unable to fetch info for "+
×
2063
                                        "edge with chan_id=%v: %v", chanID, err)
×
2064
                        }
×
2065

2066
                        // With the static information obtained, we'll now
2067
                        // fetch the dynamic policy info.
2068
                        edge1, edge2, err := fetchChanEdgePolicies(
18✔
2069
                                edgeIndex, edges, chanID,
18✔
2070
                        )
18✔
2071
                        if err != nil {
18✔
2072
                                chanID := byteOrder.Uint64(chanID)
×
2073
                                return fmt.Errorf("unable to fetch policies "+
×
2074
                                        "for edge with chan_id=%v: %v", chanID,
×
2075
                                        err)
×
2076
                        }
×
2077

2078
                        node1, err := fetchLightningNode(
18✔
2079
                                nodes, edgeInfo.NodeKey1Bytes[:],
18✔
2080
                        )
18✔
2081
                        if err != nil {
18✔
2082
                                return err
×
2083
                        }
×
2084

2085
                        node2, err := fetchLightningNode(
18✔
2086
                                nodes, edgeInfo.NodeKey2Bytes[:],
18✔
2087
                        )
18✔
2088
                        if err != nil {
18✔
2089
                                return err
×
2090
                        }
×
2091

2092
                        // Finally, we'll collate this edge with the rest of
2093
                        // edges to be returned.
2094
                        edgesSeen[chanIDInt] = struct{}{}
18✔
2095
                        channel := ChannelEdge{
18✔
2096
                                Info:    &edgeInfo,
18✔
2097
                                Policy1: edge1,
18✔
2098
                                Policy2: edge2,
18✔
2099
                                Node1:   &node1,
18✔
2100
                                Node2:   &node2,
18✔
2101
                        }
18✔
2102
                        edgesInHorizon = append(edgesInHorizon, channel)
18✔
2103
                        edgesToCache[chanIDInt] = channel
18✔
2104
                }
2105

2106
                return nil
150✔
2107
        }, func() {
150✔
2108
                edgesSeen = make(map[uint64]struct{})
150✔
2109
                edgesToCache = make(map[uint64]ChannelEdge)
150✔
2110
                edgesInHorizon = nil
150✔
2111
        })
150✔
2112
        switch {
150✔
2113
        case err == ErrGraphNoEdgesFound:
×
2114
                fallthrough
×
2115
        case err == ErrGraphNodesNotFound:
×
2116
                break
×
2117

2118
        case err != nil:
×
2119
                return nil, err
×
2120
        }
2121

2122
        // Insert any edges loaded from disk into the cache.
2123
        for chanid, channel := range edgesToCache {
168✔
2124
                c.chanCache.insert(chanid, channel)
18✔
2125
        }
18✔
2126

2127
        log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
150✔
2128
                float64(hits)/float64(len(edgesInHorizon)), hits,
150✔
2129
                len(edgesInHorizon))
150✔
2130

150✔
2131
        return edgesInHorizon, nil
150✔
2132
}
2133

2134
// NodeUpdatesInHorizon returns all the known lightning node which have an
2135
// update timestamp within the passed range. This method can be used by two
2136
// nodes to quickly determine if they have the same set of up to date node
2137
// announcements.
2138
func (c *ChannelGraph) NodeUpdatesInHorizon(startTime,
2139
        endTime time.Time) ([]models.LightningNode, error) {
8✔
2140

8✔
2141
        var nodesInHorizon []models.LightningNode
8✔
2142

8✔
2143
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
16✔
2144
                nodes := tx.ReadBucket(nodeBucket)
8✔
2145
                if nodes == nil {
8✔
2146
                        return ErrGraphNodesNotFound
×
2147
                }
×
2148

2149
                nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket)
8✔
2150
                if nodeUpdateIndex == nil {
8✔
2151
                        return ErrGraphNodesNotFound
×
2152
                }
×
2153

2154
                // We'll now obtain a cursor to perform a range query within
2155
                // the index to find all node announcements within the horizon.
2156
                updateCursor := nodeUpdateIndex.ReadCursor()
8✔
2157

8✔
2158
                var startTimeBytes, endTimeBytes [8 + 33]byte
8✔
2159
                byteOrder.PutUint64(
8✔
2160
                        startTimeBytes[:8], uint64(startTime.Unix()),
8✔
2161
                )
8✔
2162
                byteOrder.PutUint64(
8✔
2163
                        endTimeBytes[:8], uint64(endTime.Unix()),
8✔
2164
                )
8✔
2165

8✔
2166
                // With our start and end times constructed, we'll step through
8✔
2167
                // the index collecting info for each node within the time
8✔
2168
                // range.
8✔
2169
                //
8✔
2170
                //nolint:ll
8✔
2171
                for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
8✔
2172
                        bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
37✔
2173

29✔
2174
                        nodePub := indexKey[8:]
29✔
2175
                        node, err := fetchLightningNode(nodes, nodePub)
29✔
2176
                        if err != nil {
29✔
2177
                                return err
×
2178
                        }
×
2179

2180
                        nodesInHorizon = append(nodesInHorizon, node)
29✔
2181
                }
2182

2183
                return nil
8✔
2184
        }, func() {
8✔
2185
                nodesInHorizon = nil
8✔
2186
        })
8✔
2187
        switch {
8✔
2188
        case err == ErrGraphNoEdgesFound:
×
2189
                fallthrough
×
2190
        case err == ErrGraphNodesNotFound:
×
2191
                break
×
2192

2193
        case err != nil:
×
2194
                return nil, err
×
2195
        }
2196

2197
        return nodesInHorizon, nil
8✔
2198
}
2199

2200
// FilterKnownChanIDs takes a set of channel IDs and return the subset of chan
2201
// ID's that we don't know and are not known zombies of the passed set. In other
2202
// words, we perform a set difference of our set of chan ID's and the ones
2203
// passed in. This method can be used by callers to determine the set of
2204
// channels another peer knows of that we don't.
2205
func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
2206
        isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) {
122✔
2207

122✔
2208
        var newChanIDs []uint64
122✔
2209

122✔
2210
        c.cacheMu.Lock()
122✔
2211
        defer c.cacheMu.Unlock()
122✔
2212

122✔
2213
        err := kvdb.Update(c.db, func(tx kvdb.RwTx) error {
244✔
2214
                edges := tx.ReadBucket(edgeBucket)
122✔
2215
                if edges == nil {
122✔
2216
                        return ErrGraphNoEdgesFound
×
2217
                }
×
2218
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
122✔
2219
                if edgeIndex == nil {
122✔
2220
                        return ErrGraphNoEdgesFound
×
2221
                }
×
2222

2223
                // Fetch the zombie index, it may not exist if no edges have
2224
                // ever been marked as zombies. If the index has been
2225
                // initialized, we will use it later to skip known zombie edges.
2226
                zombieIndex := edges.NestedReadBucket(zombieBucket)
122✔
2227

122✔
2228
                // We'll run through the set of chanIDs and collate only the
122✔
2229
                // set of channel that are unable to be found within our db.
122✔
2230
                var cidBytes [8]byte
122✔
2231
                for _, info := range chansInfo {
247✔
2232
                        scid := info.ShortChannelID.ToUint64()
125✔
2233
                        byteOrder.PutUint64(cidBytes[:], scid)
125✔
2234

125✔
2235
                        // If the edge is already known, skip it.
125✔
2236
                        if v := edgeIndex.Get(cidBytes[:]); v != nil {
148✔
2237
                                continue
23✔
2238
                        }
2239

2240
                        // If the edge is a known zombie, skip it.
2241
                        if zombieIndex != nil {
204✔
2242
                                isZombie, _, _ := isZombieEdge(
102✔
2243
                                        zombieIndex, scid,
102✔
2244
                                )
102✔
2245

102✔
2246
                                // TODO(ziggie): Make sure that for the strict
102✔
2247
                                // pruning case we compare the pubkeys and
102✔
2248
                                // whether the right timestamp is not older than
102✔
2249
                                // the `ChannelPruneExpiry`.
102✔
2250
                                //
102✔
2251
                                // NOTE: The timestamp data has no verification
102✔
2252
                                // attached to it in the `ReplyChannelRange` msg
102✔
2253
                                // so we are trusting this data at this point.
102✔
2254
                                // However it is not critical because we are
102✔
2255
                                // just removing the channel from the db when
102✔
2256
                                // the timestamps are more recent. During the
102✔
2257
                                // querying of the gossip msg verification
102✔
2258
                                // happens as usual.
102✔
2259
                                // However we should start punishing peers when
102✔
2260
                                // they don't provide us honest data ?
102✔
2261
                                isStillZombie := isZombieChan(
102✔
2262
                                        info.Node1UpdateTimestamp,
102✔
2263
                                        info.Node2UpdateTimestamp,
102✔
2264
                                )
102✔
2265

102✔
2266
                                switch {
102✔
2267
                                // If the edge is a known zombie and if we
2268
                                // would still consider it a zombie given the
2269
                                // latest update timestamps, then we skip this
2270
                                // channel.
2271
                                case isZombie && isStillZombie:
24✔
2272
                                        continue
24✔
2273

2274
                                // Otherwise, if we have marked it as a zombie
2275
                                // but the latest update timestamps could bring
2276
                                // it back from the dead, then we mark it alive,
2277
                                // and we let it be added to the set of IDs to
2278
                                // query our peer for.
2279
                                case isZombie && !isStillZombie:
23✔
2280
                                        err := c.markEdgeLiveUnsafe(tx, scid)
23✔
2281
                                        if err != nil {
23✔
2282
                                                return err
×
2283
                                        }
×
2284
                                }
2285
                        }
2286

2287
                        newChanIDs = append(newChanIDs, scid)
78✔
2288
                }
2289

2290
                return nil
122✔
2291
        }, func() {
122✔
2292
                newChanIDs = nil
122✔
2293
        })
122✔
2294
        switch {
122✔
2295
        // If we don't know of any edges yet, then we'll return the entire set
2296
        // of chan IDs specified.
2297
        case err == ErrGraphNoEdgesFound:
×
2298
                ogChanIDs := make([]uint64, len(chansInfo))
×
2299
                for i, info := range chansInfo {
×
2300
                        ogChanIDs[i] = info.ShortChannelID.ToUint64()
×
2301
                }
×
2302

2303
                return ogChanIDs, nil
×
2304

2305
        case err != nil:
×
2306
                return nil, err
×
2307
        }
2308

2309
        return newChanIDs, nil
122✔
2310
}
2311

2312
// ChannelUpdateInfo couples the SCID of a channel with the timestamps of the
2313
// latest received channel updates for the channel.
2314
type ChannelUpdateInfo struct {
2315
        // ShortChannelID is the SCID identifier of the channel.
2316
        ShortChannelID lnwire.ShortChannelID
2317

2318
        // Node1UpdateTimestamp is the timestamp of the latest received update
2319
        // from the node 1 channel peer. This will be set to zero time if no
2320
        // update has yet been received from this node.
2321
        Node1UpdateTimestamp time.Time
2322

2323
        // Node2UpdateTimestamp is the timestamp of the latest received update
2324
        // from the node 2 channel peer. This will be set to zero time if no
2325
        // update has yet been received from this node.
2326
        Node2UpdateTimestamp time.Time
2327
}
2328

2329
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
2330
// timestamps with zero seconds unix timestamp which equals
2331
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
2332
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
2333
        node2Timestamp time.Time) ChannelUpdateInfo {
218✔
2334

218✔
2335
        chanInfo := ChannelUpdateInfo{
218✔
2336
                ShortChannelID:       scid,
218✔
2337
                Node1UpdateTimestamp: node1Timestamp,
218✔
2338
                Node2UpdateTimestamp: node2Timestamp,
218✔
2339
        }
218✔
2340

218✔
2341
        if node1Timestamp.IsZero() {
426✔
2342
                chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
208✔
2343
        }
208✔
2344

2345
        if node2Timestamp.IsZero() {
426✔
2346
                chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
208✔
2347
        }
208✔
2348

2349
        return chanInfo
218✔
2350
}
2351

2352
// BlockChannelRange represents a range of channels for a given block height.
2353
type BlockChannelRange struct {
2354
        // Height is the height of the block all of the channels below were
2355
        // included in.
2356
        Height uint32
2357

2358
        // Channels is the list of channels identified by their short ID
2359
        // representation known to us that were included in the block height
2360
        // above. The list may include channel update timestamp information if
2361
        // requested.
2362
        Channels []ChannelUpdateInfo
2363
}
2364

2365
// FilterChannelRange returns the channel ID's of all known channels which were
2366
// mined in a block height within the passed range. The channel IDs are grouped
2367
// by their common block height. This method can be used to quickly share with a
2368
// peer the set of channels we know of within a particular range to catch them
2369
// up after a period of time offline. If withTimestamps is true then the
2370
// timestamp info of the latest received channel update messages of the channel
2371
// will be included in the response.
2372
func (c *ChannelGraph) FilterChannelRange(startHeight,
2373
        endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) {
11✔
2374

11✔
2375
        startChanID := &lnwire.ShortChannelID{
11✔
2376
                BlockHeight: startHeight,
11✔
2377
        }
11✔
2378

11✔
2379
        endChanID := lnwire.ShortChannelID{
11✔
2380
                BlockHeight: endHeight,
11✔
2381
                TxIndex:     math.MaxUint32 & 0x00ffffff,
11✔
2382
                TxPosition:  math.MaxUint16,
11✔
2383
        }
11✔
2384

11✔
2385
        // As we need to perform a range scan, we'll convert the starting and
11✔
2386
        // ending height to their corresponding values when encoded using short
11✔
2387
        // channel ID's.
11✔
2388
        var chanIDStart, chanIDEnd [8]byte
11✔
2389
        byteOrder.PutUint64(chanIDStart[:], startChanID.ToUint64())
11✔
2390
        byteOrder.PutUint64(chanIDEnd[:], endChanID.ToUint64())
11✔
2391

11✔
2392
        var channelsPerBlock map[uint32][]ChannelUpdateInfo
11✔
2393
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
2394
                edges := tx.ReadBucket(edgeBucket)
11✔
2395
                if edges == nil {
11✔
2396
                        return ErrGraphNoEdgesFound
×
2397
                }
×
2398
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
2399
                if edgeIndex == nil {
11✔
2400
                        return ErrGraphNoEdgesFound
×
2401
                }
×
2402

2403
                cursor := edgeIndex.ReadCursor()
11✔
2404

11✔
2405
                // We'll now iterate through the database, and find each
11✔
2406
                // channel ID that resides within the specified range.
11✔
2407
                //
11✔
2408
                //nolint:ll
11✔
2409
                for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
11✔
2410
                        bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
55✔
2411
                        // Don't send alias SCIDs during gossip sync.
44✔
2412
                        edgeReader := bytes.NewReader(v)
44✔
2413
                        edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
44✔
2414
                        if err != nil {
44✔
2415
                                return err
×
2416
                        }
×
2417

2418
                        if edgeInfo.AuthProof == nil {
44✔
2419
                                continue
×
2420
                        }
2421

2422
                        // This channel ID rests within the target range, so
2423
                        // we'll add it to our returned set.
2424
                        rawCid := byteOrder.Uint64(k)
44✔
2425
                        cid := lnwire.NewShortChanIDFromInt(rawCid)
44✔
2426

44✔
2427
                        chanInfo := NewChannelUpdateInfo(
44✔
2428
                                cid, time.Time{}, time.Time{},
44✔
2429
                        )
44✔
2430

44✔
2431
                        if !withTimestamps {
66✔
2432
                                channelsPerBlock[cid.BlockHeight] = append(
22✔
2433
                                        channelsPerBlock[cid.BlockHeight],
22✔
2434
                                        chanInfo,
22✔
2435
                                )
22✔
2436

22✔
2437
                                continue
22✔
2438
                        }
2439

2440
                        node1Key, node2Key := computeEdgePolicyKeys(&edgeInfo)
22✔
2441

22✔
2442
                        rawPolicy := edges.Get(node1Key)
22✔
2443
                        if len(rawPolicy) != 0 {
28✔
2444
                                r := bytes.NewReader(rawPolicy)
6✔
2445

6✔
2446
                                edge, err := deserializeChanEdgePolicyRaw(r)
6✔
2447
                                if err != nil && !errors.Is(
6✔
2448
                                        err, ErrEdgePolicyOptionalFieldNotFound,
6✔
2449
                                ) {
6✔
2450

×
2451
                                        return err
×
2452
                                }
×
2453

2454
                                chanInfo.Node1UpdateTimestamp = edge.LastUpdate
6✔
2455
                        }
2456

2457
                        rawPolicy = edges.Get(node2Key)
22✔
2458
                        if len(rawPolicy) != 0 {
33✔
2459
                                r := bytes.NewReader(rawPolicy)
11✔
2460

11✔
2461
                                edge, err := deserializeChanEdgePolicyRaw(r)
11✔
2462
                                if err != nil && !errors.Is(
11✔
2463
                                        err, ErrEdgePolicyOptionalFieldNotFound,
11✔
2464
                                ) {
11✔
2465

×
2466
                                        return err
×
2467
                                }
×
2468

2469
                                chanInfo.Node2UpdateTimestamp = edge.LastUpdate
11✔
2470
                        }
2471

2472
                        channelsPerBlock[cid.BlockHeight] = append(
22✔
2473
                                channelsPerBlock[cid.BlockHeight], chanInfo,
22✔
2474
                        )
22✔
2475
                }
2476

2477
                return nil
11✔
2478
        }, func() {
11✔
2479
                channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
11✔
2480
        })
11✔
2481

2482
        switch {
11✔
2483
        // If we don't know of any channels yet, then there's nothing to
2484
        // filter, so we'll return an empty slice.
2485
        case err == ErrGraphNoEdgesFound || len(channelsPerBlock) == 0:
3✔
2486
                return nil, nil
3✔
2487

2488
        case err != nil:
×
2489
                return nil, err
×
2490
        }
2491

2492
        // Return the channel ranges in ascending block height order.
2493
        blocks := make([]uint32, 0, len(channelsPerBlock))
8✔
2494
        for block := range channelsPerBlock {
30✔
2495
                blocks = append(blocks, block)
22✔
2496
        }
22✔
2497
        sort.Slice(blocks, func(i, j int) bool {
30✔
2498
                return blocks[i] < blocks[j]
22✔
2499
        })
22✔
2500

2501
        channelRanges := make([]BlockChannelRange, 0, len(channelsPerBlock))
8✔
2502
        for _, block := range blocks {
30✔
2503
                channelRanges = append(channelRanges, BlockChannelRange{
22✔
2504
                        Height:   block,
22✔
2505
                        Channels: channelsPerBlock[block],
22✔
2506
                })
22✔
2507
        }
22✔
2508

2509
        return channelRanges, nil
8✔
2510
}
2511

2512
// FetchChanInfos returns the set of channel edges that correspond to the passed
2513
// channel ID's. If an edge is the query is unknown to the database, it will
2514
// skipped and the result will contain only those edges that exist at the time
2515
// of the query. This can be used to respond to peer queries that are seeking to
2516
// fill in gaps in their view of the channel graph.
2517
func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
3✔
2518
        return c.fetchChanInfos(nil, chanIDs)
3✔
2519
}
3✔
2520

2521
// fetchChanInfos returns the set of channel edges that correspond to the passed
2522
// channel ID's. If an edge is the query is unknown to the database, it will
2523
// skipped and the result will contain only those edges that exist at the time
2524
// of the query. This can be used to respond to peer queries that are seeking to
2525
// fill in gaps in their view of the channel graph.
2526
//
2527
// NOTE: An optional transaction may be provided. If none is provided, then a
2528
// new one will be created.
2529
func (c *ChannelGraph) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) (
2530
        []ChannelEdge, error) {
27✔
2531
        // TODO(roasbeef): sort cids?
27✔
2532

27✔
2533
        var (
27✔
2534
                chanEdges []ChannelEdge
27✔
2535
                cidBytes  [8]byte
27✔
2536
        )
27✔
2537

27✔
2538
        fetchChanInfos := func(tx kvdb.RTx) error {
54✔
2539
                edges := tx.ReadBucket(edgeBucket)
27✔
2540
                if edges == nil {
27✔
2541
                        return ErrGraphNoEdgesFound
×
2542
                }
×
2543
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
27✔
2544
                if edgeIndex == nil {
27✔
2545
                        return ErrGraphNoEdgesFound
×
2546
                }
×
2547
                nodes := tx.ReadBucket(nodeBucket)
27✔
2548
                if nodes == nil {
27✔
2549
                        return ErrGraphNotFound
×
2550
                }
×
2551

2552
                for _, cid := range chanIDs {
61✔
2553
                        byteOrder.PutUint64(cidBytes[:], cid)
34✔
2554

34✔
2555
                        // First, we'll fetch the static edge information. If
34✔
2556
                        // the edge is unknown, we will skip the edge and
34✔
2557
                        // continue gathering all known edges.
34✔
2558
                        edgeInfo, err := fetchChanEdgeInfo(
34✔
2559
                                edgeIndex, cidBytes[:],
34✔
2560
                        )
34✔
2561
                        switch {
34✔
2562
                        case errors.Is(err, ErrEdgeNotFound):
26✔
2563
                                continue
26✔
2564
                        case err != nil:
×
2565
                                return err
×
2566
                        }
2567

2568
                        // With the static information obtained, we'll now
2569
                        // fetch the dynamic policy info.
2570
                        edge1, edge2, err := fetchChanEdgePolicies(
8✔
2571
                                edgeIndex, edges, cidBytes[:],
8✔
2572
                        )
8✔
2573
                        if err != nil {
8✔
2574
                                return err
×
2575
                        }
×
2576

2577
                        node1, err := fetchLightningNode(
8✔
2578
                                nodes, edgeInfo.NodeKey1Bytes[:],
8✔
2579
                        )
8✔
2580
                        if err != nil {
8✔
2581
                                return err
×
2582
                        }
×
2583

2584
                        node2, err := fetchLightningNode(
8✔
2585
                                nodes, edgeInfo.NodeKey2Bytes[:],
8✔
2586
                        )
8✔
2587
                        if err != nil {
8✔
2588
                                return err
×
2589
                        }
×
2590

2591
                        chanEdges = append(chanEdges, ChannelEdge{
8✔
2592
                                Info:    &edgeInfo,
8✔
2593
                                Policy1: edge1,
8✔
2594
                                Policy2: edge2,
8✔
2595
                                Node1:   &node1,
8✔
2596
                                Node2:   &node2,
8✔
2597
                        })
8✔
2598
                }
2599
                return nil
27✔
2600
        }
2601

2602
        if tx == nil {
31✔
2603
                err := kvdb.View(c.db, fetchChanInfos, func() {
8✔
2604
                        chanEdges = nil
4✔
2605
                })
4✔
2606
                if err != nil {
4✔
2607
                        return nil, err
×
2608
                }
×
2609

2610
                return chanEdges, nil
4✔
2611
        }
2612

2613
        err := fetchChanInfos(tx)
23✔
2614
        if err != nil {
23✔
2615
                return nil, err
×
2616
        }
×
2617

2618
        return chanEdges, nil
23✔
2619
}
2620

2621
func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64,
2622
        edge1, edge2 *models.ChannelEdgePolicy) error {
148✔
2623

148✔
2624
        // First, we'll fetch the edge update index bucket which currently
148✔
2625
        // stores an entry for the channel we're about to delete.
148✔
2626
        updateIndex := edgesBucket.NestedReadWriteBucket(edgeUpdateIndexBucket)
148✔
2627
        if updateIndex == nil {
148✔
2628
                // No edges in bucket, return early.
×
2629
                return nil
×
2630
        }
×
2631

2632
        // Now that we have the bucket, we'll attempt to construct a template
2633
        // for the index key: updateTime || chanid.
2634
        var indexKey [8 + 8]byte
148✔
2635
        byteOrder.PutUint64(indexKey[8:], chanID)
148✔
2636

148✔
2637
        // With the template constructed, we'll attempt to delete an entry that
148✔
2638
        // would have been created by both edges: we'll alternate the update
148✔
2639
        // times, as one may had overridden the other.
148✔
2640
        if edge1 != nil {
158✔
2641
                byteOrder.PutUint64(
10✔
2642
                        indexKey[:8], uint64(edge1.LastUpdate.Unix()),
10✔
2643
                )
10✔
2644
                if err := updateIndex.Delete(indexKey[:]); err != nil {
10✔
2645
                        return err
×
2646
                }
×
2647
        }
2648

2649
        // We'll also attempt to delete the entry that may have been created by
2650
        // the second edge.
2651
        if edge2 != nil {
160✔
2652
                byteOrder.PutUint64(
12✔
2653
                        indexKey[:8], uint64(edge2.LastUpdate.Unix()),
12✔
2654
                )
12✔
2655
                if err := updateIndex.Delete(indexKey[:]); err != nil {
12✔
2656
                        return err
×
2657
                }
×
2658
        }
2659

2660
        return nil
148✔
2661
}
2662

2663
// delChannelEdgeUnsafe deletes the edge with the given chanID from the graph
2664
// cache. It then goes on to delete any policy info and edge info for this
2665
// channel from the DB and finally, if isZombie is true, it will add an entry
2666
// for this channel in the zombie index.
2667
//
2668
// NOTE: this method MUST only be called if the cacheMu has already been
2669
// acquired.
2670
func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
2671
        zombieIndex kvdb.RwBucket, chanID []byte, isZombie,
2672
        strictZombie bool) error {
207✔
2673

207✔
2674
        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
207✔
2675
        if err != nil {
266✔
2676
                return err
59✔
2677
        }
59✔
2678

2679
        if c.graphCache != nil {
296✔
2680
                c.graphCache.RemoveChannel(
148✔
2681
                        edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
148✔
2682
                        edgeInfo.ChannelID,
148✔
2683
                )
148✔
2684
        }
148✔
2685

2686
        // We'll also remove the entry in the edge update index bucket before
2687
        // we delete the edges themselves so we can access their last update
2688
        // times.
2689
        cid := byteOrder.Uint64(chanID)
148✔
2690
        edge1, edge2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
148✔
2691
        if err != nil {
148✔
2692
                return err
×
2693
        }
×
2694
        err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
148✔
2695
        if err != nil {
148✔
2696
                return err
×
2697
        }
×
2698

2699
        // The edge key is of the format pubKey || chanID. First we construct
2700
        // the latter half, populating the channel ID.
2701
        var edgeKey [33 + 8]byte
148✔
2702
        copy(edgeKey[33:], chanID)
148✔
2703

148✔
2704
        // With the latter half constructed, copy over the first public key to
148✔
2705
        // delete the edge in this direction, then the second to delete the
148✔
2706
        // edge in the opposite direction.
148✔
2707
        copy(edgeKey[:33], edgeInfo.NodeKey1Bytes[:])
148✔
2708
        if edges.Get(edgeKey[:]) != nil {
296✔
2709
                if err := edges.Delete(edgeKey[:]); err != nil {
148✔
2710
                        return err
×
2711
                }
×
2712
        }
2713
        copy(edgeKey[:33], edgeInfo.NodeKey2Bytes[:])
148✔
2714
        if edges.Get(edgeKey[:]) != nil {
296✔
2715
                if err := edges.Delete(edgeKey[:]); err != nil {
148✔
2716
                        return err
×
2717
                }
×
2718
        }
2719

2720
        // As part of deleting the edge we also remove all disabled entries
2721
        // from the edgePolicyDisabledIndex bucket. We do that for both
2722
        // directions.
2723
        err = updateEdgePolicyDisabledIndex(edges, cid, false, false)
148✔
2724
        if err != nil {
148✔
2725
                return err
×
2726
        }
×
2727
        err = updateEdgePolicyDisabledIndex(edges, cid, true, false)
148✔
2728
        if err != nil {
148✔
2729
                return err
×
2730
        }
×
2731

2732
        // With the edge data deleted, we can purge the information from the two
2733
        // edge indexes.
2734
        if err := edgeIndex.Delete(chanID); err != nil {
148✔
2735
                return err
×
2736
        }
×
2737
        var b bytes.Buffer
148✔
2738
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
148✔
2739
                return err
×
2740
        }
×
2741
        if err := chanIndex.Delete(b.Bytes()); err != nil {
148✔
2742
                return err
×
2743
        }
×
2744

2745
        // Finally, we'll mark the edge as a zombie within our index if it's
2746
        // being removed due to the channel becoming a zombie. We do this to
2747
        // ensure we don't store unnecessary data for spent channels.
2748
        if !isZombie {
272✔
2749
                return nil
124✔
2750
        }
124✔
2751

2752
        nodeKey1, nodeKey2 := edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes
24✔
2753
        if strictZombie {
28✔
2754
                nodeKey1, nodeKey2 = makeZombiePubkeys(&edgeInfo, edge1, edge2)
4✔
2755
        }
4✔
2756

2757
        return markEdgeZombie(
24✔
2758
                zombieIndex, byteOrder.Uint64(chanID), nodeKey1, nodeKey2,
24✔
2759
        )
24✔
2760
}
2761

2762
// makeZombiePubkeys derives the node pubkeys to store in the zombie index for a
2763
// particular pair of channel policies. The return values are one of:
2764
//  1. (pubkey1, pubkey2)
2765
//  2. (pubkey1, blank)
2766
//  3. (blank, pubkey2)
2767
//
2768
// A blank pubkey means that corresponding node will be unable to resurrect a
2769
// channel on its own. For example, node1 may continue to publish recent
2770
// updates, but node2 has fallen way behind. After marking an edge as a zombie,
2771
// we don't want another fresh update from node1 to resurrect, as the edge can
2772
// only become live once node2 finally sends something recent.
2773
//
2774
// In the case where we have neither update, we allow either party to resurrect
2775
// the channel. If the channel were to be marked zombie again, it would be
2776
// marked with the correct lagging channel since we received an update from only
2777
// one side.
2778
func makeZombiePubkeys(info *models.ChannelEdgeInfo,
2779
        e1, e2 *models.ChannelEdgePolicy) ([33]byte, [33]byte) {
4✔
2780

4✔
2781
        switch {
4✔
2782
        // If we don't have either edge policy, we'll return both pubkeys so
2783
        // that the channel can be resurrected by either party.
2784
        case e1 == nil && e2 == nil:
1✔
2785
                return info.NodeKey1Bytes, info.NodeKey2Bytes
1✔
2786

2787
        // If we're missing edge1, or if both edges are present but edge1 is
2788
        // older, we'll return edge1's pubkey and a blank pubkey for edge2. This
2789
        // means that only an update from edge1 will be able to resurrect the
2790
        // channel.
2791
        case e1 == nil || (e2 != nil && e1.LastUpdate.Before(e2.LastUpdate)):
1✔
2792
                return info.NodeKey1Bytes, [33]byte{}
1✔
2793

2794
        // Otherwise, we're missing edge2 or edge2 is the older side, so we
2795
        // return a blank pubkey for edge1. In this case, only an update from
2796
        // edge2 can resurect the channel.
2797
        default:
2✔
2798
                return [33]byte{}, info.NodeKey2Bytes
2✔
2799
        }
2800
}
2801

2802
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
2803
// within the database for the referenced channel. The `flags` attribute within
2804
// the ChannelEdgePolicy determines which of the directed edges are being
2805
// updated. If the flag is 1, then the first node's information is being
2806
// updated, otherwise it's the second node's information. The node ordering is
2807
// determined by the lexicographical ordering of the identity public keys of the
2808
// nodes on either side of the channel.
2809
func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
2810
        op ...batch.SchedulerOption) error {
2,663✔
2811

2,663✔
2812
        var (
2,663✔
2813
                isUpdate1    bool
2,663✔
2814
                edgeNotFound bool
2,663✔
2815
        )
2,663✔
2816

2,663✔
2817
        r := &batch.Request{
2,663✔
2818
                Reset: func() {
5,326✔
2819
                        isUpdate1 = false
2,663✔
2820
                        edgeNotFound = false
2,663✔
2821
                },
2,663✔
2822
                Update: func(tx kvdb.RwTx) error {
2,663✔
2823
                        var err error
2,663✔
2824
                        isUpdate1, err = updateEdgePolicy(
2,663✔
2825
                                tx, edge, c.graphCache,
2,663✔
2826
                        )
2,663✔
2827

2,663✔
2828
                        if err != nil {
2,666✔
2829
                                log.Errorf("UpdateEdgePolicy faild: %v", err)
3✔
2830
                        }
3✔
2831

2832
                        // Silence ErrEdgeNotFound so that the batch can
2833
                        // succeed, but propagate the error via local state.
2834
                        if errors.Is(err, ErrEdgeNotFound) {
2,666✔
2835
                                edgeNotFound = true
3✔
2836
                                return nil
3✔
2837
                        }
3✔
2838

2839
                        return err
2,660✔
2840
                },
2841
                OnCommit: func(err error) error {
2,663✔
2842
                        switch {
2,663✔
2843
                        case err != nil:
×
2844
                                return err
×
2845
                        case edgeNotFound:
3✔
2846
                                return ErrEdgeNotFound
3✔
2847
                        default:
2,660✔
2848
                                c.updateEdgeCache(edge, isUpdate1)
2,660✔
2849
                                return nil
2,660✔
2850
                        }
2851
                },
2852
        }
2853

2854
        for _, f := range op {
2,663✔
2855
                f(r)
×
2856
        }
×
2857

2858
        return c.chanScheduler.Execute(r)
2,663✔
2859
}
2860

2861
func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy,
2862
        isUpdate1 bool) {
2,660✔
2863

2,660✔
2864
        // If an entry for this channel is found in reject cache, we'll modify
2,660✔
2865
        // the entry with the updated timestamp for the direction that was just
2,660✔
2866
        // written. If the edge doesn't exist, we'll load the cache entry lazily
2,660✔
2867
        // during the next query for this edge.
2,660✔
2868
        if entry, ok := c.rejectCache.get(e.ChannelID); ok {
2,665✔
2869
                if isUpdate1 {
8✔
2870
                        entry.upd1Time = e.LastUpdate.Unix()
3✔
2871
                } else {
5✔
2872
                        entry.upd2Time = e.LastUpdate.Unix()
2✔
2873
                }
2✔
2874
                c.rejectCache.insert(e.ChannelID, entry)
5✔
2875
        }
2876

2877
        // If an entry for this channel is found in channel cache, we'll modify
2878
        // the entry with the updated policy for the direction that was just
2879
        // written. If the edge doesn't exist, we'll defer loading the info and
2880
        // policies and lazily read from disk during the next query.
2881
        if channel, ok := c.chanCache.get(e.ChannelID); ok {
2,660✔
2882
                if isUpdate1 {
×
2883
                        channel.Policy1 = e
×
2884
                } else {
×
2885
                        channel.Policy2 = e
×
2886
                }
×
2887
                c.chanCache.insert(e.ChannelID, channel)
×
2888
        }
2889
}
2890

2891
// updateEdgePolicy attempts to update an edge's policy within the relevant
2892
// buckets using an existing database transaction. The returned boolean will be
2893
// true if the updated policy belongs to node1, and false if the policy belonged
2894
// to node2.
2895
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2896
        graphCache *GraphCache) (bool, error) {
2,663✔
2897

2,663✔
2898
        edges := tx.ReadWriteBucket(edgeBucket)
2,663✔
2899
        if edges == nil {
2,663✔
2900
                return false, ErrEdgeNotFound
×
2901
        }
×
2902
        edgeIndex := edges.NestedReadWriteBucket(edgeIndexBucket)
2,663✔
2903
        if edgeIndex == nil {
2,663✔
2904
                return false, ErrEdgeNotFound
×
2905
        }
×
2906

2907
        // Create the channelID key be converting the channel ID
2908
        // integer into a byte slice.
2909
        var chanID [8]byte
2,663✔
2910
        byteOrder.PutUint64(chanID[:], edge.ChannelID)
2,663✔
2911

2,663✔
2912
        // With the channel ID, we then fetch the value storing the two
2,663✔
2913
        // nodes which connect this channel edge.
2,663✔
2914
        nodeInfo := edgeIndex.Get(chanID[:])
2,663✔
2915
        if nodeInfo == nil {
2,666✔
2916
                return false, ErrEdgeNotFound
3✔
2917
        }
3✔
2918

2919
        // Depending on the flags value passed above, either the first
2920
        // or second edge policy is being updated.
2921
        var fromNode, toNode []byte
2,660✔
2922
        var isUpdate1 bool
2,660✔
2923
        if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
3,994✔
2924
                fromNode = nodeInfo[:33]
1,334✔
2925
                toNode = nodeInfo[33:66]
1,334✔
2926
                isUpdate1 = true
1,334✔
2927
        } else {
2,660✔
2928
                fromNode = nodeInfo[33:66]
1,326✔
2929
                toNode = nodeInfo[:33]
1,326✔
2930
                isUpdate1 = false
1,326✔
2931
        }
1,326✔
2932

2933
        // Finally, with the direction of the edge being updated
2934
        // identified, we update the on-disk edge representation.
2935
        err := putChanEdgePolicy(edges, edge, fromNode, toNode)
2,660✔
2936
        if err != nil {
2,660✔
2937
                return false, err
×
2938
        }
×
2939

2940
        var (
2,660✔
2941
                fromNodePubKey route.Vertex
2,660✔
2942
                toNodePubKey   route.Vertex
2,660✔
2943
        )
2,660✔
2944
        copy(fromNodePubKey[:], fromNode)
2,660✔
2945
        copy(toNodePubKey[:], toNode)
2,660✔
2946

2,660✔
2947
        if graphCache != nil {
4,934✔
2948
                graphCache.UpdatePolicy(
2,274✔
2949
                        edge, fromNodePubKey, toNodePubKey, isUpdate1,
2,274✔
2950
                )
2,274✔
2951
        }
2,274✔
2952

2953
        return isUpdate1, nil
2,660✔
2954
}
2955

2956
// isPublic determines whether the node is seen as public within the graph from
2957
// the source node's point of view. An existing database transaction can also be
2958
// specified.
2959
func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
2960
        sourcePubKey []byte) (bool, error) {
13✔
2961

13✔
2962
        // In order to determine whether this node is publicly advertised within
13✔
2963
        // the graph, we'll need to look at all of its edges and check whether
13✔
2964
        // they extend to any other node than the source node. errDone will be
13✔
2965
        // used to terminate the check early.
13✔
2966
        nodeIsPublic := false
13✔
2967
        errDone := errors.New("done")
13✔
2968
        err := c.ForEachNodeChannelTx(tx, nodePub, func(tx kvdb.RTx,
13✔
2969
                info *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy,
13✔
2970
                _ *models.ChannelEdgePolicy) error {
23✔
2971

10✔
2972
                // If this edge doesn't extend to the source node, we'll
10✔
2973
                // terminate our search as we can now conclude that the node is
10✔
2974
                // publicly advertised within the graph due to the local node
10✔
2975
                // knowing of the current edge.
10✔
2976
                if !bytes.Equal(info.NodeKey1Bytes[:], sourcePubKey) &&
10✔
2977
                        !bytes.Equal(info.NodeKey2Bytes[:], sourcePubKey) {
13✔
2978

3✔
2979
                        nodeIsPublic = true
3✔
2980
                        return errDone
3✔
2981
                }
3✔
2982

2983
                // Since the edge _does_ extend to the source node, we'll also
2984
                // need to ensure that this is a public edge.
2985
                if info.AuthProof != nil {
13✔
2986
                        nodeIsPublic = true
6✔
2987
                        return errDone
6✔
2988
                }
6✔
2989

2990
                // Otherwise, we'll continue our search.
2991
                return nil
1✔
2992
        })
2993
        if err != nil && err != errDone {
13✔
2994
                return false, err
×
2995
        }
×
2996

2997
        return nodeIsPublic, nil
13✔
2998
}
2999

3000
// FetchLightningNodeTx attempts to look up a target node by its identity
3001
// public key. If the node isn't found in the database, then
3002
// ErrGraphNodeNotFound is returned. An optional transaction may be provided.
3003
// If none is provided, then a new one will be created.
3004
func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) (
3005
        *models.LightningNode, error) {
3,630✔
3006

3,630✔
3007
        return c.fetchLightningNode(tx, nodePub)
3,630✔
3008
}
3,630✔
3009

3010
// FetchLightningNode attempts to look up a target node by its identity public
3011
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
3012
// returned.
3013
func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
3014
        *models.LightningNode, error) {
152✔
3015

152✔
3016
        return c.fetchLightningNode(nil, nodePub)
152✔
3017
}
152✔
3018

3019
// fetchLightningNode attempts to look up a target node by its identity public
3020
// key. If the node isn't found in the database, then ErrGraphNodeNotFound is
3021
// returned. An optional transaction may be provided. If none is provided, then
3022
// a new one will be created.
3023
func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
3024
        nodePub route.Vertex) (*models.LightningNode, error) {
3,782✔
3025

3,782✔
3026
        var node *models.LightningNode
3,782✔
3027
        fetch := func(tx kvdb.RTx) error {
7,564✔
3028
                // First grab the nodes bucket which stores the mapping from
3,782✔
3029
                // pubKey to node information.
3,782✔
3030
                nodes := tx.ReadBucket(nodeBucket)
3,782✔
3031
                if nodes == nil {
3,782✔
3032
                        return ErrGraphNotFound
×
3033
                }
×
3034

3035
                // If a key for this serialized public key isn't found, then
3036
                // the target node doesn't exist within the database.
3037
                nodeBytes := nodes.Get(nodePub[:])
3,782✔
3038
                if nodeBytes == nil {
3,796✔
3039
                        return ErrGraphNodeNotFound
14✔
3040
                }
14✔
3041

3042
                // If the node is found, then we can de deserialize the node
3043
                // information to return to the user.
3044
                nodeReader := bytes.NewReader(nodeBytes)
3,768✔
3045
                n, err := deserializeLightningNode(nodeReader)
3,768✔
3046
                if err != nil {
3,768✔
3047
                        return err
×
3048
                }
×
3049

3050
                node = &n
3,768✔
3051

3,768✔
3052
                return nil
3,768✔
3053
        }
3054

3055
        if tx == nil {
3,937✔
3056
                err := kvdb.View(
155✔
3057
                        c.db, fetch, func() {
310✔
3058
                                node = nil
155✔
3059
                        },
155✔
3060
                )
3061
                if err != nil {
158✔
3062
                        return nil, err
3✔
3063
                }
3✔
3064

3065
                return node, nil
152✔
3066
        }
3067

3068
        err := fetch(tx)
3,627✔
3069
        if err != nil {
3,638✔
3070
                return nil, err
11✔
3071
        }
11✔
3072

3073
        return node, nil
3,616✔
3074
}
3075

3076
// HasLightningNode determines if the graph has a vertex identified by the
3077
// target node identity public key. If the node exists in the database, a
3078
// timestamp of when the data for the node was lasted updated is returned along
3079
// with a true boolean. Otherwise, an empty time.Time is returned with a false
3080
// boolean.
3081
func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool,
3082
        error) {
16✔
3083

16✔
3084
        var (
16✔
3085
                updateTime time.Time
16✔
3086
                exists     bool
16✔
3087
        )
16✔
3088

16✔
3089
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
32✔
3090
                // First grab the nodes bucket which stores the mapping from
16✔
3091
                // pubKey to node information.
16✔
3092
                nodes := tx.ReadBucket(nodeBucket)
16✔
3093
                if nodes == nil {
16✔
3094
                        return ErrGraphNotFound
×
3095
                }
×
3096

3097
                // If a key for this serialized public key isn't found, we can
3098
                // exit early.
3099
                nodeBytes := nodes.Get(nodePub[:])
16✔
3100
                if nodeBytes == nil {
19✔
3101
                        exists = false
3✔
3102
                        return nil
3✔
3103
                }
3✔
3104

3105
                // Otherwise we continue on to obtain the time stamp
3106
                // representing the last time the data for this node was
3107
                // updated.
3108
                nodeReader := bytes.NewReader(nodeBytes)
13✔
3109
                node, err := deserializeLightningNode(nodeReader)
13✔
3110
                if err != nil {
13✔
3111
                        return err
×
3112
                }
×
3113

3114
                exists = true
13✔
3115
                updateTime = node.LastUpdate
13✔
3116
                return nil
13✔
3117
        }, func() {
16✔
3118
                updateTime = time.Time{}
16✔
3119
                exists = false
16✔
3120
        })
16✔
3121
        if err != nil {
16✔
3122
                return time.Time{}, exists, err
×
3123
        }
×
3124

3125
        return updateTime, exists, nil
16✔
3126
}
3127

3128
// nodeTraversal is used to traverse all channels of a node given by its
3129
// public key and passes channel information into the specified callback.
3130
func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
3131
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3132
                *models.ChannelEdgePolicy) error) error {
1,266✔
3133

1,266✔
3134
        traversal := func(tx kvdb.RTx) error {
2,532✔
3135
                edges := tx.ReadBucket(edgeBucket)
1,266✔
3136
                if edges == nil {
1,266✔
3137
                        return ErrGraphNotFound
×
3138
                }
×
3139
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
1,266✔
3140
                if edgeIndex == nil {
1,266✔
3141
                        return ErrGraphNoEdgesFound
×
3142
                }
×
3143

3144
                // In order to reach all the edges for this node, we take
3145
                // advantage of the construction of the key-space within the
3146
                // edge bucket. The keys are stored in the form: pubKey ||
3147
                // chanID. Therefore, starting from a chanID of zero, we can
3148
                // scan forward in the bucket, grabbing all the edges for the
3149
                // node. Once the prefix no longer matches, then we know we're
3150
                // done.
3151
                var nodeStart [33 + 8]byte
1,266✔
3152
                copy(nodeStart[:], nodePub)
1,266✔
3153
                copy(nodeStart[33:], chanStart[:])
1,266✔
3154

1,266✔
3155
                // Starting from the key pubKey || 0, we seek forward in the
1,266✔
3156
                // bucket until the retrieved key no longer has the public key
1,266✔
3157
                // as its prefix. This indicates that we've stepped over into
1,266✔
3158
                // another node's edges, so we can terminate our scan.
1,266✔
3159
                edgeCursor := edges.ReadCursor()
1,266✔
3160
                for nodeEdge, _ := edgeCursor.Seek(nodeStart[:]); bytes.HasPrefix(nodeEdge, nodePub); nodeEdge, _ = edgeCursor.Next() { //nolint:ll
5,106✔
3161
                        // If the prefix still matches, the channel id is
3,840✔
3162
                        // returned in nodeEdge. Channel id is used to lookup
3,840✔
3163
                        // the node at the other end of the channel and both
3,840✔
3164
                        // edge policies.
3,840✔
3165
                        chanID := nodeEdge[33:]
3,840✔
3166
                        edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
3,840✔
3167
                        if err != nil {
3,840✔
3168
                                return err
×
3169
                        }
×
3170

3171
                        outgoingPolicy, err := fetchChanEdgePolicy(
3,840✔
3172
                                edges, chanID, nodePub,
3,840✔
3173
                        )
3,840✔
3174
                        if err != nil {
3,840✔
3175
                                return err
×
3176
                        }
×
3177

3178
                        otherNode, err := edgeInfo.OtherNodeKeyBytes(nodePub)
3,840✔
3179
                        if err != nil {
3,840✔
3180
                                return err
×
3181
                        }
×
3182

3183
                        incomingPolicy, err := fetchChanEdgePolicy(
3,840✔
3184
                                edges, chanID, otherNode[:],
3,840✔
3185
                        )
3,840✔
3186
                        if err != nil {
3,840✔
3187
                                return err
×
3188
                        }
×
3189

3190
                        // Finally, we execute the callback.
3191
                        err = cb(tx, &edgeInfo, outgoingPolicy, incomingPolicy)
3,840✔
3192
                        if err != nil {
3,849✔
3193
                                return err
9✔
3194
                        }
9✔
3195
                }
3196

3197
                return nil
1,257✔
3198
        }
3199

3200
        // If no transaction was provided, then we'll create a new transaction
3201
        // to execute the transaction within.
3202
        if tx == nil {
1,275✔
3203
                return kvdb.View(db, traversal, func() {})
18✔
3204
        }
3205

3206
        // Otherwise, we re-use the existing transaction to execute the graph
3207
        // traversal.
3208
        return traversal(tx)
1,257✔
3209
}
3210

3211
// ForEachNodeChannel iterates through all channels of the given node,
3212
// executing the passed callback with an edge info structure and the policies
3213
// of each end of the channel. The first edge policy is the outgoing edge *to*
3214
// the connecting node, while the second is the incoming edge *from* the
3215
// connecting node. If the callback returns an error, then the iteration is
3216
// halted with the error propagated back up to the caller.
3217
//
3218
// Unknown policies are passed into the callback as nil values.
3219
func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex,
3220
        cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3221
                *models.ChannelEdgePolicy) error) error {
6✔
3222

6✔
3223
        return nodeTraversal(nil, nodePub[:], c.db, cb)
6✔
3224
}
6✔
3225

3226
// ForEachNodeChannelTx iterates through all channels of the given node,
3227
// executing the passed callback with an edge info structure and the policies
3228
// of each end of the channel. The first edge policy is the outgoing edge *to*
3229
// the connecting node, while the second is the incoming edge *from* the
3230
// connecting node. If the callback returns an error, then the iteration is
3231
// halted with the error propagated back up to the caller.
3232
//
3233
// Unknown policies are passed into the callback as nil values.
3234
//
3235
// If the caller wishes to re-use an existing boltdb transaction, then it
3236
// should be passed as the first argument.  Otherwise, the first argument should
3237
// be nil and a fresh transaction will be created to execute the graph
3238
// traversal.
3239
func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx,
3240
        nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo,
3241
                *models.ChannelEdgePolicy,
3242
                *models.ChannelEdgePolicy) error) error {
1,018✔
3243

1,018✔
3244
        return nodeTraversal(tx, nodePub[:], c.db, cb)
1,018✔
3245
}
1,018✔
3246

3247
// FetchOtherNode attempts to fetch the full LightningNode that's opposite of
3248
// the target node in the channel. This is useful when one knows the pubkey of
3249
// one of the nodes, and wishes to obtain the full LightningNode for the other
3250
// end of the channel.
3251
func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
3252
        channel *models.ChannelEdgeInfo, thisNodeKey []byte) (
3253
        *models.LightningNode, error) {
×
3254

×
3255
        // Ensure that the node passed in is actually a member of the channel.
×
3256
        var targetNodeBytes [33]byte
×
3257
        switch {
×
3258
        case bytes.Equal(channel.NodeKey1Bytes[:], thisNodeKey):
×
3259
                targetNodeBytes = channel.NodeKey2Bytes
×
3260
        case bytes.Equal(channel.NodeKey2Bytes[:], thisNodeKey):
×
3261
                targetNodeBytes = channel.NodeKey1Bytes
×
3262
        default:
×
3263
                return nil, fmt.Errorf("node not participating in this channel")
×
3264
        }
3265

3266
        var targetNode *models.LightningNode
×
3267
        fetchNodeFunc := func(tx kvdb.RTx) error {
×
3268
                // First grab the nodes bucket which stores the mapping from
×
3269
                // pubKey to node information.
×
3270
                nodes := tx.ReadBucket(nodeBucket)
×
3271
                if nodes == nil {
×
3272
                        return ErrGraphNotFound
×
3273
                }
×
3274

3275
                node, err := fetchLightningNode(nodes, targetNodeBytes[:])
×
3276
                if err != nil {
×
3277
                        return err
×
3278
                }
×
3279

3280
                targetNode = &node
×
3281

×
3282
                return nil
×
3283
        }
3284

3285
        // If the transaction is nil, then we'll need to create a new one,
3286
        // otherwise we can use the existing db transaction.
3287
        var err error
×
3288
        if tx == nil {
×
3289
                err = kvdb.View(c.db, fetchNodeFunc, func() {
×
3290
                        targetNode = nil
×
3291
                })
×
3292
        } else {
×
3293
                err = fetchNodeFunc(tx)
×
3294
        }
×
3295

3296
        return targetNode, err
×
3297
}
3298

3299
// computeEdgePolicyKeys is a helper function that can be used to compute the
3300
// keys used to index the channel edge policy info for the two nodes of the
3301
// edge. The keys for node 1 and node 2 are returned respectively.
3302
func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) {
22✔
3303
        var (
22✔
3304
                node1Key [33 + 8]byte
22✔
3305
                node2Key [33 + 8]byte
22✔
3306
        )
22✔
3307

22✔
3308
        copy(node1Key[:], info.NodeKey1Bytes[:])
22✔
3309
        copy(node2Key[:], info.NodeKey2Bytes[:])
22✔
3310

22✔
3311
        byteOrder.PutUint64(node1Key[33:], info.ChannelID)
22✔
3312
        byteOrder.PutUint64(node2Key[33:], info.ChannelID)
22✔
3313

22✔
3314
        return node1Key[:], node2Key[:]
22✔
3315
}
22✔
3316

3317
// FetchChannelEdgesByOutpoint attempts to lookup the two directed edges for
3318
// the channel identified by the funding outpoint. If the channel can't be
3319
// found, then ErrEdgeNotFound is returned. A struct which houses the general
3320
// information for the channel itself is returned as well as two structs that
3321
// contain the routing policies for the channel in either direction.
3322
func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
3323
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3324
        *models.ChannelEdgePolicy, error) {
11✔
3325

11✔
3326
        var (
11✔
3327
                edgeInfo *models.ChannelEdgeInfo
11✔
3328
                policy1  *models.ChannelEdgePolicy
11✔
3329
                policy2  *models.ChannelEdgePolicy
11✔
3330
        )
11✔
3331

11✔
3332
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
22✔
3333
                // First, grab the node bucket. This will be used to populate
11✔
3334
                // the Node pointers in each edge read from disk.
11✔
3335
                nodes := tx.ReadBucket(nodeBucket)
11✔
3336
                if nodes == nil {
11✔
3337
                        return ErrGraphNotFound
×
3338
                }
×
3339

3340
                // Next, grab the edge bucket which stores the edges, and also
3341
                // the index itself so we can group the directed edges together
3342
                // logically.
3343
                edges := tx.ReadBucket(edgeBucket)
11✔
3344
                if edges == nil {
11✔
3345
                        return ErrGraphNoEdgesFound
×
3346
                }
×
3347
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
11✔
3348
                if edgeIndex == nil {
11✔
3349
                        return ErrGraphNoEdgesFound
×
3350
                }
×
3351

3352
                // If the channel's outpoint doesn't exist within the outpoint
3353
                // index, then the edge does not exist.
3354
                chanIndex := edges.NestedReadBucket(channelPointBucket)
11✔
3355
                if chanIndex == nil {
11✔
3356
                        return ErrGraphNoEdgesFound
×
3357
                }
×
3358
                var b bytes.Buffer
11✔
3359
                if err := WriteOutpoint(&b, op); err != nil {
11✔
3360
                        return err
×
3361
                }
×
3362
                chanID := chanIndex.Get(b.Bytes())
11✔
3363
                if chanID == nil {
21✔
3364
                        return fmt.Errorf("%w: op=%v", ErrEdgeNotFound, op)
10✔
3365
                }
10✔
3366

3367
                // If the channel is found to exists, then we'll first retrieve
3368
                // the general information for the channel.
3369
                edge, err := fetchChanEdgeInfo(edgeIndex, chanID)
1✔
3370
                if err != nil {
1✔
3371
                        return fmt.Errorf("%w: chanID=%x", err, chanID)
×
3372
                }
×
3373
                edgeInfo = &edge
1✔
3374

1✔
3375
                // Once we have the information about the channels' parameters,
1✔
3376
                // we'll fetch the routing policies for each for the directed
1✔
3377
                // edges.
1✔
3378
                e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, chanID)
1✔
3379
                if err != nil {
1✔
3380
                        return fmt.Errorf("failed to find policy: %w", err)
×
3381
                }
×
3382

3383
                policy1 = e1
1✔
3384
                policy2 = e2
1✔
3385
                return nil
1✔
3386
        }, func() {
11✔
3387
                edgeInfo = nil
11✔
3388
                policy1 = nil
11✔
3389
                policy2 = nil
11✔
3390
        })
11✔
3391
        if err != nil {
21✔
3392
                return nil, nil, nil, err
10✔
3393
        }
10✔
3394

3395
        return edgeInfo, policy1, policy2, nil
1✔
3396
}
3397

3398
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
3399
// channel identified by the channel ID. If the channel can't be found, then
3400
// ErrEdgeNotFound is returned. A struct which houses the general information
3401
// for the channel itself is returned as well as two structs that contain the
3402
// routing policies for the channel in either direction.
3403
//
3404
// ErrZombieEdge an be returned if the edge is currently marked as a zombie
3405
// within the database. In this case, the ChannelEdgePolicy's will be nil, and
3406
// the ChannelEdgeInfo will only include the public keys of each node.
3407
func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (
3408
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3409
        *models.ChannelEdgePolicy, error) {
24✔
3410

24✔
3411
        var (
24✔
3412
                edgeInfo  *models.ChannelEdgeInfo
24✔
3413
                policy1   *models.ChannelEdgePolicy
24✔
3414
                policy2   *models.ChannelEdgePolicy
24✔
3415
                channelID [8]byte
24✔
3416
        )
24✔
3417

24✔
3418
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
48✔
3419
                // First, grab the node bucket. This will be used to populate
24✔
3420
                // the Node pointers in each edge read from disk.
24✔
3421
                nodes := tx.ReadBucket(nodeBucket)
24✔
3422
                if nodes == nil {
24✔
3423
                        return ErrGraphNotFound
×
3424
                }
×
3425

3426
                // Next, grab the edge bucket which stores the edges, and also
3427
                // the index itself so we can group the directed edges together
3428
                // logically.
3429
                edges := tx.ReadBucket(edgeBucket)
24✔
3430
                if edges == nil {
24✔
3431
                        return ErrGraphNoEdgesFound
×
3432
                }
×
3433
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
24✔
3434
                if edgeIndex == nil {
24✔
3435
                        return ErrGraphNoEdgesFound
×
3436
                }
×
3437

3438
                byteOrder.PutUint64(channelID[:], chanID)
24✔
3439

24✔
3440
                // Now, attempt to fetch edge.
24✔
3441
                edge, err := fetchChanEdgeInfo(edgeIndex, channelID[:])
24✔
3442

24✔
3443
                // If it doesn't exist, we'll quickly check our zombie index to
24✔
3444
                // see if we've previously marked it as so.
24✔
3445
                if errors.Is(err, ErrEdgeNotFound) {
25✔
3446
                        // If the zombie index doesn't exist, or the edge is not
1✔
3447
                        // marked as a zombie within it, then we'll return the
1✔
3448
                        // original ErrEdgeNotFound error.
1✔
3449
                        zombieIndex := edges.NestedReadBucket(zombieBucket)
1✔
3450
                        if zombieIndex == nil {
1✔
3451
                                return ErrEdgeNotFound
×
3452
                        }
×
3453

3454
                        isZombie, pubKey1, pubKey2 := isZombieEdge(
1✔
3455
                                zombieIndex, chanID,
1✔
3456
                        )
1✔
3457
                        if !isZombie {
1✔
3458
                                return ErrEdgeNotFound
×
3459
                        }
×
3460

3461
                        // Otherwise, the edge is marked as a zombie, so we'll
3462
                        // populate the edge info with the public keys of each
3463
                        // party as this is the only information we have about
3464
                        // it and return an error signaling so.
3465
                        edgeInfo = &models.ChannelEdgeInfo{
1✔
3466
                                NodeKey1Bytes: pubKey1,
1✔
3467
                                NodeKey2Bytes: pubKey2,
1✔
3468
                        }
1✔
3469
                        return ErrZombieEdge
1✔
3470
                }
3471

3472
                // Otherwise, we'll just return the error if any.
3473
                if err != nil {
23✔
3474
                        return err
×
3475
                }
×
3476

3477
                edgeInfo = &edge
23✔
3478

23✔
3479
                // Then we'll attempt to fetch the accompanying policies of this
23✔
3480
                // edge.
23✔
3481
                e1, e2, err := fetchChanEdgePolicies(
23✔
3482
                        edgeIndex, edges, channelID[:],
23✔
3483
                )
23✔
3484
                if err != nil {
23✔
3485
                        return err
×
3486
                }
×
3487

3488
                policy1 = e1
23✔
3489
                policy2 = e2
23✔
3490
                return nil
23✔
3491
        }, func() {
24✔
3492
                edgeInfo = nil
24✔
3493
                policy1 = nil
24✔
3494
                policy2 = nil
24✔
3495
        })
24✔
3496
        if err == ErrZombieEdge {
25✔
3497
                return edgeInfo, nil, nil, err
1✔
3498
        }
1✔
3499
        if err != nil {
23✔
3500
                return nil, nil, nil, err
×
3501
        }
×
3502

3503
        return edgeInfo, policy1, policy2, nil
23✔
3504
}
3505

3506
// IsPublicNode is a helper method that determines whether the node with the
3507
// given public key is seen as a public node in the graph from the graph's
3508
// source node's point of view.
3509
func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
13✔
3510
        var nodeIsPublic bool
13✔
3511
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
26✔
3512
                nodes := tx.ReadBucket(nodeBucket)
13✔
3513
                if nodes == nil {
13✔
3514
                        return ErrGraphNodesNotFound
×
3515
                }
×
3516
                ourPubKey := nodes.Get(sourceKey)
13✔
3517
                if ourPubKey == nil {
13✔
3518
                        return ErrSourceNodeNotSet
×
3519
                }
×
3520
                node, err := fetchLightningNode(nodes, pubKey[:])
13✔
3521
                if err != nil {
13✔
3522
                        return err
×
3523
                }
×
3524

3525
                nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
13✔
3526
                return err
13✔
3527
        }, func() {
13✔
3528
                nodeIsPublic = false
13✔
3529
        })
13✔
3530
        if err != nil {
13✔
3531
                return false, err
×
3532
        }
×
3533

3534
        return nodeIsPublic, nil
13✔
3535
}
3536

3537
// genMultiSigP2WSH generates the p2wsh'd multisig script for 2 of 2 pubkeys.
3538
func genMultiSigP2WSH(aPub, bPub []byte) ([]byte, error) {
46✔
3539
        witnessScript, err := input.GenMultiSigScript(aPub, bPub)
46✔
3540
        if err != nil {
46✔
3541
                return nil, err
×
3542
        }
×
3543

3544
        // With the witness script generated, we'll now turn it into a p2wsh
3545
        // script:
3546
        //  * OP_0 <sha256(script)>
3547
        bldr := txscript.NewScriptBuilder(
46✔
3548
                txscript.WithScriptAllocSize(input.P2WSHSize),
46✔
3549
        )
46✔
3550
        bldr.AddOp(txscript.OP_0)
46✔
3551
        scriptHash := sha256.Sum256(witnessScript)
46✔
3552
        bldr.AddData(scriptHash[:])
46✔
3553

46✔
3554
        return bldr.Script()
46✔
3555
}
3556

3557
// EdgePoint couples the outpoint of a channel with the funding script that it
3558
// creates. The FilteredChainView will use this to watch for spends of this
3559
// edge point on chain. We require both of these values as depending on the
3560
// concrete implementation, either the pkScript, or the out point will be used.
3561
type EdgePoint struct {
3562
        // FundingPkScript is the p2wsh multi-sig script of the target channel.
3563
        FundingPkScript []byte
3564

3565
        // OutPoint is the outpoint of the target channel.
3566
        OutPoint wire.OutPoint
3567
}
3568

3569
// String returns a human readable version of the target EdgePoint. We return
3570
// the outpoint directly as it is enough to uniquely identify the edge point.
3571
func (e *EdgePoint) String() string {
×
3572
        return e.OutPoint.String()
×
3573
}
×
3574

3575
// ChannelView returns the verifiable edge information for each active channel
3576
// within the known channel graph. The set of UTXO's (along with their scripts)
3577
// returned are the ones that need to be watched on chain to detect channel
3578
// closes on the resident blockchain.
3579
func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) {
22✔
3580
        var edgePoints []EdgePoint
22✔
3581
        if err := kvdb.View(c.db, func(tx kvdb.RTx) error {
44✔
3582
                // We're going to iterate over the entire channel index, so
22✔
3583
                // we'll need to fetch the edgeBucket to get to the index as
22✔
3584
                // it's a sub-bucket.
22✔
3585
                edges := tx.ReadBucket(edgeBucket)
22✔
3586
                if edges == nil {
22✔
3587
                        return ErrGraphNoEdgesFound
×
3588
                }
×
3589
                chanIndex := edges.NestedReadBucket(channelPointBucket)
22✔
3590
                if chanIndex == nil {
22✔
3591
                        return ErrGraphNoEdgesFound
×
3592
                }
×
3593
                edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
22✔
3594
                if edgeIndex == nil {
22✔
3595
                        return ErrGraphNoEdgesFound
×
3596
                }
×
3597

3598
                // Once we have the proper bucket, we'll range over each key
3599
                // (which is the channel point for the channel) and decode it,
3600
                // accumulating each entry.
3601
                return chanIndex.ForEach(
22✔
3602
                        func(chanPointBytes, chanID []byte) error {
64✔
3603
                                chanPointReader := bytes.NewReader(
42✔
3604
                                        chanPointBytes,
42✔
3605
                                )
42✔
3606

42✔
3607
                                var chanPoint wire.OutPoint
42✔
3608
                                err := ReadOutpoint(chanPointReader, &chanPoint)
42✔
3609
                                if err != nil {
42✔
3610
                                        return err
×
3611
                                }
×
3612

3613
                                edgeInfo, err := fetchChanEdgeInfo(
42✔
3614
                                        edgeIndex, chanID,
42✔
3615
                                )
42✔
3616
                                if err != nil {
42✔
3617
                                        return err
×
3618
                                }
×
3619

3620
                                pkScript, err := genMultiSigP2WSH(
42✔
3621
                                        edgeInfo.BitcoinKey1Bytes[:],
42✔
3622
                                        edgeInfo.BitcoinKey2Bytes[:],
42✔
3623
                                )
42✔
3624
                                if err != nil {
42✔
3625
                                        return err
×
3626
                                }
×
3627

3628
                                edgePoints = append(edgePoints, EdgePoint{
42✔
3629
                                        FundingPkScript: pkScript,
42✔
3630
                                        OutPoint:        chanPoint,
42✔
3631
                                })
42✔
3632

42✔
3633
                                return nil
42✔
3634
                        },
3635
                )
3636
        }, func() {
22✔
3637
                edgePoints = nil
22✔
3638
        }); err != nil {
22✔
3639
                return nil, err
×
3640
        }
×
3641

3642
        return edgePoints, nil
22✔
3643
}
3644

3645
// MarkEdgeZombie attempts to mark a channel identified by its channel ID as a
3646
// zombie. This method is used on an ad-hoc basis, when channels need to be
3647
// marked as zombies outside the normal pruning cycle.
3648
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
3649
        pubKey1, pubKey2 [33]byte) error {
117✔
3650

117✔
3651
        c.cacheMu.Lock()
117✔
3652
        defer c.cacheMu.Unlock()
117✔
3653

117✔
3654
        err := kvdb.Batch(c.db, func(tx kvdb.RwTx) error {
234✔
3655
                edges := tx.ReadWriteBucket(edgeBucket)
117✔
3656
                if edges == nil {
117✔
3657
                        return ErrGraphNoEdgesFound
×
3658
                }
×
3659
                zombieIndex, err := edges.CreateBucketIfNotExists(zombieBucket)
117✔
3660
                if err != nil {
117✔
3661
                        return fmt.Errorf("unable to create zombie "+
×
3662
                                "bucket: %w", err)
×
3663
                }
×
3664

3665
                if c.graphCache != nil {
234✔
3666
                        c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
117✔
3667
                }
117✔
3668

3669
                return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
117✔
3670
        })
3671
        if err != nil {
117✔
3672
                return err
×
3673
        }
×
3674

3675
        c.rejectCache.remove(chanID)
117✔
3676
        c.chanCache.remove(chanID)
117✔
3677

117✔
3678
        return nil
117✔
3679
}
3680

3681
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
3682
// keys should represent the node public keys of the two parties involved in the
3683
// edge.
3684
func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1,
3685
        pubKey2 [33]byte) error {
141✔
3686

141✔
3687
        var k [8]byte
141✔
3688
        byteOrder.PutUint64(k[:], chanID)
141✔
3689

141✔
3690
        var v [66]byte
141✔
3691
        copy(v[:33], pubKey1[:])
141✔
3692
        copy(v[33:], pubKey2[:])
141✔
3693

141✔
3694
        return zombieIndex.Put(k[:], v[:])
141✔
3695
}
141✔
3696

3697
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3698
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
2✔
3699
        c.cacheMu.Lock()
2✔
3700
        defer c.cacheMu.Unlock()
2✔
3701

2✔
3702
        return c.markEdgeLiveUnsafe(nil, chanID)
2✔
3703
}
2✔
3704

3705
// markEdgeLiveUnsafe clears an edge from the zombie index. This method can be
3706
// called with an existing kvdb.RwTx or the argument can be set to nil in which
3707
// case a new transaction will be created.
3708
//
3709
// NOTE: this method MUST only be called if the cacheMu has already been
3710
// acquired.
3711
func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
25✔
3712
        dbFn := func(tx kvdb.RwTx) error {
50✔
3713
                edges := tx.ReadWriteBucket(edgeBucket)
25✔
3714
                if edges == nil {
25✔
3715
                        return ErrGraphNoEdgesFound
×
3716
                }
×
3717
                zombieIndex := edges.NestedReadWriteBucket(zombieBucket)
25✔
3718
                if zombieIndex == nil {
25✔
3719
                        return nil
×
3720
                }
×
3721

3722
                var k [8]byte
25✔
3723
                byteOrder.PutUint64(k[:], chanID)
25✔
3724

25✔
3725
                if len(zombieIndex.Get(k[:])) == 0 {
26✔
3726
                        return ErrZombieEdgeNotFound
1✔
3727
                }
1✔
3728

3729
                return zombieIndex.Delete(k[:])
24✔
3730
        }
3731

3732
        // If the transaction is nil, we'll create a new one. Otherwise, we use
3733
        // the existing transaction
3734
        var err error
25✔
3735
        if tx == nil {
27✔
3736
                err = kvdb.Update(c.db, dbFn, func() {})
4✔
3737
        } else {
23✔
3738
                err = dbFn(tx)
23✔
3739
        }
23✔
3740
        if err != nil {
26✔
3741
                return err
1✔
3742
        }
1✔
3743

3744
        c.rejectCache.remove(chanID)
24✔
3745
        c.chanCache.remove(chanID)
24✔
3746

24✔
3747
        // We need to add the channel back into our graph cache, otherwise we
24✔
3748
        // won't use it for path finding.
24✔
3749
        if c.graphCache != nil {
48✔
3750
                edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
24✔
3751
                if err != nil {
24✔
3752
                        return err
×
3753
                }
×
3754

3755
                for _, edgeInfo := range edgeInfos {
24✔
3756
                        c.graphCache.AddChannel(
×
3757
                                edgeInfo.Info, edgeInfo.Policy1,
×
3758
                                edgeInfo.Policy2,
×
3759
                        )
×
3760
                }
×
3761
        }
3762

3763
        return nil
24✔
3764
}
3765

3766
// IsZombieEdge returns whether the edge is considered zombie. If it is a
3767
// zombie, then the two node public keys corresponding to this edge are also
3768
// returned.
3769
func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) {
5✔
3770
        var (
5✔
3771
                isZombie         bool
5✔
3772
                pubKey1, pubKey2 [33]byte
5✔
3773
        )
5✔
3774

5✔
3775
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
10✔
3776
                edges := tx.ReadBucket(edgeBucket)
5✔
3777
                if edges == nil {
5✔
3778
                        return ErrGraphNoEdgesFound
×
3779
                }
×
3780
                zombieIndex := edges.NestedReadBucket(zombieBucket)
5✔
3781
                if zombieIndex == nil {
5✔
3782
                        return nil
×
3783
                }
×
3784

3785
                isZombie, pubKey1, pubKey2 = isZombieEdge(zombieIndex, chanID)
5✔
3786
                return nil
5✔
3787
        }, func() {
5✔
3788
                isZombie = false
5✔
3789
                pubKey1 = [33]byte{}
5✔
3790
                pubKey2 = [33]byte{}
5✔
3791
        })
5✔
3792
        if err != nil {
5✔
3793
                return false, [33]byte{}, [33]byte{}
×
3794
        }
×
3795

3796
        return isZombie, pubKey1, pubKey2
5✔
3797
}
3798

3799
// isZombieEdge returns whether an entry exists for the given channel in the
3800
// zombie index. If an entry exists, then the two node public keys corresponding
3801
// to this edge are also returned.
3802
func isZombieEdge(zombieIndex kvdb.RBucket,
3803
        chanID uint64) (bool, [33]byte, [33]byte) {
193✔
3804

193✔
3805
        var k [8]byte
193✔
3806
        byteOrder.PutUint64(k[:], chanID)
193✔
3807

193✔
3808
        v := zombieIndex.Get(k[:])
193✔
3809
        if v == nil {
313✔
3810
                return false, [33]byte{}, [33]byte{}
120✔
3811
        }
120✔
3812

3813
        var pubKey1, pubKey2 [33]byte
73✔
3814
        copy(pubKey1[:], v[:33])
73✔
3815
        copy(pubKey2[:], v[33:])
73✔
3816

73✔
3817
        return true, pubKey1, pubKey2
73✔
3818
}
3819

3820
// NumZombies returns the current number of zombie channels in the graph.
3821
func (c *ChannelGraph) NumZombies() (uint64, error) {
4✔
3822
        var numZombies uint64
4✔
3823
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
8✔
3824
                edges := tx.ReadBucket(edgeBucket)
4✔
3825
                if edges == nil {
4✔
3826
                        return nil
×
3827
                }
×
3828
                zombieIndex := edges.NestedReadBucket(zombieBucket)
4✔
3829
                if zombieIndex == nil {
4✔
3830
                        return nil
×
3831
                }
×
3832

3833
                return zombieIndex.ForEach(func(_, _ []byte) error {
6✔
3834
                        numZombies++
2✔
3835
                        return nil
2✔
3836
                })
2✔
3837
        }, func() {
4✔
3838
                numZombies = 0
4✔
3839
        })
4✔
3840
        if err != nil {
4✔
3841
                return 0, err
×
3842
        }
×
3843

3844
        return numZombies, nil
4✔
3845
}
3846

3847
// PutClosedScid stores a SCID for a closed channel in the database. This is so
3848
// that we can ignore channel announcements that we know to be closed without
3849
// having to validate them and fetch a block.
3850
func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error {
1✔
3851
        return kvdb.Update(c.db, func(tx kvdb.RwTx) error {
2✔
3852
                closedScids, err := tx.CreateTopLevelBucket(closedScidBucket)
1✔
3853
                if err != nil {
1✔
3854
                        return err
×
3855
                }
×
3856

3857
                var k [8]byte
1✔
3858
                byteOrder.PutUint64(k[:], scid.ToUint64())
1✔
3859

1✔
3860
                return closedScids.Put(k[:], []byte{})
1✔
3861
        }, func() {})
1✔
3862
}
3863

3864
// IsClosedScid checks whether a channel identified by the passed in scid is
3865
// closed. This helps avoid having to perform expensive validation checks.
3866
// TODO: Add an LRU cache to cut down on disc reads.
3867
func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
2✔
3868
        var isClosed bool
2✔
3869
        err := kvdb.View(c.db, func(tx kvdb.RTx) error {
4✔
3870
                closedScids := tx.ReadBucket(closedScidBucket)
2✔
3871
                if closedScids == nil {
2✔
3872
                        return ErrClosedScidsNotFound
×
3873
                }
×
3874

3875
                var k [8]byte
2✔
3876
                byteOrder.PutUint64(k[:], scid.ToUint64())
2✔
3877

2✔
3878
                if closedScids.Get(k[:]) != nil {
3✔
3879
                        isClosed = true
1✔
3880
                        return nil
1✔
3881
                }
1✔
3882

3883
                return nil
1✔
3884
        }, func() {
2✔
3885
                isClosed = false
2✔
3886
        })
2✔
3887
        if err != nil {
2✔
3888
                return false, err
×
3889
        }
×
3890

3891
        return isClosed, nil
2✔
3892
}
3893

3894
// GraphSession will provide the call-back with access to a NodeTraverser
3895
// instance which can be used to perform queries against the channel graph. If
3896
// the graph cache is not enabled, then the call-back will  be provided with
3897
// access to the graph via a consistent read-only transaction.
3898
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
133✔
3899
        if c.graphCache != nil {
212✔
3900
                return cb(&nodeTraverserSession{db: c})
79✔
3901
        }
79✔
3902

3903
        return c.db.View(func(tx walletdb.ReadTx) error {
108✔
3904
                return cb(&nodeTraverserSession{
54✔
3905
                        db: c,
54✔
3906
                        tx: tx,
54✔
3907
                })
54✔
3908
        }, func() {})
108✔
3909
}
3910

3911
// nodeTraverserSession implements the NodeTraverser interface but with a
3912
// backing read only transaction for a consistent view of the graph in the case
3913
// where the graph Cache has not been enabled.
3914
type nodeTraverserSession struct {
3915
        tx kvdb.RTx
3916
        db *ChannelGraph
3917
}
3918

3919
// ForEachNodeDirectedChannel calls the callback for every channel of the given
3920
// node.
3921
//
3922
// NOTE: Part of the NodeTraverser interface.
3923
func (c *nodeTraverserSession) ForEachNodeDirectedChannel(nodePub route.Vertex,
3924
        cb func(channel *DirectedChannel) error) error {
590✔
3925

590✔
3926
        return c.db.forEachNodeDirectedChannel(c.tx, nodePub, cb)
590✔
3927
}
590✔
3928

3929
// FetchNodeFeatures returns the features of the given node. If the node is
3930
// unknown, assume no additional features are supported.
3931
//
3932
// NOTE: Part of the NodeTraverser interface.
3933
func (c *nodeTraverserSession) FetchNodeFeatures(nodePub route.Vertex) (
3934
        *lnwire.FeatureVector, error) {
620✔
3935

620✔
3936
        return c.db.fetchNodeFeatures(c.tx, nodePub)
620✔
3937
}
620✔
3938

3939
func putLightningNode(nodeBucket kvdb.RwBucket, aliasBucket kvdb.RwBucket, // nolint:dupl
3940
        updateIndex kvdb.RwBucket, node *models.LightningNode) error {
989✔
3941

989✔
3942
        var (
989✔
3943
                scratch [16]byte
989✔
3944
                b       bytes.Buffer
989✔
3945
        )
989✔
3946

989✔
3947
        pub, err := node.PubKey()
989✔
3948
        if err != nil {
989✔
3949
                return err
×
3950
        }
×
3951
        nodePub := pub.SerializeCompressed()
989✔
3952

989✔
3953
        // If the node has the update time set, write it, else write 0.
989✔
3954
        updateUnix := uint64(0)
989✔
3955
        if node.LastUpdate.Unix() > 0 {
1,852✔
3956
                updateUnix = uint64(node.LastUpdate.Unix())
863✔
3957
        }
863✔
3958

3959
        byteOrder.PutUint64(scratch[:8], updateUnix)
989✔
3960
        if _, err := b.Write(scratch[:8]); err != nil {
989✔
3961
                return err
×
3962
        }
×
3963

3964
        if _, err := b.Write(nodePub); err != nil {
989✔
3965
                return err
×
3966
        }
×
3967

3968
        // If we got a node announcement for this node, we will have the rest
3969
        // of the data available. If not we don't have more data to write.
3970
        if !node.HaveNodeAnnouncement {
1,062✔
3971
                // Write HaveNodeAnnouncement=0.
73✔
3972
                byteOrder.PutUint16(scratch[:2], 0)
73✔
3973
                if _, err := b.Write(scratch[:2]); err != nil {
73✔
3974
                        return err
×
3975
                }
×
3976

3977
                return nodeBucket.Put(nodePub, b.Bytes())
73✔
3978
        }
3979

3980
        // Write HaveNodeAnnouncement=1.
3981
        byteOrder.PutUint16(scratch[:2], 1)
916✔
3982
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
3983
                return err
×
3984
        }
×
3985

3986
        if err := binary.Write(&b, byteOrder, node.Color.R); err != nil {
916✔
3987
                return err
×
3988
        }
×
3989
        if err := binary.Write(&b, byteOrder, node.Color.G); err != nil {
916✔
3990
                return err
×
3991
        }
×
3992
        if err := binary.Write(&b, byteOrder, node.Color.B); err != nil {
916✔
3993
                return err
×
3994
        }
×
3995

3996
        if err := wire.WriteVarString(&b, 0, node.Alias); err != nil {
916✔
3997
                return err
×
3998
        }
×
3999

4000
        if err := node.Features.Encode(&b); err != nil {
916✔
4001
                return err
×
4002
        }
×
4003

4004
        numAddresses := uint16(len(node.Addresses))
916✔
4005
        byteOrder.PutUint16(scratch[:2], numAddresses)
916✔
4006
        if _, err := b.Write(scratch[:2]); err != nil {
916✔
4007
                return err
×
4008
        }
×
4009

4010
        for _, address := range node.Addresses {
2,060✔
4011
                if err := SerializeAddr(&b, address); err != nil {
1,144✔
4012
                        return err
×
4013
                }
×
4014
        }
4015

4016
        sigLen := len(node.AuthSigBytes)
916✔
4017
        if sigLen > 80 {
916✔
4018
                return fmt.Errorf("max sig len allowed is 80, had %v",
×
4019
                        sigLen)
×
4020
        }
×
4021

4022
        err = wire.WriteVarBytes(&b, 0, node.AuthSigBytes)
916✔
4023
        if err != nil {
916✔
4024
                return err
×
4025
        }
×
4026

4027
        if len(node.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
916✔
4028
                return ErrTooManyExtraOpaqueBytes(len(node.ExtraOpaqueData))
×
4029
        }
×
4030
        err = wire.WriteVarBytes(&b, 0, node.ExtraOpaqueData)
916✔
4031
        if err != nil {
916✔
4032
                return err
×
4033
        }
×
4034

4035
        if err := aliasBucket.Put(nodePub, []byte(node.Alias)); err != nil {
916✔
4036
                return err
×
4037
        }
×
4038

4039
        // With the alias bucket updated, we'll now update the index that
4040
        // tracks the time series of node updates.
4041
        var indexKey [8 + 33]byte
916✔
4042
        byteOrder.PutUint64(indexKey[:8], updateUnix)
916✔
4043
        copy(indexKey[8:], nodePub)
916✔
4044

916✔
4045
        // If there was already an old index entry for this node, then we'll
916✔
4046
        // delete the old one before we write the new entry.
916✔
4047
        if nodeBytes := nodeBucket.Get(nodePub); nodeBytes != nil {
1,020✔
4048
                // Extract out the old update time to we can reconstruct the
104✔
4049
                // prior index key to delete it from the index.
104✔
4050
                oldUpdateTime := nodeBytes[:8]
104✔
4051

104✔
4052
                var oldIndexKey [8 + 33]byte
104✔
4053
                copy(oldIndexKey[:8], oldUpdateTime)
104✔
4054
                copy(oldIndexKey[8:], nodePub)
104✔
4055

104✔
4056
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
104✔
4057
                        return err
×
4058
                }
×
4059
        }
4060

4061
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
916✔
4062
                return err
×
4063
        }
×
4064

4065
        return nodeBucket.Put(nodePub, b.Bytes())
916✔
4066
}
4067

4068
func fetchLightningNode(nodeBucket kvdb.RBucket,
4069
        nodePub []byte) (models.LightningNode, error) {
3,629✔
4070

3,629✔
4071
        nodeBytes := nodeBucket.Get(nodePub)
3,629✔
4072
        if nodeBytes == nil {
3,701✔
4073
                return models.LightningNode{}, ErrGraphNodeNotFound
72✔
4074
        }
72✔
4075

4076
        nodeReader := bytes.NewReader(nodeBytes)
3,557✔
4077
        return deserializeLightningNode(nodeReader)
3,557✔
4078
}
4079

4080
func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex,
4081
        *lnwire.FeatureVector, error) {
120✔
4082

120✔
4083
        var (
120✔
4084
                pubKey      route.Vertex
120✔
4085
                features    = lnwire.EmptyFeatureVector()
120✔
4086
                nodeScratch [8]byte
120✔
4087
        )
120✔
4088

120✔
4089
        // Skip ahead:
120✔
4090
        // - LastUpdate (8 bytes)
120✔
4091
        if _, err := r.Read(nodeScratch[:]); err != nil {
120✔
4092
                return pubKey, nil, err
×
4093
        }
×
4094

4095
        if _, err := io.ReadFull(r, pubKey[:]); err != nil {
120✔
4096
                return pubKey, nil, err
×
4097
        }
×
4098

4099
        // Read the node announcement flag.
4100
        if _, err := r.Read(nodeScratch[:2]); err != nil {
120✔
4101
                return pubKey, nil, err
×
4102
        }
×
4103
        hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
120✔
4104

120✔
4105
        // The rest of the data is optional, and will only be there if we got a
120✔
4106
        // node announcement for this node.
120✔
4107
        if hasNodeAnn == 0 {
120✔
4108
                return pubKey, features, nil
×
4109
        }
×
4110

4111
        // We did get a node announcement for this node, so we'll have the rest
4112
        // of the data available.
4113
        var rgb uint8
120✔
4114
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4115
                return pubKey, nil, err
×
4116
        }
×
4117
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4118
                return pubKey, nil, err
×
4119
        }
×
4120
        if err := binary.Read(r, byteOrder, &rgb); err != nil {
120✔
4121
                return pubKey, nil, err
×
4122
        }
×
4123

4124
        if _, err := wire.ReadVarString(r, 0); err != nil {
120✔
4125
                return pubKey, nil, err
×
4126
        }
×
4127

4128
        if err := features.Decode(r); err != nil {
120✔
4129
                return pubKey, nil, err
×
4130
        }
×
4131

4132
        return pubKey, features, nil
120✔
4133
}
4134

4135
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
8,516✔
4136
        var (
8,516✔
4137
                node    models.LightningNode
8,516✔
4138
                scratch [8]byte
8,516✔
4139
                err     error
8,516✔
4140
        )
8,516✔
4141

8,516✔
4142
        // Always populate a feature vector, even if we don't have a node
8,516✔
4143
        // announcement and short circuit below.
8,516✔
4144
        node.Features = lnwire.EmptyFeatureVector()
8,516✔
4145

8,516✔
4146
        if _, err := r.Read(scratch[:]); err != nil {
8,516✔
4147
                return models.LightningNode{}, err
×
4148
        }
×
4149

4150
        unix := int64(byteOrder.Uint64(scratch[:]))
8,516✔
4151
        node.LastUpdate = time.Unix(unix, 0)
8,516✔
4152

8,516✔
4153
        if _, err := io.ReadFull(r, node.PubKeyBytes[:]); err != nil {
8,516✔
4154
                return models.LightningNode{}, err
×
4155
        }
×
4156

4157
        if _, err := r.Read(scratch[:2]); err != nil {
8,516✔
4158
                return models.LightningNode{}, err
×
4159
        }
×
4160

4161
        hasNodeAnn := byteOrder.Uint16(scratch[:2])
8,516✔
4162
        if hasNodeAnn == 1 {
16,884✔
4163
                node.HaveNodeAnnouncement = true
8,368✔
4164
        } else {
8,516✔
4165
                node.HaveNodeAnnouncement = false
148✔
4166
        }
148✔
4167

4168
        // The rest of the data is optional, and will only be there if we got a
4169
        // node announcement for this node.
4170
        if !node.HaveNodeAnnouncement {
8,664✔
4171
                return node, nil
148✔
4172
        }
148✔
4173

4174
        // We did get a node announcement for this node, so we'll have the rest
4175
        // of the data available.
4176
        if err := binary.Read(r, byteOrder, &node.Color.R); err != nil {
8,368✔
4177
                return models.LightningNode{}, err
×
4178
        }
×
4179
        if err := binary.Read(r, byteOrder, &node.Color.G); err != nil {
8,368✔
4180
                return models.LightningNode{}, err
×
4181
        }
×
4182
        if err := binary.Read(r, byteOrder, &node.Color.B); err != nil {
8,368✔
4183
                return models.LightningNode{}, err
×
4184
        }
×
4185

4186
        node.Alias, err = wire.ReadVarString(r, 0)
8,368✔
4187
        if err != nil {
8,368✔
4188
                return models.LightningNode{}, err
×
4189
        }
×
4190

4191
        err = node.Features.Decode(r)
8,368✔
4192
        if err != nil {
8,368✔
4193
                return models.LightningNode{}, err
×
4194
        }
×
4195

4196
        if _, err := r.Read(scratch[:2]); err != nil {
8,368✔
4197
                return models.LightningNode{}, err
×
4198
        }
×
4199
        numAddresses := int(byteOrder.Uint16(scratch[:2]))
8,368✔
4200

8,368✔
4201
        var addresses []net.Addr
8,368✔
4202
        for i := 0; i < numAddresses; i++ {
18,974✔
4203
                address, err := DeserializeAddr(r)
10,606✔
4204
                if err != nil {
10,606✔
4205
                        return models.LightningNode{}, err
×
4206
                }
×
4207
                addresses = append(addresses, address)
10,606✔
4208
        }
4209
        node.Addresses = addresses
8,368✔
4210

8,368✔
4211
        node.AuthSigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,368✔
4212
        if err != nil {
8,368✔
4213
                return models.LightningNode{}, err
×
4214
        }
×
4215

4216
        // We'll try and see if there are any opaque bytes left, if not, then
4217
        // we'll ignore the EOF error and return the node as is.
4218
        node.ExtraOpaqueData, err = wire.ReadVarBytes(
8,368✔
4219
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,368✔
4220
        )
8,368✔
4221
        switch {
8,368✔
4222
        case err == io.ErrUnexpectedEOF:
×
4223
        case err == io.EOF:
×
4224
        case err != nil:
×
4225
                return models.LightningNode{}, err
×
4226
        }
4227

4228
        return node, nil
8,368✔
4229
}
4230

4231
func putChanEdgeInfo(edgeIndex kvdb.RwBucket,
4232
        edgeInfo *models.ChannelEdgeInfo, chanID [8]byte) error {
1,489✔
4233

1,489✔
4234
        var b bytes.Buffer
1,489✔
4235

1,489✔
4236
        if _, err := b.Write(edgeInfo.NodeKey1Bytes[:]); err != nil {
1,489✔
4237
                return err
×
4238
        }
×
4239
        if _, err := b.Write(edgeInfo.NodeKey2Bytes[:]); err != nil {
1,489✔
4240
                return err
×
4241
        }
×
4242
        if _, err := b.Write(edgeInfo.BitcoinKey1Bytes[:]); err != nil {
1,489✔
4243
                return err
×
4244
        }
×
4245
        if _, err := b.Write(edgeInfo.BitcoinKey2Bytes[:]); err != nil {
1,489✔
4246
                return err
×
4247
        }
×
4248

4249
        if err := wire.WriteVarBytes(&b, 0, edgeInfo.Features); err != nil {
1,489✔
4250
                return err
×
4251
        }
×
4252

4253
        authProof := edgeInfo.AuthProof
1,489✔
4254
        var nodeSig1, nodeSig2, bitcoinSig1, bitcoinSig2 []byte
1,489✔
4255
        if authProof != nil {
2,895✔
4256
                nodeSig1 = authProof.NodeSig1Bytes
1,406✔
4257
                nodeSig2 = authProof.NodeSig2Bytes
1,406✔
4258
                bitcoinSig1 = authProof.BitcoinSig1Bytes
1,406✔
4259
                bitcoinSig2 = authProof.BitcoinSig2Bytes
1,406✔
4260
        }
1,406✔
4261

4262
        if err := wire.WriteVarBytes(&b, 0, nodeSig1); err != nil {
1,489✔
4263
                return err
×
4264
        }
×
4265
        if err := wire.WriteVarBytes(&b, 0, nodeSig2); err != nil {
1,489✔
4266
                return err
×
4267
        }
×
4268
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig1); err != nil {
1,489✔
4269
                return err
×
4270
        }
×
4271
        if err := wire.WriteVarBytes(&b, 0, bitcoinSig2); err != nil {
1,489✔
4272
                return err
×
4273
        }
×
4274

4275
        if err := WriteOutpoint(&b, &edgeInfo.ChannelPoint); err != nil {
1,489✔
4276
                return err
×
4277
        }
×
4278
        err := binary.Write(&b, byteOrder, uint64(edgeInfo.Capacity))
1,489✔
4279
        if err != nil {
1,489✔
4280
                return err
×
4281
        }
×
4282
        if _, err := b.Write(chanID[:]); err != nil {
1,489✔
4283
                return err
×
4284
        }
×
4285
        if _, err := b.Write(edgeInfo.ChainHash[:]); err != nil {
1,489✔
4286
                return err
×
4287
        }
×
4288

4289
        if len(edgeInfo.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
1,489✔
4290
                return ErrTooManyExtraOpaqueBytes(len(edgeInfo.ExtraOpaqueData))
×
4291
        }
×
4292
        err = wire.WriteVarBytes(&b, 0, edgeInfo.ExtraOpaqueData)
1,489✔
4293
        if err != nil {
1,489✔
4294
                return err
×
4295
        }
×
4296

4297
        return edgeIndex.Put(chanID[:], b.Bytes())
1,489✔
4298
}
4299

4300
func fetchChanEdgeInfo(edgeIndex kvdb.RBucket,
4301
        chanID []byte) (models.ChannelEdgeInfo, error) {
4,197✔
4302

4,197✔
4303
        edgeInfoBytes := edgeIndex.Get(chanID)
4,197✔
4304
        if edgeInfoBytes == nil {
4,283✔
4305
                return models.ChannelEdgeInfo{}, ErrEdgeNotFound
86✔
4306
        }
86✔
4307

4308
        edgeInfoReader := bytes.NewReader(edgeInfoBytes)
4,111✔
4309
        return deserializeChanEdgeInfo(edgeInfoReader)
4,111✔
4310
}
4311

4312
func deserializeChanEdgeInfo(r io.Reader) (models.ChannelEdgeInfo, error) {
4,742✔
4313
        var (
4,742✔
4314
                err      error
4,742✔
4315
                edgeInfo models.ChannelEdgeInfo
4,742✔
4316
        )
4,742✔
4317

4,742✔
4318
        if _, err := io.ReadFull(r, edgeInfo.NodeKey1Bytes[:]); err != nil {
4,742✔
4319
                return models.ChannelEdgeInfo{}, err
×
4320
        }
×
4321
        if _, err := io.ReadFull(r, edgeInfo.NodeKey2Bytes[:]); err != nil {
4,742✔
4322
                return models.ChannelEdgeInfo{}, err
×
4323
        }
×
4324
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey1Bytes[:]); err != nil {
4,742✔
4325
                return models.ChannelEdgeInfo{}, err
×
4326
        }
×
4327
        if _, err := io.ReadFull(r, edgeInfo.BitcoinKey2Bytes[:]); err != nil {
4,742✔
4328
                return models.ChannelEdgeInfo{}, err
×
4329
        }
×
4330

4331
        edgeInfo.Features, err = wire.ReadVarBytes(r, 0, 900, "features")
4,742✔
4332
        if err != nil {
4,742✔
4333
                return models.ChannelEdgeInfo{}, err
×
4334
        }
×
4335

4336
        proof := &models.ChannelAuthProof{}
4,742✔
4337

4,742✔
4338
        proof.NodeSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,742✔
4339
        if err != nil {
4,742✔
4340
                return models.ChannelEdgeInfo{}, err
×
4341
        }
×
4342
        proof.NodeSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,742✔
4343
        if err != nil {
4,742✔
4344
                return models.ChannelEdgeInfo{}, err
×
4345
        }
×
4346
        proof.BitcoinSig1Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,742✔
4347
        if err != nil {
4,742✔
4348
                return models.ChannelEdgeInfo{}, err
×
4349
        }
×
4350
        proof.BitcoinSig2Bytes, err = wire.ReadVarBytes(r, 0, 80, "sigs")
4,742✔
4351
        if err != nil {
4,742✔
4352
                return models.ChannelEdgeInfo{}, err
×
4353
        }
×
4354

4355
        if !proof.IsEmpty() {
6,535✔
4356
                edgeInfo.AuthProof = proof
1,793✔
4357
        }
1,793✔
4358

4359
        edgeInfo.ChannelPoint = wire.OutPoint{}
4,742✔
4360
        if err := ReadOutpoint(r, &edgeInfo.ChannelPoint); err != nil {
4,742✔
4361
                return models.ChannelEdgeInfo{}, err
×
4362
        }
×
4363
        if err := binary.Read(r, byteOrder, &edgeInfo.Capacity); err != nil {
4,742✔
4364
                return models.ChannelEdgeInfo{}, err
×
4365
        }
×
4366
        if err := binary.Read(r, byteOrder, &edgeInfo.ChannelID); err != nil {
4,742✔
4367
                return models.ChannelEdgeInfo{}, err
×
4368
        }
×
4369

4370
        if _, err := io.ReadFull(r, edgeInfo.ChainHash[:]); err != nil {
4,742✔
4371
                return models.ChannelEdgeInfo{}, err
×
4372
        }
×
4373

4374
        // We'll try and see if there are any opaque bytes left, if not, then
4375
        // we'll ignore the EOF error and return the edge as is.
4376
        edgeInfo.ExtraOpaqueData, err = wire.ReadVarBytes(
4,742✔
4377
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
4,742✔
4378
        )
4,742✔
4379
        switch {
4,742✔
4380
        case err == io.ErrUnexpectedEOF:
×
4381
        case err == io.EOF:
×
4382
        case err != nil:
×
4383
                return models.ChannelEdgeInfo{}, err
×
4384
        }
4385

4386
        return edgeInfo, nil
4,742✔
4387
}
4388

4389
func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy,
4390
        from, to []byte) error {
2,660✔
4391

2,660✔
4392
        var edgeKey [33 + 8]byte
2,660✔
4393
        copy(edgeKey[:], from)
2,660✔
4394
        byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
2,660✔
4395

2,660✔
4396
        var b bytes.Buffer
2,660✔
4397
        if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
2,660✔
4398
                return err
×
4399
        }
×
4400

4401
        // Before we write out the new edge, we'll create a new entry in the
4402
        // update index in order to keep it fresh.
4403
        updateUnix := uint64(edge.LastUpdate.Unix())
2,660✔
4404
        var indexKey [8 + 8]byte
2,660✔
4405
        byteOrder.PutUint64(indexKey[:8], updateUnix)
2,660✔
4406
        byteOrder.PutUint64(indexKey[8:], edge.ChannelID)
2,660✔
4407

2,660✔
4408
        updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
2,660✔
4409
        if err != nil {
2,660✔
4410
                return err
×
4411
        }
×
4412

4413
        // If there was already an entry for this edge, then we'll need to
4414
        // delete the old one to ensure we don't leave around any after-images.
4415
        // An unknown policy value does not have a update time recorded, so
4416
        // it also does not need to be removed.
4417
        if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil &&
2,660✔
4418
                !bytes.Equal(edgeBytes[:], unknownPolicy) {
2,684✔
4419

24✔
4420
                // In order to delete the old entry, we'll need to obtain the
24✔
4421
                // *prior* update time in order to delete it. To do this, we'll
24✔
4422
                // need to deserialize the existing policy within the database
24✔
4423
                // (now outdated by the new one), and delete its corresponding
24✔
4424
                // entry within the update index. We'll ignore any
24✔
4425
                // ErrEdgePolicyOptionalFieldNotFound error, as we only need
24✔
4426
                // the channel ID and update time to delete the entry.
24✔
4427
                // TODO(halseth): get rid of these invalid policies in a
24✔
4428
                // migration.
24✔
4429
                oldEdgePolicy, err := deserializeChanEdgePolicy(
24✔
4430
                        bytes.NewReader(edgeBytes),
24✔
4431
                )
24✔
4432
                if err != nil && err != ErrEdgePolicyOptionalFieldNotFound {
24✔
4433
                        return err
×
4434
                }
×
4435

4436
                oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
24✔
4437

24✔
4438
                var oldIndexKey [8 + 8]byte
24✔
4439
                byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
24✔
4440
                byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
24✔
4441

24✔
4442
                if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
24✔
4443
                        return err
×
4444
                }
×
4445
        }
4446

4447
        if err := updateIndex.Put(indexKey[:], nil); err != nil {
2,660✔
4448
                return err
×
4449
        }
×
4450

4451
        err = updateEdgePolicyDisabledIndex(
2,660✔
4452
                edges, edge.ChannelID,
2,660✔
4453
                edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
2,660✔
4454
                edge.IsDisabled(),
2,660✔
4455
        )
2,660✔
4456
        if err != nil {
2,660✔
4457
                return err
×
4458
        }
×
4459

4460
        return edges.Put(edgeKey[:], b.Bytes()[:])
2,660✔
4461
}
4462

4463
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
4464
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
4465
// one.
4466
// The direction represents the direction of the edge and disabled is used for
4467
// deciding whether to remove or add an entry to the bucket.
4468
// In general a channel is disabled if two entries for the same chanID exist
4469
// in this bucket.
4470
// Maintaining the bucket this way allows a fast retrieval of disabled
4471
// channels, for example when prune is needed.
4472
func updateEdgePolicyDisabledIndex(edges kvdb.RwBucket, chanID uint64,
4473
        direction bool, disabled bool) error {
2,956✔
4474

2,956✔
4475
        var disabledEdgeKey [8 + 1]byte
2,956✔
4476
        byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
2,956✔
4477
        if direction {
4,430✔
4478
                disabledEdgeKey[8] = 1
1,474✔
4479
        }
1,474✔
4480

4481
        disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
2,956✔
4482
                disabledEdgePolicyBucket,
2,956✔
4483
        )
2,956✔
4484
        if err != nil {
2,956✔
4485
                return err
×
4486
        }
×
4487

4488
        if disabled {
2,982✔
4489
                return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
26✔
4490
        }
26✔
4491

4492
        return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
2,930✔
4493
}
4494

4495
// putChanEdgePolicyUnknown marks the edge policy as unknown
4496
// in the edges bucket.
4497
func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
4498
        from []byte) error {
2,976✔
4499

2,976✔
4500
        var edgeKey [33 + 8]byte
2,976✔
4501
        copy(edgeKey[:], from)
2,976✔
4502
        byteOrder.PutUint64(edgeKey[33:], channelID)
2,976✔
4503

2,976✔
4504
        if edges.Get(edgeKey[:]) != nil {
2,976✔
4505
                return fmt.Errorf("cannot write unknown policy for channel %v "+
×
4506
                        " when there is already a policy present", channelID)
×
4507
        }
×
4508

4509
        return edges.Put(edgeKey[:], unknownPolicy)
2,976✔
4510
}
4511

4512
func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
4513
        nodePub []byte) (*models.ChannelEdgePolicy, error) {
8,180✔
4514

8,180✔
4515
        var edgeKey [33 + 8]byte
8,180✔
4516
        copy(edgeKey[:], nodePub)
8,180✔
4517
        copy(edgeKey[33:], chanID[:])
8,180✔
4518

8,180✔
4519
        edgeBytes := edges.Get(edgeKey[:])
8,180✔
4520
        if edgeBytes == nil {
8,180✔
4521
                return nil, ErrEdgeNotFound
×
4522
        }
×
4523

4524
        // No need to deserialize unknown policy.
4525
        if bytes.Equal(edgeBytes[:], unknownPolicy) {
8,563✔
4526
                return nil, nil
383✔
4527
        }
383✔
4528

4529
        edgeReader := bytes.NewReader(edgeBytes)
7,797✔
4530

7,797✔
4531
        ep, err := deserializeChanEdgePolicy(edgeReader)
7,797✔
4532
        switch {
7,797✔
4533
        // If the db policy was missing an expected optional field, we return
4534
        // nil as if the policy was unknown.
4535
        case err == ErrEdgePolicyOptionalFieldNotFound:
1✔
4536
                return nil, nil
1✔
4537

4538
        case err != nil:
×
4539
                return nil, err
×
4540
        }
4541

4542
        return ep, nil
7,796✔
4543
}
4544

4545
func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
4546
        chanID []byte) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy,
4547
        error) {
250✔
4548

250✔
4549
        edgeInfo := edgeIndex.Get(chanID)
250✔
4550
        if edgeInfo == nil {
250✔
4551
                return nil, nil, fmt.Errorf("%w: chanID=%x", ErrEdgeNotFound,
×
4552
                        chanID)
×
4553
        }
×
4554

4555
        // The first node is contained within the first half of the edge
4556
        // information. We only propagate the error here and below if it's
4557
        // something other than edge non-existence.
4558
        node1Pub := edgeInfo[:33]
250✔
4559
        edge1, err := fetchChanEdgePolicy(edges, chanID, node1Pub)
250✔
4560
        if err != nil {
250✔
4561
                return nil, nil, fmt.Errorf("%w: node1Pub=%x", ErrEdgeNotFound,
×
4562
                        node1Pub)
×
4563
        }
×
4564

4565
        // Similarly, the second node is contained within the latter
4566
        // half of the edge information.
4567
        node2Pub := edgeInfo[33:66]
250✔
4568
        edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub)
250✔
4569
        if err != nil {
250✔
4570
                return nil, nil, fmt.Errorf("%w: node2Pub=%x", ErrEdgeNotFound,
×
4571
                        node2Pub)
×
4572
        }
×
4573

4574
        return edge1, edge2, nil
250✔
4575
}
4576

4577
func serializeChanEdgePolicy(w io.Writer, edge *models.ChannelEdgePolicy,
4578
        to []byte) error {
2,662✔
4579

2,662✔
4580
        err := wire.WriteVarBytes(w, 0, edge.SigBytes)
2,662✔
4581
        if err != nil {
2,662✔
4582
                return err
×
4583
        }
×
4584

4585
        if err := binary.Write(w, byteOrder, edge.ChannelID); err != nil {
2,662✔
4586
                return err
×
4587
        }
×
4588

4589
        var scratch [8]byte
2,662✔
4590
        updateUnix := uint64(edge.LastUpdate.Unix())
2,662✔
4591
        byteOrder.PutUint64(scratch[:], updateUnix)
2,662✔
4592
        if _, err := w.Write(scratch[:]); err != nil {
2,662✔
4593
                return err
×
4594
        }
×
4595

4596
        if err := binary.Write(w, byteOrder, edge.MessageFlags); err != nil {
2,662✔
4597
                return err
×
4598
        }
×
4599
        if err := binary.Write(w, byteOrder, edge.ChannelFlags); err != nil {
2,662✔
4600
                return err
×
4601
        }
×
4602
        if err := binary.Write(w, byteOrder, edge.TimeLockDelta); err != nil {
2,662✔
4603
                return err
×
4604
        }
×
4605
        if err := binary.Write(w, byteOrder, uint64(edge.MinHTLC)); err != nil {
2,662✔
4606
                return err
×
4607
        }
×
4608
        err = binary.Write(w, byteOrder, uint64(edge.FeeBaseMSat))
2,662✔
4609
        if err != nil {
2,662✔
4610
                return err
×
4611
        }
×
4612
        err = binary.Write(
2,662✔
4613
                w, byteOrder, uint64(edge.FeeProportionalMillionths),
2,662✔
4614
        )
2,662✔
4615
        if err != nil {
2,662✔
4616
                return err
×
4617
        }
×
4618

4619
        if _, err := w.Write(to); err != nil {
2,662✔
4620
                return err
×
4621
        }
×
4622

4623
        // If the max_htlc field is present, we write it. To be compatible with
4624
        // older versions that wasn't aware of this field, we write it as part
4625
        // of the opaque data.
4626
        // TODO(halseth): clean up when moving to TLV.
4627
        var opaqueBuf bytes.Buffer
2,662✔
4628
        if edge.MessageFlags.HasMaxHtlc() {
4,940✔
4629
                err := binary.Write(&opaqueBuf, byteOrder, uint64(edge.MaxHTLC))
2,278✔
4630
                if err != nil {
2,278✔
4631
                        return err
×
4632
                }
×
4633
        }
4634

4635
        if len(edge.ExtraOpaqueData) > MaxAllowedExtraOpaqueBytes {
2,662✔
4636
                return ErrTooManyExtraOpaqueBytes(len(edge.ExtraOpaqueData))
×
4637
        }
×
4638
        if _, err := opaqueBuf.Write(edge.ExtraOpaqueData); err != nil {
2,662✔
4639
                return err
×
4640
        }
×
4641

4642
        if err := wire.WriteVarBytes(w, 0, opaqueBuf.Bytes()); err != nil {
2,662✔
4643
                return err
×
4644
        }
×
4645
        return nil
2,662✔
4646
}
4647

4648
func deserializeChanEdgePolicy(r io.Reader) (*models.ChannelEdgePolicy, error) {
7,822✔
4649
        // Deserialize the policy. Note that in case an optional field is not
7,822✔
4650
        // found, both an error and a populated policy object are returned.
7,822✔
4651
        edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
7,822✔
4652
        if deserializeErr != nil &&
7,822✔
4653
                deserializeErr != ErrEdgePolicyOptionalFieldNotFound {
7,822✔
4654

×
4655
                return nil, deserializeErr
×
4656
        }
×
4657

4658
        return edge, deserializeErr
7,822✔
4659
}
4660

4661
func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy,
4662
        error) {
8,829✔
4663

8,829✔
4664
        edge := &models.ChannelEdgePolicy{}
8,829✔
4665

8,829✔
4666
        var err error
8,829✔
4667
        edge.SigBytes, err = wire.ReadVarBytes(r, 0, 80, "sig")
8,829✔
4668
        if err != nil {
8,829✔
4669
                return nil, err
×
4670
        }
×
4671

4672
        if err := binary.Read(r, byteOrder, &edge.ChannelID); err != nil {
8,829✔
4673
                return nil, err
×
4674
        }
×
4675

4676
        var scratch [8]byte
8,829✔
4677
        if _, err := r.Read(scratch[:]); err != nil {
8,829✔
4678
                return nil, err
×
4679
        }
×
4680
        unix := int64(byteOrder.Uint64(scratch[:]))
8,829✔
4681
        edge.LastUpdate = time.Unix(unix, 0)
8,829✔
4682

8,829✔
4683
        if err := binary.Read(r, byteOrder, &edge.MessageFlags); err != nil {
8,829✔
4684
                return nil, err
×
4685
        }
×
4686
        if err := binary.Read(r, byteOrder, &edge.ChannelFlags); err != nil {
8,829✔
4687
                return nil, err
×
4688
        }
×
4689
        if err := binary.Read(r, byteOrder, &edge.TimeLockDelta); err != nil {
8,829✔
4690
                return nil, err
×
4691
        }
×
4692

4693
        var n uint64
8,829✔
4694
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,829✔
4695
                return nil, err
×
4696
        }
×
4697
        edge.MinHTLC = lnwire.MilliSatoshi(n)
8,829✔
4698

8,829✔
4699
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,829✔
4700
                return nil, err
×
4701
        }
×
4702
        edge.FeeBaseMSat = lnwire.MilliSatoshi(n)
8,829✔
4703

8,829✔
4704
        if err := binary.Read(r, byteOrder, &n); err != nil {
8,829✔
4705
                return nil, err
×
4706
        }
×
4707
        edge.FeeProportionalMillionths = lnwire.MilliSatoshi(n)
8,829✔
4708

8,829✔
4709
        if _, err := r.Read(edge.ToNode[:]); err != nil {
8,829✔
4710
                return nil, err
×
4711
        }
×
4712

4713
        // We'll try and see if there are any opaque bytes left, if not, then
4714
        // we'll ignore the EOF error and return the edge as is.
4715
        edge.ExtraOpaqueData, err = wire.ReadVarBytes(
8,829✔
4716
                r, 0, MaxAllowedExtraOpaqueBytes, "blob",
8,829✔
4717
        )
8,829✔
4718
        switch {
8,829✔
4719
        case err == io.ErrUnexpectedEOF:
×
4720
        case err == io.EOF:
3✔
4721
        case err != nil:
×
4722
                return nil, err
×
4723
        }
4724

4725
        // See if optional fields are present.
4726
        if edge.MessageFlags.HasMaxHtlc() {
17,284✔
4727
                // The max_htlc field should be at the beginning of the opaque
8,455✔
4728
                // bytes.
8,455✔
4729
                opq := edge.ExtraOpaqueData
8,455✔
4730

8,455✔
4731
                // If the max_htlc field is not present, it might be old data
8,455✔
4732
                // stored before this field was validated. We'll return the
8,455✔
4733
                // edge along with an error.
8,455✔
4734
                if len(opq) < 8 {
8,458✔
4735
                        return edge, ErrEdgePolicyOptionalFieldNotFound
3✔
4736
                }
3✔
4737

4738
                maxHtlc := byteOrder.Uint64(opq[:8])
8,452✔
4739
                edge.MaxHTLC = lnwire.MilliSatoshi(maxHtlc)
8,452✔
4740

8,452✔
4741
                // Exclude the parsed field from the rest of the opaque data.
8,452✔
4742
                edge.ExtraOpaqueData = opq[8:]
8,452✔
4743
        }
4744

4745
        return edge, nil
8,826✔
4746
}
4747

4748
// chanGraphNodeTx is an implementation of the NodeRTx interface backed by the
4749
// ChannelGraph and a kvdb.RTx.
4750
type chanGraphNodeTx struct {
4751
        tx   kvdb.RTx
4752
        db   *ChannelGraph
4753
        node *models.LightningNode
4754
}
4755

4756
// A compile-time constraint to ensure chanGraphNodeTx implements the NodeRTx
4757
// interface.
4758
var _ NodeRTx = (*chanGraphNodeTx)(nil)
4759

4760
func newChanGraphNodeTx(tx kvdb.RTx, db *ChannelGraph,
4761
        node *models.LightningNode) *chanGraphNodeTx {
3,914✔
4762

3,914✔
4763
        return &chanGraphNodeTx{
3,914✔
4764
                tx:   tx,
3,914✔
4765
                db:   db,
3,914✔
4766
                node: node,
3,914✔
4767
        }
3,914✔
4768
}
3,914✔
4769

4770
// Node returns the raw information of the node.
4771
//
4772
// NOTE: This is a part of the NodeRTx interface.
4773
func (c *chanGraphNodeTx) Node() *models.LightningNode {
4,839✔
4774
        return c.node
4,839✔
4775
}
4,839✔
4776

4777
// FetchNode fetches the node with the given pub key under the same transaction
4778
// used to fetch the current node. The returned node is also a NodeRTx and any
4779
// operations on that NodeRTx will also be done under the same transaction.
4780
//
4781
// NOTE: This is a part of the NodeRTx interface.
4782
func (c *chanGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
2,944✔
4783
        node, err := c.db.FetchLightningNodeTx(c.tx, nodePub)
2,944✔
4784
        if err != nil {
2,944✔
4785
                return nil, err
×
4786
        }
×
4787

4788
        return newChanGraphNodeTx(c.tx, c.db, node), nil
2,944✔
4789
}
4790

4791
// ForEachChannel can be used to iterate over the node's channels under
4792
// the same transaction used to fetch the node.
4793
//
4794
// NOTE: This is a part of the NodeRTx interface.
4795
func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo,
4796
        *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
965✔
4797

965✔
4798
        return c.db.ForEachNodeChannelTx(c.tx, c.node.PubKeyBytes,
965✔
4799
                func(_ kvdb.RTx, info *models.ChannelEdgeInfo, policy1,
965✔
4800
                        policy2 *models.ChannelEdgePolicy) error {
3,909✔
4801

2,944✔
4802
                        return f(info, policy1, policy2)
2,944✔
4803
                },
2,944✔
4804
        )
4805
}
4806

4807
// MakeTestGraph creates a new instance of the ChannelGraph for testing
4808
// purposes.
4809
func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph,
4810
        error) {
40✔
4811

40✔
4812
        opts := DefaultOptions()
40✔
4813
        for _, modifier := range modifiers {
40✔
4814
                modifier(opts)
×
4815
        }
×
4816

4817
        // Next, create channelgraph for the first time.
4818
        backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
40✔
4819
        if err != nil {
40✔
4820
                backendCleanup()
×
4821
                return nil, err
×
4822
        }
×
4823

4824
        graph, err := NewChannelGraph(backend)
40✔
4825
        if err != nil {
40✔
4826
                backendCleanup()
×
4827
                return nil, err
×
4828
        }
×
4829

4830
        t.Cleanup(func() {
80✔
4831
                _ = backend.Close()
40✔
4832
                backendCleanup()
40✔
4833
        })
40✔
4834

4835
        return graph, nil
40✔
4836
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc