• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10426952143

16 Aug 2024 10:17PM UTC coverage: 49.856% (+0.01%) from 49.843%
10426952143

Pull #8512

github

Roasbeef
lnwallet/chancloser: add unit tests for new rbf coop close
Pull Request #8512: [3/4] - lnwallet/chancloser: add new protofsm based RBF chan closer

6 of 1064 new or added lines in 6 files covered. (0.56%)

159 existing lines in 21 files now uncovered.

96167 of 192890 relevant lines covered (49.86%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.03
/graph/builder.go
1
package graph
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/go-errors/errors"
16
        "github.com/lightningnetwork/lnd/batch"
17
        "github.com/lightningnetwork/lnd/chainntnfs"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/channeldb/models"
20
        "github.com/lightningnetwork/lnd/input"
21
        "github.com/lightningnetwork/lnd/kvdb"
22
        "github.com/lightningnetwork/lnd/lnutils"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
25
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
26
        "github.com/lightningnetwork/lnd/lnwire"
27
        "github.com/lightningnetwork/lnd/multimutex"
28
        "github.com/lightningnetwork/lnd/routing/chainview"
29
        "github.com/lightningnetwork/lnd/routing/route"
30
        "github.com/lightningnetwork/lnd/ticker"
31
)
32

33
const (
34
        // DefaultChannelPruneExpiry is the default duration used to determine
35
        // if a channel should be pruned or not.
36
        DefaultChannelPruneExpiry = time.Hour * 24 * 14
37

38
        // DefaultFirstTimePruneDelay is the time we'll wait after startup
39
        // before attempting to prune the graph for zombie channels. We don't
40
        // do it immediately after startup to allow lnd to start up without
41
        // getting blocked by this job.
42
        DefaultFirstTimePruneDelay = 30 * time.Second
43

44
        // defaultStatInterval governs how often the router will log non-empty
45
        // stats related to processing new channels, updates, or node
46
        // announcements.
47
        defaultStatInterval = time.Minute
48
)
49

50
var (
51
        // ErrGraphBuilderShuttingDown is returned if the graph builder is in
52
        // the process of shutting down.
53
        ErrGraphBuilderShuttingDown = fmt.Errorf("graph builder shutting down")
54
)
55

56
// Config holds the configuration required by the Builder.
57
type Config struct {
58
        // SelfNode is the public key of the node that this channel router
59
        // belongs to.
60
        SelfNode route.Vertex
61

62
        // Graph is the channel graph that the ChannelRouter will use to gather
63
        // metrics from and also to carry out path finding queries.
64
        Graph DB
65

66
        // Chain is the router's source to the most up-to-date blockchain data.
67
        // All incoming advertised channels will be checked against the chain
68
        // to ensure that the channels advertised are still open.
69
        Chain lnwallet.BlockChainIO
70

71
        // ChainView is an instance of a FilteredChainView which is used to
72
        // watch the sub-set of the UTXO set (the set of active channels) that
73
        // we need in order to properly maintain the channel graph.
74
        ChainView chainview.FilteredChainView
75

76
        // Notifier is a reference to the ChainNotifier, used to grab
77
        // the latest blocks if the router is missing any.
78
        Notifier chainntnfs.ChainNotifier
79

80
        // ChannelPruneExpiry is the duration used to determine if a channel
81
        // should be pruned or not. If the delta between now and when the
82
        // channel was last updated is greater than ChannelPruneExpiry, then
83
        // the channel is marked as a zombie channel eligible for pruning.
84
        ChannelPruneExpiry time.Duration
85

86
        // GraphPruneInterval is used as an interval to determine how often we
87
        // should examine the channel graph to garbage collect zombie channels.
88
        GraphPruneInterval time.Duration
89

90
        // FirstTimePruneDelay is the time we'll wait after startup before
91
        // attempting to prune the graph for zombie channels. We don't do it
92
        // immediately after startup to allow lnd to start up without getting
93
        // blocked by this job.
94
        FirstTimePruneDelay time.Duration
95

96
        // AssumeChannelValid toggles whether the router will check for
97
        // spentness of channel outpoints. For neutrino, this saves long rescans
98
        // from blocking initial usage of the daemon.
99
        AssumeChannelValid bool
100

101
        // StrictZombiePruning determines if we attempt to prune zombie
102
        // channels according to a stricter criteria. If true, then we'll prune
103
        // a channel if only *one* of the edges is considered a zombie.
104
        // Otherwise, we'll only prune the channel when both edges have a very
105
        // dated last update.
106
        StrictZombiePruning bool
107

108
        // IsAlias returns whether a passed ShortChannelID is an alias. This is
109
        // only used for our local channels.
110
        IsAlias func(scid lnwire.ShortChannelID) bool
111
}
112

113
// Builder builds and maintains a view of the Lightning Network graph.
114
type Builder struct {
115
        started atomic.Bool
116
        stopped atomic.Bool
117

118
        ntfnClientCounter atomic.Uint64
119
        bestHeight        atomic.Uint32
120

121
        cfg *Config
122

123
        // newBlocks is a channel in which new blocks connected to the end of
124
        // the main chain are sent over, and blocks updated after a call to
125
        // UpdateFilter.
126
        newBlocks <-chan *chainview.FilteredBlock
127

128
        // staleBlocks is a channel in which blocks disconnected from the end
129
        // of our currently known best chain are sent over.
130
        staleBlocks <-chan *chainview.FilteredBlock
131

132
        // networkUpdates is a channel that carries new topology updates
133
        // messages from outside the Builder to be processed by the
134
        // networkHandler.
135
        networkUpdates chan *routingMsg
136

137
        // topologyClients maps a client's unique notification ID to a
138
        // topologyClient client that contains its notification dispatch
139
        // channel.
140
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
141

142
        // ntfnClientUpdates is a channel that's used to send new updates to
143
        // topology notification clients to the Builder. Updates either
144
        // add a new notification client, or cancel notifications for an
145
        // existing client.
146
        ntfnClientUpdates chan *topologyClientUpdate
147

148
        // channelEdgeMtx is a mutex we use to make sure we process only one
149
        // ChannelEdgePolicy at a time for a given channelID, to ensure
150
        // consistency between the various database accesses.
151
        channelEdgeMtx *multimutex.Mutex[uint64]
152

153
        // statTicker is a resumable ticker that logs the router's progress as
154
        // it discovers channels or receives updates.
155
        statTicker ticker.Ticker
156

157
        // stats tracks newly processed channels, updates, and node
158
        // announcements over a window of defaultStatInterval.
159
        stats *routerStats
160

161
        quit chan struct{}
162
        wg   sync.WaitGroup
163
}
164

165
// A compile time check to ensure Builder implements the
166
// ChannelGraphSource interface.
167
var _ ChannelGraphSource = (*Builder)(nil)
168

169
// NewBuilder constructs a new Builder.
170
func NewBuilder(cfg *Config) (*Builder, error) {
3✔
171
        return &Builder{
3✔
172
                cfg:               cfg,
3✔
173
                networkUpdates:    make(chan *routingMsg),
3✔
174
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
3✔
175
                ntfnClientUpdates: make(chan *topologyClientUpdate),
3✔
176
                channelEdgeMtx:    multimutex.NewMutex[uint64](),
3✔
177
                statTicker:        ticker.New(defaultStatInterval),
3✔
178
                stats:             new(routerStats),
3✔
179
                quit:              make(chan struct{}),
3✔
180
        }, nil
3✔
181
}
3✔
182

183
// Start launches all the goroutines the Builder requires to carry out its
184
// duties. If the builder has already been started, then this method is a noop.
185
func (b *Builder) Start() error {
3✔
186
        if !b.started.CompareAndSwap(false, true) {
3✔
187
                return nil
×
188
        }
×
189

190
        log.Info("Builder starting")
3✔
191

3✔
192
        bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
3✔
193
        if err != nil {
3✔
194
                return err
×
195
        }
×
196

197
        // If the graph has never been pruned, or hasn't fully been created yet,
198
        // then we don't treat this as an explicit error.
199
        if _, _, err := b.cfg.Graph.PruneTip(); err != nil {
6✔
200
                switch {
3✔
201
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
3✔
202
                        fallthrough
3✔
203

204
                case errors.Is(err, channeldb.ErrGraphNotFound):
3✔
205
                        // If the graph has never been pruned, then we'll set
3✔
206
                        // the prune height to the current best height of the
3✔
207
                        // chain backend.
3✔
208
                        _, err = b.cfg.Graph.PruneGraph(
3✔
209
                                nil, bestHash, uint32(bestHeight),
3✔
210
                        )
3✔
211
                        if err != nil {
3✔
212
                                return err
×
213
                        }
×
214

215
                default:
×
216
                        return err
×
217
                }
218
        }
219

220
        // If AssumeChannelValid is present, then we won't rely on pruning
221
        // channels from the graph based on their spentness, but whether they
222
        // are considered zombies or not. We will start zombie pruning after a
223
        // small delay, to avoid slowing down startup of lnd.
224
        if b.cfg.AssumeChannelValid { //nolint:nestif
3✔
225
                time.AfterFunc(b.cfg.FirstTimePruneDelay, func() {
×
226
                        select {
×
227
                        case <-b.quit:
×
228
                                return
×
229
                        default:
×
230
                        }
231

232
                        log.Info("Initial zombie prune starting")
×
233
                        if err := b.pruneZombieChans(); err != nil {
×
234
                                log.Errorf("Unable to prune zombies: %v", err)
×
235
                        }
×
236
                })
237
        } else {
3✔
238
                // Otherwise, we'll use our filtered chain view to prune
3✔
239
                // channels as soon as they are detected as spent on-chain.
3✔
240
                if err := b.cfg.ChainView.Start(); err != nil {
3✔
241
                        return err
×
242
                }
×
243

244
                // Once the instance is active, we'll fetch the channel we'll
245
                // receive notifications over.
246
                b.newBlocks = b.cfg.ChainView.FilteredBlocks()
3✔
247
                b.staleBlocks = b.cfg.ChainView.DisconnectedBlocks()
3✔
248

3✔
249
                // Before we perform our manual block pruning, we'll construct
3✔
250
                // and apply a fresh chain filter to the active
3✔
251
                // FilteredChainView instance.  We do this before, as otherwise
3✔
252
                // we may miss on-chain events as the filter hasn't properly
3✔
253
                // been applied.
3✔
254
                channelView, err := b.cfg.Graph.ChannelView()
3✔
255
                if err != nil && !errors.Is(
3✔
256
                        err, channeldb.ErrGraphNoEdgesFound,
3✔
257
                ) {
3✔
258

×
259
                        return err
×
260
                }
×
261

262
                log.Infof("Filtering chain using %v channels active",
3✔
263
                        len(channelView))
3✔
264

3✔
265
                if len(channelView) != 0 {
6✔
266
                        err = b.cfg.ChainView.UpdateFilter(
3✔
267
                                channelView, uint32(bestHeight),
3✔
268
                        )
3✔
269
                        if err != nil {
3✔
270
                                return err
×
271
                        }
×
272
                }
273

274
                // The graph pruning might have taken a while and there could be
275
                // new blocks available.
276
                _, bestHeight, err = b.cfg.Chain.GetBestBlock()
3✔
277
                if err != nil {
3✔
278
                        return err
×
279
                }
×
280
                b.bestHeight.Store(uint32(bestHeight))
3✔
281

3✔
282
                // Before we begin normal operation of the router, we first need
3✔
283
                // to synchronize the channel graph to the latest state of the
3✔
284
                // UTXO set.
3✔
285
                if err := b.syncGraphWithChain(); err != nil {
3✔
286
                        return err
×
287
                }
×
288

289
                // Finally, before we proceed, we'll prune any unconnected nodes
290
                // from the graph in order to ensure we maintain a tight graph
291
                // of "useful" nodes.
292
                err = b.cfg.Graph.PruneGraphNodes()
3✔
293
                if err != nil &&
3✔
294
                        !errors.Is(err, channeldb.ErrGraphNodesNotFound) {
3✔
295

×
296
                        return err
×
297
                }
×
298
        }
299

300
        b.wg.Add(1)
3✔
301
        go b.networkHandler()
3✔
302

3✔
303
        log.Debug("Builder started")
3✔
304

3✔
305
        return nil
3✔
306
}
307

308
// Stop signals to the Builder that it should halt all routines. This method
309
// will *block* until all goroutines have excited. If the builder has already
310
// stopped then this method will return immediately.
311
func (b *Builder) Stop() error {
×
312
        if !b.stopped.CompareAndSwap(false, true) {
×
313
                return nil
×
314
        }
×
315

316
        log.Info("Builder shutting down...")
×
317

×
318
        // Our filtered chain view could've only been started if
×
319
        // AssumeChannelValid isn't present.
×
320
        if !b.cfg.AssumeChannelValid {
×
321
                if err := b.cfg.ChainView.Stop(); err != nil {
×
322
                        return err
×
323
                }
×
324
        }
325

326
        close(b.quit)
×
327
        b.wg.Wait()
×
328

×
329
        log.Debug("Builder shutdown complete")
×
330

×
331
        return nil
×
332
}
333

334
// syncGraphWithChain attempts to synchronize the current channel graph with
335
// the latest UTXO set state. This process involves pruning from the channel
336
// graph any channels which have been closed by spending their funding output
337
// since we've been down.
338
func (b *Builder) syncGraphWithChain() error {
3✔
339
        // First, we'll need to check to see if we're already in sync with the
3✔
340
        // latest state of the UTXO set.
3✔
341
        bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
3✔
342
        if err != nil {
3✔
343
                return err
×
344
        }
×
345
        b.bestHeight.Store(uint32(bestHeight))
3✔
346

3✔
347
        pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip()
3✔
348
        if err != nil {
3✔
349
                switch {
×
350
                // If the graph has never been pruned, or hasn't fully been
351
                // created yet, then we don't treat this as an explicit error.
352
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
×
353
                case errors.Is(err, channeldb.ErrGraphNotFound):
×
354
                default:
×
355
                        return err
×
356
                }
357
        }
358

359
        log.Infof("Prune tip for Channel Graph: height=%v, hash=%v",
3✔
360
                pruneHeight, pruneHash)
3✔
361

3✔
362
        switch {
3✔
363
        // If the graph has never been pruned, then we can exit early as this
364
        // entails it's being created for the first time and hasn't seen any
365
        // block or created channels.
366
        case pruneHeight == 0 || pruneHash == nil:
×
367
                return nil
×
368

369
        // If the block hashes and heights match exactly, then we don't need to
370
        // prune the channel graph as we're already fully in sync.
371
        case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
3✔
372
                return nil
3✔
373
        }
374

375
        // If the main chain blockhash at prune height is different from the
376
        // prune hash, this might indicate the database is on a stale branch.
377
        mainBlockHash, err := b.cfg.Chain.GetBlockHash(int64(pruneHeight))
3✔
378
        if err != nil {
3✔
379
                return err
×
380
        }
×
381

382
        // While we are on a stale branch of the chain, walk backwards to find
383
        // first common block.
384
        for !pruneHash.IsEqual(mainBlockHash) {
3✔
385
                log.Infof("channel graph is stale. Disconnecting block %v "+
×
386
                        "(hash=%v)", pruneHeight, pruneHash)
×
387
                // Prune the graph for every channel that was opened at height
×
388
                // >= pruneHeight.
×
389
                _, err := b.cfg.Graph.DisconnectBlockAtHeight(pruneHeight)
×
390
                if err != nil {
×
391
                        return err
×
392
                }
×
393

394
                pruneHash, pruneHeight, err = b.cfg.Graph.PruneTip()
×
395
                switch {
×
396
                // If at this point the graph has never been pruned, we can exit
397
                // as this entails we are back to the point where it hasn't seen
398
                // any block or created channels, alas there's nothing left to
399
                // prune.
400
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
×
401
                        return nil
×
402

403
                case errors.Is(err, channeldb.ErrGraphNotFound):
×
404
                        return nil
×
405

406
                case err != nil:
×
407
                        return err
×
408

409
                default:
×
410
                }
411

412
                mainBlockHash, err = b.cfg.Chain.GetBlockHash(
×
413
                        int64(pruneHeight),
×
414
                )
×
415
                if err != nil {
×
416
                        return err
×
417
                }
×
418
        }
419

420
        log.Infof("Syncing channel graph from height=%v (hash=%v) to "+
3✔
421
                "height=%v (hash=%v)", pruneHeight, pruneHash, bestHeight,
3✔
422
                bestHash)
3✔
423

3✔
424
        // If we're not yet caught up, then we'll walk forward in the chain
3✔
425
        // pruning the channel graph with each new block that hasn't yet been
3✔
426
        // consumed by the channel graph.
3✔
427
        var spentOutputs []*wire.OutPoint
3✔
428
        for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ { //nolint:lll
6✔
429
                // Break out of the rescan early if a shutdown has been
3✔
430
                // requested, otherwise long rescans will block the daemon from
3✔
431
                // shutting down promptly.
3✔
432
                select {
3✔
433
                case <-b.quit:
×
434
                        return ErrGraphBuilderShuttingDown
×
435
                default:
3✔
436
                }
437

438
                // Using the next height, request a manual block pruning from
439
                // the chainview for the particular block hash.
440
                log.Infof("Filtering block for closed channels, at height: %v",
3✔
441
                        int64(nextHeight))
3✔
442
                nextHash, err := b.cfg.Chain.GetBlockHash(int64(nextHeight))
3✔
443
                if err != nil {
3✔
444
                        return err
×
445
                }
×
446
                log.Tracef("Running block filter on block with hash: %v",
3✔
447
                        nextHash)
3✔
448
                filterBlock, err := b.cfg.ChainView.FilterBlock(nextHash)
3✔
449
                if err != nil {
3✔
450
                        return err
×
451
                }
×
452

453
                // We're only interested in all prior outputs that have been
454
                // spent in the block, so collate all the referenced previous
455
                // outpoints within each tx and input.
456
                for _, tx := range filterBlock.Transactions {
6✔
457
                        for _, txIn := range tx.TxIn {
6✔
458
                                spentOutputs = append(spentOutputs,
3✔
459
                                        &txIn.PreviousOutPoint)
3✔
460
                        }
3✔
461
                }
462
        }
463

464
        // With the spent outputs gathered, attempt to prune the channel graph,
465
        // also passing in the best hash+height so the prune tip can be updated.
466
        closedChans, err := b.cfg.Graph.PruneGraph(
3✔
467
                spentOutputs, bestHash, uint32(bestHeight),
3✔
468
        )
3✔
469
        if err != nil {
3✔
470
                return err
×
471
        }
×
472

473
        log.Infof("Graph pruning complete: %v channels were closed since "+
3✔
474
                "height %v", len(closedChans), pruneHeight)
3✔
475

3✔
476
        return nil
3✔
477
}
478

479
// isZombieChannel takes two edge policy updates and determines if the
480
// corresponding channel should be considered a zombie. The first boolean is
481
// true if the policy update from node 1 is considered a zombie, the second
482
// boolean is that of node 2, and the final boolean is true if the channel
483
// is considered a zombie.
484
func (b *Builder) isZombieChannel(e1,
485
        e2 *models.ChannelEdgePolicy) (bool, bool, bool) {
×
486

×
487
        chanExpiry := b.cfg.ChannelPruneExpiry
×
488

×
489
        e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
×
490
        e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
×
491

×
492
        var e1Time, e2Time time.Time
×
493
        if e1 != nil {
×
494
                e1Time = e1.LastUpdate
×
495
        }
×
496
        if e2 != nil {
×
497
                e2Time = e2.LastUpdate
×
498
        }
×
499

500
        return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time)
×
501
}
502

503
// IsZombieChannel takes the timestamps of the latest channel updates for a
504
// channel and returns true if the channel should be considered a zombie based
505
// on these timestamps.
506
func (b *Builder) IsZombieChannel(updateTime1,
507
        updateTime2 time.Time) bool {
3✔
508

3✔
509
        chanExpiry := b.cfg.ChannelPruneExpiry
3✔
510

3✔
511
        e1Zombie := updateTime1.IsZero() ||
3✔
512
                time.Since(updateTime1) >= chanExpiry
3✔
513

3✔
514
        e2Zombie := updateTime2.IsZero() ||
3✔
515
                time.Since(updateTime2) >= chanExpiry
3✔
516

3✔
517
        // If we're using strict zombie pruning, then a channel is only
3✔
518
        // considered live if both edges have a recent update we know of.
3✔
519
        if b.cfg.StrictZombiePruning {
4✔
520
                return e1Zombie || e2Zombie
1✔
521
        }
1✔
522

523
        // Otherwise, if we're using the less strict variant, then a channel is
524
        // considered live if either of the edges have a recent update.
525
        return e1Zombie && e2Zombie
2✔
526
}
527

528
// pruneZombieChans is a method that will be called periodically to prune out
529
// any "zombie" channels. We consider channels zombies if *both* edges haven't
530
// been updated since our zombie horizon. If AssumeChannelValid is present,
531
// we'll also consider channels zombies if *both* edges are disabled. This
532
// usually signals that a channel has been closed on-chain. We do this
533
// periodically to keep a healthy, lively routing table.
534
func (b *Builder) pruneZombieChans() error {
×
535
        chansToPrune := make(map[uint64]struct{})
×
536
        chanExpiry := b.cfg.ChannelPruneExpiry
×
537

×
538
        log.Infof("Examining channel graph for zombie channels")
×
539

×
540
        // A helper method to detect if the channel belongs to this node
×
541
        isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool {
×
542
                return info.NodeKey1Bytes == b.cfg.SelfNode ||
×
543
                        info.NodeKey2Bytes == b.cfg.SelfNode
×
544
        }
×
545

546
        // First, we'll collect all the channels which are eligible for garbage
547
        // collection due to being zombies.
548
        filterPruneChans := func(info *models.ChannelEdgeInfo,
×
549
                e1, e2 *models.ChannelEdgePolicy) error {
×
550

×
551
                // Exit early in case this channel is already marked to be
×
552
                // pruned
×
553
                _, markedToPrune := chansToPrune[info.ChannelID]
×
554
                if markedToPrune {
×
555
                        return nil
×
556
                }
×
557

558
                // We'll ensure that we don't attempt to prune our *own*
559
                // channels from the graph, as in any case this should be
560
                // re-advertised by the sub-system above us.
561
                if isSelfChannelEdge(info) {
×
562
                        return nil
×
563
                }
×
564

565
                e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)
×
566

×
567
                if e1Zombie {
×
568
                        log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
×
569
                                info.NodeKey1Bytes, info.ChannelID)
×
570
                }
×
571

572
                if e2Zombie {
×
573
                        log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie",
×
574
                                info.NodeKey2Bytes, info.ChannelID)
×
575
                }
×
576

577
                // If either edge hasn't been updated for a period of
578
                // chanExpiry, then we'll mark the channel itself as eligible
579
                // for graph pruning.
580
                if !isZombieChan {
×
581
                        return nil
×
582
                }
×
583

584
                log.Debugf("ChannelID(%v) is a zombie, collecting to prune",
×
585
                        info.ChannelID)
×
586

×
587
                // TODO(roasbeef): add ability to delete single directional edge
×
588
                chansToPrune[info.ChannelID] = struct{}{}
×
589

×
590
                return nil
×
591
        }
592

593
        // If AssumeChannelValid is present we'll look at the disabled bit for
594
        // both edges. If they're both disabled, then we can interpret this as
595
        // the channel being closed and can prune it from our graph.
596
        if b.cfg.AssumeChannelValid {
×
597
                disabledChanIDs, err := b.cfg.Graph.DisabledChannelIDs()
×
598
                if err != nil {
×
599
                        return fmt.Errorf("unable to get disabled channels "+
×
600
                                "ids chans: %v", err)
×
601
                }
×
602

603
                disabledEdges, err := b.cfg.Graph.FetchChanInfos(
×
604
                        disabledChanIDs,
×
605
                )
×
606
                if err != nil {
×
607
                        return fmt.Errorf("unable to fetch disabled channels "+
×
608
                                "edges chans: %v", err)
×
609
                }
×
610

611
                // Ensuring we won't prune our own channel from the graph.
612
                for _, disabledEdge := range disabledEdges {
×
613
                        if !isSelfChannelEdge(disabledEdge.Info) {
×
614
                                chansToPrune[disabledEdge.Info.ChannelID] =
×
615
                                        struct{}{}
×
616
                        }
×
617
                }
618
        }
619

620
        startTime := time.Unix(0, 0)
×
621
        endTime := time.Now().Add(-1 * chanExpiry)
×
622
        oldEdges, err := b.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
×
623
        if err != nil {
×
624
                return fmt.Errorf("unable to fetch expired channel updates "+
×
625
                        "chans: %v", err)
×
626
        }
×
627

628
        for _, u := range oldEdges {
×
629
                err = filterPruneChans(u.Info, u.Policy1, u.Policy2)
×
630
                if err != nil {
×
631
                        return fmt.Errorf("error filtering channels to "+
×
632
                                "prune: %w", err)
×
633
                }
×
634
        }
635

636
        log.Infof("Pruning %v zombie channels", len(chansToPrune))
×
637
        if len(chansToPrune) == 0 {
×
638
                return nil
×
639
        }
×
640

641
        // With the set of zombie-like channels obtained, we'll do another pass
642
        // to delete them from the channel graph.
643
        toPrune := make([]uint64, 0, len(chansToPrune))
×
644
        for chanID := range chansToPrune {
×
645
                toPrune = append(toPrune, chanID)
×
646
                log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
×
647
        }
×
648
        err = b.cfg.Graph.DeleteChannelEdges(
×
649
                b.cfg.StrictZombiePruning, true, toPrune...,
×
650
        )
×
651
        if err != nil {
×
652
                return fmt.Errorf("unable to delete zombie channels: %w", err)
×
653
        }
×
654

655
        // With the channels pruned, we'll also attempt to prune any nodes that
656
        // were a part of them.
657
        err = b.cfg.Graph.PruneGraphNodes()
×
658
        if err != nil && !errors.Is(err, channeldb.ErrGraphNodesNotFound) {
×
659
                return fmt.Errorf("unable to prune graph nodes: %w", err)
×
660
        }
×
661

662
        return nil
×
663
}
664

665
// handleNetworkUpdate is responsible for processing the update message and
666
// notifies topology changes, if any.
667
//
668
// NOTE: must be run inside goroutine.
669
func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier,
670
        update *routingMsg) {
3✔
671

3✔
672
        defer b.wg.Done()
3✔
673
        defer vb.CompleteJob()
3✔
674

3✔
675
        // If this message has an existing dependency, then we'll wait until
3✔
676
        // that has been fully validated before we proceed.
3✔
677
        err := vb.WaitForDependants(update.msg)
3✔
678
        if err != nil {
3✔
679
                switch {
×
680
                case IsError(err, ErrVBarrierShuttingDown):
×
681
                        update.err <- err
×
682

683
                case IsError(err, ErrParentValidationFailed):
×
684
                        update.err <- newErrf(ErrIgnored, err.Error())
×
685

686
                default:
×
687
                        log.Warnf("unexpected error during validation "+
×
688
                                "barrier shutdown: %v", err)
×
689
                        update.err <- err
×
690
                }
691

692
                return
×
693
        }
694

695
        // Process the routing update to determine if this is either a new
696
        // update from our PoV or an update to a prior vertex/edge we
697
        // previously accepted.
698
        err = b.processUpdate(update.msg, update.op...)
3✔
699
        update.err <- err
3✔
700

3✔
701
        // If this message had any dependencies, then we can now signal them to
3✔
702
        // continue.
3✔
703
        allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
3✔
704
        vb.SignalDependants(update.msg, allowDependents)
3✔
705

3✔
706
        // If the error is not nil here, there's no need to send topology
3✔
707
        // change.
3✔
708
        if err != nil {
6✔
709
                // We now decide to log an error or not. If allowDependents is
3✔
710
                // false, it means there is an error and the error is neither
3✔
711
                // ErrIgnored or ErrOutdated. In this case, we'll log an error.
3✔
712
                // Otherwise, we'll add debug log only.
3✔
713
                if allowDependents {
6✔
714
                        log.Debugf("process network updates got: %v", err)
3✔
715
                } else {
3✔
716
                        log.Errorf("process network updates got: %v", err)
×
717
                }
×
718

719
                return
3✔
720
        }
721

722
        // Otherwise, we'll send off a new notification for the newly accepted
723
        // update, if any.
724
        topChange := &TopologyChange{}
3✔
725
        err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
3✔
726
        if err != nil {
3✔
727
                log.Errorf("unable to update topology change notification: %v",
×
728
                        err)
×
729
                return
×
730
        }
×
731

732
        if !topChange.isEmpty() {
6✔
733
                b.notifyTopologyChange(topChange)
3✔
734
        }
3✔
735
}
736

737
// networkHandler is the primary goroutine for the Builder. The roles of
738
// this goroutine include answering queries related to the state of the
739
// network, pruning the graph on new block notification, applying network
740
// updates, and registering new topology clients.
741
//
742
// NOTE: This MUST be run as a goroutine.
743
func (b *Builder) networkHandler() {
3✔
744
        defer b.wg.Done()
3✔
745

3✔
746
        graphPruneTicker := time.NewTicker(b.cfg.GraphPruneInterval)
3✔
747
        defer graphPruneTicker.Stop()
3✔
748

3✔
749
        defer b.statTicker.Stop()
3✔
750

3✔
751
        b.stats.Reset()
3✔
752

3✔
753
        // We'll use this validation barrier to ensure that we process all jobs
3✔
754
        // in the proper order during parallel validation.
3✔
755
        //
3✔
756
        // NOTE: For AssumeChannelValid, we bump up the maximum number of
3✔
757
        // concurrent validation requests since there are no blocks being
3✔
758
        // fetched. This significantly increases the performance of IGD for
3✔
759
        // neutrino nodes.
3✔
760
        //
3✔
761
        // However, we dial back to use multiple of the number of cores when
3✔
762
        // fully validating, to avoid fetching up to 1000 blocks from the
3✔
763
        // backend. On bitcoind, this will empirically cause massive latency
3✔
764
        // spikes when executing this many concurrent RPC calls. Critical
3✔
765
        // subsystems or basic rpc calls that rely on calls such as GetBestBlock
3✔
766
        // will hang due to excessive load.
3✔
767
        //
3✔
768
        // See https://github.com/lightningnetwork/lnd/issues/4892.
3✔
769
        var validationBarrier *ValidationBarrier
3✔
770
        if b.cfg.AssumeChannelValid {
3✔
771
                validationBarrier = NewValidationBarrier(1000, b.quit)
×
772
        } else {
3✔
773
                validationBarrier = NewValidationBarrier(
3✔
774
                        4*runtime.NumCPU(), b.quit,
3✔
775
                )
3✔
776
        }
3✔
777

778
        for {
6✔
779
                // If there are stats, resume the statTicker.
3✔
780
                if !b.stats.Empty() {
6✔
781
                        b.statTicker.Resume()
3✔
782
                }
3✔
783

784
                select {
3✔
785
                // A new fully validated network update has just arrived. As a
786
                // result we'll modify the channel graph accordingly depending
787
                // on the exact type of the message.
788
                case update := <-b.networkUpdates:
3✔
789
                        // We'll set up any dependants, and wait until a free
3✔
790
                        // slot for this job opens up, this allows us to not
3✔
791
                        // have thousands of goroutines active.
3✔
792
                        validationBarrier.InitJobDependencies(update.msg)
3✔
793

3✔
794
                        b.wg.Add(1)
3✔
795
                        go b.handleNetworkUpdate(validationBarrier, update)
3✔
796

797
                        // TODO(roasbeef): remove all unconnected vertexes
798
                        // after N blocks pass with no corresponding
799
                        // announcements.
800

801
                case chainUpdate, ok := <-b.staleBlocks:
2✔
802
                        // If the channel has been closed, then this indicates
2✔
803
                        // the daemon is shutting down, so we exit ourselves.
2✔
804
                        if !ok {
2✔
805
                                return
×
806
                        }
×
807

808
                        // Since this block is stale, we update our best height
809
                        // to the previous block.
810
                        blockHeight := chainUpdate.Height
2✔
811
                        b.bestHeight.Store(blockHeight - 1)
2✔
812

2✔
813
                        // Update the channel graph to reflect that this block
2✔
814
                        // was disconnected.
2✔
815
                        _, err := b.cfg.Graph.DisconnectBlockAtHeight(
2✔
816
                                blockHeight,
2✔
817
                        )
2✔
818
                        if err != nil {
2✔
819
                                log.Errorf("unable to prune graph with stale "+
×
820
                                        "block: %v", err)
×
821
                                continue
×
822
                        }
823

824
                        // TODO(halseth): notify client about the reorg?
825

826
                // A new block has arrived, so we can prune the channel graph
827
                // of any channels which were closed in the block.
828
                case chainUpdate, ok := <-b.newBlocks:
3✔
829
                        // If the channel has been closed, then this indicates
3✔
830
                        // the daemon is shutting down, so we exit ourselves.
3✔
831
                        if !ok {
3✔
832
                                return
×
833
                        }
×
834

835
                        // We'll ensure that any new blocks received attach
836
                        // directly to the end of our main chain. If not, then
837
                        // we've somehow missed some blocks. Here we'll catch
838
                        // up the chain with the latest blocks.
839
                        currentHeight := b.bestHeight.Load()
3✔
840
                        switch {
3✔
841
                        case chainUpdate.Height == currentHeight+1:
3✔
842
                                err := b.updateGraphWithClosedChannels(
3✔
843
                                        chainUpdate,
3✔
844
                                )
3✔
845
                                if err != nil {
3✔
846
                                        log.Errorf("unable to prune graph "+
×
847
                                                "with closed channels: %v", err)
×
848
                                }
×
849

850
                        case chainUpdate.Height > currentHeight+1:
×
851
                                log.Errorf("out of order block: expecting "+
×
852
                                        "height=%v, got height=%v",
×
853
                                        currentHeight+1, chainUpdate.Height)
×
854

×
855
                                err := b.getMissingBlocks(
×
856
                                        currentHeight, chainUpdate,
×
857
                                )
×
858
                                if err != nil {
×
859
                                        log.Errorf("unable to retrieve missing"+
×
860
                                                "blocks: %v", err)
×
861
                                }
×
862

UNCOV
863
                        case chainUpdate.Height < currentHeight+1:
×
UNCOV
864
                                log.Errorf("out of order block: expecting "+
×
UNCOV
865
                                        "height=%v, got height=%v",
×
UNCOV
866
                                        currentHeight+1, chainUpdate.Height)
×
UNCOV
867

×
UNCOV
868
                                log.Infof("Skipping channel pruning since "+
×
UNCOV
869
                                        "received block height %v was already"+
×
UNCOV
870
                                        " processed.", chainUpdate.Height)
×
871
                        }
872

873
                // A new notification client update has arrived. We're either
874
                // gaining a new client, or cancelling notifications for an
875
                // existing client.
876
                case ntfnUpdate := <-b.ntfnClientUpdates:
3✔
877
                        clientID := ntfnUpdate.clientID
3✔
878

3✔
879
                        if ntfnUpdate.cancel {
6✔
880
                                client, ok := b.topologyClients.LoadAndDelete(
3✔
881
                                        clientID,
3✔
882
                                )
3✔
883
                                if ok {
6✔
884
                                        close(client.exit)
3✔
885
                                        client.wg.Wait()
3✔
886

3✔
887
                                        close(client.ntfnChan)
3✔
888
                                }
3✔
889

890
                                continue
3✔
891
                        }
892

893
                        b.topologyClients.Store(clientID, &topologyClient{
3✔
894
                                ntfnChan: ntfnUpdate.ntfnChan,
3✔
895
                                exit:     make(chan struct{}),
3✔
896
                        })
3✔
897

898
                // The graph prune ticker has ticked, so we'll examine the
899
                // state of the known graph to filter out any zombie channels
900
                // for pruning.
901
                case <-graphPruneTicker.C:
×
902
                        if err := b.pruneZombieChans(); err != nil {
×
903
                                log.Errorf("Unable to prune zombies: %v", err)
×
904
                        }
×
905

906
                // Log any stats if we've processed a non-empty number of
907
                // channels, updates, or nodes. We'll only pause the ticker if
908
                // the last window contained no updates to avoid resuming and
909
                // pausing while consecutive windows contain new info.
910
                case <-b.statTicker.Ticks():
3✔
911
                        if !b.stats.Empty() {
6✔
912
                                log.Infof(b.stats.String())
3✔
913
                        } else {
5✔
914
                                b.statTicker.Pause()
2✔
915
                        }
2✔
916
                        b.stats.Reset()
3✔
917

918
                // The router has been signalled to exit, to we exit our main
919
                // loop so the wait group can be decremented.
920
                case <-b.quit:
×
921
                        return
×
922
                }
923
        }
924
}
925

926
// getMissingBlocks walks through all missing blocks and updates the graph
927
// closed channels accordingly.
928
func (b *Builder) getMissingBlocks(currentHeight uint32,
929
        chainUpdate *chainview.FilteredBlock) error {
×
930

×
931
        outdatedHash, err := b.cfg.Chain.GetBlockHash(int64(currentHeight))
×
932
        if err != nil {
×
933
                return err
×
934
        }
×
935

936
        outdatedBlock := &chainntnfs.BlockEpoch{
×
937
                Height: int32(currentHeight),
×
938
                Hash:   outdatedHash,
×
939
        }
×
940

×
941
        epochClient, err := b.cfg.Notifier.RegisterBlockEpochNtfn(
×
942
                outdatedBlock,
×
943
        )
×
944
        if err != nil {
×
945
                return err
×
946
        }
×
947
        defer epochClient.Cancel()
×
948

×
949
        blockDifference := int(chainUpdate.Height - currentHeight)
×
950

×
951
        // We'll walk through all the outdated blocks and make sure we're able
×
952
        // to update the graph with any closed channels from them.
×
953
        for i := 0; i < blockDifference; i++ {
×
954
                var (
×
955
                        missingBlock *chainntnfs.BlockEpoch
×
956
                        ok           bool
×
957
                )
×
958

×
959
                select {
×
960
                case missingBlock, ok = <-epochClient.Epochs:
×
961
                        if !ok {
×
962
                                return nil
×
963
                        }
×
964

965
                case <-b.quit:
×
966
                        return nil
×
967
                }
968

969
                filteredBlock, err := b.cfg.ChainView.FilterBlock(
×
970
                        missingBlock.Hash,
×
971
                )
×
972
                if err != nil {
×
973
                        return err
×
974
                }
×
975

976
                err = b.updateGraphWithClosedChannels(
×
977
                        filteredBlock,
×
978
                )
×
979
                if err != nil {
×
980
                        return err
×
981
                }
×
982
        }
983

984
        return nil
×
985
}
986

987
// updateGraphWithClosedChannels prunes the channel graph of closed channels
988
// that are no longer needed.
989
func (b *Builder) updateGraphWithClosedChannels(
990
        chainUpdate *chainview.FilteredBlock) error {
3✔
991

3✔
992
        // Once a new block arrives, we update our running track of the height
3✔
993
        // of the chain tip.
3✔
994
        blockHeight := chainUpdate.Height
3✔
995

3✔
996
        b.bestHeight.Store(blockHeight)
3✔
997
        log.Infof("Pruning channel graph using block %v (height=%v)",
3✔
998
                chainUpdate.Hash, blockHeight)
3✔
999

3✔
1000
        // We're only interested in all prior outputs that have been spent in
3✔
1001
        // the block, so collate all the referenced previous outpoints within
3✔
1002
        // each tx and input.
3✔
1003
        var spentOutputs []*wire.OutPoint
3✔
1004
        for _, tx := range chainUpdate.Transactions {
6✔
1005
                for _, txIn := range tx.TxIn {
6✔
1006
                        spentOutputs = append(spentOutputs,
3✔
1007
                                &txIn.PreviousOutPoint)
3✔
1008
                }
3✔
1009
        }
1010

1011
        // With the spent outputs gathered, attempt to prune the channel graph,
1012
        // also passing in the hash+height of the block being pruned so the
1013
        // prune tip can be updated.
1014
        chansClosed, err := b.cfg.Graph.PruneGraph(spentOutputs,
3✔
1015
                &chainUpdate.Hash, chainUpdate.Height)
3✔
1016
        if err != nil {
3✔
1017
                log.Errorf("unable to prune routing table: %v", err)
×
1018
                return err
×
1019
        }
×
1020

1021
        log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
3✔
1022
                blockHeight, len(chansClosed))
3✔
1023

3✔
1024
        if len(chansClosed) == 0 {
6✔
1025
                return err
3✔
1026
        }
3✔
1027

1028
        // Notify all currently registered clients of the newly closed channels.
1029
        closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
3✔
1030
        b.notifyTopologyChange(&TopologyChange{
3✔
1031
                ClosedChannels: closeSummaries,
3✔
1032
        })
3✔
1033

3✔
1034
        return nil
3✔
1035
}
1036

1037
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
1038
// the database for the passed node with a timestamp newer than the passed
1039
// timestamp. ErrIgnored will be returned if we already have the node, and
1040
// ErrOutdated will be returned if we have a timestamp that's after the new
1041
// timestamp.
1042
func (b *Builder) assertNodeAnnFreshness(node route.Vertex,
1043
        msgTimestamp time.Time) error {
3✔
1044

3✔
1045
        // If we are not already aware of this node, it means that we don't
3✔
1046
        // know about any channel using this node. To avoid a DoS attack by
3✔
1047
        // node announcements, we will ignore such nodes. If we do know about
3✔
1048
        // this node, check that this update brings info newer than what we
3✔
1049
        // already have.
3✔
1050
        lastUpdate, exists, err := b.cfg.Graph.HasLightningNode(node)
3✔
1051
        if err != nil {
3✔
1052
                return errors.Errorf("unable to query for the "+
×
1053
                        "existence of node: %v", err)
×
1054
        }
×
1055
        if !exists {
6✔
1056
                return newErrf(ErrIgnored, "Ignoring node announcement"+
3✔
1057
                        " for node not found in channel graph (%x)",
3✔
1058
                        node[:])
3✔
1059
        }
3✔
1060

1061
        // If we've reached this point then we're aware of the vertex being
1062
        // advertised. So we now check if the new message has a new time stamp,
1063
        // if not then we won't accept the new data as it would override newer
1064
        // data.
1065
        if !lastUpdate.Before(msgTimestamp) {
6✔
1066
                return newErrf(ErrOutdated, "Ignoring outdated "+
3✔
1067
                        "announcement for %x", node[:])
3✔
1068
        }
3✔
1069

1070
        return nil
3✔
1071
}
1072

1073
// addZombieEdge adds a channel that failed complete validation into the zombie
1074
// index so we can avoid having to re-validate it in the future.
1075
func (b *Builder) addZombieEdge(chanID uint64) error {
×
1076
        // If the edge fails validation we'll mark the edge itself as a zombie
×
1077
        // so we don't continue to request it. We use the "zero key" for both
×
1078
        // node pubkeys so this edge can't be resurrected.
×
1079
        var zeroKey [33]byte
×
1080
        err := b.cfg.Graph.MarkEdgeZombie(chanID, zeroKey, zeroKey)
×
1081
        if err != nil {
×
1082
                return fmt.Errorf("unable to mark spent chan(id=%v) as a "+
×
1083
                        "zombie: %w", chanID, err)
×
1084
        }
×
1085

1086
        return nil
×
1087
}
1088

1089
// makeFundingScript is used to make the funding script for both segwit v0 and
1090
// segwit v1 (taproot) channels.
1091
//
1092
// TODO(roasbeef: export and use elsewhere?
1093
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
1094
        chanFeatures []byte) ([]byte, error) {
3✔
1095

3✔
1096
        legacyFundingScript := func() ([]byte, error) {
6✔
1097
                witnessScript, err := input.GenMultiSigScript(
3✔
1098
                        bitcoinKey1, bitcoinKey2,
3✔
1099
                )
3✔
1100
                if err != nil {
3✔
1101
                        return nil, err
×
1102
                }
×
1103
                pkScript, err := input.WitnessScriptHash(witnessScript)
3✔
1104
                if err != nil {
3✔
1105
                        return nil, err
×
1106
                }
×
1107

1108
                return pkScript, nil
3✔
1109
        }
1110

1111
        if len(chanFeatures) == 0 {
3✔
1112
                return legacyFundingScript()
×
1113
        }
×
1114

1115
        // In order to make the correct funding script, we'll need to parse the
1116
        // chanFeatures bytes into a feature vector we can interact with.
1117
        rawFeatures := lnwire.NewRawFeatureVector()
3✔
1118
        err := rawFeatures.Decode(bytes.NewReader(chanFeatures))
3✔
1119
        if err != nil {
3✔
1120
                return nil, fmt.Errorf("unable to parse chan feature "+
×
1121
                        "bits: %w", err)
×
1122
        }
×
1123

1124
        chanFeatureBits := lnwire.NewFeatureVector(
3✔
1125
                rawFeatures, lnwire.Features,
3✔
1126
        )
3✔
1127
        if chanFeatureBits.HasFeature(
3✔
1128
                lnwire.SimpleTaprootChannelsOptionalStaging,
3✔
1129
        ) {
6✔
1130

3✔
1131
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
3✔
1132
                if err != nil {
3✔
1133
                        return nil, err
×
1134
                }
×
1135
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
3✔
1136
                if err != nil {
3✔
1137
                        return nil, err
×
1138
                }
×
1139

1140
                fundingScript, _, err := input.GenTaprootFundingScript(
3✔
1141
                        pubKey1, pubKey2, 0,
3✔
1142
                )
3✔
1143
                if err != nil {
3✔
1144
                        return nil, err
×
1145
                }
×
1146

1147
                return fundingScript, nil
3✔
1148
        }
1149

1150
        return legacyFundingScript()
3✔
1151
}
1152

1153
// processUpdate processes a new relate authenticated channel/edge, node or
1154
// channel/edge update network update. If the update didn't affect the internal
1155
// state of the draft due to either being out of date, invalid, or redundant,
1156
// then error is returned.
1157
//
1158
//nolint:funlen
1159
func (b *Builder) processUpdate(msg interface{},
1160
        op ...batch.SchedulerOption) error {
3✔
1161

3✔
1162
        switch msg := msg.(type) {
3✔
1163
        case *channeldb.LightningNode:
3✔
1164
                // Before we add the node to the database, we'll check to see
3✔
1165
                // if the announcement is "fresh" or not. If it isn't, then
3✔
1166
                // we'll return an error.
3✔
1167
                err := b.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate)
3✔
1168
                if err != nil {
6✔
1169
                        return err
3✔
1170
                }
3✔
1171

1172
                if err := b.cfg.Graph.AddLightningNode(msg, op...); err != nil {
3✔
1173
                        return errors.Errorf("unable to add node %x to the "+
×
1174
                                "graph: %v", msg.PubKeyBytes, err)
×
1175
                }
×
1176

1177
                log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes)
3✔
1178
                b.stats.incNumNodeUpdates()
3✔
1179

1180
        case *models.ChannelEdgeInfo:
3✔
1181
                log.Debugf("Received ChannelEdgeInfo for channel %v",
3✔
1182
                        msg.ChannelID)
3✔
1183

3✔
1184
                // Prior to processing the announcement we first check if we
3✔
1185
                // already know of this channel, if so, then we can exit early.
3✔
1186
                _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
3✔
1187
                        msg.ChannelID,
3✔
1188
                )
3✔
1189
                if err != nil &&
3✔
1190
                        !errors.Is(err, channeldb.ErrGraphNoEdgesFound) {
3✔
1191

×
1192
                        return errors.Errorf("unable to check for edge "+
×
1193
                                "existence: %v", err)
×
1194
                }
×
1195
                if isZombie {
3✔
1196
                        return newErrf(ErrIgnored, "ignoring msg for zombie "+
×
1197
                                "chan_id=%v", msg.ChannelID)
×
1198
                }
×
1199
                if exists {
6✔
1200
                        return newErrf(ErrIgnored, "ignoring msg for known "+
3✔
1201
                                "chan_id=%v", msg.ChannelID)
3✔
1202
                }
3✔
1203

1204
                // If AssumeChannelValid is present, then we are unable to
1205
                // perform any of the expensive checks below, so we'll
1206
                // short-circuit our path straight to adding the edge to our
1207
                // graph. If the passed ShortChannelID is an alias, then we'll
1208
                // skip validation as it will not map to a legitimate tx. This
1209
                // is not a DoS vector as only we can add an alias
1210
                // ChannelAnnouncement from the gossiper.
1211
                scid := lnwire.NewShortChanIDFromInt(msg.ChannelID)
3✔
1212
                if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) {
6✔
1213
                        err := b.cfg.Graph.AddChannelEdge(msg, op...)
3✔
1214
                        if err != nil {
3✔
1215
                                return fmt.Errorf("unable to add edge: %w", err)
×
1216
                        }
×
1217
                        log.Tracef("New channel discovered! Link "+
3✔
1218
                                "connects %x and %x with ChannelID(%v)",
3✔
1219
                                msg.NodeKey1Bytes, msg.NodeKey2Bytes,
3✔
1220
                                msg.ChannelID)
3✔
1221
                        b.stats.incNumEdgesDiscovered()
3✔
1222

3✔
1223
                        break
3✔
1224
                }
1225

1226
                // Before we can add the channel to the channel graph, we need
1227
                // to obtain the full funding outpoint that's encoded within
1228
                // the channel ID.
1229
                channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
3✔
1230
                fundingTx, err := b.fetchFundingTxWrapper(&channelID)
3✔
1231
                if err != nil {
3✔
1232
                        //nolint:lll
×
1233
                        //
×
1234
                        // In order to ensure we don't erroneously mark a
×
1235
                        // channel as a zombie due to an RPC failure, we'll
×
1236
                        // attempt to string match for the relevant errors.
×
1237
                        //
×
1238
                        // * btcd:
×
1239
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
×
1240
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
×
1241
                        // * bitcoind:
×
1242
                        //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
×
1243
                        //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
×
1244
                        switch {
×
1245
                        case strings.Contains(err.Error(), "not found"):
×
1246
                                fallthrough
×
1247

1248
                        case strings.Contains(err.Error(), "out of range"):
×
1249
                                // If the funding transaction isn't found at
×
1250
                                // all, then we'll mark the edge itself as a
×
1251
                                // zombie so we don't continue to request it.
×
1252
                                // We use the "zero key" for both node pubkeys
×
1253
                                // so this edge can't be resurrected.
×
1254
                                zErr := b.addZombieEdge(msg.ChannelID)
×
1255
                                if zErr != nil {
×
1256
                                        return zErr
×
1257
                                }
×
1258

1259
                        default:
×
1260
                        }
1261

1262
                        return newErrf(ErrNoFundingTransaction, "unable to "+
×
1263
                                "locate funding tx: %v", err)
×
1264
                }
1265

1266
                // Recreate witness output to be sure that declared in channel
1267
                // edge bitcoin keys and channel value corresponds to the
1268
                // reality.
1269
                fundingPkScript, err := makeFundingScript(
3✔
1270
                        msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:],
3✔
1271
                        msg.Features,
3✔
1272
                )
3✔
1273
                if err != nil {
3✔
1274
                        return err
×
1275
                }
×
1276

1277
                // Next we'll validate that this channel is actually well
1278
                // formed. If this check fails, then this channel either
1279
                // doesn't exist, or isn't the one that was meant to be created
1280
                // according to the passed channel proofs.
1281
                fundingPoint, err := chanvalidate.Validate(
3✔
1282
                        &chanvalidate.Context{
3✔
1283
                                Locator: &chanvalidate.ShortChanIDChanLocator{
3✔
1284
                                        ID: channelID,
3✔
1285
                                },
3✔
1286
                                MultiSigPkScript: fundingPkScript,
3✔
1287
                                FundingTx:        fundingTx,
3✔
1288
                        },
3✔
1289
                )
3✔
1290
                if err != nil {
3✔
1291
                        // Mark the edge as a zombie so we won't try to
×
1292
                        // re-validate it on start up.
×
1293
                        if err := b.addZombieEdge(msg.ChannelID); err != nil {
×
1294
                                return err
×
1295
                        }
×
1296

1297
                        return newErrf(ErrInvalidFundingOutput, "output "+
×
1298
                                "failed validation: %w", err)
×
1299
                }
1300

1301
                // Now that we have the funding outpoint of the channel, ensure
1302
                // that it hasn't yet been spent. If so, then this channel has
1303
                // been closed so we'll ignore it.
1304
                chanUtxo, err := b.cfg.Chain.GetUtxo(
3✔
1305
                        fundingPoint, fundingPkScript, channelID.BlockHeight,
3✔
1306
                        b.quit,
3✔
1307
                )
3✔
1308
                if err != nil {
3✔
1309
                        if errors.Is(err, btcwallet.ErrOutputSpent) {
×
1310
                                zErr := b.addZombieEdge(msg.ChannelID)
×
1311
                                if zErr != nil {
×
1312
                                        return zErr
×
1313
                                }
×
1314
                        }
1315

1316
                        return newErrf(ErrChannelSpent, "unable to fetch utxo "+
×
1317
                                "for chan_id=%v, chan_point=%v: %v",
×
1318
                                msg.ChannelID, fundingPoint, err)
×
1319
                }
1320

1321
                // TODO(roasbeef): this is a hack, needs to be removed
1322
                // after commitment fees are dynamic.
1323
                msg.Capacity = btcutil.Amount(chanUtxo.Value)
3✔
1324
                msg.ChannelPoint = *fundingPoint
3✔
1325
                if err := b.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
3✔
1326
                        return errors.Errorf("unable to add edge: %v", err)
×
1327
                }
×
1328

1329
                log.Debugf("New channel discovered! Link "+
3✔
1330
                        "connects %x and %x with ChannelPoint(%v): "+
3✔
1331
                        "chan_id=%v, capacity=%v",
3✔
1332
                        msg.NodeKey1Bytes, msg.NodeKey2Bytes,
3✔
1333
                        fundingPoint, msg.ChannelID, msg.Capacity)
3✔
1334
                b.stats.incNumEdgesDiscovered()
3✔
1335

3✔
1336
                // As a new edge has been added to the channel graph, we'll
3✔
1337
                // update the current UTXO filter within our active
3✔
1338
                // FilteredChainView so we are notified if/when this channel is
3✔
1339
                // closed.
3✔
1340
                filterUpdate := []channeldb.EdgePoint{
3✔
1341
                        {
3✔
1342
                                FundingPkScript: fundingPkScript,
3✔
1343
                                OutPoint:        *fundingPoint,
3✔
1344
                        },
3✔
1345
                }
3✔
1346
                err = b.cfg.ChainView.UpdateFilter(
3✔
1347
                        filterUpdate, b.bestHeight.Load(),
3✔
1348
                )
3✔
1349
                if err != nil {
3✔
1350
                        return errors.Errorf("unable to update chain "+
×
1351
                                "view: %v", err)
×
1352
                }
×
1353

1354
        case *models.ChannelEdgePolicy:
3✔
1355
                log.Debugf("Received ChannelEdgePolicy for channel %v",
3✔
1356
                        msg.ChannelID)
3✔
1357

3✔
1358
                // We make sure to hold the mutex for this channel ID,
3✔
1359
                // such that no other goroutine is concurrently doing
3✔
1360
                // database accesses for the same channel ID.
3✔
1361
                b.channelEdgeMtx.Lock(msg.ChannelID)
3✔
1362
                defer b.channelEdgeMtx.Unlock(msg.ChannelID)
3✔
1363

3✔
1364
                edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
3✔
1365
                        b.cfg.Graph.HasChannelEdge(msg.ChannelID)
3✔
1366
                if err != nil && !errors.Is(
3✔
1367
                        err, channeldb.ErrGraphNoEdgesFound,
3✔
1368
                ) {
3✔
1369

×
1370
                        return errors.Errorf("unable to check for edge "+
×
1371
                                "existence: %v", err)
×
1372
                }
×
1373

1374
                // If the channel is marked as a zombie in our database, and
1375
                // we consider this a stale update, then we should not apply the
1376
                // policy.
1377
                isStaleUpdate := time.Since(msg.LastUpdate) >
3✔
1378
                        b.cfg.ChannelPruneExpiry
3✔
1379

3✔
1380
                if isZombie && isStaleUpdate {
3✔
1381
                        return newErrf(ErrIgnored, "ignoring stale update "+
×
1382
                                "(flags=%v|%v) for zombie chan_id=%v",
×
1383
                                msg.MessageFlags, msg.ChannelFlags,
×
1384
                                msg.ChannelID)
×
1385
                }
×
1386

1387
                // If the channel doesn't exist in our database, we cannot
1388
                // apply the updated policy.
1389
                if !exists {
3✔
1390
                        return newErrf(ErrIgnored, "ignoring update "+
×
1391
                                "(flags=%v|%v) for unknown chan_id=%v",
×
1392
                                msg.MessageFlags, msg.ChannelFlags,
×
1393
                                msg.ChannelID)
×
1394
                }
×
1395

1396
                // As edges are directional edge node has a unique policy for
1397
                // the direction of the edge they control. Therefore, we first
1398
                // check if we already have the most up-to-date information for
1399
                // that edge. If this message has a timestamp not strictly
1400
                // newer than what we already know of we can exit early.
1401
                switch {
3✔
1402
                // A flag set of 0 indicates this is an announcement for the
1403
                // "first" node in the channel.
1404
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
3✔
1405

3✔
1406
                        // Ignore outdated message.
3✔
1407
                        if !edge1Timestamp.Before(msg.LastUpdate) {
6✔
1408
                                return newErrf(ErrOutdated, "Ignoring "+
3✔
1409
                                        "outdated update (flags=%v|%v) for "+
3✔
1410
                                        "known chan_id=%v", msg.MessageFlags,
3✔
1411
                                        msg.ChannelFlags, msg.ChannelID)
3✔
1412
                        }
3✔
1413

1414
                // Similarly, a flag set of 1 indicates this is an announcement
1415
                // for the "second" node in the channel.
1416
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
3✔
1417

3✔
1418
                        // Ignore outdated message.
3✔
1419
                        if !edge2Timestamp.Before(msg.LastUpdate) {
6✔
1420
                                return newErrf(ErrOutdated, "Ignoring "+
3✔
1421
                                        "outdated update (flags=%v|%v) for "+
3✔
1422
                                        "known chan_id=%v", msg.MessageFlags,
3✔
1423
                                        msg.ChannelFlags, msg.ChannelID)
3✔
1424
                        }
3✔
1425
                }
1426

1427
                // Now that we know this isn't a stale update, we'll apply the
1428
                // new edge policy to the proper directional edge within the
1429
                // channel graph.
1430
                if err = b.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
3✔
1431
                        err := errors.Errorf("unable to add channel: %v", err)
×
1432
                        log.Error(err)
×
1433
                        return err
×
1434
                }
×
1435

1436
                log.Tracef("New channel update applied: %v",
3✔
1437
                        lnutils.SpewLogClosure(msg))
3✔
1438
                b.stats.incNumChannelUpdates()
3✔
1439

1440
        default:
×
1441
                return errors.Errorf("wrong routing update message type")
×
1442
        }
1443

1444
        return nil
3✔
1445
}
1446

1447
// fetchFundingTxWrapper is a wrapper around fetchFundingTx, except that it
1448
// will exit if the router has stopped.
1449
func (b *Builder) fetchFundingTxWrapper(chanID *lnwire.ShortChannelID) (
1450
        *wire.MsgTx, error) {
3✔
1451

3✔
1452
        txChan := make(chan *wire.MsgTx, 1)
3✔
1453
        errChan := make(chan error, 1)
3✔
1454

3✔
1455
        go func() {
6✔
1456
                tx, err := b.fetchFundingTx(chanID)
3✔
1457
                if err != nil {
3✔
1458
                        errChan <- err
×
1459
                        return
×
1460
                }
×
1461

1462
                txChan <- tx
3✔
1463
        }()
1464

1465
        select {
3✔
1466
        case tx := <-txChan:
3✔
1467
                return tx, nil
3✔
1468

1469
        case err := <-errChan:
×
1470
                return nil, err
×
1471

1472
        case <-b.quit:
×
1473
                return nil, ErrGraphBuilderShuttingDown
×
1474
        }
1475
}
1476

1477
// fetchFundingTx returns the funding transaction identified by the passed
1478
// short channel ID.
1479
//
1480
// TODO(roasbeef): replace with call to GetBlockTransaction? (would allow to
1481
// later use getblocktxn).
1482
func (b *Builder) fetchFundingTx(
1483
        chanID *lnwire.ShortChannelID) (*wire.MsgTx, error) {
3✔
1484

3✔
1485
        // First fetch the block hash by the block number encoded, then use
3✔
1486
        // that hash to fetch the block itself.
3✔
1487
        blockNum := int64(chanID.BlockHeight)
3✔
1488
        blockHash, err := b.cfg.Chain.GetBlockHash(blockNum)
3✔
1489
        if err != nil {
3✔
1490
                return nil, err
×
1491
        }
×
1492
        fundingBlock, err := b.cfg.Chain.GetBlock(blockHash)
3✔
1493
        if err != nil {
3✔
1494
                return nil, err
×
1495
        }
×
1496

1497
        // As a sanity check, ensure that the advertised transaction index is
1498
        // within the bounds of the total number of transactions within a
1499
        // block.
1500
        numTxns := uint32(len(fundingBlock.Transactions))
3✔
1501
        if chanID.TxIndex > numTxns-1 {
3✔
1502
                return nil, fmt.Errorf("tx_index=#%v "+
×
1503
                        "is out of range (max_index=%v), network_chan_id=%v",
×
1504
                        chanID.TxIndex, numTxns-1, chanID)
×
1505
        }
×
1506

1507
        return fundingBlock.Transactions[chanID.TxIndex].Copy(), nil
3✔
1508
}
1509

1510
// routingMsg couples a routing related routing topology update to the
1511
// error channel.
1512
type routingMsg struct {
1513
        msg interface{}
1514
        op  []batch.SchedulerOption
1515
        err chan error
1516
}
1517

1518
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
1519
// database. It returns a bool indicating whether the updates were successful.
1520
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate) bool {
3✔
1521
        ch, _, _, err := b.GetChannelByID(msg.ShortChannelID)
3✔
1522
        if err != nil {
6✔
1523
                log.Errorf("Unable to retrieve channel by id: %v", err)
3✔
1524
                return false
3✔
1525
        }
3✔
1526

1527
        var pubKey *btcec.PublicKey
3✔
1528

3✔
1529
        switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
3✔
1530
        case 0:
3✔
1531
                pubKey, _ = ch.NodeKey1()
3✔
1532

1533
        case 1:
3✔
1534
                pubKey, _ = ch.NodeKey2()
3✔
1535
        }
1536

1537
        // Exit early if the pubkey cannot be decided.
1538
        if pubKey == nil {
3✔
1539
                log.Errorf("Unable to decide pubkey with ChannelFlags=%v",
×
1540
                        msg.ChannelFlags)
×
1541
                return false
×
1542
        }
×
1543

1544
        err = ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg)
3✔
1545
        if err != nil {
3✔
1546
                log.Errorf("Unable to validate channel update: %v", err)
×
1547
                return false
×
1548
        }
×
1549

1550
        err = b.UpdateEdge(&models.ChannelEdgePolicy{
3✔
1551
                SigBytes:                  msg.Signature.ToSignatureBytes(),
3✔
1552
                ChannelID:                 msg.ShortChannelID.ToUint64(),
3✔
1553
                LastUpdate:                time.Unix(int64(msg.Timestamp), 0),
3✔
1554
                MessageFlags:              msg.MessageFlags,
3✔
1555
                ChannelFlags:              msg.ChannelFlags,
3✔
1556
                TimeLockDelta:             msg.TimeLockDelta,
3✔
1557
                MinHTLC:                   msg.HtlcMinimumMsat,
3✔
1558
                MaxHTLC:                   msg.HtlcMaximumMsat,
3✔
1559
                FeeBaseMSat:               lnwire.MilliSatoshi(msg.BaseFee),
3✔
1560
                FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
3✔
1561
                ExtraOpaqueData:           msg.ExtraOpaqueData,
3✔
1562
        })
3✔
1563
        if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
3✔
1564
                log.Errorf("Unable to apply channel update: %v", err)
×
1565
                return false
×
1566
        }
×
1567

1568
        return true
3✔
1569
}
1570

1571
// AddNode is used to add information about a node to the router database. If
1572
// the node with this pubkey is not present in an existing channel, it will
1573
// be ignored.
1574
//
1575
// NOTE: This method is part of the ChannelGraphSource interface.
1576
func (b *Builder) AddNode(node *channeldb.LightningNode,
1577
        op ...batch.SchedulerOption) error {
3✔
1578

3✔
1579
        rMsg := &routingMsg{
3✔
1580
                msg: node,
3✔
1581
                op:  op,
3✔
1582
                err: make(chan error, 1),
3✔
1583
        }
3✔
1584

3✔
1585
        select {
3✔
1586
        case b.networkUpdates <- rMsg:
3✔
1587
                select {
3✔
1588
                case err := <-rMsg.err:
3✔
1589
                        return err
3✔
1590
                case <-b.quit:
×
1591
                        return ErrGraphBuilderShuttingDown
×
1592
                }
1593
        case <-b.quit:
×
1594
                return ErrGraphBuilderShuttingDown
×
1595
        }
1596
}
1597

1598
// AddEdge is used to add edge/channel to the topology of the router, after all
1599
// information about channel will be gathered this edge/channel might be used
1600
// in construction of payment path.
1601
//
1602
// NOTE: This method is part of the ChannelGraphSource interface.
1603
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
1604
        op ...batch.SchedulerOption) error {
3✔
1605

3✔
1606
        rMsg := &routingMsg{
3✔
1607
                msg: edge,
3✔
1608
                op:  op,
3✔
1609
                err: make(chan error, 1),
3✔
1610
        }
3✔
1611

3✔
1612
        select {
3✔
1613
        case b.networkUpdates <- rMsg:
3✔
1614
                select {
3✔
1615
                case err := <-rMsg.err:
3✔
1616
                        return err
3✔
1617
                case <-b.quit:
×
1618
                        return ErrGraphBuilderShuttingDown
×
1619
                }
1620
        case <-b.quit:
×
1621
                return ErrGraphBuilderShuttingDown
×
1622
        }
1623
}
1624

1625
// UpdateEdge is used to update edge information, without this message edge
1626
// considered as not fully constructed.
1627
//
1628
// NOTE: This method is part of the ChannelGraphSource interface.
1629
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
1630
        op ...batch.SchedulerOption) error {
3✔
1631

3✔
1632
        rMsg := &routingMsg{
3✔
1633
                msg: update,
3✔
1634
                op:  op,
3✔
1635
                err: make(chan error, 1),
3✔
1636
        }
3✔
1637

3✔
1638
        select {
3✔
1639
        case b.networkUpdates <- rMsg:
3✔
1640
                select {
3✔
1641
                case err := <-rMsg.err:
3✔
1642
                        return err
3✔
1643
                case <-b.quit:
×
1644
                        return ErrGraphBuilderShuttingDown
×
1645
                }
1646
        case <-b.quit:
×
1647
                return ErrGraphBuilderShuttingDown
×
1648
        }
1649
}
1650

1651
// CurrentBlockHeight returns the block height from POV of the router subsystem.
1652
//
1653
// NOTE: This method is part of the ChannelGraphSource interface.
1654
func (b *Builder) CurrentBlockHeight() (uint32, error) {
3✔
1655
        _, height, err := b.cfg.Chain.GetBestBlock()
3✔
1656
        return uint32(height), err
3✔
1657
}
3✔
1658

1659
// SyncedHeight returns the block height to which the router subsystem currently
1660
// is synced to. This can differ from the above chain height if the goroutine
1661
// responsible for processing the blocks isn't yet up to speed.
1662
func (b *Builder) SyncedHeight() uint32 {
3✔
1663
        return b.bestHeight.Load()
3✔
1664
}
3✔
1665

1666
// GetChannelByID return the channel by the channel id.
1667
//
1668
// NOTE: This method is part of the ChannelGraphSource interface.
1669
func (b *Builder) GetChannelByID(chanID lnwire.ShortChannelID) (
1670
        *models.ChannelEdgeInfo,
1671
        *models.ChannelEdgePolicy,
1672
        *models.ChannelEdgePolicy, error) {
3✔
1673

3✔
1674
        return b.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
3✔
1675
}
3✔
1676

1677
// FetchLightningNode attempts to look up a target node by its identity public
1678
// key. channeldb.ErrGraphNodeNotFound is returned if the node doesn't exist
1679
// within the graph.
1680
//
1681
// NOTE: This method is part of the ChannelGraphSource interface.
1682
func (b *Builder) FetchLightningNode(
1683
        node route.Vertex) (*channeldb.LightningNode, error) {
3✔
1684

3✔
1685
        return b.cfg.Graph.FetchLightningNode(node)
3✔
1686
}
3✔
1687

1688
// ForEachNode is used to iterate over every node in router topology.
1689
//
1690
// NOTE: This method is part of the ChannelGraphSource interface.
1691
func (b *Builder) ForEachNode(
1692
        cb func(*channeldb.LightningNode) error) error {
×
1693

×
1694
        return b.cfg.Graph.ForEachNode(
×
1695
                func(_ kvdb.RTx, n *channeldb.LightningNode) error {
×
1696
                        return cb(n)
×
1697
                })
×
1698
}
1699

1700
// ForAllOutgoingChannels is used to iterate over all outgoing channels owned by
1701
// the router.
1702
//
1703
// NOTE: This method is part of the ChannelGraphSource interface.
1704
func (b *Builder) ForAllOutgoingChannels(cb func(kvdb.RTx,
1705
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error {
3✔
1706

3✔
1707
        return b.cfg.Graph.ForEachNodeChannel(b.cfg.SelfNode,
3✔
1708
                func(tx kvdb.RTx, c *models.ChannelEdgeInfo,
3✔
1709
                        e *models.ChannelEdgePolicy,
3✔
1710
                        _ *models.ChannelEdgePolicy) error {
6✔
1711

3✔
1712
                        if e == nil {
3✔
1713
                                return fmt.Errorf("channel from self node " +
×
1714
                                        "has no policy")
×
1715
                        }
×
1716

1717
                        return cb(tx, c, e)
3✔
1718
                },
1719
        )
1720
}
1721

1722
// AddProof updates the channel edge info with proof which is needed to
1723
// properly announce the edge to the rest of the network.
1724
//
1725
// NOTE: This method is part of the ChannelGraphSource interface.
1726
func (b *Builder) AddProof(chanID lnwire.ShortChannelID,
1727
        proof *models.ChannelAuthProof) error {
3✔
1728

3✔
1729
        info, _, _, err := b.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
3✔
1730
        if err != nil {
3✔
1731
                return err
×
1732
        }
×
1733

1734
        info.AuthProof = proof
3✔
1735

3✔
1736
        return b.cfg.Graph.UpdateChannelEdge(info)
3✔
1737
}
1738

1739
// IsStaleNode returns true if the graph source has a node announcement for the
1740
// target node with a more recent timestamp.
1741
//
1742
// NOTE: This method is part of the ChannelGraphSource interface.
1743
func (b *Builder) IsStaleNode(node route.Vertex,
1744
        timestamp time.Time) bool {
3✔
1745

3✔
1746
        // If our attempt to assert that the node announcement is fresh fails,
3✔
1747
        // then we know that this is actually a stale announcement.
3✔
1748
        err := b.assertNodeAnnFreshness(node, timestamp)
3✔
1749
        if err != nil {
6✔
1750
                log.Debugf("Checking stale node %x got %v", node, err)
3✔
1751
                return true
3✔
1752
        }
3✔
1753

1754
        return false
3✔
1755
}
1756

1757
// IsPublicNode determines whether the given vertex is seen as a public node in
1758
// the graph from the graph's source node's point of view.
1759
//
1760
// NOTE: This method is part of the ChannelGraphSource interface.
1761
func (b *Builder) IsPublicNode(node route.Vertex) (bool, error) {
3✔
1762
        return b.cfg.Graph.IsPublicNode(node)
3✔
1763
}
3✔
1764

1765
// IsKnownEdge returns true if the graph source already knows of the passed
1766
// channel ID either as a live or zombie edge.
1767
//
1768
// NOTE: This method is part of the ChannelGraphSource interface.
1769
func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
3✔
1770
        _, _, exists, isZombie, _ := b.cfg.Graph.HasChannelEdge(
3✔
1771
                chanID.ToUint64(),
3✔
1772
        )
3✔
1773

3✔
1774
        return exists || isZombie
3✔
1775
}
3✔
1776

1777
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
1778
// the passed channel ID (and flags) that have a more recent timestamp.
1779
//
1780
// NOTE: This method is part of the ChannelGraphSource interface.
1781
func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
1782
        timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
3✔
1783

3✔
1784
        edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
3✔
1785
                b.cfg.Graph.HasChannelEdge(chanID.ToUint64())
3✔
1786
        if err != nil {
3✔
1787
                log.Debugf("Check stale edge policy got error: %v", err)
×
1788
                return false
×
1789
        }
×
1790

1791
        // If we know of the edge as a zombie, then we'll make some additional
1792
        // checks to determine if the new policy is fresh.
1793
        if isZombie {
6✔
1794
                // When running with AssumeChannelValid, we also prune channels
3✔
1795
                // if both of their edges are disabled. We'll mark the new
3✔
1796
                // policy as stale if it remains disabled.
3✔
1797
                if b.cfg.AssumeChannelValid {
3✔
1798
                        isDisabled := flags&lnwire.ChanUpdateDisabled ==
×
1799
                                lnwire.ChanUpdateDisabled
×
1800
                        if isDisabled {
×
1801
                                return true
×
1802
                        }
×
1803
                }
1804

1805
                // Otherwise, we'll fall back to our usual ChannelPruneExpiry.
1806
                return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
3✔
1807
        }
1808

1809
        // If we don't know of the edge, then it means it's fresh (thus not
1810
        // stale).
1811
        if !exists {
6✔
1812
                return false
3✔
1813
        }
3✔
1814

1815
        // As edges are directional edge node has a unique policy for the
1816
        // direction of the edge they control. Therefore, we first check if we
1817
        // already have the most up-to-date information for that edge. If so,
1818
        // then we can exit early.
1819
        switch {
3✔
1820
        // A flag set of 0 indicates this is an announcement for the "first"
1821
        // node in the channel.
1822
        case flags&lnwire.ChanUpdateDirection == 0:
3✔
1823
                return !edge1Timestamp.Before(timestamp)
3✔
1824

1825
        // Similarly, a flag set of 1 indicates this is an announcement for the
1826
        // "second" node in the channel.
1827
        case flags&lnwire.ChanUpdateDirection == 1:
3✔
1828
                return !edge2Timestamp.Before(timestamp)
3✔
1829
        }
1830

1831
        return false
×
1832
}
1833

1834
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
1835
//
1836
// NOTE: This method is part of the ChannelGraphSource interface.
1837
func (b *Builder) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
3✔
1838
        return b.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
3✔
1839
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc