• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.36
/graph/builder.go
1
package graph
2

3
import (
4
        "bytes"
5
        "fmt"
6
        "runtime"
7
        "strings"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/chaincfg/chainhash"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/go-errors/errors"
17
        "github.com/lightningnetwork/lnd/batch"
18
        "github.com/lightningnetwork/lnd/chainntnfs"
19
        "github.com/lightningnetwork/lnd/fn/v2"
20
        graphdb "github.com/lightningnetwork/lnd/graph/db"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/input"
23
        "github.com/lightningnetwork/lnd/kvdb"
24
        "github.com/lightningnetwork/lnd/lnutils"
25
        "github.com/lightningnetwork/lnd/lnwallet"
26
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
27
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
28
        "github.com/lightningnetwork/lnd/lnwire"
29
        "github.com/lightningnetwork/lnd/multimutex"
30
        "github.com/lightningnetwork/lnd/netann"
31
        "github.com/lightningnetwork/lnd/routing/chainview"
32
        "github.com/lightningnetwork/lnd/routing/route"
33
        "github.com/lightningnetwork/lnd/ticker"
34
)
35

36
const (
37
        // DefaultChannelPruneExpiry is the default duration used to determine
38
        // if a channel should be pruned or not.
39
        DefaultChannelPruneExpiry = time.Hour * 24 * 14
40

41
        // DefaultFirstTimePruneDelay is the time we'll wait after startup
42
        // before attempting to prune the graph for zombie channels. We don't
43
        // do it immediately after startup to allow lnd to start up without
44
        // getting blocked by this job.
45
        DefaultFirstTimePruneDelay = 30 * time.Second
46

47
        // defaultStatInterval governs how often the router will log non-empty
48
        // stats related to processing new channels, updates, or node
49
        // announcements.
50
        defaultStatInterval = time.Minute
51
)
52

53
var (
54
        // ErrGraphBuilderShuttingDown is returned if the graph builder is in
55
        // the process of shutting down.
56
        ErrGraphBuilderShuttingDown = fmt.Errorf("graph builder shutting down")
57
)
58

59
// Config holds the configuration required by the Builder.
60
type Config struct {
61
        // SelfNode is the public key of the node that this channel router
62
        // belongs to.
63
        SelfNode route.Vertex
64

65
        // Graph is the channel graph that the ChannelRouter will use to gather
66
        // metrics from and also to carry out path finding queries.
67
        Graph DB
68

69
        // Chain is the router's source to the most up-to-date blockchain data.
70
        // All incoming advertised channels will be checked against the chain
71
        // to ensure that the channels advertised are still open.
72
        Chain lnwallet.BlockChainIO
73

74
        // ChainView is an instance of a FilteredChainView which is used to
75
        // watch the sub-set of the UTXO set (the set of active channels) that
76
        // we need in order to properly maintain the channel graph.
77
        ChainView chainview.FilteredChainView
78

79
        // Notifier is a reference to the ChainNotifier, used to grab
80
        // the latest blocks if the router is missing any.
81
        Notifier chainntnfs.ChainNotifier
82

83
        // ChannelPruneExpiry is the duration used to determine if a channel
84
        // should be pruned or not. If the delta between now and when the
85
        // channel was last updated is greater than ChannelPruneExpiry, then
86
        // the channel is marked as a zombie channel eligible for pruning.
87
        ChannelPruneExpiry time.Duration
88

89
        // GraphPruneInterval is used as an interval to determine how often we
90
        // should examine the channel graph to garbage collect zombie channels.
91
        GraphPruneInterval time.Duration
92

93
        // FirstTimePruneDelay is the time we'll wait after startup before
94
        // attempting to prune the graph for zombie channels. We don't do it
95
        // immediately after startup to allow lnd to start up without getting
96
        // blocked by this job.
97
        FirstTimePruneDelay time.Duration
98

99
        // AssumeChannelValid toggles whether the router will check for
100
        // spentness of channel outpoints. For neutrino, this saves long rescans
101
        // from blocking initial usage of the daemon.
102
        AssumeChannelValid bool
103

104
        // StrictZombiePruning determines if we attempt to prune zombie
105
        // channels according to a stricter criteria. If true, then we'll prune
106
        // a channel if only *one* of the edges is considered a zombie.
107
        // Otherwise, we'll only prune the channel when both edges have a very
108
        // dated last update.
109
        StrictZombiePruning bool
110

111
        // IsAlias returns whether a passed ShortChannelID is an alias. This is
112
        // only used for our local channels.
113
        IsAlias func(scid lnwire.ShortChannelID) bool
114
}
115

116
// Builder builds and maintains a view of the Lightning Network graph.
117
type Builder struct {
118
        started atomic.Bool
119
        stopped atomic.Bool
120

121
        ntfnClientCounter atomic.Uint64
122
        bestHeight        atomic.Uint32
123

124
        cfg *Config
125

126
        // newBlocks is a channel in which new blocks connected to the end of
127
        // the main chain are sent over, and blocks updated after a call to
128
        // UpdateFilter.
129
        newBlocks <-chan *chainview.FilteredBlock
130

131
        // staleBlocks is a channel in which blocks disconnected from the end
132
        // of our currently known best chain are sent over.
133
        staleBlocks <-chan *chainview.FilteredBlock
134

135
        // networkUpdates is a channel that carries new topology updates
136
        // messages from outside the Builder to be processed by the
137
        // networkHandler.
138
        networkUpdates chan *routingMsg
139

140
        // topologyClients maps a client's unique notification ID to a
141
        // topologyClient client that contains its notification dispatch
142
        // channel.
143
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
144

145
        // ntfnClientUpdates is a channel that's used to send new updates to
146
        // topology notification clients to the Builder. Updates either
147
        // add a new notification client, or cancel notifications for an
148
        // existing client.
149
        ntfnClientUpdates chan *topologyClientUpdate
150

151
        // channelEdgeMtx is a mutex we use to make sure we process only one
152
        // ChannelEdgePolicy at a time for a given channelID, to ensure
153
        // consistency between the various database accesses.
154
        channelEdgeMtx *multimutex.Mutex[uint64]
155

156
        // statTicker is a resumable ticker that logs the router's progress as
157
        // it discovers channels or receives updates.
158
        statTicker ticker.Ticker
159

160
        // stats tracks newly processed channels, updates, and node
161
        // announcements over a window of defaultStatInterval.
162
        stats *routerStats
163

164
        quit chan struct{}
165
        wg   sync.WaitGroup
166
}
167

168
// A compile time check to ensure Builder implements the
169
// ChannelGraphSource interface.
170
var _ ChannelGraphSource = (*Builder)(nil)
171

172
// NewBuilder constructs a new Builder.
173
func NewBuilder(cfg *Config) (*Builder, error) {
21✔
174
        return &Builder{
21✔
175
                cfg:               cfg,
21✔
176
                networkUpdates:    make(chan *routingMsg),
21✔
177
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
21✔
178
                ntfnClientUpdates: make(chan *topologyClientUpdate),
21✔
179
                channelEdgeMtx:    multimutex.NewMutex[uint64](),
21✔
180
                statTicker:        ticker.New(defaultStatInterval),
21✔
181
                stats:             new(routerStats),
21✔
182
                quit:              make(chan struct{}),
21✔
183
        }, nil
21✔
184
}
21✔
185

186
// Start launches all the goroutines the Builder requires to carry out its
187
// duties. If the builder has already been started, then this method is a noop.
188
func (b *Builder) Start() error {
21✔
189
        if !b.started.CompareAndSwap(false, true) {
21✔
190
                return nil
×
191
        }
×
192

193
        log.Info("Builder starting")
21✔
194

21✔
195
        bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
21✔
196
        if err != nil {
21✔
197
                return err
×
198
        }
×
199

200
        // If the graph has never been pruned, or hasn't fully been created yet,
201
        // then we don't treat this as an explicit error.
202
        if _, _, err := b.cfg.Graph.PruneTip(); err != nil {
40✔
203
                switch {
19✔
204
                case errors.Is(err, graphdb.ErrGraphNeverPruned):
19✔
205
                        fallthrough
19✔
206

207
                case errors.Is(err, graphdb.ErrGraphNotFound):
19✔
208
                        // If the graph has never been pruned, then we'll set
19✔
209
                        // the prune height to the current best height of the
19✔
210
                        // chain backend.
19✔
211
                        _, err = b.cfg.Graph.PruneGraph(
19✔
212
                                nil, bestHash, uint32(bestHeight),
19✔
213
                        )
19✔
214
                        if err != nil {
19✔
215
                                return err
×
216
                        }
×
217

218
                default:
×
219
                        return err
×
220
                }
221
        }
222

223
        // If AssumeChannelValid is present, then we won't rely on pruning
224
        // channels from the graph based on their spentness, but whether they
225
        // are considered zombies or not. We will start zombie pruning after a
226
        // small delay, to avoid slowing down startup of lnd.
227
        if b.cfg.AssumeChannelValid { //nolint:nestif
22✔
228
                time.AfterFunc(b.cfg.FirstTimePruneDelay, func() {
2✔
229
                        select {
1✔
230
                        case <-b.quit:
×
231
                                return
×
232
                        default:
1✔
233
                        }
234

235
                        log.Info("Initial zombie prune starting")
1✔
236
                        if err := b.pruneZombieChans(); err != nil {
1✔
237
                                log.Errorf("Unable to prune zombies: %v", err)
×
238
                        }
×
239
                })
240
        } else {
20✔
241
                // Otherwise, we'll use our filtered chain view to prune
20✔
242
                // channels as soon as they are detected as spent on-chain.
20✔
243
                if err := b.cfg.ChainView.Start(); err != nil {
20✔
244
                        return err
×
245
                }
×
246

247
                // Once the instance is active, we'll fetch the channel we'll
248
                // receive notifications over.
249
                b.newBlocks = b.cfg.ChainView.FilteredBlocks()
20✔
250
                b.staleBlocks = b.cfg.ChainView.DisconnectedBlocks()
20✔
251

20✔
252
                // Before we perform our manual block pruning, we'll construct
20✔
253
                // and apply a fresh chain filter to the active
20✔
254
                // FilteredChainView instance.  We do this before, as otherwise
20✔
255
                // we may miss on-chain events as the filter hasn't properly
20✔
256
                // been applied.
20✔
257
                channelView, err := b.cfg.Graph.ChannelView()
20✔
258
                if err != nil && !errors.Is(
20✔
259
                        err, graphdb.ErrGraphNoEdgesFound,
20✔
260
                ) {
20✔
261

×
262
                        return err
×
263
                }
×
264

265
                log.Infof("Filtering chain using %v channels active",
20✔
266
                        len(channelView))
20✔
267

20✔
268
                if len(channelView) != 0 {
27✔
269
                        err = b.cfg.ChainView.UpdateFilter(
7✔
270
                                channelView, uint32(bestHeight),
7✔
271
                        )
7✔
272
                        if err != nil {
7✔
273
                                return err
×
274
                        }
×
275
                }
276

277
                // The graph pruning might have taken a while and there could be
278
                // new blocks available.
279
                _, bestHeight, err = b.cfg.Chain.GetBestBlock()
20✔
280
                if err != nil {
20✔
281
                        return err
×
282
                }
×
283
                b.bestHeight.Store(uint32(bestHeight))
20✔
284

20✔
285
                // Before we begin normal operation of the router, we first need
20✔
286
                // to synchronize the channel graph to the latest state of the
20✔
287
                // UTXO set.
20✔
288
                if err := b.syncGraphWithChain(); err != nil {
20✔
289
                        return err
×
290
                }
×
291

292
                // Finally, before we proceed, we'll prune any unconnected nodes
293
                // from the graph in order to ensure we maintain a tight graph
294
                // of "useful" nodes.
295
                err = b.cfg.Graph.PruneGraphNodes()
20✔
296
                if err != nil &&
20✔
297
                        !errors.Is(err, graphdb.ErrGraphNodesNotFound) {
20✔
298

×
299
                        return err
×
300
                }
×
301
        }
302

303
        b.wg.Add(1)
21✔
304
        go b.networkHandler()
21✔
305

21✔
306
        log.Debug("Builder started")
21✔
307

21✔
308
        return nil
21✔
309
}
310

311
// Stop signals to the Builder that it should halt all routines. This method
312
// will *block* until all goroutines have excited. If the builder has already
313
// stopped then this method will return immediately.
314
func (b *Builder) Stop() error {
21✔
315
        if !b.stopped.CompareAndSwap(false, true) {
23✔
316
                return nil
2✔
317
        }
2✔
318

319
        log.Info("Builder shutting down...")
19✔
320

19✔
321
        // Our filtered chain view could've only been started if
19✔
322
        // AssumeChannelValid isn't present.
19✔
323
        if !b.cfg.AssumeChannelValid {
37✔
324
                if err := b.cfg.ChainView.Stop(); err != nil {
18✔
325
                        return err
×
326
                }
×
327
        }
328

329
        close(b.quit)
19✔
330
        b.wg.Wait()
19✔
331

19✔
332
        log.Debug("Builder shutdown complete")
19✔
333

19✔
334
        return nil
19✔
335
}
336

337
// syncGraphWithChain attempts to synchronize the current channel graph with
338
// the latest UTXO set state. This process involves pruning from the channel
339
// graph any channels which have been closed by spending their funding output
340
// since we've been down.
341
func (b *Builder) syncGraphWithChain() error {
20✔
342
        // First, we'll need to check to see if we're already in sync with the
20✔
343
        // latest state of the UTXO set.
20✔
344
        bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
20✔
345
        if err != nil {
20✔
346
                return err
×
347
        }
×
348
        b.bestHeight.Store(uint32(bestHeight))
20✔
349

20✔
350
        pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip()
20✔
351
        if err != nil {
20✔
352
                switch {
×
353
                // If the graph has never been pruned, or hasn't fully been
354
                // created yet, then we don't treat this as an explicit error.
355
                case errors.Is(err, graphdb.ErrGraphNeverPruned):
×
356
                case errors.Is(err, graphdb.ErrGraphNotFound):
×
357
                default:
×
358
                        return err
×
359
                }
360
        }
361

362
        log.Infof("Prune tip for Channel Graph: height=%v, hash=%v",
20✔
363
                pruneHeight, pruneHash)
20✔
364

20✔
365
        switch {
20✔
366
        // If the graph has never been pruned, then we can exit early as this
367
        // entails it's being created for the first time and hasn't seen any
368
        // block or created channels.
369
        case pruneHeight == 0 || pruneHash == nil:
4✔
370
                return nil
4✔
371

372
        // If the block hashes and heights match exactly, then we don't need to
373
        // prune the channel graph as we're already fully in sync.
374
        case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
14✔
375
                return nil
14✔
376
        }
377

378
        // If the main chain blockhash at prune height is different from the
379
        // prune hash, this might indicate the database is on a stale branch.
380
        mainBlockHash, err := b.cfg.Chain.GetBlockHash(int64(pruneHeight))
2✔
381
        if err != nil {
2✔
382
                return err
×
383
        }
×
384

385
        // While we are on a stale branch of the chain, walk backwards to find
386
        // first common block.
387
        for !pruneHash.IsEqual(mainBlockHash) {
12✔
388
                log.Infof("channel graph is stale. Disconnecting block %v "+
10✔
389
                        "(hash=%v)", pruneHeight, pruneHash)
10✔
390
                // Prune the graph for every channel that was opened at height
10✔
391
                // >= pruneHeight.
10✔
392
                _, err := b.cfg.Graph.DisconnectBlockAtHeight(pruneHeight)
10✔
393
                if err != nil {
10✔
394
                        return err
×
395
                }
×
396

397
                pruneHash, pruneHeight, err = b.cfg.Graph.PruneTip()
10✔
398
                switch {
10✔
399
                // If at this point the graph has never been pruned, we can exit
400
                // as this entails we are back to the point where it hasn't seen
401
                // any block or created channels, alas there's nothing left to
402
                // prune.
403
                case errors.Is(err, graphdb.ErrGraphNeverPruned):
×
404
                        return nil
×
405

406
                case errors.Is(err, graphdb.ErrGraphNotFound):
×
407
                        return nil
×
408

409
                case err != nil:
×
410
                        return err
×
411

412
                default:
10✔
413
                }
414

415
                mainBlockHash, err = b.cfg.Chain.GetBlockHash(
10✔
416
                        int64(pruneHeight),
10✔
417
                )
10✔
418
                if err != nil {
10✔
419
                        return err
×
420
                }
×
421
        }
422

423
        log.Infof("Syncing channel graph from height=%v (hash=%v) to "+
2✔
424
                "height=%v (hash=%v)", pruneHeight, pruneHash, bestHeight,
2✔
425
                bestHash)
2✔
426

2✔
427
        // If we're not yet caught up, then we'll walk forward in the chain
2✔
428
        // pruning the channel graph with each new block that hasn't yet been
2✔
429
        // consumed by the channel graph.
2✔
430
        var spentOutputs []*wire.OutPoint
2✔
431
        for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ { //nolint:ll
24✔
432
                // Break out of the rescan early if a shutdown has been
22✔
433
                // requested, otherwise long rescans will block the daemon from
22✔
434
                // shutting down promptly.
22✔
435
                select {
22✔
436
                case <-b.quit:
×
437
                        return ErrGraphBuilderShuttingDown
×
438
                default:
22✔
439
                }
440

441
                // Using the next height, request a manual block pruning from
442
                // the chainview for the particular block hash.
443
                log.Infof("Filtering block for closed channels, at height: %v",
22✔
444
                        int64(nextHeight))
22✔
445
                nextHash, err := b.cfg.Chain.GetBlockHash(int64(nextHeight))
22✔
446
                if err != nil {
22✔
447
                        return err
×
448
                }
×
449
                log.Tracef("Running block filter on block with hash: %v",
22✔
450
                        nextHash)
22✔
451
                filterBlock, err := b.cfg.ChainView.FilterBlock(nextHash)
22✔
452
                if err != nil {
22✔
453
                        return err
×
454
                }
×
455

456
                // We're only interested in all prior outputs that have been
457
                // spent in the block, so collate all the referenced previous
458
                // outpoints within each tx and input.
459
                for _, tx := range filterBlock.Transactions {
23✔
460
                        for _, txIn := range tx.TxIn {
2✔
461
                                spentOutputs = append(spentOutputs,
1✔
462
                                        &txIn.PreviousOutPoint)
1✔
463
                        }
1✔
464
                }
465
        }
466

467
        // With the spent outputs gathered, attempt to prune the channel graph,
468
        // also passing in the best hash+height so the prune tip can be updated.
469
        closedChans, err := b.cfg.Graph.PruneGraph(
2✔
470
                spentOutputs, bestHash, uint32(bestHeight),
2✔
471
        )
2✔
472
        if err != nil {
2✔
473
                return err
×
474
        }
×
475

476
        log.Infof("Graph pruning complete: %v channels were closed since "+
2✔
477
                "height %v", len(closedChans), pruneHeight)
2✔
478

2✔
479
        return nil
2✔
480
}
481

482
// isZombieChannel takes two edge policy updates and determines if the
483
// corresponding channel should be considered a zombie. The first boolean is
484
// true if the policy update from node 1 is considered a zombie, the second
485
// boolean is that of node 2, and the final boolean is true if the channel
486
// is considered a zombie.
487
func (b *Builder) isZombieChannel(e1,
488
        e2 *models.ChannelEdgePolicy) (bool, bool, bool) {
6✔
489

6✔
490
        chanExpiry := b.cfg.ChannelPruneExpiry
6✔
491

6✔
492
        e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
6✔
493
        e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
6✔
494

6✔
495
        var e1Time, e2Time time.Time
6✔
496
        if e1 != nil {
10✔
497
                e1Time = e1.LastUpdate
4✔
498
        }
4✔
499
        if e2 != nil {
12✔
500
                e2Time = e2.LastUpdate
6✔
501
        }
6✔
502

503
        return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time)
6✔
504
}
505

506
// IsZombieChannel takes the timestamps of the latest channel updates for a
507
// channel and returns true if the channel should be considered a zombie based
508
// on these timestamps.
509
func (b *Builder) IsZombieChannel(updateTime1,
510
        updateTime2 time.Time) bool {
6✔
511

6✔
512
        chanExpiry := b.cfg.ChannelPruneExpiry
6✔
513

6✔
514
        e1Zombie := updateTime1.IsZero() ||
6✔
515
                time.Since(updateTime1) >= chanExpiry
6✔
516

6✔
517
        e2Zombie := updateTime2.IsZero() ||
6✔
518
                time.Since(updateTime2) >= chanExpiry
6✔
519

6✔
520
        // If we're using strict zombie pruning, then a channel is only
6✔
521
        // considered live if both edges have a recent update we know of.
6✔
522
        if b.cfg.StrictZombiePruning {
9✔
523
                return e1Zombie || e2Zombie
3✔
524
        }
3✔
525

526
        // Otherwise, if we're using the less strict variant, then a channel is
527
        // considered live if either of the edges have a recent update.
528
        return e1Zombie && e2Zombie
3✔
529
}
530

531
// pruneZombieChans is a method that will be called periodically to prune out
532
// any "zombie" channels. We consider channels zombies if *both* edges haven't
533
// been updated since our zombie horizon. If AssumeChannelValid is present,
534
// we'll also consider channels zombies if *both* edges are disabled. This
535
// usually signals that a channel has been closed on-chain. We do this
536
// periodically to keep a healthy, lively routing table.
537
func (b *Builder) pruneZombieChans() error {
5✔
538
        chansToPrune := make(map[uint64]struct{})
5✔
539
        chanExpiry := b.cfg.ChannelPruneExpiry
5✔
540

5✔
541
        log.Infof("Examining channel graph for zombie channels")
5✔
542

5✔
543
        // A helper method to detect if the channel belongs to this node
5✔
544
        isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool {
16✔
545
                return info.NodeKey1Bytes == b.cfg.SelfNode ||
11✔
546
                        info.NodeKey2Bytes == b.cfg.SelfNode
11✔
547
        }
11✔
548

549
        // First, we'll collect all the channels which are eligible for garbage
550
        // collection due to being zombies.
551
        filterPruneChans := func(info *models.ChannelEdgeInfo,
5✔
552
                e1, e2 *models.ChannelEdgePolicy) error {
13✔
553

8✔
554
                // Exit early in case this channel is already marked to be
8✔
555
                // pruned
8✔
556
                _, markedToPrune := chansToPrune[info.ChannelID]
8✔
557
                if markedToPrune {
8✔
558
                        return nil
×
559
                }
×
560

561
                // We'll ensure that we don't attempt to prune our *own*
562
                // channels from the graph, as in any case this should be
563
                // re-advertised by the sub-system above us.
564
                if isSelfChannelEdge(info) {
10✔
565
                        return nil
2✔
566
                }
2✔
567

568
                e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)
6✔
569

6✔
570
                if e1Zombie {
10✔
571
                        log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
4✔
572
                                info.NodeKey1Bytes, info.ChannelID)
4✔
573
                }
4✔
574

575
                if e2Zombie {
12✔
576
                        log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie",
6✔
577
                                info.NodeKey2Bytes, info.ChannelID)
6✔
578
                }
6✔
579

580
                // If either edge hasn't been updated for a period of
581
                // chanExpiry, then we'll mark the channel itself as eligible
582
                // for graph pruning.
583
                if !isZombieChan {
7✔
584
                        return nil
1✔
585
                }
1✔
586

587
                log.Debugf("ChannelID(%v) is a zombie, collecting to prune",
5✔
588
                        info.ChannelID)
5✔
589

5✔
590
                // TODO(roasbeef): add ability to delete single directional edge
5✔
591
                chansToPrune[info.ChannelID] = struct{}{}
5✔
592

5✔
593
                return nil
5✔
594
        }
595

596
        // If AssumeChannelValid is present we'll look at the disabled bit for
597
        // both edges. If they're both disabled, then we can interpret this as
598
        // the channel being closed and can prune it from our graph.
599
        if b.cfg.AssumeChannelValid {
7✔
600
                disabledChanIDs, err := b.cfg.Graph.DisabledChannelIDs()
2✔
601
                if err != nil {
2✔
602
                        return fmt.Errorf("unable to get disabled channels "+
×
603
                                "ids chans: %v", err)
×
604
                }
×
605

606
                disabledEdges, err := b.cfg.Graph.FetchChanInfos(
2✔
607
                        disabledChanIDs,
2✔
608
                )
2✔
609
                if err != nil {
2✔
610
                        return fmt.Errorf("unable to fetch disabled channels "+
×
611
                                "edges chans: %v", err)
×
612
                }
×
613

614
                // Ensuring we won't prune our own channel from the graph.
615
                for _, disabledEdge := range disabledEdges {
5✔
616
                        if !isSelfChannelEdge(disabledEdge.Info) {
4✔
617
                                chansToPrune[disabledEdge.Info.ChannelID] =
1✔
618
                                        struct{}{}
1✔
619
                        }
1✔
620
                }
621
        }
622

623
        startTime := time.Unix(0, 0)
5✔
624
        endTime := time.Now().Add(-1 * chanExpiry)
5✔
625
        oldEdges, err := b.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
5✔
626
        if err != nil {
5✔
627
                return fmt.Errorf("unable to fetch expired channel updates "+
×
628
                        "chans: %v", err)
×
629
        }
×
630

631
        for _, u := range oldEdges {
13✔
632
                err = filterPruneChans(u.Info, u.Policy1, u.Policy2)
8✔
633
                if err != nil {
8✔
634
                        return fmt.Errorf("error filtering channels to "+
×
635
                                "prune: %w", err)
×
636
                }
×
637
        }
638

639
        log.Infof("Pruning %v zombie channels", len(chansToPrune))
5✔
640
        if len(chansToPrune) == 0 {
7✔
641
                return nil
2✔
642
        }
2✔
643

644
        // With the set of zombie-like channels obtained, we'll do another pass
645
        // to delete them from the channel graph.
646
        toPrune := make([]uint64, 0, len(chansToPrune))
3✔
647
        for chanID := range chansToPrune {
9✔
648
                toPrune = append(toPrune, chanID)
6✔
649
                log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
6✔
650
        }
6✔
651
        err = b.cfg.Graph.DeleteChannelEdges(
3✔
652
                b.cfg.StrictZombiePruning, true, toPrune...,
3✔
653
        )
3✔
654
        if err != nil {
3✔
655
                return fmt.Errorf("unable to delete zombie channels: %w", err)
×
656
        }
×
657

658
        // With the channels pruned, we'll also attempt to prune any nodes that
659
        // were a part of them.
660
        err = b.cfg.Graph.PruneGraphNodes()
3✔
661
        if err != nil && !errors.Is(err, graphdb.ErrGraphNodesNotFound) {
3✔
662
                return fmt.Errorf("unable to prune graph nodes: %w", err)
×
663
        }
×
664

665
        return nil
3✔
666
}
667

668
// handleNetworkUpdate is responsible for processing the update message and
669
// notifies topology changes, if any.
670
//
671
// NOTE: must be run inside goroutine.
672
func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier,
673
        update *routingMsg) {
30✔
674

30✔
675
        defer b.wg.Done()
30✔
676
        defer vb.CompleteJob()
30✔
677

30✔
678
        // If this message has an existing dependency, then we'll wait until
30✔
679
        // that has been fully validated before we proceed.
30✔
680
        err := vb.WaitForDependants(update.msg)
30✔
681
        if err != nil {
30✔
682
                switch {
×
683
                case IsError(err, ErrVBarrierShuttingDown):
×
684
                        update.err <- err
×
685

686
                case IsError(err, ErrParentValidationFailed):
×
687
                        update.err <- NewErrf(ErrIgnored, err.Error()) //nolint
×
688

689
                default:
×
690
                        log.Warnf("unexpected error during validation "+
×
691
                                "barrier shutdown: %v", err)
×
692
                        update.err <- err
×
693
                }
694

695
                return
×
696
        }
697

698
        // Process the routing update to determine if this is either a new
699
        // update from our PoV or an update to a prior vertex/edge we
700
        // previously accepted.
701
        err = b.processUpdate(update.msg, update.op...)
30✔
702
        update.err <- err
30✔
703

30✔
704
        // If this message had any dependencies, then we can now signal them to
30✔
705
        // continue.
30✔
706
        allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
30✔
707
        vb.SignalDependants(update.msg, allowDependents)
30✔
708

30✔
709
        // If the error is not nil here, there's no need to send topology
30✔
710
        // change.
30✔
711
        if err != nil {
35✔
712
                // We now decide to log an error or not. If allowDependents is
5✔
713
                // false, it means there is an error and the error is neither
5✔
714
                // ErrIgnored or ErrOutdated. In this case, we'll log an error.
5✔
715
                // Otherwise, we'll add debug log only.
5✔
716
                if allowDependents {
7✔
717
                        log.Debugf("process network updates got: %v", err)
2✔
718
                } else {
5✔
719
                        log.Errorf("process network updates got: %v", err)
3✔
720
                }
3✔
721

722
                return
5✔
723
        }
724

725
        // Otherwise, we'll send off a new notification for the newly accepted
726
        // update, if any.
727
        topChange := &TopologyChange{}
25✔
728
        err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
25✔
729
        if err != nil {
25✔
730
                log.Errorf("unable to update topology change notification: %v",
×
731
                        err)
×
732
                return
×
733
        }
×
734

735
        if !topChange.isEmpty() {
36✔
736
                b.notifyTopologyChange(topChange)
11✔
737
        }
11✔
738
}
739

740
// networkHandler is the primary goroutine for the Builder. The roles of
741
// this goroutine include answering queries related to the state of the
742
// network, pruning the graph on new block notification, applying network
743
// updates, and registering new topology clients.
744
//
745
// NOTE: This MUST be run as a goroutine.
746
func (b *Builder) networkHandler() {
21✔
747
        defer b.wg.Done()
21✔
748

21✔
749
        graphPruneTicker := time.NewTicker(b.cfg.GraphPruneInterval)
21✔
750
        defer graphPruneTicker.Stop()
21✔
751

21✔
752
        defer b.statTicker.Stop()
21✔
753

21✔
754
        b.stats.Reset()
21✔
755

21✔
756
        // We'll use this validation barrier to ensure that we process all jobs
21✔
757
        // in the proper order during parallel validation.
21✔
758
        //
21✔
759
        // NOTE: For AssumeChannelValid, we bump up the maximum number of
21✔
760
        // concurrent validation requests since there are no blocks being
21✔
761
        // fetched. This significantly increases the performance of IGD for
21✔
762
        // neutrino nodes.
21✔
763
        //
21✔
764
        // However, we dial back to use multiple of the number of cores when
21✔
765
        // fully validating, to avoid fetching up to 1000 blocks from the
21✔
766
        // backend. On bitcoind, this will empirically cause massive latency
21✔
767
        // spikes when executing this many concurrent RPC calls. Critical
21✔
768
        // subsystems or basic rpc calls that rely on calls such as GetBestBlock
21✔
769
        // will hang due to excessive load.
21✔
770
        //
21✔
771
        // See https://github.com/lightningnetwork/lnd/issues/4892.
21✔
772
        var validationBarrier *ValidationBarrier
21✔
773
        if b.cfg.AssumeChannelValid {
22✔
774
                validationBarrier = NewValidationBarrier(1000, b.quit)
1✔
775
        } else {
21✔
776
                validationBarrier = NewValidationBarrier(
20✔
777
                        4*runtime.NumCPU(), b.quit,
20✔
778
                )
20✔
779
        }
20✔
780

781
        for {
153✔
782
                // If there are stats, resume the statTicker.
132✔
783
                if !b.stats.Empty() {
177✔
784
                        b.statTicker.Resume()
45✔
785
                }
45✔
786

787
                select {
132✔
788
                // A new fully validated network update has just arrived. As a
789
                // result we'll modify the channel graph accordingly depending
790
                // on the exact type of the message.
791
                case update := <-b.networkUpdates:
30✔
792
                        // We'll set up any dependants, and wait until a free
30✔
793
                        // slot for this job opens up, this allows us to not
30✔
794
                        // have thousands of goroutines active.
30✔
795
                        validationBarrier.InitJobDependencies(update.msg)
30✔
796

30✔
797
                        b.wg.Add(1)
30✔
798
                        go b.handleNetworkUpdate(validationBarrier, update)
30✔
799

800
                        // TODO(roasbeef): remove all unconnected vertexes
801
                        // after N blocks pass with no corresponding
802
                        // announcements.
803

804
                case chainUpdate, ok := <-b.staleBlocks:
10✔
805
                        // If the channel has been closed, then this indicates
10✔
806
                        // the daemon is shutting down, so we exit ourselves.
10✔
807
                        if !ok {
10✔
808
                                return
×
809
                        }
×
810

811
                        // Since this block is stale, we update our best height
812
                        // to the previous block.
813
                        blockHeight := chainUpdate.Height
10✔
814
                        b.bestHeight.Store(blockHeight - 1)
10✔
815

10✔
816
                        // Update the channel graph to reflect that this block
10✔
817
                        // was disconnected.
10✔
818
                        _, err := b.cfg.Graph.DisconnectBlockAtHeight(
10✔
819
                                blockHeight,
10✔
820
                        )
10✔
821
                        if err != nil {
10✔
822
                                log.Errorf("unable to prune graph with stale "+
×
823
                                        "block: %v", err)
×
824
                                continue
×
825
                        }
826

827
                        // TODO(halseth): notify client about the reorg?
828

829
                // A new block has arrived, so we can prune the channel graph
830
                // of any channels which were closed in the block.
831
                case chainUpdate, ok := <-b.newBlocks:
66✔
832
                        // If the channel has been closed, then this indicates
66✔
833
                        // the daemon is shutting down, so we exit ourselves.
66✔
834
                        if !ok {
66✔
835
                                return
×
836
                        }
×
837

838
                        // We'll ensure that any new blocks received attach
839
                        // directly to the end of our main chain. If not, then
840
                        // we've somehow missed some blocks. Here we'll catch
841
                        // up the chain with the latest blocks.
842
                        currentHeight := b.bestHeight.Load()
66✔
843
                        switch {
66✔
844
                        case chainUpdate.Height == currentHeight+1:
60✔
845
                                err := b.updateGraphWithClosedChannels(
60✔
846
                                        chainUpdate,
60✔
847
                                )
60✔
848
                                if err != nil {
60✔
849
                                        log.Errorf("unable to prune graph "+
×
850
                                                "with closed channels: %v", err)
×
851
                                }
×
852

853
                        case chainUpdate.Height > currentHeight+1:
1✔
854
                                log.Errorf("out of order block: expecting "+
1✔
855
                                        "height=%v, got height=%v",
1✔
856
                                        currentHeight+1, chainUpdate.Height)
1✔
857

1✔
858
                                err := b.getMissingBlocks(
1✔
859
                                        currentHeight, chainUpdate,
1✔
860
                                )
1✔
861
                                if err != nil {
1✔
862
                                        log.Errorf("unable to retrieve missing"+
×
863
                                                "blocks: %v", err)
×
864
                                }
×
865

866
                        case chainUpdate.Height < currentHeight+1:
5✔
867
                                log.Errorf("out of order block: expecting "+
5✔
868
                                        "height=%v, got height=%v",
5✔
869
                                        currentHeight+1, chainUpdate.Height)
5✔
870

5✔
871
                                log.Infof("Skipping channel pruning since "+
5✔
872
                                        "received block height %v was already"+
5✔
873
                                        " processed.", chainUpdate.Height)
5✔
874
                        }
875

876
                // A new notification client update has arrived. We're either
877
                // gaining a new client, or cancelling notifications for an
878
                // existing client.
879
                case ntfnUpdate := <-b.ntfnClientUpdates:
5✔
880
                        clientID := ntfnUpdate.clientID
5✔
881

5✔
882
                        if ntfnUpdate.cancel {
6✔
883
                                client, ok := b.topologyClients.LoadAndDelete(
1✔
884
                                        clientID,
1✔
885
                                )
1✔
886
                                if ok {
2✔
887
                                        close(client.exit)
1✔
888
                                        client.wg.Wait()
1✔
889

1✔
890
                                        close(client.ntfnChan)
1✔
891
                                }
1✔
892

893
                                continue
1✔
894
                        }
895

896
                        b.topologyClients.Store(clientID, &topologyClient{
4✔
897
                                ntfnChan: ntfnUpdate.ntfnChan,
4✔
898
                                exit:     make(chan struct{}),
4✔
899
                        })
4✔
900

901
                // The graph prune ticker has ticked, so we'll examine the
902
                // state of the known graph to filter out any zombie channels
903
                // for pruning.
904
                case <-graphPruneTicker.C:
×
905
                        if err := b.pruneZombieChans(); err != nil {
×
906
                                log.Errorf("Unable to prune zombies: %v", err)
×
907
                        }
×
908

909
                // Log any stats if we've processed a non-empty number of
910
                // channels, updates, or nodes. We'll only pause the ticker if
911
                // the last window contained no updates to avoid resuming and
912
                // pausing while consecutive windows contain new info.
UNCOV
913
                case <-b.statTicker.Ticks():
×
UNCOV
914
                        if !b.stats.Empty() {
×
UNCOV
915
                                log.Infof(b.stats.String())
×
UNCOV
916
                        } else {
×
917
                                b.statTicker.Pause()
×
918
                        }
×
UNCOV
919
                        b.stats.Reset()
×
920

921
                // The router has been signalled to exit, to we exit our main
922
                // loop so the wait group can be decremented.
923
                case <-b.quit:
19✔
924
                        return
19✔
925
                }
926
        }
927
}
928

929
// getMissingBlocks walks through all missing blocks and updates the graph
930
// closed channels accordingly.
931
func (b *Builder) getMissingBlocks(currentHeight uint32,
932
        chainUpdate *chainview.FilteredBlock) error {
1✔
933

1✔
934
        outdatedHash, err := b.cfg.Chain.GetBlockHash(int64(currentHeight))
1✔
935
        if err != nil {
1✔
936
                return err
×
937
        }
×
938

939
        outdatedBlock := &chainntnfs.BlockEpoch{
1✔
940
                Height: int32(currentHeight),
1✔
941
                Hash:   outdatedHash,
1✔
942
        }
1✔
943

1✔
944
        epochClient, err := b.cfg.Notifier.RegisterBlockEpochNtfn(
1✔
945
                outdatedBlock,
1✔
946
        )
1✔
947
        if err != nil {
1✔
948
                return err
×
949
        }
×
950
        defer epochClient.Cancel()
1✔
951

1✔
952
        blockDifference := int(chainUpdate.Height - currentHeight)
1✔
953

1✔
954
        // We'll walk through all the outdated blocks and make sure we're able
1✔
955
        // to update the graph with any closed channels from them.
1✔
956
        for i := 0; i < blockDifference; i++ {
6✔
957
                var (
5✔
958
                        missingBlock *chainntnfs.BlockEpoch
5✔
959
                        ok           bool
5✔
960
                )
5✔
961

5✔
962
                select {
5✔
963
                case missingBlock, ok = <-epochClient.Epochs:
5✔
964
                        if !ok {
5✔
965
                                return nil
×
966
                        }
×
967

968
                case <-b.quit:
×
969
                        return nil
×
970
                }
971

972
                filteredBlock, err := b.cfg.ChainView.FilterBlock(
5✔
973
                        missingBlock.Hash,
5✔
974
                )
5✔
975
                if err != nil {
5✔
976
                        return err
×
977
                }
×
978

979
                err = b.updateGraphWithClosedChannels(
5✔
980
                        filteredBlock,
5✔
981
                )
5✔
982
                if err != nil {
5✔
983
                        return err
×
984
                }
×
985
        }
986

987
        return nil
1✔
988
}
989

990
// updateGraphWithClosedChannels prunes the channel graph of closed channels
991
// that are no longer needed.
992
func (b *Builder) updateGraphWithClosedChannels(
993
        chainUpdate *chainview.FilteredBlock) error {
65✔
994

65✔
995
        // Once a new block arrives, we update our running track of the height
65✔
996
        // of the chain tip.
65✔
997
        blockHeight := chainUpdate.Height
65✔
998

65✔
999
        b.bestHeight.Store(blockHeight)
65✔
1000
        log.Infof("Pruning channel graph using block %v (height=%v)",
65✔
1001
                chainUpdate.Hash, blockHeight)
65✔
1002

65✔
1003
        // We're only interested in all prior outputs that have been spent in
65✔
1004
        // the block, so collate all the referenced previous outpoints within
65✔
1005
        // each tx and input.
65✔
1006
        var spentOutputs []*wire.OutPoint
65✔
1007
        for _, tx := range chainUpdate.Transactions {
66✔
1008
                for _, txIn := range tx.TxIn {
2✔
1009
                        spentOutputs = append(spentOutputs,
1✔
1010
                                &txIn.PreviousOutPoint)
1✔
1011
                }
1✔
1012
        }
1013

1014
        // With the spent outputs gathered, attempt to prune the channel graph,
1015
        // also passing in the hash+height of the block being pruned so the
1016
        // prune tip can be updated.
1017
        chansClosed, err := b.cfg.Graph.PruneGraph(spentOutputs,
65✔
1018
                &chainUpdate.Hash, chainUpdate.Height)
65✔
1019
        if err != nil {
65✔
1020
                log.Errorf("unable to prune routing table: %v", err)
×
1021
                return err
×
1022
        }
×
1023

1024
        log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
65✔
1025
                blockHeight, len(chansClosed))
65✔
1026

65✔
1027
        if len(chansClosed) == 0 {
129✔
1028
                return err
64✔
1029
        }
64✔
1030

1031
        // Notify all currently registered clients of the newly closed channels.
1032
        closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
1✔
1033
        b.notifyTopologyChange(&TopologyChange{
1✔
1034
                ClosedChannels: closeSummaries,
1✔
1035
        })
1✔
1036

1✔
1037
        return nil
1✔
1038
}
1039

1040
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
1041
// the database for the passed node with a timestamp newer than the passed
1042
// timestamp. ErrIgnored will be returned if we already have the node, and
1043
// ErrOutdated will be returned if we have a timestamp that's after the new
1044
// timestamp.
1045
func (b *Builder) assertNodeAnnFreshness(node route.Vertex,
1046
        msgTimestamp time.Time) error {
10✔
1047

10✔
1048
        // If we are not already aware of this node, it means that we don't
10✔
1049
        // know about any channel using this node. To avoid a DoS attack by
10✔
1050
        // node announcements, we will ignore such nodes. If we do know about
10✔
1051
        // this node, check that this update brings info newer than what we
10✔
1052
        // already have.
10✔
1053
        lastUpdate, exists, err := b.cfg.Graph.HasLightningNode(node)
10✔
1054
        if err != nil {
10✔
1055
                return errors.Errorf("unable to query for the "+
×
1056
                        "existence of node: %v", err)
×
1057
        }
×
1058
        if !exists {
11✔
1059
                return NewErrf(ErrIgnored, "Ignoring node announcement"+
1✔
1060
                        " for node not found in channel graph (%x)",
1✔
1061
                        node[:])
1✔
1062
        }
1✔
1063

1064
        // If we've reached this point then we're aware of the vertex being
1065
        // advertised. So we now check if the new message has a new time stamp,
1066
        // if not then we won't accept the new data as it would override newer
1067
        // data.
1068
        if !lastUpdate.Before(msgTimestamp) {
10✔
1069
                return NewErrf(ErrOutdated, "Ignoring outdated "+
1✔
1070
                        "announcement for %x", node[:])
1✔
1071
        }
1✔
1072

1073
        return nil
8✔
1074
}
1075

1076
// addZombieEdge adds a channel that failed complete validation into the zombie
1077
// index so we can avoid having to re-validate it in the future.
1078
func (b *Builder) addZombieEdge(chanID uint64) error {
3✔
1079
        // If the edge fails validation we'll mark the edge itself as a zombie
3✔
1080
        // so we don't continue to request it. We use the "zero key" for both
3✔
1081
        // node pubkeys so this edge can't be resurrected.
3✔
1082
        var zeroKey [33]byte
3✔
1083
        err := b.cfg.Graph.MarkEdgeZombie(chanID, zeroKey, zeroKey)
3✔
1084
        if err != nil {
3✔
1085
                return fmt.Errorf("unable to mark spent chan(id=%v) as a "+
×
1086
                        "zombie: %w", chanID, err)
×
1087
        }
×
1088

1089
        return nil
3✔
1090
}
1091

1092
// makeFundingScript is used to make the funding script for both segwit v0 and
1093
// segwit v1 (taproot) channels.
1094
//
1095
// TODO(roasbeef: export and use elsewhere?
1096
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte, chanFeatures []byte,
1097
        tapscriptRoot fn.Option[chainhash.Hash]) ([]byte, error) {
16✔
1098

16✔
1099
        legacyFundingScript := func() ([]byte, error) {
32✔
1100
                witnessScript, err := input.GenMultiSigScript(
16✔
1101
                        bitcoinKey1, bitcoinKey2,
16✔
1102
                )
16✔
1103
                if err != nil {
16✔
1104
                        return nil, err
×
1105
                }
×
1106
                pkScript, err := input.WitnessScriptHash(witnessScript)
16✔
1107
                if err != nil {
16✔
1108
                        return nil, err
×
1109
                }
×
1110

1111
                return pkScript, nil
16✔
1112
        }
1113

1114
        if len(chanFeatures) == 0 {
32✔
1115
                return legacyFundingScript()
16✔
1116
        }
16✔
1117

1118
        // In order to make the correct funding script, we'll need to parse the
1119
        // chanFeatures bytes into a feature vector we can interact with.
UNCOV
1120
        rawFeatures := lnwire.NewRawFeatureVector()
×
UNCOV
1121
        err := rawFeatures.Decode(bytes.NewReader(chanFeatures))
×
UNCOV
1122
        if err != nil {
×
1123
                return nil, fmt.Errorf("unable to parse chan feature "+
×
1124
                        "bits: %w", err)
×
1125
        }
×
1126

UNCOV
1127
        chanFeatureBits := lnwire.NewFeatureVector(
×
UNCOV
1128
                rawFeatures, lnwire.Features,
×
UNCOV
1129
        )
×
UNCOV
1130
        if chanFeatureBits.HasFeature(
×
UNCOV
1131
                lnwire.SimpleTaprootChannelsOptionalStaging,
×
UNCOV
1132
        ) {
×
UNCOV
1133

×
UNCOV
1134
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
×
UNCOV
1135
                if err != nil {
×
1136
                        return nil, err
×
1137
                }
×
UNCOV
1138
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
×
UNCOV
1139
                if err != nil {
×
1140
                        return nil, err
×
1141
                }
×
1142

UNCOV
1143
                fundingScript, _, err := input.GenTaprootFundingScript(
×
UNCOV
1144
                        pubKey1, pubKey2, 0, tapscriptRoot,
×
UNCOV
1145
                )
×
UNCOV
1146
                if err != nil {
×
1147
                        return nil, err
×
1148
                }
×
1149

1150
                // TODO(roasbeef): add tapscript root to gossip v1.5
1151

UNCOV
1152
                return fundingScript, nil
×
1153
        }
1154

UNCOV
1155
        return legacyFundingScript()
×
1156
}
1157

1158
// processUpdate processes a new relate authenticated channel/edge, node or
1159
// channel/edge update network update. If the update didn't affect the internal
1160
// state of the draft due to either being out of date, invalid, or redundant,
1161
// then error is returned.
1162
//
1163
//nolint:funlen
1164
func (b *Builder) processUpdate(msg interface{},
1165
        op ...batch.SchedulerOption) error {
30✔
1166

30✔
1167
        switch msg := msg.(type) {
30✔
1168
        case *models.LightningNode:
7✔
1169
                // Before we add the node to the database, we'll check to see
7✔
1170
                // if the announcement is "fresh" or not. If it isn't, then
7✔
1171
                // we'll return an error.
7✔
1172
                err := b.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate)
7✔
1173
                if err != nil {
8✔
1174
                        return err
1✔
1175
                }
1✔
1176

1177
                if err := b.cfg.Graph.AddLightningNode(msg, op...); err != nil {
6✔
1178
                        return errors.Errorf("unable to add node %x to the "+
×
1179
                                "graph: %v", msg.PubKeyBytes, err)
×
1180
                }
×
1181

1182
                log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes)
6✔
1183
                b.stats.incNumNodeUpdates()
6✔
1184

1185
        case *models.ChannelEdgeInfo:
17✔
1186
                log.Debugf("Received ChannelEdgeInfo for channel %v",
17✔
1187
                        msg.ChannelID)
17✔
1188

17✔
1189
                // Prior to processing the announcement we first check if we
17✔
1190
                // already know of this channel, if so, then we can exit early.
17✔
1191
                _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
17✔
1192
                        msg.ChannelID,
17✔
1193
                )
17✔
1194
                if err != nil &&
17✔
1195
                        !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
17✔
1196

×
1197
                        return errors.Errorf("unable to check for edge "+
×
1198
                                "existence: %v", err)
×
1199
                }
×
1200
                if isZombie {
17✔
1201
                        return NewErrf(ErrIgnored, "ignoring msg for zombie "+
×
1202
                                "chan_id=%v", msg.ChannelID)
×
1203
                }
×
1204
                if exists {
17✔
UNCOV
1205
                        return NewErrf(ErrIgnored, "ignoring msg for known "+
×
UNCOV
1206
                                "chan_id=%v", msg.ChannelID)
×
UNCOV
1207
                }
×
1208

1209
                // If AssumeChannelValid is present, then we are unable to
1210
                // perform any of the expensive checks below, so we'll
1211
                // short-circuit our path straight to adding the edge to our
1212
                // graph. If the passed ShortChannelID is an alias, then we'll
1213
                // skip validation as it will not map to a legitimate tx. This
1214
                // is not a DoS vector as only we can add an alias
1215
                // ChannelAnnouncement from the gossiper.
1216
                scid := lnwire.NewShortChanIDFromInt(msg.ChannelID)
17✔
1217
                if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) {
17✔
UNCOV
1218
                        err := b.cfg.Graph.AddChannelEdge(msg, op...)
×
UNCOV
1219
                        if err != nil {
×
1220
                                return fmt.Errorf("unable to add edge: %w", err)
×
1221
                        }
×
UNCOV
1222
                        log.Tracef("New channel discovered! Link "+
×
UNCOV
1223
                                "connects %x and %x with ChannelID(%v)",
×
UNCOV
1224
                                msg.NodeKey1Bytes, msg.NodeKey2Bytes,
×
UNCOV
1225
                                msg.ChannelID)
×
UNCOV
1226
                        b.stats.incNumEdgesDiscovered()
×
UNCOV
1227

×
UNCOV
1228
                        break
×
1229
                }
1230

1231
                // Before we can add the channel to the channel graph, we need
1232
                // to obtain the full funding outpoint that's encoded within
1233
                // the channel ID.
1234
                channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
17✔
1235
                fundingTx, err := lnwallet.FetchFundingTxWrapper(
17✔
1236
                        b.cfg.Chain, &channelID, b.quit,
17✔
1237
                )
17✔
1238
                if err != nil {
18✔
1239
                        //nolint:ll
1✔
1240
                        //
1✔
1241
                        // In order to ensure we don't erroneously mark a
1✔
1242
                        // channel as a zombie due to an RPC failure, we'll
1✔
1243
                        // attempt to string match for the relevant errors.
1✔
1244
                        //
1✔
1245
                        // * btcd:
1✔
1246
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
1✔
1247
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
1✔
1248
                        // * bitcoind:
1✔
1249
                        //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
1✔
1250
                        //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
1✔
1251
                        switch {
1✔
1252
                        case strings.Contains(err.Error(), "not found"):
×
1253
                                fallthrough
×
1254

1255
                        case strings.Contains(err.Error(), "out of range"):
1✔
1256
                                // If the funding transaction isn't found at
1✔
1257
                                // all, then we'll mark the edge itself as a
1✔
1258
                                // zombie so we don't continue to request it.
1✔
1259
                                // We use the "zero key" for both node pubkeys
1✔
1260
                                // so this edge can't be resurrected.
1✔
1261
                                zErr := b.addZombieEdge(msg.ChannelID)
1✔
1262
                                if zErr != nil {
1✔
1263
                                        return zErr
×
1264
                                }
×
1265

1266
                        default:
×
1267
                        }
1268

1269
                        return NewErrf(ErrNoFundingTransaction, "unable to "+
1✔
1270
                                "locate funding tx: %v", err)
1✔
1271
                }
1272

1273
                // Recreate witness output to be sure that declared in channel
1274
                // edge bitcoin keys and channel value corresponds to the
1275
                // reality.
1276
                fundingPkScript, err := makeFundingScript(
16✔
1277
                        msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:],
16✔
1278
                        msg.Features, msg.TapscriptRoot,
16✔
1279
                )
16✔
1280
                if err != nil {
16✔
1281
                        return err
×
1282
                }
×
1283

1284
                // Next we'll validate that this channel is actually well
1285
                // formed. If this check fails, then this channel either
1286
                // doesn't exist, or isn't the one that was meant to be created
1287
                // according to the passed channel proofs.
1288
                fundingPoint, err := chanvalidate.Validate(
16✔
1289
                        &chanvalidate.Context{
16✔
1290
                                Locator: &chanvalidate.ShortChanIDChanLocator{
16✔
1291
                                        ID: channelID,
16✔
1292
                                },
16✔
1293
                                MultiSigPkScript: fundingPkScript,
16✔
1294
                                FundingTx:        fundingTx,
16✔
1295
                        },
16✔
1296
                )
16✔
1297
                if err != nil {
17✔
1298
                        // Mark the edge as a zombie so we won't try to
1✔
1299
                        // re-validate it on start up.
1✔
1300
                        if err := b.addZombieEdge(msg.ChannelID); err != nil {
1✔
1301
                                return err
×
1302
                        }
×
1303

1304
                        return NewErrf(ErrInvalidFundingOutput, "output "+
1✔
1305
                                "failed validation: %w", err)
1✔
1306
                }
1307

1308
                // Now that we have the funding outpoint of the channel, ensure
1309
                // that it hasn't yet been spent. If so, then this channel has
1310
                // been closed so we'll ignore it.
1311
                chanUtxo, err := b.cfg.Chain.GetUtxo(
15✔
1312
                        fundingPoint, fundingPkScript, channelID.BlockHeight,
15✔
1313
                        b.quit,
15✔
1314
                )
15✔
1315
                if err != nil {
16✔
1316
                        if errors.Is(err, btcwallet.ErrOutputSpent) {
2✔
1317
                                zErr := b.addZombieEdge(msg.ChannelID)
1✔
1318
                                if zErr != nil {
1✔
1319
                                        return zErr
×
1320
                                }
×
1321
                        }
1322

1323
                        return NewErrf(ErrChannelSpent, "unable to fetch utxo "+
1✔
1324
                                "for chan_id=%v, chan_point=%v: %v",
1✔
1325
                                msg.ChannelID, fundingPoint, err)
1✔
1326
                }
1327

1328
                // TODO(roasbeef): this is a hack, needs to be removed
1329
                // after commitment fees are dynamic.
1330
                msg.Capacity = btcutil.Amount(chanUtxo.Value)
14✔
1331
                msg.ChannelPoint = *fundingPoint
14✔
1332
                if err := b.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
14✔
1333
                        return errors.Errorf("unable to add edge: %v", err)
×
1334
                }
×
1335

1336
                log.Debugf("New channel discovered! Link "+
14✔
1337
                        "connects %x and %x with ChannelPoint(%v): "+
14✔
1338
                        "chan_id=%v, capacity=%v",
14✔
1339
                        msg.NodeKey1Bytes, msg.NodeKey2Bytes,
14✔
1340
                        fundingPoint, msg.ChannelID, msg.Capacity)
14✔
1341
                b.stats.incNumEdgesDiscovered()
14✔
1342

14✔
1343
                // As a new edge has been added to the channel graph, we'll
14✔
1344
                // update the current UTXO filter within our active
14✔
1345
                // FilteredChainView so we are notified if/when this channel is
14✔
1346
                // closed.
14✔
1347
                filterUpdate := []graphdb.EdgePoint{
14✔
1348
                        {
14✔
1349
                                FundingPkScript: fundingPkScript,
14✔
1350
                                OutPoint:        *fundingPoint,
14✔
1351
                        },
14✔
1352
                }
14✔
1353
                err = b.cfg.ChainView.UpdateFilter(
14✔
1354
                        filterUpdate, b.bestHeight.Load(),
14✔
1355
                )
14✔
1356
                if err != nil {
14✔
1357
                        return errors.Errorf("unable to update chain "+
×
1358
                                "view: %v", err)
×
1359
                }
×
1360

1361
        case *models.ChannelEdgePolicy:
6✔
1362
                log.Debugf("Received ChannelEdgePolicy for channel %v",
6✔
1363
                        msg.ChannelID)
6✔
1364

6✔
1365
                // We make sure to hold the mutex for this channel ID,
6✔
1366
                // such that no other goroutine is concurrently doing
6✔
1367
                // database accesses for the same channel ID.
6✔
1368
                b.channelEdgeMtx.Lock(msg.ChannelID)
6✔
1369
                defer b.channelEdgeMtx.Unlock(msg.ChannelID)
6✔
1370

6✔
1371
                edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
6✔
1372
                        b.cfg.Graph.HasChannelEdge(msg.ChannelID)
6✔
1373
                if err != nil && !errors.Is(
6✔
1374
                        err, graphdb.ErrGraphNoEdgesFound,
6✔
1375
                ) {
6✔
1376

×
1377
                        return errors.Errorf("unable to check for edge "+
×
1378
                                "existence: %v", err)
×
1379
                }
×
1380

1381
                // If the channel is marked as a zombie in our database, and
1382
                // we consider this a stale update, then we should not apply the
1383
                // policy.
1384
                isStaleUpdate := time.Since(msg.LastUpdate) >
6✔
1385
                        b.cfg.ChannelPruneExpiry
6✔
1386

6✔
1387
                if isZombie && isStaleUpdate {
6✔
1388
                        return NewErrf(ErrIgnored, "ignoring stale update "+
×
1389
                                "(flags=%v|%v) for zombie chan_id=%v",
×
1390
                                msg.MessageFlags, msg.ChannelFlags,
×
1391
                                msg.ChannelID)
×
1392
                }
×
1393

1394
                // If the channel doesn't exist in our database, we cannot
1395
                // apply the updated policy.
1396
                if !exists {
7✔
1397
                        return NewErrf(ErrIgnored, "ignoring update "+
1✔
1398
                                "(flags=%v|%v) for unknown chan_id=%v",
1✔
1399
                                msg.MessageFlags, msg.ChannelFlags,
1✔
1400
                                msg.ChannelID)
1✔
1401
                }
1✔
1402

1403
                // As edges are directional edge node has a unique policy for
1404
                // the direction of the edge they control. Therefore, we first
1405
                // check if we already have the most up-to-date information for
1406
                // that edge. If this message has a timestamp not strictly
1407
                // newer than what we already know of we can exit early.
1408
                switch {
5✔
1409
                // A flag set of 0 indicates this is an announcement for the
1410
                // "first" node in the channel.
1411
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
3✔
1412

3✔
1413
                        // Ignore outdated message.
3✔
1414
                        if !edge1Timestamp.Before(msg.LastUpdate) {
3✔
UNCOV
1415
                                return NewErrf(ErrOutdated, "Ignoring "+
×
UNCOV
1416
                                        "outdated update (flags=%v|%v) for "+
×
UNCOV
1417
                                        "known chan_id=%v", msg.MessageFlags,
×
UNCOV
1418
                                        msg.ChannelFlags, msg.ChannelID)
×
UNCOV
1419
                        }
×
1420

1421
                // Similarly, a flag set of 1 indicates this is an announcement
1422
                // for the "second" node in the channel.
1423
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
2✔
1424

2✔
1425
                        // Ignore outdated message.
2✔
1426
                        if !edge2Timestamp.Before(msg.LastUpdate) {
2✔
UNCOV
1427
                                return NewErrf(ErrOutdated, "Ignoring "+
×
UNCOV
1428
                                        "outdated update (flags=%v|%v) for "+
×
UNCOV
1429
                                        "known chan_id=%v", msg.MessageFlags,
×
UNCOV
1430
                                        msg.ChannelFlags, msg.ChannelID)
×
UNCOV
1431
                        }
×
1432
                }
1433

1434
                // Now that we know this isn't a stale update, we'll apply the
1435
                // new edge policy to the proper directional edge within the
1436
                // channel graph.
1437
                if err = b.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
5✔
1438
                        err := errors.Errorf("unable to add channel: %v", err)
×
1439
                        log.Error(err)
×
1440
                        return err
×
1441
                }
×
1442

1443
                log.Tracef("New channel update applied: %v",
5✔
1444
                        lnutils.SpewLogClosure(msg))
5✔
1445
                b.stats.incNumChannelUpdates()
5✔
1446

1447
        default:
×
1448
                return errors.Errorf("wrong routing update message type")
×
1449
        }
1450

1451
        return nil
25✔
1452
}
1453

1454
// routingMsg couples a routing related routing topology update to the
1455
// error channel.
1456
type routingMsg struct {
1457
        msg interface{}
1458
        op  []batch.SchedulerOption
1459
        err chan error
1460
}
1461

1462
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
1463
// database. It returns a bool indicating whether the updates were successful.
UNCOV
1464
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
×
UNCOV
1465
        ch, _, _, err := b.GetChannelByID(msg.ShortChannelID)
×
UNCOV
1466
        if err != nil {
×
UNCOV
1467
                log.Errorf("Unable to retrieve channel by id: %v", err)
×
UNCOV
1468
                return false
×
UNCOV
1469
        }
×
1470

UNCOV
1471
        var pubKey *btcec.PublicKey
×
UNCOV
1472

×
UNCOV
1473
        switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
×
UNCOV
1474
        case 0:
×
UNCOV
1475
                pubKey, _ = ch.NodeKey1()
×
1476

UNCOV
1477
        case 1:
×
UNCOV
1478
                pubKey, _ = ch.NodeKey2()
×
1479
        }
1480

1481
        // Exit early if the pubkey cannot be decided.
UNCOV
1482
        if pubKey == nil {
×
1483
                log.Errorf("Unable to decide pubkey with ChannelFlags=%v",
×
1484
                        msg.ChannelFlags)
×
1485
                return false
×
1486
        }
×
1487

UNCOV
1488
        err = netann.ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg)
×
UNCOV
1489
        if err != nil {
×
1490
                log.Errorf("Unable to validate channel update: %v", err)
×
1491
                return false
×
1492
        }
×
1493

UNCOV
1494
        err = b.UpdateEdge(&models.ChannelEdgePolicy{
×
UNCOV
1495
                SigBytes:                  msg.Signature.ToSignatureBytes(),
×
UNCOV
1496
                ChannelID:                 msg.ShortChannelID.ToUint64(),
×
UNCOV
1497
                LastUpdate:                time.Unix(int64(msg.Timestamp), 0),
×
UNCOV
1498
                MessageFlags:              msg.MessageFlags,
×
UNCOV
1499
                ChannelFlags:              msg.ChannelFlags,
×
UNCOV
1500
                TimeLockDelta:             msg.TimeLockDelta,
×
UNCOV
1501
                MinHTLC:                   msg.HtlcMinimumMsat,
×
UNCOV
1502
                MaxHTLC:                   msg.HtlcMaximumMsat,
×
UNCOV
1503
                FeeBaseMSat:               lnwire.MilliSatoshi(msg.BaseFee),
×
UNCOV
1504
                FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
×
UNCOV
1505
                ExtraOpaqueData:           msg.ExtraOpaqueData,
×
UNCOV
1506
        })
×
UNCOV
1507
        if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
×
1508
                log.Errorf("Unable to apply channel update: %v", err)
×
1509
                return false
×
1510
        }
×
1511

UNCOV
1512
        return true
×
1513
}
1514

1515
// AddNode is used to add information about a node to the router database. If
1516
// the node with this pubkey is not present in an existing channel, it will
1517
// be ignored.
1518
//
1519
// NOTE: This method is part of the ChannelGraphSource interface.
1520
func (b *Builder) AddNode(node *models.LightningNode,
1521
        op ...batch.SchedulerOption) error {
7✔
1522

7✔
1523
        rMsg := &routingMsg{
7✔
1524
                msg: node,
7✔
1525
                op:  op,
7✔
1526
                err: make(chan error, 1),
7✔
1527
        }
7✔
1528

7✔
1529
        select {
7✔
1530
        case b.networkUpdates <- rMsg:
7✔
1531
                select {
7✔
1532
                case err := <-rMsg.err:
7✔
1533
                        return err
7✔
1534
                case <-b.quit:
×
1535
                        return ErrGraphBuilderShuttingDown
×
1536
                }
1537
        case <-b.quit:
×
1538
                return ErrGraphBuilderShuttingDown
×
1539
        }
1540
}
1541

1542
// AddEdge is used to add edge/channel to the topology of the router, after all
1543
// information about channel will be gathered this edge/channel might be used
1544
// in construction of payment path.
1545
//
1546
// NOTE: This method is part of the ChannelGraphSource interface.
1547
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
1548
        op ...batch.SchedulerOption) error {
17✔
1549

17✔
1550
        rMsg := &routingMsg{
17✔
1551
                msg: edge,
17✔
1552
                op:  op,
17✔
1553
                err: make(chan error, 1),
17✔
1554
        }
17✔
1555

17✔
1556
        select {
17✔
1557
        case b.networkUpdates <- rMsg:
17✔
1558
                select {
17✔
1559
                case err := <-rMsg.err:
17✔
1560
                        return err
17✔
1561
                case <-b.quit:
×
1562
                        return ErrGraphBuilderShuttingDown
×
1563
                }
1564
        case <-b.quit:
×
1565
                return ErrGraphBuilderShuttingDown
×
1566
        }
1567
}
1568

1569
// UpdateEdge is used to update edge information, without this message edge
1570
// considered as not fully constructed.
1571
//
1572
// NOTE: This method is part of the ChannelGraphSource interface.
1573
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
1574
        op ...batch.SchedulerOption) error {
6✔
1575

6✔
1576
        rMsg := &routingMsg{
6✔
1577
                msg: update,
6✔
1578
                op:  op,
6✔
1579
                err: make(chan error, 1),
6✔
1580
        }
6✔
1581

6✔
1582
        select {
6✔
1583
        case b.networkUpdates <- rMsg:
6✔
1584
                select {
6✔
1585
                case err := <-rMsg.err:
6✔
1586
                        return err
6✔
1587
                case <-b.quit:
×
1588
                        return ErrGraphBuilderShuttingDown
×
1589
                }
1590
        case <-b.quit:
×
1591
                return ErrGraphBuilderShuttingDown
×
1592
        }
1593
}
1594

1595
// CurrentBlockHeight returns the block height from POV of the router subsystem.
1596
//
1597
// NOTE: This method is part of the ChannelGraphSource interface.
UNCOV
1598
func (b *Builder) CurrentBlockHeight() (uint32, error) {
×
UNCOV
1599
        _, height, err := b.cfg.Chain.GetBestBlock()
×
UNCOV
1600
        return uint32(height), err
×
UNCOV
1601
}
×
1602

1603
// SyncedHeight returns the block height to which the router subsystem currently
1604
// is synced to. This can differ from the above chain height if the goroutine
1605
// responsible for processing the blocks isn't yet up to speed.
UNCOV
1606
func (b *Builder) SyncedHeight() uint32 {
×
UNCOV
1607
        return b.bestHeight.Load()
×
UNCOV
1608
}
×
1609

1610
// GetChannelByID return the channel by the channel id.
1611
//
1612
// NOTE: This method is part of the ChannelGraphSource interface.
1613
func (b *Builder) GetChannelByID(chanID lnwire.ShortChannelID) (
1614
        *models.ChannelEdgeInfo,
1615
        *models.ChannelEdgePolicy,
1616
        *models.ChannelEdgePolicy, error) {
1✔
1617

1✔
1618
        return b.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
1✔
1619
}
1✔
1620

1621
// FetchLightningNode attempts to look up a target node by its identity public
1622
// key. graphdb.ErrGraphNodeNotFound is returned if the node doesn't exist
1623
// within the graph.
1624
//
1625
// NOTE: This method is part of the ChannelGraphSource interface.
1626
func (b *Builder) FetchLightningNode(
UNCOV
1627
        node route.Vertex) (*models.LightningNode, error) {
×
UNCOV
1628

×
UNCOV
1629
        return b.cfg.Graph.FetchLightningNode(node)
×
UNCOV
1630
}
×
1631

1632
// ForEachNode is used to iterate over every node in router topology.
1633
//
1634
// NOTE: This method is part of the ChannelGraphSource interface.
1635
func (b *Builder) ForEachNode(
1636
        cb func(*models.LightningNode) error) error {
×
1637

×
1638
        return b.cfg.Graph.ForEachNode(
×
1639
                func(_ kvdb.RTx, n *models.LightningNode) error {
×
1640
                        return cb(n)
×
1641
                })
×
1642
}
1643

1644
// ForAllOutgoingChannels is used to iterate over all outgoing channels owned by
1645
// the router.
1646
//
1647
// NOTE: This method is part of the ChannelGraphSource interface.
1648
func (b *Builder) ForAllOutgoingChannels(cb func(*models.ChannelEdgeInfo,
UNCOV
1649
        *models.ChannelEdgePolicy) error) error {
×
UNCOV
1650

×
UNCOV
1651
        return b.cfg.Graph.ForEachNodeChannel(b.cfg.SelfNode,
×
UNCOV
1652
                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
×
UNCOV
1653
                        e *models.ChannelEdgePolicy,
×
UNCOV
1654
                        _ *models.ChannelEdgePolicy) error {
×
UNCOV
1655

×
UNCOV
1656
                        if e == nil {
×
1657
                                return fmt.Errorf("channel from self node " +
×
1658
                                        "has no policy")
×
1659
                        }
×
1660

UNCOV
1661
                        return cb(c, e)
×
1662
                },
1663
        )
1664
}
1665

1666
// AddProof updates the channel edge info with proof which is needed to
1667
// properly announce the edge to the rest of the network.
1668
//
1669
// NOTE: This method is part of the ChannelGraphSource interface.
1670
func (b *Builder) AddProof(chanID lnwire.ShortChannelID,
1671
        proof *models.ChannelAuthProof) error {
1✔
1672

1✔
1673
        info, _, _, err := b.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
1✔
1674
        if err != nil {
1✔
1675
                return err
×
1676
        }
×
1677

1678
        info.AuthProof = proof
1✔
1679

1✔
1680
        return b.cfg.Graph.UpdateChannelEdge(info)
1✔
1681
}
1682

1683
// IsStaleNode returns true if the graph source has a node announcement for the
1684
// target node with a more recent timestamp.
1685
//
1686
// NOTE: This method is part of the ChannelGraphSource interface.
1687
func (b *Builder) IsStaleNode(node route.Vertex,
1688
        timestamp time.Time) bool {
3✔
1689

3✔
1690
        // If our attempt to assert that the node announcement is fresh fails,
3✔
1691
        // then we know that this is actually a stale announcement.
3✔
1692
        err := b.assertNodeAnnFreshness(node, timestamp)
3✔
1693
        if err != nil {
4✔
1694
                log.Debugf("Checking stale node %x got %v", node, err)
1✔
1695
                return true
1✔
1696
        }
1✔
1697

1698
        return false
2✔
1699
}
1700

1701
// IsPublicNode determines whether the given vertex is seen as a public node in
1702
// the graph from the graph's source node's point of view.
1703
//
1704
// NOTE: This method is part of the ChannelGraphSource interface.
UNCOV
1705
func (b *Builder) IsPublicNode(node route.Vertex) (bool, error) {
×
UNCOV
1706
        return b.cfg.Graph.IsPublicNode(node)
×
UNCOV
1707
}
×
1708

1709
// IsKnownEdge returns true if the graph source already knows of the passed
1710
// channel ID either as a live or zombie edge.
1711
//
1712
// NOTE: This method is part of the ChannelGraphSource interface.
1713
func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
1✔
1714
        _, _, exists, isZombie, _ := b.cfg.Graph.HasChannelEdge(
1✔
1715
                chanID.ToUint64(),
1✔
1716
        )
1✔
1717

1✔
1718
        return exists || isZombie
1✔
1719
}
1✔
1720

1721
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
1722
// the passed channel ID (and flags) that have a more recent timestamp.
1723
//
1724
// NOTE: This method is part of the ChannelGraphSource interface.
1725
func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
1726
        timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
6✔
1727

6✔
1728
        edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
6✔
1729
                b.cfg.Graph.HasChannelEdge(chanID.ToUint64())
6✔
1730
        if err != nil {
6✔
1731
                log.Debugf("Check stale edge policy got error: %v", err)
×
1732
                return false
×
1733
        }
×
1734

1735
        // If we know of the edge as a zombie, then we'll make some additional
1736
        // checks to determine if the new policy is fresh.
1737
        if isZombie {
6✔
UNCOV
1738
                // When running with AssumeChannelValid, we also prune channels
×
UNCOV
1739
                // if both of their edges are disabled. We'll mark the new
×
UNCOV
1740
                // policy as stale if it remains disabled.
×
UNCOV
1741
                if b.cfg.AssumeChannelValid {
×
1742
                        isDisabled := flags&lnwire.ChanUpdateDisabled ==
×
1743
                                lnwire.ChanUpdateDisabled
×
1744
                        if isDisabled {
×
1745
                                return true
×
1746
                        }
×
1747
                }
1748

1749
                // Otherwise, we'll fall back to our usual ChannelPruneExpiry.
UNCOV
1750
                return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
×
1751
        }
1752

1753
        // If we don't know of the edge, then it means it's fresh (thus not
1754
        // stale).
1755
        if !exists {
8✔
1756
                return false
2✔
1757
        }
2✔
1758

1759
        // As edges are directional edge node has a unique policy for the
1760
        // direction of the edge they control. Therefore, we first check if we
1761
        // already have the most up-to-date information for that edge. If so,
1762
        // then we can exit early.
1763
        switch {
4✔
1764
        // A flag set of 0 indicates this is an announcement for the "first"
1765
        // node in the channel.
1766
        case flags&lnwire.ChanUpdateDirection == 0:
2✔
1767
                return !edge1Timestamp.Before(timestamp)
2✔
1768

1769
        // Similarly, a flag set of 1 indicates this is an announcement for the
1770
        // "second" node in the channel.
1771
        case flags&lnwire.ChanUpdateDirection == 1:
2✔
1772
                return !edge2Timestamp.Before(timestamp)
2✔
1773
        }
1774

1775
        return false
×
1776
}
1777

1778
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
1779
//
1780
// NOTE: This method is part of the ChannelGraphSource interface.
UNCOV
1781
func (b *Builder) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
×
UNCOV
1782
        return b.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
×
UNCOV
1783
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc