• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13440912774

20 Feb 2025 05:14PM UTC coverage: 57.697% (-1.1%) from 58.802%
13440912774

Pull #9535

github

guggero
GitHub: remove duplicate caching

Turns out that actions/setup-go starting with @v4 also adds caching.
With that, our cache size on disk has almost doubled, leading to the
GitHub runner running out of space in certain situation.
We fix that by disabling the automated caching since we already have our
own, custom-tailored version.
Pull Request #9535: GitHub: remove duplicate caching

103519 of 179417 relevant lines covered (57.7%)

24825.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.68
/graph/builder.go
1
package graph
2

3
import (
4
        "fmt"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "github.com/btcsuite/btcd/btcec/v2"
10
        "github.com/btcsuite/btcd/wire"
11
        "github.com/go-errors/errors"
12
        "github.com/lightningnetwork/lnd/batch"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        graphdb "github.com/lightningnetwork/lnd/graph/db"
15
        "github.com/lightningnetwork/lnd/graph/db/models"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwire"
20
        "github.com/lightningnetwork/lnd/multimutex"
21
        "github.com/lightningnetwork/lnd/netann"
22
        "github.com/lightningnetwork/lnd/routing/chainview"
23
        "github.com/lightningnetwork/lnd/routing/route"
24
        "github.com/lightningnetwork/lnd/ticker"
25
)
26

27
const (
28
        // DefaultChannelPruneExpiry is the default duration used to determine
29
        // if a channel should be pruned or not.
30
        DefaultChannelPruneExpiry = time.Hour * 24 * 14
31

32
        // DefaultFirstTimePruneDelay is the time we'll wait after startup
33
        // before attempting to prune the graph for zombie channels. We don't
34
        // do it immediately after startup to allow lnd to start up without
35
        // getting blocked by this job.
36
        DefaultFirstTimePruneDelay = 30 * time.Second
37

38
        // defaultStatInterval governs how often the router will log non-empty
39
        // stats related to processing new channels, updates, or node
40
        // announcements.
41
        defaultStatInterval = time.Minute
42
)
43

44
var (
45
        // ErrGraphBuilderShuttingDown is returned if the graph builder is in
46
        // the process of shutting down.
47
        ErrGraphBuilderShuttingDown = fmt.Errorf("graph builder shutting down")
48
)
49

50
// Config holds the configuration required by the Builder.
51
type Config struct {
52
        // SelfNode is the public key of the node that this channel router
53
        // belongs to.
54
        SelfNode route.Vertex
55

56
        // Graph is the channel graph that the ChannelRouter will use to gather
57
        // metrics from and also to carry out path finding queries.
58
        Graph DB
59

60
        // Chain is the router's source to the most up-to-date blockchain data.
61
        // All incoming advertised channels will be checked against the chain
62
        // to ensure that the channels advertised are still open.
63
        Chain lnwallet.BlockChainIO
64

65
        // ChainView is an instance of a FilteredChainView which is used to
66
        // watch the sub-set of the UTXO set (the set of active channels) that
67
        // we need in order to properly maintain the channel graph.
68
        ChainView chainview.FilteredChainView
69

70
        // Notifier is a reference to the ChainNotifier, used to grab
71
        // the latest blocks if the router is missing any.
72
        Notifier chainntnfs.ChainNotifier
73

74
        // ChannelPruneExpiry is the duration used to determine if a channel
75
        // should be pruned or not. If the delta between now and when the
76
        // channel was last updated is greater than ChannelPruneExpiry, then
77
        // the channel is marked as a zombie channel eligible for pruning.
78
        ChannelPruneExpiry time.Duration
79

80
        // GraphPruneInterval is used as an interval to determine how often we
81
        // should examine the channel graph to garbage collect zombie channels.
82
        GraphPruneInterval time.Duration
83

84
        // FirstTimePruneDelay is the time we'll wait after startup before
85
        // attempting to prune the graph for zombie channels. We don't do it
86
        // immediately after startup to allow lnd to start up without getting
87
        // blocked by this job.
88
        FirstTimePruneDelay time.Duration
89

90
        // AssumeChannelValid toggles whether the builder will prune channels
91
        // based on their spentness vs using the fact that they are considered
92
        // zombies.
93
        AssumeChannelValid bool
94

95
        // StrictZombiePruning determines if we attempt to prune zombie
96
        // channels according to a stricter criteria. If true, then we'll prune
97
        // a channel if only *one* of the edges is considered a zombie.
98
        // Otherwise, we'll only prune the channel when both edges have a very
99
        // dated last update.
100
        StrictZombiePruning bool
101

102
        // IsAlias returns whether a passed ShortChannelID is an alias. This is
103
        // only used for our local channels.
104
        IsAlias func(scid lnwire.ShortChannelID) bool
105
}
106

107
// Builder builds and maintains a view of the Lightning Network graph.
108
type Builder struct {
109
        started atomic.Bool
110
        stopped atomic.Bool
111

112
        ntfnClientCounter atomic.Uint64
113
        bestHeight        atomic.Uint32
114

115
        cfg *Config
116

117
        // newBlocks is a channel in which new blocks connected to the end of
118
        // the main chain are sent over, and blocks updated after a call to
119
        // UpdateFilter.
120
        newBlocks <-chan *chainview.FilteredBlock
121

122
        // staleBlocks is a channel in which blocks disconnected from the end
123
        // of our currently known best chain are sent over.
124
        staleBlocks <-chan *chainview.FilteredBlock
125

126
        // networkUpdates is a channel that carries new topology updates
127
        // messages from outside the Builder to be processed by the
128
        // networkHandler.
129
        networkUpdates chan *routingMsg
130

131
        // topologyClients maps a client's unique notification ID to a
132
        // topologyClient client that contains its notification dispatch
133
        // channel.
134
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
135

136
        // ntfnClientUpdates is a channel that's used to send new updates to
137
        // topology notification clients to the Builder. Updates either
138
        // add a new notification client, or cancel notifications for an
139
        // existing client.
140
        ntfnClientUpdates chan *topologyClientUpdate
141

142
        // channelEdgeMtx is a mutex we use to make sure we process only one
143
        // ChannelEdgePolicy at a time for a given channelID, to ensure
144
        // consistency between the various database accesses.
145
        channelEdgeMtx *multimutex.Mutex[uint64]
146

147
        // statTicker is a resumable ticker that logs the router's progress as
148
        // it discovers channels or receives updates.
149
        statTicker ticker.Ticker
150

151
        // stats tracks newly processed channels, updates, and node
152
        // announcements over a window of defaultStatInterval.
153
        stats *builderStats
154

155
        quit chan struct{}
156
        wg   sync.WaitGroup
157
}
158

159
// A compile time check to ensure Builder implements the
160
// ChannelGraphSource interface.
161
var _ ChannelGraphSource = (*Builder)(nil)
162

163
// NewBuilder constructs a new Builder.
164
func NewBuilder(cfg *Config) (*Builder, error) {
20✔
165
        return &Builder{
20✔
166
                cfg:               cfg,
20✔
167
                networkUpdates:    make(chan *routingMsg),
20✔
168
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
20✔
169
                ntfnClientUpdates: make(chan *topologyClientUpdate),
20✔
170
                channelEdgeMtx:    multimutex.NewMutex[uint64](),
20✔
171
                statTicker:        ticker.New(defaultStatInterval),
20✔
172
                stats:             new(builderStats),
20✔
173
                quit:              make(chan struct{}),
20✔
174
        }, nil
20✔
175
}
20✔
176

177
// Start launches all the goroutines the Builder requires to carry out its
178
// duties. If the builder has already been started, then this method is a noop.
179
func (b *Builder) Start() error {
20✔
180
        if !b.started.CompareAndSwap(false, true) {
20✔
181
                return nil
×
182
        }
×
183

184
        log.Info("Builder starting")
20✔
185

20✔
186
        bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
20✔
187
        if err != nil {
20✔
188
                return err
×
189
        }
×
190

191
        // If the graph has never been pruned, or hasn't fully been created yet,
192
        // then we don't treat this as an explicit error.
193
        if _, _, err := b.cfg.Graph.PruneTip(); err != nil {
38✔
194
                switch {
18✔
195
                case errors.Is(err, graphdb.ErrGraphNeverPruned):
18✔
196
                        fallthrough
18✔
197

198
                case errors.Is(err, graphdb.ErrGraphNotFound):
18✔
199
                        // If the graph has never been pruned, then we'll set
18✔
200
                        // the prune height to the current best height of the
18✔
201
                        // chain backend.
18✔
202
                        _, err = b.cfg.Graph.PruneGraph(
18✔
203
                                nil, bestHash, uint32(bestHeight),
18✔
204
                        )
18✔
205
                        if err != nil {
18✔
206
                                return err
×
207
                        }
×
208

209
                default:
×
210
                        return err
×
211
                }
212
        }
213

214
        // If AssumeChannelValid is present, then we won't rely on pruning
215
        // channels from the graph based on their spentness, but whether they
216
        // are considered zombies or not. We will start zombie pruning after a
217
        // small delay, to avoid slowing down startup of lnd.
218
        if b.cfg.AssumeChannelValid { //nolint:nestif
21✔
219
                time.AfterFunc(b.cfg.FirstTimePruneDelay, func() {
2✔
220
                        select {
1✔
221
                        case <-b.quit:
×
222
                                return
×
223
                        default:
1✔
224
                        }
225

226
                        log.Info("Initial zombie prune starting")
1✔
227
                        if err := b.pruneZombieChans(); err != nil {
1✔
228
                                log.Errorf("Unable to prune zombies: %v", err)
×
229
                        }
×
230
                })
231
        } else {
19✔
232
                // Otherwise, we'll use our filtered chain view to prune
19✔
233
                // channels as soon as they are detected as spent on-chain.
19✔
234
                if err := b.cfg.ChainView.Start(); err != nil {
19✔
235
                        return err
×
236
                }
×
237

238
                // Once the instance is active, we'll fetch the channel we'll
239
                // receive notifications over.
240
                b.newBlocks = b.cfg.ChainView.FilteredBlocks()
19✔
241
                b.staleBlocks = b.cfg.ChainView.DisconnectedBlocks()
19✔
242

19✔
243
                // Before we perform our manual block pruning, we'll construct
19✔
244
                // and apply a fresh chain filter to the active
19✔
245
                // FilteredChainView instance.  We do this before, as otherwise
19✔
246
                // we may miss on-chain events as the filter hasn't properly
19✔
247
                // been applied.
19✔
248
                channelView, err := b.cfg.Graph.ChannelView()
19✔
249
                if err != nil && !errors.Is(
19✔
250
                        err, graphdb.ErrGraphNoEdgesFound,
19✔
251
                ) {
19✔
252

×
253
                        return err
×
254
                }
×
255

256
                log.Infof("Filtering chain using %v channels active",
19✔
257
                        len(channelView))
19✔
258

19✔
259
                if len(channelView) != 0 {
26✔
260
                        err = b.cfg.ChainView.UpdateFilter(
7✔
261
                                channelView, uint32(bestHeight),
7✔
262
                        )
7✔
263
                        if err != nil {
7✔
264
                                return err
×
265
                        }
×
266
                }
267

268
                // The graph pruning might have taken a while and there could be
269
                // new blocks available.
270
                _, bestHeight, err = b.cfg.Chain.GetBestBlock()
19✔
271
                if err != nil {
19✔
272
                        return err
×
273
                }
×
274
                b.bestHeight.Store(uint32(bestHeight))
19✔
275

19✔
276
                // Before we begin normal operation of the router, we first need
19✔
277
                // to synchronize the channel graph to the latest state of the
19✔
278
                // UTXO set.
19✔
279
                if err := b.syncGraphWithChain(); err != nil {
19✔
280
                        return err
×
281
                }
×
282

283
                // Finally, before we proceed, we'll prune any unconnected nodes
284
                // from the graph in order to ensure we maintain a tight graph
285
                // of "useful" nodes.
286
                err = b.cfg.Graph.PruneGraphNodes()
19✔
287
                if err != nil &&
19✔
288
                        !errors.Is(err, graphdb.ErrGraphNodesNotFound) {
19✔
289

×
290
                        return err
×
291
                }
×
292
        }
293

294
        b.wg.Add(1)
20✔
295
        go b.networkHandler()
20✔
296

20✔
297
        log.Debug("Builder started")
20✔
298

20✔
299
        return nil
20✔
300
}
301

302
// Stop signals to the Builder that it should halt all routines. This method
303
// will *block* until all goroutines have excited. If the builder has already
304
// stopped then this method will return immediately.
305
func (b *Builder) Stop() error {
20✔
306
        if !b.stopped.CompareAndSwap(false, true) {
22✔
307
                return nil
2✔
308
        }
2✔
309

310
        log.Info("Builder shutting down...")
18✔
311

18✔
312
        // Our filtered chain view could've only been started if
18✔
313
        // AssumeChannelValid isn't present.
18✔
314
        if !b.cfg.AssumeChannelValid {
35✔
315
                if err := b.cfg.ChainView.Stop(); err != nil {
17✔
316
                        return err
×
317
                }
×
318
        }
319

320
        close(b.quit)
18✔
321
        b.wg.Wait()
18✔
322

18✔
323
        log.Debug("Builder shutdown complete")
18✔
324

18✔
325
        return nil
18✔
326
}
327

328
// syncGraphWithChain attempts to synchronize the current channel graph with
329
// the latest UTXO set state. This process involves pruning from the channel
330
// graph any channels which have been closed by spending their funding output
331
// since we've been down.
332
func (b *Builder) syncGraphWithChain() error {
19✔
333
        // First, we'll need to check to see if we're already in sync with the
19✔
334
        // latest state of the UTXO set.
19✔
335
        bestHash, bestHeight, err := b.cfg.Chain.GetBestBlock()
19✔
336
        if err != nil {
19✔
337
                return err
×
338
        }
×
339
        b.bestHeight.Store(uint32(bestHeight))
19✔
340

19✔
341
        pruneHash, pruneHeight, err := b.cfg.Graph.PruneTip()
19✔
342
        if err != nil {
19✔
343
                switch {
×
344
                // If the graph has never been pruned, or hasn't fully been
345
                // created yet, then we don't treat this as an explicit error.
346
                case errors.Is(err, graphdb.ErrGraphNeverPruned):
×
347
                case errors.Is(err, graphdb.ErrGraphNotFound):
×
348
                default:
×
349
                        return err
×
350
                }
351
        }
352

353
        log.Infof("Prune tip for Channel Graph: height=%v, hash=%v",
19✔
354
                pruneHeight, pruneHash)
19✔
355

19✔
356
        switch {
19✔
357
        // If the graph has never been pruned, then we can exit early as this
358
        // entails it's being created for the first time and hasn't seen any
359
        // block or created channels.
360
        case pruneHeight == 0 || pruneHash == nil:
3✔
361
                return nil
3✔
362

363
        // If the block hashes and heights match exactly, then we don't need to
364
        // prune the channel graph as we're already fully in sync.
365
        case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
14✔
366
                return nil
14✔
367
        }
368

369
        // If the main chain blockhash at prune height is different from the
370
        // prune hash, this might indicate the database is on a stale branch.
371
        mainBlockHash, err := b.cfg.Chain.GetBlockHash(int64(pruneHeight))
2✔
372
        if err != nil {
2✔
373
                return err
×
374
        }
×
375

376
        // While we are on a stale branch of the chain, walk backwards to find
377
        // first common block.
378
        for !pruneHash.IsEqual(mainBlockHash) {
12✔
379
                log.Infof("channel graph is stale. Disconnecting block %v "+
10✔
380
                        "(hash=%v)", pruneHeight, pruneHash)
10✔
381
                // Prune the graph for every channel that was opened at height
10✔
382
                // >= pruneHeight.
10✔
383
                _, err := b.cfg.Graph.DisconnectBlockAtHeight(pruneHeight)
10✔
384
                if err != nil {
10✔
385
                        return err
×
386
                }
×
387

388
                pruneHash, pruneHeight, err = b.cfg.Graph.PruneTip()
10✔
389
                switch {
10✔
390
                // If at this point the graph has never been pruned, we can exit
391
                // as this entails we are back to the point where it hasn't seen
392
                // any block or created channels, alas there's nothing left to
393
                // prune.
394
                case errors.Is(err, graphdb.ErrGraphNeverPruned):
×
395
                        return nil
×
396

397
                case errors.Is(err, graphdb.ErrGraphNotFound):
×
398
                        return nil
×
399

400
                case err != nil:
×
401
                        return err
×
402

403
                default:
10✔
404
                }
405

406
                mainBlockHash, err = b.cfg.Chain.GetBlockHash(
10✔
407
                        int64(pruneHeight),
10✔
408
                )
10✔
409
                if err != nil {
10✔
410
                        return err
×
411
                }
×
412
        }
413

414
        log.Infof("Syncing channel graph from height=%v (hash=%v) to "+
2✔
415
                "height=%v (hash=%v)", pruneHeight, pruneHash, bestHeight,
2✔
416
                bestHash)
2✔
417

2✔
418
        // If we're not yet caught up, then we'll walk forward in the chain
2✔
419
        // pruning the channel graph with each new block that hasn't yet been
2✔
420
        // consumed by the channel graph.
2✔
421
        var spentOutputs []*wire.OutPoint
2✔
422
        for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ { //nolint:ll
25✔
423
                // Break out of the rescan early if a shutdown has been
23✔
424
                // requested, otherwise long rescans will block the daemon from
23✔
425
                // shutting down promptly.
23✔
426
                select {
23✔
427
                case <-b.quit:
×
428
                        return ErrGraphBuilderShuttingDown
×
429
                default:
23✔
430
                }
431

432
                // Using the next height, request a manual block pruning from
433
                // the chainview for the particular block hash.
434
                log.Infof("Filtering block for closed channels, at height: %v",
23✔
435
                        int64(nextHeight))
23✔
436
                nextHash, err := b.cfg.Chain.GetBlockHash(int64(nextHeight))
23✔
437
                if err != nil {
23✔
438
                        return err
×
439
                }
×
440
                log.Tracef("Running block filter on block with hash: %v",
23✔
441
                        nextHash)
23✔
442
                filterBlock, err := b.cfg.ChainView.FilterBlock(nextHash)
23✔
443
                if err != nil {
23✔
444
                        return err
×
445
                }
×
446

447
                // We're only interested in all prior outputs that have been
448
                // spent in the block, so collate all the referenced previous
449
                // outpoints within each tx and input.
450
                for _, tx := range filterBlock.Transactions {
24✔
451
                        for _, txIn := range tx.TxIn {
2✔
452
                                spentOutputs = append(spentOutputs,
1✔
453
                                        &txIn.PreviousOutPoint)
1✔
454
                        }
1✔
455
                }
456
        }
457

458
        // With the spent outputs gathered, attempt to prune the channel graph,
459
        // also passing in the best hash+height so the prune tip can be updated.
460
        closedChans, err := b.cfg.Graph.PruneGraph(
2✔
461
                spentOutputs, bestHash, uint32(bestHeight),
2✔
462
        )
2✔
463
        if err != nil {
2✔
464
                return err
×
465
        }
×
466

467
        log.Infof("Graph pruning complete: %v channels were closed since "+
2✔
468
                "height %v", len(closedChans), pruneHeight)
2✔
469

2✔
470
        return nil
2✔
471
}
472

473
// isZombieChannel takes two edge policy updates and determines if the
474
// corresponding channel should be considered a zombie. The first boolean is
475
// true if the policy update from node 1 is considered a zombie, the second
476
// boolean is that of node 2, and the final boolean is true if the channel
477
// is considered a zombie.
478
func (b *Builder) isZombieChannel(e1,
479
        e2 *models.ChannelEdgePolicy) (bool, bool, bool) {
6✔
480

6✔
481
        chanExpiry := b.cfg.ChannelPruneExpiry
6✔
482

6✔
483
        e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
6✔
484
        e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
6✔
485

6✔
486
        var e1Time, e2Time time.Time
6✔
487
        if e1 != nil {
10✔
488
                e1Time = e1.LastUpdate
4✔
489
        }
4✔
490
        if e2 != nil {
12✔
491
                e2Time = e2.LastUpdate
6✔
492
        }
6✔
493

494
        return e1Zombie, e2Zombie, b.IsZombieChannel(e1Time, e2Time)
6✔
495
}
496

497
// IsZombieChannel takes the timestamps of the latest channel updates for a
498
// channel and returns true if the channel should be considered a zombie based
499
// on these timestamps.
500
func (b *Builder) IsZombieChannel(updateTime1,
501
        updateTime2 time.Time) bool {
6✔
502

6✔
503
        chanExpiry := b.cfg.ChannelPruneExpiry
6✔
504

6✔
505
        e1Zombie := updateTime1.IsZero() ||
6✔
506
                time.Since(updateTime1) >= chanExpiry
6✔
507

6✔
508
        e2Zombie := updateTime2.IsZero() ||
6✔
509
                time.Since(updateTime2) >= chanExpiry
6✔
510

6✔
511
        // If we're using strict zombie pruning, then a channel is only
6✔
512
        // considered live if both edges have a recent update we know of.
6✔
513
        if b.cfg.StrictZombiePruning {
9✔
514
                return e1Zombie || e2Zombie
3✔
515
        }
3✔
516

517
        // Otherwise, if we're using the less strict variant, then a channel is
518
        // considered live if either of the edges have a recent update.
519
        return e1Zombie && e2Zombie
3✔
520
}
521

522
// pruneZombieChans is a method that will be called periodically to prune out
523
// any "zombie" channels. We consider channels zombies if *both* edges haven't
524
// been updated since our zombie horizon. If AssumeChannelValid is present,
525
// we'll also consider channels zombies if *both* edges are disabled. This
526
// usually signals that a channel has been closed on-chain. We do this
527
// periodically to keep a healthy, lively routing table.
528
func (b *Builder) pruneZombieChans() error {
5✔
529
        chansToPrune := make(map[uint64]struct{})
5✔
530
        chanExpiry := b.cfg.ChannelPruneExpiry
5✔
531

5✔
532
        log.Infof("Examining channel graph for zombie channels")
5✔
533

5✔
534
        // A helper method to detect if the channel belongs to this node
5✔
535
        isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool {
16✔
536
                return info.NodeKey1Bytes == b.cfg.SelfNode ||
11✔
537
                        info.NodeKey2Bytes == b.cfg.SelfNode
11✔
538
        }
11✔
539

540
        // First, we'll collect all the channels which are eligible for garbage
541
        // collection due to being zombies.
542
        filterPruneChans := func(info *models.ChannelEdgeInfo,
5✔
543
                e1, e2 *models.ChannelEdgePolicy) error {
13✔
544

8✔
545
                // Exit early in case this channel is already marked to be
8✔
546
                // pruned
8✔
547
                _, markedToPrune := chansToPrune[info.ChannelID]
8✔
548
                if markedToPrune {
8✔
549
                        return nil
×
550
                }
×
551

552
                // We'll ensure that we don't attempt to prune our *own*
553
                // channels from the graph, as in any case this should be
554
                // re-advertised by the sub-system above us.
555
                if isSelfChannelEdge(info) {
10✔
556
                        return nil
2✔
557
                }
2✔
558

559
                e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)
6✔
560

6✔
561
                if e1Zombie {
10✔
562
                        log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
4✔
563
                                info.NodeKey1Bytes, info.ChannelID)
4✔
564
                }
4✔
565

566
                if e2Zombie {
12✔
567
                        log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie",
6✔
568
                                info.NodeKey2Bytes, info.ChannelID)
6✔
569
                }
6✔
570

571
                // If either edge hasn't been updated for a period of
572
                // chanExpiry, then we'll mark the channel itself as eligible
573
                // for graph pruning.
574
                if !isZombieChan {
7✔
575
                        return nil
1✔
576
                }
1✔
577

578
                log.Debugf("ChannelID(%v) is a zombie, collecting to prune",
5✔
579
                        info.ChannelID)
5✔
580

5✔
581
                // TODO(roasbeef): add ability to delete single directional edge
5✔
582
                chansToPrune[info.ChannelID] = struct{}{}
5✔
583

5✔
584
                return nil
5✔
585
        }
586

587
        // If AssumeChannelValid is present we'll look at the disabled bit for
588
        // both edges. If they're both disabled, then we can interpret this as
589
        // the channel being closed and can prune it from our graph.
590
        if b.cfg.AssumeChannelValid {
7✔
591
                disabledChanIDs, err := b.cfg.Graph.DisabledChannelIDs()
2✔
592
                if err != nil {
2✔
593
                        return fmt.Errorf("unable to get disabled channels "+
×
594
                                "ids chans: %v", err)
×
595
                }
×
596

597
                disabledEdges, err := b.cfg.Graph.FetchChanInfos(
2✔
598
                        disabledChanIDs,
2✔
599
                )
2✔
600
                if err != nil {
2✔
601
                        return fmt.Errorf("unable to fetch disabled channels "+
×
602
                                "edges chans: %v", err)
×
603
                }
×
604

605
                // Ensuring we won't prune our own channel from the graph.
606
                for _, disabledEdge := range disabledEdges {
5✔
607
                        if !isSelfChannelEdge(disabledEdge.Info) {
4✔
608
                                chansToPrune[disabledEdge.Info.ChannelID] =
1✔
609
                                        struct{}{}
1✔
610
                        }
1✔
611
                }
612
        }
613

614
        startTime := time.Unix(0, 0)
5✔
615
        endTime := time.Now().Add(-1 * chanExpiry)
5✔
616
        oldEdges, err := b.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
5✔
617
        if err != nil {
5✔
618
                return fmt.Errorf("unable to fetch expired channel updates "+
×
619
                        "chans: %v", err)
×
620
        }
×
621

622
        for _, u := range oldEdges {
13✔
623
                err = filterPruneChans(u.Info, u.Policy1, u.Policy2)
8✔
624
                if err != nil {
8✔
625
                        return fmt.Errorf("error filtering channels to "+
×
626
                                "prune: %w", err)
×
627
                }
×
628
        }
629

630
        log.Infof("Pruning %v zombie channels", len(chansToPrune))
5✔
631
        if len(chansToPrune) == 0 {
7✔
632
                return nil
2✔
633
        }
2✔
634

635
        // With the set of zombie-like channels obtained, we'll do another pass
636
        // to delete them from the channel graph.
637
        toPrune := make([]uint64, 0, len(chansToPrune))
3✔
638
        for chanID := range chansToPrune {
9✔
639
                toPrune = append(toPrune, chanID)
6✔
640
                log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
6✔
641
        }
6✔
642
        err = b.cfg.Graph.DeleteChannelEdges(
3✔
643
                b.cfg.StrictZombiePruning, true, toPrune...,
3✔
644
        )
3✔
645
        if err != nil {
3✔
646
                return fmt.Errorf("unable to delete zombie channels: %w", err)
×
647
        }
×
648

649
        // With the channels pruned, we'll also attempt to prune any nodes that
650
        // were a part of them.
651
        err = b.cfg.Graph.PruneGraphNodes()
3✔
652
        if err != nil && !errors.Is(err, graphdb.ErrGraphNodesNotFound) {
3✔
653
                return fmt.Errorf("unable to prune graph nodes: %w", err)
×
654
        }
×
655

656
        return nil
3✔
657
}
658

659
// handleNetworkUpdate is responsible for processing the update message and
660
// notifies topology changes, if any.
661
//
662
// NOTE: must be run inside goroutine.
663
func (b *Builder) handleNetworkUpdate(update *routingMsg) {
27✔
664
        defer b.wg.Done()
27✔
665

27✔
666
        // Process the routing update to determine if this is either a new
27✔
667
        // update from our PoV or an update to a prior vertex/edge we
27✔
668
        // previously accepted.
27✔
669
        var err error
27✔
670
        switch msg := update.msg.(type) {
27✔
671
        case *models.LightningNode:
7✔
672
                err = b.addNode(msg, update.op...)
7✔
673

674
        case *models.ChannelEdgeInfo:
14✔
675
                err = b.addEdge(msg, update.op...)
14✔
676

677
        case *models.ChannelEdgePolicy:
6✔
678
                err = b.updateEdge(msg, update.op...)
6✔
679

680
        default:
×
681
                err = errors.Errorf("wrong routing update message type")
×
682
        }
683
        update.err <- err
27✔
684

27✔
685
        // If the error is not nil here, there's no need to send topology
27✔
686
        // change.
27✔
687
        if err != nil {
29✔
688
                // Log as a debug message if this is not an error we need to be
2✔
689
                // concerned about.
2✔
690
                if IsError(err, ErrIgnored, ErrOutdated) {
4✔
691
                        log.Debugf("process network updates got: %v", err)
2✔
692
                } else {
2✔
693
                        log.Errorf("process network updates got: %v", err)
×
694
                }
×
695

696
                return
2✔
697
        }
698

699
        // Otherwise, we'll send off a new notification for the newly accepted
700
        // update, if any.
701
        topChange := &TopologyChange{}
25✔
702
        err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
25✔
703
        if err != nil {
25✔
704
                log.Errorf("unable to update topology change notification: %v",
×
705
                        err)
×
706
                return
×
707
        }
×
708

709
        if !topChange.isEmpty() {
36✔
710
                b.notifyTopologyChange(topChange)
11✔
711
        }
11✔
712
}
713

714
// networkHandler is the primary goroutine for the Builder. The roles of
715
// this goroutine include answering queries related to the state of the
716
// network, pruning the graph on new block notification, applying network
717
// updates, and registering new topology clients.
718
//
719
// NOTE: This MUST be run as a goroutine.
720
func (b *Builder) networkHandler() {
20✔
721
        defer b.wg.Done()
20✔
722

20✔
723
        graphPruneTicker := time.NewTicker(b.cfg.GraphPruneInterval)
20✔
724
        defer graphPruneTicker.Stop()
20✔
725

20✔
726
        defer b.statTicker.Stop()
20✔
727

20✔
728
        b.stats.Reset()
20✔
729

20✔
730
        for {
147✔
731
                // If there are stats, resume the statTicker.
127✔
732
                if !b.stats.Empty() {
171✔
733
                        b.statTicker.Resume()
44✔
734
                }
44✔
735

736
                select {
127✔
737
                // A new fully validated network update has just arrived. As a
738
                // result we'll modify the channel graph accordingly depending
739
                // on the exact type of the message.
740
                case update := <-b.networkUpdates:
27✔
741
                        b.wg.Add(1)
27✔
742
                        go b.handleNetworkUpdate(update)
27✔
743

744
                        // TODO(roasbeef): remove all unconnected vertexes
745
                        // after N blocks pass with no corresponding
746
                        // announcements.
747

748
                case chainUpdate, ok := <-b.staleBlocks:
10✔
749
                        // If the channel has been closed, then this indicates
10✔
750
                        // the daemon is shutting down, so we exit ourselves.
10✔
751
                        if !ok {
10✔
752
                                return
×
753
                        }
×
754

755
                        // Since this block is stale, we update our best height
756
                        // to the previous block.
757
                        blockHeight := chainUpdate.Height
10✔
758
                        b.bestHeight.Store(blockHeight - 1)
10✔
759

10✔
760
                        // Update the channel graph to reflect that this block
10✔
761
                        // was disconnected.
10✔
762
                        _, err := b.cfg.Graph.DisconnectBlockAtHeight(
10✔
763
                                blockHeight,
10✔
764
                        )
10✔
765
                        if err != nil {
10✔
766
                                log.Errorf("unable to prune graph with stale "+
×
767
                                        "block: %v", err)
×
768
                                continue
×
769
                        }
770

771
                        // TODO(halseth): notify client about the reorg?
772

773
                // A new block has arrived, so we can prune the channel graph
774
                // of any channels which were closed in the block.
775
                case chainUpdate, ok := <-b.newBlocks:
65✔
776
                        // If the channel has been closed, then this indicates
65✔
777
                        // the daemon is shutting down, so we exit ourselves.
65✔
778
                        if !ok {
65✔
779
                                return
×
780
                        }
×
781

782
                        // We'll ensure that any new blocks received attach
783
                        // directly to the end of our main chain. If not, then
784
                        // we've somehow missed some blocks. Here we'll catch
785
                        // up the chain with the latest blocks.
786
                        currentHeight := b.bestHeight.Load()
65✔
787
                        switch {
65✔
788
                        case chainUpdate.Height == currentHeight+1:
59✔
789
                                err := b.updateGraphWithClosedChannels(
59✔
790
                                        chainUpdate,
59✔
791
                                )
59✔
792
                                if err != nil {
59✔
793
                                        log.Errorf("unable to prune graph "+
×
794
                                                "with closed channels: %v", err)
×
795
                                }
×
796

797
                        case chainUpdate.Height > currentHeight+1:
1✔
798
                                log.Errorf("out of order block: expecting "+
1✔
799
                                        "height=%v, got height=%v",
1✔
800
                                        currentHeight+1, chainUpdate.Height)
1✔
801

1✔
802
                                err := b.getMissingBlocks(
1✔
803
                                        currentHeight, chainUpdate,
1✔
804
                                )
1✔
805
                                if err != nil {
1✔
806
                                        log.Errorf("unable to retrieve missing"+
×
807
                                                "blocks: %v", err)
×
808
                                }
×
809

810
                        case chainUpdate.Height < currentHeight+1:
5✔
811
                                log.Errorf("out of order block: expecting "+
5✔
812
                                        "height=%v, got height=%v",
5✔
813
                                        currentHeight+1, chainUpdate.Height)
5✔
814

5✔
815
                                log.Infof("Skipping channel pruning since "+
5✔
816
                                        "received block height %v was already"+
5✔
817
                                        " processed.", chainUpdate.Height)
5✔
818
                        }
819

820
                // A new notification client update has arrived. We're either
821
                // gaining a new client, or cancelling notifications for an
822
                // existing client.
823
                case ntfnUpdate := <-b.ntfnClientUpdates:
5✔
824
                        clientID := ntfnUpdate.clientID
5✔
825

5✔
826
                        if ntfnUpdate.cancel {
6✔
827
                                client, ok := b.topologyClients.LoadAndDelete(
1✔
828
                                        clientID,
1✔
829
                                )
1✔
830
                                if ok {
2✔
831
                                        close(client.exit)
1✔
832
                                        client.wg.Wait()
1✔
833

1✔
834
                                        close(client.ntfnChan)
1✔
835
                                }
1✔
836

837
                                continue
1✔
838
                        }
839

840
                        b.topologyClients.Store(clientID, &topologyClient{
4✔
841
                                ntfnChan: ntfnUpdate.ntfnChan,
4✔
842
                                exit:     make(chan struct{}),
4✔
843
                        })
4✔
844

845
                // The graph prune ticker has ticked, so we'll examine the
846
                // state of the known graph to filter out any zombie channels
847
                // for pruning.
848
                case <-graphPruneTicker.C:
×
849
                        if err := b.pruneZombieChans(); err != nil {
×
850
                                log.Errorf("Unable to prune zombies: %v", err)
×
851
                        }
×
852

853
                // Log any stats if we've processed a non-empty number of
854
                // channels, updates, or nodes. We'll only pause the ticker if
855
                // the last window contained no updates to avoid resuming and
856
                // pausing while consecutive windows contain new info.
857
                case <-b.statTicker.Ticks():
×
858
                        if !b.stats.Empty() {
×
859
                                log.Infof(b.stats.String())
×
860
                        } else {
×
861
                                b.statTicker.Pause()
×
862
                        }
×
863
                        b.stats.Reset()
×
864

865
                // The router has been signalled to exit, to we exit our main
866
                // loop so the wait group can be decremented.
867
                case <-b.quit:
18✔
868
                        return
18✔
869
                }
870
        }
871
}
872

873
// getMissingBlocks walks through all missing blocks and updates the graph
874
// closed channels accordingly.
875
func (b *Builder) getMissingBlocks(currentHeight uint32,
876
        chainUpdate *chainview.FilteredBlock) error {
1✔
877

1✔
878
        outdatedHash, err := b.cfg.Chain.GetBlockHash(int64(currentHeight))
1✔
879
        if err != nil {
1✔
880
                return err
×
881
        }
×
882

883
        outdatedBlock := &chainntnfs.BlockEpoch{
1✔
884
                Height: int32(currentHeight),
1✔
885
                Hash:   outdatedHash,
1✔
886
        }
1✔
887

1✔
888
        epochClient, err := b.cfg.Notifier.RegisterBlockEpochNtfn(
1✔
889
                outdatedBlock,
1✔
890
        )
1✔
891
        if err != nil {
1✔
892
                return err
×
893
        }
×
894
        defer epochClient.Cancel()
1✔
895

1✔
896
        blockDifference := int(chainUpdate.Height - currentHeight)
1✔
897

1✔
898
        // We'll walk through all the outdated blocks and make sure we're able
1✔
899
        // to update the graph with any closed channels from them.
1✔
900
        for i := 0; i < blockDifference; i++ {
6✔
901
                var (
5✔
902
                        missingBlock *chainntnfs.BlockEpoch
5✔
903
                        ok           bool
5✔
904
                )
5✔
905

5✔
906
                select {
5✔
907
                case missingBlock, ok = <-epochClient.Epochs:
5✔
908
                        if !ok {
5✔
909
                                return nil
×
910
                        }
×
911

912
                case <-b.quit:
×
913
                        return nil
×
914
                }
915

916
                filteredBlock, err := b.cfg.ChainView.FilterBlock(
5✔
917
                        missingBlock.Hash,
5✔
918
                )
5✔
919
                if err != nil {
5✔
920
                        return err
×
921
                }
×
922

923
                err = b.updateGraphWithClosedChannels(
5✔
924
                        filteredBlock,
5✔
925
                )
5✔
926
                if err != nil {
5✔
927
                        return err
×
928
                }
×
929
        }
930

931
        return nil
1✔
932
}
933

934
// updateGraphWithClosedChannels prunes the channel graph of closed channels
935
// that are no longer needed.
936
func (b *Builder) updateGraphWithClosedChannels(
937
        chainUpdate *chainview.FilteredBlock) error {
64✔
938

64✔
939
        // Once a new block arrives, we update our running track of the height
64✔
940
        // of the chain tip.
64✔
941
        blockHeight := chainUpdate.Height
64✔
942

64✔
943
        b.bestHeight.Store(blockHeight)
64✔
944
        log.Infof("Pruning channel graph using block %v (height=%v)",
64✔
945
                chainUpdate.Hash, blockHeight)
64✔
946

64✔
947
        // We're only interested in all prior outputs that have been spent in
64✔
948
        // the block, so collate all the referenced previous outpoints within
64✔
949
        // each tx and input.
64✔
950
        var spentOutputs []*wire.OutPoint
64✔
951
        for _, tx := range chainUpdate.Transactions {
65✔
952
                for _, txIn := range tx.TxIn {
2✔
953
                        spentOutputs = append(spentOutputs,
1✔
954
                                &txIn.PreviousOutPoint)
1✔
955
                }
1✔
956
        }
957

958
        // With the spent outputs gathered, attempt to prune the channel graph,
959
        // also passing in the hash+height of the block being pruned so the
960
        // prune tip can be updated.
961
        chansClosed, err := b.cfg.Graph.PruneGraph(spentOutputs,
64✔
962
                &chainUpdate.Hash, chainUpdate.Height)
64✔
963
        if err != nil {
64✔
964
                log.Errorf("unable to prune routing table: %v", err)
×
965
                return err
×
966
        }
×
967

968
        log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
64✔
969
                blockHeight, len(chansClosed))
64✔
970

64✔
971
        if len(chansClosed) == 0 {
127✔
972
                return err
63✔
973
        }
63✔
974

975
        // Notify all currently registered clients of the newly closed channels.
976
        closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
1✔
977
        b.notifyTopologyChange(&TopologyChange{
1✔
978
                ClosedChannels: closeSummaries,
1✔
979
        })
1✔
980

1✔
981
        return nil
1✔
982
}
983

984
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
985
// the database for the passed node with a timestamp newer than the passed
986
// timestamp. ErrIgnored will be returned if we already have the node, and
987
// ErrOutdated will be returned if we have a timestamp that's after the new
988
// timestamp.
989
func (b *Builder) assertNodeAnnFreshness(node route.Vertex,
990
        msgTimestamp time.Time) error {
10✔
991

10✔
992
        // If we are not already aware of this node, it means that we don't
10✔
993
        // know about any channel using this node. To avoid a DoS attack by
10✔
994
        // node announcements, we will ignore such nodes. If we do know about
10✔
995
        // this node, check that this update brings info newer than what we
10✔
996
        // already have.
10✔
997
        lastUpdate, exists, err := b.cfg.Graph.HasLightningNode(node)
10✔
998
        if err != nil {
10✔
999
                return errors.Errorf("unable to query for the "+
×
1000
                        "existence of node: %v", err)
×
1001
        }
×
1002
        if !exists {
11✔
1003
                return NewErrf(ErrIgnored, "Ignoring node announcement"+
1✔
1004
                        " for node not found in channel graph (%x)",
1✔
1005
                        node[:])
1✔
1006
        }
1✔
1007

1008
        // If we've reached this point then we're aware of the vertex being
1009
        // advertised. So we now check if the new message has a new time stamp,
1010
        // if not then we won't accept the new data as it would override newer
1011
        // data.
1012
        if !lastUpdate.Before(msgTimestamp) {
10✔
1013
                return NewErrf(ErrOutdated, "Ignoring outdated "+
1✔
1014
                        "announcement for %x", node[:])
1✔
1015
        }
1✔
1016

1017
        return nil
8✔
1018
}
1019

1020
// MarkZombieEdge adds a channel that failed complete validation into the zombie
1021
// index so we can avoid having to re-validate it in the future.
1022
func (b *Builder) MarkZombieEdge(chanID uint64) error {
×
1023
        // If the edge fails validation we'll mark the edge itself as a zombie
×
1024
        // so we don't continue to request it. We use the "zero key" for both
×
1025
        // node pubkeys so this edge can't be resurrected.
×
1026
        var zeroKey [33]byte
×
1027
        err := b.cfg.Graph.MarkEdgeZombie(chanID, zeroKey, zeroKey)
×
1028
        if err != nil {
×
1029
                return fmt.Errorf("unable to mark spent chan(id=%v) as a "+
×
1030
                        "zombie: %w", chanID, err)
×
1031
        }
×
1032

1033
        return nil
×
1034
}
1035

1036
// routingMsg couples a routing related routing topology update to the
1037
// error channel.
1038
type routingMsg struct {
1039
        msg interface{}
1040
        op  []batch.SchedulerOption
1041
        err chan error
1042
}
1043

1044
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
1045
// database. It returns a bool indicating whether the updates were successful.
1046
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
×
1047
        ch, _, _, err := b.GetChannelByID(msg.ShortChannelID)
×
1048
        if err != nil {
×
1049
                log.Errorf("Unable to retrieve channel by id: %v", err)
×
1050
                return false
×
1051
        }
×
1052

1053
        var pubKey *btcec.PublicKey
×
1054

×
1055
        switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
×
1056
        case 0:
×
1057
                pubKey, _ = ch.NodeKey1()
×
1058

1059
        case 1:
×
1060
                pubKey, _ = ch.NodeKey2()
×
1061
        }
1062

1063
        // Exit early if the pubkey cannot be decided.
1064
        if pubKey == nil {
×
1065
                log.Errorf("Unable to decide pubkey with ChannelFlags=%v",
×
1066
                        msg.ChannelFlags)
×
1067
                return false
×
1068
        }
×
1069

1070
        err = netann.ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg)
×
1071
        if err != nil {
×
1072
                log.Errorf("Unable to validate channel update: %v", err)
×
1073
                return false
×
1074
        }
×
1075

1076
        err = b.UpdateEdge(&models.ChannelEdgePolicy{
×
1077
                SigBytes:                  msg.Signature.ToSignatureBytes(),
×
1078
                ChannelID:                 msg.ShortChannelID.ToUint64(),
×
1079
                LastUpdate:                time.Unix(int64(msg.Timestamp), 0),
×
1080
                MessageFlags:              msg.MessageFlags,
×
1081
                ChannelFlags:              msg.ChannelFlags,
×
1082
                TimeLockDelta:             msg.TimeLockDelta,
×
1083
                MinHTLC:                   msg.HtlcMinimumMsat,
×
1084
                MaxHTLC:                   msg.HtlcMaximumMsat,
×
1085
                FeeBaseMSat:               lnwire.MilliSatoshi(msg.BaseFee),
×
1086
                FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
×
1087
                ExtraOpaqueData:           msg.ExtraOpaqueData,
×
1088
        })
×
1089
        if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
×
1090
                log.Errorf("Unable to apply channel update: %v", err)
×
1091
                return false
×
1092
        }
×
1093

1094
        return true
×
1095
}
1096

1097
// AddNode is used to add information about a node to the router database. If
1098
// the node with this pubkey is not present in an existing channel, it will
1099
// be ignored.
1100
//
1101
// NOTE: This method is part of the ChannelGraphSource interface.
1102
func (b *Builder) AddNode(node *models.LightningNode,
1103
        op ...batch.SchedulerOption) error {
7✔
1104

7✔
1105
        rMsg := &routingMsg{
7✔
1106
                msg: node,
7✔
1107
                op:  op,
7✔
1108
                err: make(chan error, 1),
7✔
1109
        }
7✔
1110

7✔
1111
        select {
7✔
1112
        case b.networkUpdates <- rMsg:
7✔
1113
                select {
7✔
1114
                case err := <-rMsg.err:
7✔
1115
                        return err
7✔
1116
                case <-b.quit:
×
1117
                        return ErrGraphBuilderShuttingDown
×
1118
                }
1119
        case <-b.quit:
×
1120
                return ErrGraphBuilderShuttingDown
×
1121
        }
1122
}
1123

1124
// addNode does some basic checks on the given LightningNode against what we
1125
// currently have persisted in the graph, and then adds it to the graph. If we
1126
// already know about the node, then we only update our DB if the new update
1127
// has a newer timestamp than the last one we received.
1128
func (b *Builder) addNode(node *models.LightningNode,
1129
        op ...batch.SchedulerOption) error {
7✔
1130

7✔
1131
        // Before we add the node to the database, we'll check to see if the
7✔
1132
        // announcement is "fresh" or not. If it isn't, then we'll return an
7✔
1133
        // error.
7✔
1134
        err := b.assertNodeAnnFreshness(node.PubKeyBytes, node.LastUpdate)
7✔
1135
        if err != nil {
8✔
1136
                return err
1✔
1137
        }
1✔
1138

1139
        if err := b.cfg.Graph.AddLightningNode(node, op...); err != nil {
6✔
1140
                return errors.Errorf("unable to add node %x to the "+
×
1141
                        "graph: %v", node.PubKeyBytes, err)
×
1142
        }
×
1143

1144
        log.Tracef("Updated vertex data for node=%x", node.PubKeyBytes)
6✔
1145
        b.stats.incNumNodeUpdates()
6✔
1146

6✔
1147
        return nil
6✔
1148
}
1149

1150
// AddEdge is used to add edge/channel to the topology of the router, after all
1151
// information about channel will be gathered this edge/channel might be used
1152
// in construction of payment path.
1153
//
1154
// NOTE: This method is part of the ChannelGraphSource interface.
1155
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
1156
        op ...batch.SchedulerOption) error {
14✔
1157

14✔
1158
        rMsg := &routingMsg{
14✔
1159
                msg: edge,
14✔
1160
                op:  op,
14✔
1161
                err: make(chan error, 1),
14✔
1162
        }
14✔
1163

14✔
1164
        select {
14✔
1165
        case b.networkUpdates <- rMsg:
14✔
1166
                select {
14✔
1167
                case err := <-rMsg.err:
14✔
1168
                        return err
14✔
1169
                case <-b.quit:
×
1170
                        return ErrGraphBuilderShuttingDown
×
1171
                }
1172
        case <-b.quit:
×
1173
                return ErrGraphBuilderShuttingDown
×
1174
        }
1175
}
1176

1177
// addEdge does some validation on the new channel edge against what we
1178
// currently have persisted in the graph, and then adds it to the graph. The
1179
// Chain View is updated with the new edge if it is successfully added to the
1180
// graph. We only persist the channel if we currently dont have it at all in
1181
// our graph.
1182
//
1183
// TODO(elle): this currently also does funding-transaction validation. But this
1184
// should be moved to the gossiper instead.
1185
func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
1186
        op ...batch.SchedulerOption) error {
14✔
1187

14✔
1188
        log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID)
14✔
1189

14✔
1190
        // Prior to processing the announcement we first check if we
14✔
1191
        // already know of this channel, if so, then we can exit early.
14✔
1192
        _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
14✔
1193
                edge.ChannelID,
14✔
1194
        )
14✔
1195
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
14✔
1196
                return errors.Errorf("unable to check for edge existence: %v",
×
1197
                        err)
×
1198
        }
×
1199
        if isZombie {
14✔
1200
                return NewErrf(ErrIgnored, "ignoring msg for zombie chan_id=%v",
×
1201
                        edge.ChannelID)
×
1202
        }
×
1203
        if exists {
14✔
1204
                return NewErrf(ErrIgnored, "ignoring msg for known chan_id=%v",
×
1205
                        edge.ChannelID)
×
1206
        }
×
1207

1208
        if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil {
14✔
1209
                return fmt.Errorf("unable to add edge: %w", err)
×
1210
        }
×
1211

1212
        b.stats.incNumEdgesDiscovered()
14✔
1213

14✔
1214
        // If AssumeChannelValid is present, of if the SCID is an alias, then
14✔
1215
        // the gossiper would not have done the expensive work of fetching
14✔
1216
        // a funding transaction and validating it. So we won't have the channel
14✔
1217
        // capacity nor the funding script. So we just log and return here.
14✔
1218
        scid := lnwire.NewShortChanIDFromInt(edge.ChannelID)
14✔
1219
        if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) {
14✔
1220
                log.Tracef("New channel discovered! Link connects %x and %x "+
×
1221
                        "with ChannelID(%v)", edge.NodeKey1Bytes,
×
1222
                        edge.NodeKey2Bytes, edge.ChannelID)
×
1223

×
1224
                return nil
×
1225
        }
×
1226

1227
        log.Debugf("New channel discovered! Link connects %x and %x with "+
14✔
1228
                "ChannelPoint(%v): chan_id=%v, capacity=%v", edge.NodeKey1Bytes,
14✔
1229
                edge.NodeKey2Bytes, edge.ChannelPoint, edge.ChannelID,
14✔
1230
                edge.Capacity)
14✔
1231

14✔
1232
        // Otherwise, then we expect the funding script to be present on the
14✔
1233
        // edge since it would have been fetched when the gossiper validated the
14✔
1234
        // announcement.
14✔
1235
        fundingPkScript, err := edge.FundingScript.UnwrapOrErr(fmt.Errorf(
14✔
1236
                "expected the funding transaction script to be set",
14✔
1237
        ))
14✔
1238
        if err != nil {
14✔
1239
                return err
×
1240
        }
×
1241

1242
        // As a new edge has been added to the channel graph, we'll update the
1243
        // current UTXO filter within our active FilteredChainView so we are
1244
        // notified if/when this channel is closed.
1245
        filterUpdate := []graphdb.EdgePoint{
14✔
1246
                {
14✔
1247
                        FundingPkScript: fundingPkScript,
14✔
1248
                        OutPoint:        edge.ChannelPoint,
14✔
1249
                },
14✔
1250
        }
14✔
1251

14✔
1252
        err = b.cfg.ChainView.UpdateFilter(filterUpdate, b.bestHeight.Load())
14✔
1253
        if err != nil {
14✔
1254
                return errors.Errorf("unable to update chain "+
×
1255
                        "view: %v", err)
×
1256
        }
×
1257

1258
        return nil
14✔
1259
}
1260

1261
// UpdateEdge is used to update edge information, without this message edge
1262
// considered as not fully constructed.
1263
//
1264
// NOTE: This method is part of the ChannelGraphSource interface.
1265
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
1266
        op ...batch.SchedulerOption) error {
6✔
1267

6✔
1268
        rMsg := &routingMsg{
6✔
1269
                msg: update,
6✔
1270
                op:  op,
6✔
1271
                err: make(chan error, 1),
6✔
1272
        }
6✔
1273

6✔
1274
        select {
6✔
1275
        case b.networkUpdates <- rMsg:
6✔
1276
                select {
6✔
1277
                case err := <-rMsg.err:
6✔
1278
                        return err
6✔
1279
                case <-b.quit:
×
1280
                        return ErrGraphBuilderShuttingDown
×
1281
                }
1282
        case <-b.quit:
×
1283
                return ErrGraphBuilderShuttingDown
×
1284
        }
1285
}
1286

1287
// updateEdge validates the new edge policy against what we currently have
1288
// persisted in the graph, and then applies it to the graph if the update is
1289
// considered fresh enough and if we actually have a channel persisted for the
1290
// given update.
1291
func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
1292
        op ...batch.SchedulerOption) error {
6✔
1293

6✔
1294
        log.Debugf("Received ChannelEdgePolicy for channel %v",
6✔
1295
                policy.ChannelID)
6✔
1296

6✔
1297
        // We make sure to hold the mutex for this channel ID, such that no
6✔
1298
        // other goroutine is concurrently doing database accesses for the same
6✔
1299
        // channel ID.
6✔
1300
        b.channelEdgeMtx.Lock(policy.ChannelID)
6✔
1301
        defer b.channelEdgeMtx.Unlock(policy.ChannelID)
6✔
1302

6✔
1303
        edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
6✔
1304
                b.cfg.Graph.HasChannelEdge(policy.ChannelID)
6✔
1305
        if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
6✔
1306
                return errors.Errorf("unable to check for edge existence: %v",
×
1307
                        err)
×
1308
        }
×
1309

1310
        // If the channel is marked as a zombie in our database, and
1311
        // we consider this a stale update, then we should not apply the
1312
        // policy.
1313
        isStaleUpdate := time.Since(policy.LastUpdate) >
6✔
1314
                b.cfg.ChannelPruneExpiry
6✔
1315

6✔
1316
        if isZombie && isStaleUpdate {
6✔
1317
                return NewErrf(ErrIgnored, "ignoring stale update "+
×
1318
                        "(flags=%v|%v) for zombie chan_id=%v",
×
1319
                        policy.MessageFlags, policy.ChannelFlags,
×
1320
                        policy.ChannelID)
×
1321
        }
×
1322

1323
        // If the channel doesn't exist in our database, we cannot apply the
1324
        // updated policy.
1325
        if !exists {
7✔
1326
                return NewErrf(ErrIgnored, "ignoring update (flags=%v|%v) for "+
1✔
1327
                        "unknown chan_id=%v", policy.MessageFlags,
1✔
1328
                        policy.ChannelFlags, policy.ChannelID)
1✔
1329
        }
1✔
1330

1331
        log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
5✔
1332
                edge1Timestamp, edge2Timestamp)
5✔
1333

5✔
1334
        // As edges are directional edge node has a unique policy for the
5✔
1335
        // direction of the edge they control. Therefore, we first check if we
5✔
1336
        // already have the most up-to-date information for that edge. If this
5✔
1337
        // message has a timestamp not strictly newer than what we already know
5✔
1338
        // of we can exit early.
5✔
1339
        switch policy.ChannelFlags & lnwire.ChanUpdateDirection {
5✔
1340
        // A flag set of 0 indicates this is an announcement for the "first"
1341
        // node in the channel.
1342
        case 0:
3✔
1343
                // Ignore outdated message.
3✔
1344
                if !edge1Timestamp.Before(policy.LastUpdate) {
3✔
1345
                        return NewErrf(ErrOutdated, "Ignoring "+
×
1346
                                "outdated update (flags=%v|%v) for "+
×
1347
                                "known chan_id=%v", policy.MessageFlags,
×
1348
                                policy.ChannelFlags, policy.ChannelID)
×
1349
                }
×
1350

1351
        // Similarly, a flag set of 1 indicates this is an announcement
1352
        // for the "second" node in the channel.
1353
        case 1:
2✔
1354
                // Ignore outdated message.
2✔
1355
                if !edge2Timestamp.Before(policy.LastUpdate) {
2✔
1356
                        return NewErrf(ErrOutdated, "Ignoring "+
×
1357
                                "outdated update (flags=%v|%v) for "+
×
1358
                                "known chan_id=%v", policy.MessageFlags,
×
1359
                                policy.ChannelFlags, policy.ChannelID)
×
1360
                }
×
1361
        }
1362

1363
        // Now that we know this isn't a stale update, we'll apply the new edge
1364
        // policy to the proper directional edge within the channel graph.
1365
        if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil {
5✔
1366
                err := errors.Errorf("unable to add channel: %v", err)
×
1367
                log.Error(err)
×
1368
                return err
×
1369
        }
×
1370

1371
        log.Tracef("New channel update applied: %v",
5✔
1372
                lnutils.SpewLogClosure(policy))
5✔
1373
        b.stats.incNumChannelUpdates()
5✔
1374

5✔
1375
        return nil
5✔
1376
}
1377

1378
// CurrentBlockHeight returns the block height from POV of the router subsystem.
1379
//
1380
// NOTE: This method is part of the ChannelGraphSource interface.
1381
func (b *Builder) CurrentBlockHeight() (uint32, error) {
×
1382
        _, height, err := b.cfg.Chain.GetBestBlock()
×
1383
        return uint32(height), err
×
1384
}
×
1385

1386
// SyncedHeight returns the block height to which the router subsystem currently
1387
// is synced to. This can differ from the above chain height if the goroutine
1388
// responsible for processing the blocks isn't yet up to speed.
1389
func (b *Builder) SyncedHeight() uint32 {
×
1390
        return b.bestHeight.Load()
×
1391
}
×
1392

1393
// GetChannelByID return the channel by the channel id.
1394
//
1395
// NOTE: This method is part of the ChannelGraphSource interface.
1396
func (b *Builder) GetChannelByID(chanID lnwire.ShortChannelID) (
1397
        *models.ChannelEdgeInfo,
1398
        *models.ChannelEdgePolicy,
1399
        *models.ChannelEdgePolicy, error) {
1✔
1400

1✔
1401
        return b.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
1✔
1402
}
1✔
1403

1404
// FetchLightningNode attempts to look up a target node by its identity public
1405
// key. graphdb.ErrGraphNodeNotFound is returned if the node doesn't exist
1406
// within the graph.
1407
//
1408
// NOTE: This method is part of the ChannelGraphSource interface.
1409
func (b *Builder) FetchLightningNode(
1410
        node route.Vertex) (*models.LightningNode, error) {
×
1411

×
1412
        return b.cfg.Graph.FetchLightningNode(node)
×
1413
}
×
1414

1415
// ForAllOutgoingChannels is used to iterate over all outgoing channels owned by
1416
// the router.
1417
//
1418
// NOTE: This method is part of the ChannelGraphSource interface.
1419
func (b *Builder) ForAllOutgoingChannels(cb func(*models.ChannelEdgeInfo,
1420
        *models.ChannelEdgePolicy) error) error {
×
1421

×
1422
        return b.cfg.Graph.ForEachNodeChannel(b.cfg.SelfNode,
×
1423
                func(_ kvdb.RTx, c *models.ChannelEdgeInfo,
×
1424
                        e *models.ChannelEdgePolicy,
×
1425
                        _ *models.ChannelEdgePolicy) error {
×
1426

×
1427
                        if e == nil {
×
1428
                                return fmt.Errorf("channel from self node " +
×
1429
                                        "has no policy")
×
1430
                        }
×
1431

1432
                        return cb(c, e)
×
1433
                },
1434
        )
1435
}
1436

1437
// AddProof updates the channel edge info with proof which is needed to
1438
// properly announce the edge to the rest of the network.
1439
//
1440
// NOTE: This method is part of the ChannelGraphSource interface.
1441
func (b *Builder) AddProof(chanID lnwire.ShortChannelID,
1442
        proof *models.ChannelAuthProof) error {
1✔
1443

1✔
1444
        return b.cfg.Graph.AddEdgeProof(chanID, proof)
1✔
1445
}
1✔
1446

1447
// IsStaleNode returns true if the graph source has a node announcement for the
1448
// target node with a more recent timestamp.
1449
//
1450
// NOTE: This method is part of the ChannelGraphSource interface.
1451
func (b *Builder) IsStaleNode(node route.Vertex,
1452
        timestamp time.Time) bool {
3✔
1453

3✔
1454
        // If our attempt to assert that the node announcement is fresh fails,
3✔
1455
        // then we know that this is actually a stale announcement.
3✔
1456
        err := b.assertNodeAnnFreshness(node, timestamp)
3✔
1457
        if err != nil {
4✔
1458
                log.Debugf("Checking stale node %x got %v", node, err)
1✔
1459
                return true
1✔
1460
        }
1✔
1461

1462
        return false
2✔
1463
}
1464

1465
// IsPublicNode determines whether the given vertex is seen as a public node in
1466
// the graph from the graph's source node's point of view.
1467
//
1468
// NOTE: This method is part of the ChannelGraphSource interface.
1469
func (b *Builder) IsPublicNode(node route.Vertex) (bool, error) {
×
1470
        return b.cfg.Graph.IsPublicNode(node)
×
1471
}
×
1472

1473
// IsKnownEdge returns true if the graph source already knows of the passed
1474
// channel ID either as a live or zombie edge.
1475
//
1476
// NOTE: This method is part of the ChannelGraphSource interface.
1477
func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
1✔
1478
        _, _, exists, isZombie, _ := b.cfg.Graph.HasChannelEdge(
1✔
1479
                chanID.ToUint64(),
1✔
1480
        )
1✔
1481

1✔
1482
        return exists || isZombie
1✔
1483
}
1✔
1484

1485
// IsZombieEdge returns true if the graph source has marked the given channel ID
1486
// as a zombie edge.
1487
//
1488
// NOTE: This method is part of the ChannelGraphSource interface.
1489
func (b *Builder) IsZombieEdge(chanID lnwire.ShortChannelID) (bool, error) {
×
1490
        _, _, _, isZombie, err := b.cfg.Graph.HasChannelEdge(chanID.ToUint64())
×
1491

×
1492
        return isZombie, err
×
1493
}
×
1494

1495
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
1496
// the passed channel ID (and flags) that have a more recent timestamp.
1497
//
1498
// NOTE: This method is part of the ChannelGraphSource interface.
1499
func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
1500
        timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
6✔
1501

6✔
1502
        edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
6✔
1503
                b.cfg.Graph.HasChannelEdge(chanID.ToUint64())
6✔
1504
        if err != nil {
6✔
1505
                log.Debugf("Check stale edge policy got error: %v", err)
×
1506
                return false
×
1507
        }
×
1508

1509
        // If we know of the edge as a zombie, then we'll make some additional
1510
        // checks to determine if the new policy is fresh.
1511
        if isZombie {
6✔
1512
                // When running with AssumeChannelValid, we also prune channels
×
1513
                // if both of their edges are disabled. We'll mark the new
×
1514
                // policy as stale if it remains disabled.
×
1515
                if b.cfg.AssumeChannelValid {
×
1516
                        isDisabled := flags&lnwire.ChanUpdateDisabled ==
×
1517
                                lnwire.ChanUpdateDisabled
×
1518
                        if isDisabled {
×
1519
                                return true
×
1520
                        }
×
1521
                }
1522

1523
                // Otherwise, we'll fall back to our usual ChannelPruneExpiry.
1524
                return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
×
1525
        }
1526

1527
        // If we don't know of the edge, then it means it's fresh (thus not
1528
        // stale).
1529
        if !exists {
8✔
1530
                return false
2✔
1531
        }
2✔
1532

1533
        // As edges are directional edge node has a unique policy for the
1534
        // direction of the edge they control. Therefore, we first check if we
1535
        // already have the most up-to-date information for that edge. If so,
1536
        // then we can exit early.
1537
        switch {
4✔
1538
        // A flag set of 0 indicates this is an announcement for the "first"
1539
        // node in the channel.
1540
        case flags&lnwire.ChanUpdateDirection == 0:
2✔
1541
                return !edge1Timestamp.Before(timestamp)
2✔
1542

1543
        // Similarly, a flag set of 1 indicates this is an announcement for the
1544
        // "second" node in the channel.
1545
        case flags&lnwire.ChanUpdateDirection == 1:
2✔
1546
                return !edge2Timestamp.Before(timestamp)
2✔
1547
        }
1548

1549
        return false
×
1550
}
1551

1552
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
1553
//
1554
// NOTE: This method is part of the ChannelGraphSource interface.
1555
func (b *Builder) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
×
1556
        return b.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
×
1557
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc