• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9935147745

15 Jul 2024 07:07AM UTC coverage: 49.819% (+0.6%) from 49.268%
9935147745

Pull #8900

github

guggero
Makefile: add GOCC variable
Pull Request #8900: Makefile: add GOCC variable

93876 of 188433 relevant lines covered (49.82%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.41
/routing/router.go
1
package routing
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "math"
8
        "runtime"
9
        "strings"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/davecgh/go-spew/spew"
18
        "github.com/go-errors/errors"
19
        sphinx "github.com/lightningnetwork/lightning-onion"
20
        "github.com/lightningnetwork/lnd/amp"
21
        "github.com/lightningnetwork/lnd/batch"
22
        "github.com/lightningnetwork/lnd/chainntnfs"
23
        "github.com/lightningnetwork/lnd/channeldb"
24
        "github.com/lightningnetwork/lnd/channeldb/models"
25
        "github.com/lightningnetwork/lnd/clock"
26
        "github.com/lightningnetwork/lnd/fn"
27
        "github.com/lightningnetwork/lnd/htlcswitch"
28
        "github.com/lightningnetwork/lnd/input"
29
        "github.com/lightningnetwork/lnd/kvdb"
30
        "github.com/lightningnetwork/lnd/lntypes"
31
        "github.com/lightningnetwork/lnd/lnutils"
32
        "github.com/lightningnetwork/lnd/lnwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
34
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
35
        "github.com/lightningnetwork/lnd/lnwire"
36
        "github.com/lightningnetwork/lnd/multimutex"
37
        "github.com/lightningnetwork/lnd/record"
38
        "github.com/lightningnetwork/lnd/routing/chainview"
39
        "github.com/lightningnetwork/lnd/routing/route"
40
        "github.com/lightningnetwork/lnd/routing/shards"
41
        "github.com/lightningnetwork/lnd/ticker"
42
        "github.com/lightningnetwork/lnd/zpay32"
43
)
44

45
const (
46
        // DefaultPayAttemptTimeout is the default payment attempt timeout. The
47
        // payment attempt timeout defines the duration after which we stop
48
        // trying more routes for a payment.
49
        DefaultPayAttemptTimeout = time.Second * 60
50

51
        // DefaultChannelPruneExpiry is the default duration used to determine
52
        // if a channel should be pruned or not.
53
        DefaultChannelPruneExpiry = time.Hour * 24 * 14
54

55
        // DefaultFirstTimePruneDelay is the time we'll wait after startup
56
        // before attempting to prune the graph for zombie channels. We don't
57
        // do it immediately after startup to allow lnd to start up without
58
        // getting blocked by this job.
59
        DefaultFirstTimePruneDelay = 30 * time.Second
60

61
        // defaultStatInterval governs how often the router will log non-empty
62
        // stats related to processing new channels, updates, or node
63
        // announcements.
64
        defaultStatInterval = time.Minute
65

66
        // MinCLTVDelta is the minimum CLTV value accepted by LND for all
67
        // timelock deltas. This includes both forwarding CLTV deltas set on
68
        // channel updates, as well as final CLTV deltas used to create BOLT 11
69
        // payment requests.
70
        //
71
        // NOTE: For payment requests, BOLT 11 stipulates that a final CLTV
72
        // delta of 9 should be used when no value is decoded. This however
73
        // leads to inflexibility in upgrading this default parameter, since it
74
        // can create inconsistencies around the assumed value between sender
75
        // and receiver. Specifically, if the receiver assumes a higher value
76
        // than the sender, the receiver will always see the received HTLCs as
77
        // invalid due to their timelock not meeting the required delta.
78
        //
79
        // We skirt this by always setting an explicit CLTV delta when creating
80
        // invoices. This allows LND nodes to freely update the minimum without
81
        // creating incompatibilities during the upgrade process. For some time
82
        // LND has used an explicit default final CLTV delta of 40 blocks for
83
        // bitcoin, though we now clamp the lower end of this
84
        // range for user-chosen deltas to 18 blocks to be conservative.
85
        MinCLTVDelta = 18
86

87
        // MaxCLTVDelta is the maximum CLTV value accepted by LND for all
88
        // timelock deltas.
89
        MaxCLTVDelta = math.MaxUint16
90
)
91

92
var (
93
        // ErrRouterShuttingDown is returned if the router is in the process of
94
        // shutting down.
95
        ErrRouterShuttingDown = fmt.Errorf("router shutting down")
96

97
        // ErrSelfIntro is a failure returned when the source node of a
98
        // route request is also the introduction node. This is not yet
99
        // supported because LND does not support blinded forwardingg.
100
        ErrSelfIntro = errors.New("introduction point as own node not " +
101
                "supported")
102

103
        // ErrHintsAndBlinded is returned if a route request has both
104
        // bolt 11 route hints and a blinded path set.
105
        ErrHintsAndBlinded = errors.New("bolt 11 route hints and blinded " +
106
                "paths are mutually exclusive")
107

108
        // ErrExpiryAndBlinded is returned if a final cltv and a blinded path
109
        // are provided, as the cltv should be provided within the blinded
110
        // path.
111
        ErrExpiryAndBlinded = errors.New("final cltv delta and blinded " +
112
                "paths are mutually exclusive")
113

114
        // ErrTargetAndBlinded is returned is a target destination and a
115
        // blinded path are both set (as the target is inferred from the
116
        // blinded path).
117
        ErrTargetAndBlinded = errors.New("target node and blinded paths " +
118
                "are mutually exclusive")
119

120
        // ErrNoTarget is returned when the target node for a route is not
121
        // provided by either a blinded route or a cleartext pubkey.
122
        ErrNoTarget = errors.New("destination not set in target or blinded " +
123
                "path")
124

125
        // ErrSkipTempErr is returned when a non-MPP is made yet the
126
        // skipTempErr flag is set.
127
        ErrSkipTempErr = errors.New("cannot skip temp error for non-MPP")
128
)
129

130
// ChannelGraphSource represents the source of information about the topology
131
// of the lightning network. It's responsible for the addition of nodes, edges,
132
// applying edge updates, and returning the current block height with which the
133
// topology is synchronized.
134
type ChannelGraphSource interface {
135
        // AddNode is used to add information about a node to the router
136
        // database. If the node with this pubkey is not present in an existing
137
        // channel, it will be ignored.
138
        AddNode(node *channeldb.LightningNode,
139
                op ...batch.SchedulerOption) error
140

141
        // AddEdge is used to add edge/channel to the topology of the router,
142
        // after all information about channel will be gathered this
143
        // edge/channel might be used in construction of payment path.
144
        AddEdge(edge *models.ChannelEdgeInfo,
145
                op ...batch.SchedulerOption) error
146

147
        // AddProof updates the channel edge info with proof which is needed to
148
        // properly announce the edge to the rest of the network.
149
        AddProof(chanID lnwire.ShortChannelID,
150
                proof *models.ChannelAuthProof) error
151

152
        // UpdateEdge is used to update edge information, without this message
153
        // edge considered as not fully constructed.
154
        UpdateEdge(policy *models.ChannelEdgePolicy,
155
                op ...batch.SchedulerOption) error
156

157
        // IsStaleNode returns true if the graph source has a node announcement
158
        // for the target node with a more recent timestamp. This method will
159
        // also return true if we don't have an active channel announcement for
160
        // the target node.
161
        IsStaleNode(node route.Vertex, timestamp time.Time) bool
162

163
        // IsPublicNode determines whether the given vertex is seen as a public
164
        // node in the graph from the graph's source node's point of view.
165
        IsPublicNode(node route.Vertex) (bool, error)
166

167
        // IsKnownEdge returns true if the graph source already knows of the
168
        // passed channel ID either as a live or zombie edge.
169
        IsKnownEdge(chanID lnwire.ShortChannelID) bool
170

171
        // IsStaleEdgePolicy returns true if the graph source has a channel
172
        // edge for the passed channel ID (and flags) that have a more recent
173
        // timestamp.
174
        IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time,
175
                flags lnwire.ChanUpdateChanFlags) bool
176

177
        // MarkEdgeLive clears an edge from our zombie index, deeming it as
178
        // live.
179
        MarkEdgeLive(chanID lnwire.ShortChannelID) error
180

181
        // ForAllOutgoingChannels is used to iterate over all channels
182
        // emanating from the "source" node which is the center of the
183
        // star-graph.
184
        ForAllOutgoingChannels(cb func(tx kvdb.RTx,
185
                c *models.ChannelEdgeInfo,
186
                e *models.ChannelEdgePolicy) error) error
187

188
        // CurrentBlockHeight returns the block height from POV of the router
189
        // subsystem.
190
        CurrentBlockHeight() (uint32, error)
191

192
        // GetChannelByID return the channel by the channel id.
193
        GetChannelByID(chanID lnwire.ShortChannelID) (
194
                *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
195
                *models.ChannelEdgePolicy, error)
196

197
        // FetchLightningNode attempts to look up a target node by its identity
198
        // public key. channeldb.ErrGraphNodeNotFound is returned if the node
199
        // doesn't exist within the graph.
200
        FetchLightningNode(route.Vertex) (*channeldb.LightningNode, error)
201

202
        // ForEachNode is used to iterate over every node in the known graph.
203
        ForEachNode(func(node *channeldb.LightningNode) error) error
204
}
205

206
// PaymentAttemptDispatcher is used by the router to send payment attempts onto
207
// the network, and receive their results.
208
type PaymentAttemptDispatcher interface {
209
        // SendHTLC is a function that directs a link-layer switch to
210
        // forward a fully encoded payment to the first hop in the route
211
        // denoted by its public key. A non-nil error is to be returned if the
212
        // payment was unsuccessful.
213
        SendHTLC(firstHop lnwire.ShortChannelID,
214
                attemptID uint64,
215
                htlcAdd *lnwire.UpdateAddHTLC) error
216

217
        // GetAttemptResult returns the result of the payment attempt with
218
        // the given attemptID. The paymentHash should be set to the payment's
219
        // overall hash, or in case of AMP payments the payment's unique
220
        // identifier.
221
        //
222
        // The method returns a channel where the payment result will be sent
223
        // when available, or an error is encountered during forwarding. When a
224
        // result is received on the channel, the HTLC is guaranteed to no
225
        // longer be in flight.  The switch shutting down is signaled by
226
        // closing the channel. If the attemptID is unknown,
227
        // ErrPaymentIDNotFound will be returned.
228
        GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
229
                deobfuscator htlcswitch.ErrorDecrypter) (
230
                <-chan *htlcswitch.PaymentResult, error)
231

232
        // CleanStore calls the underlying result store, telling it is safe to
233
        // delete all entries except the ones in the keepPids map. This should
234
        // be called periodically to let the switch clean up payment results
235
        // that we have handled.
236
        // NOTE: New payment attempts MUST NOT be made after the keepPids map
237
        // has been created and this method has returned.
238
        CleanStore(keepPids map[uint64]struct{}) error
239
}
240

241
// PaymentSessionSource is an interface that defines a source for the router to
242
// retrieve new payment sessions.
243
type PaymentSessionSource interface {
244
        // NewPaymentSession creates a new payment session that will produce
245
        // routes to the given target. An optional set of routing hints can be
246
        // provided in order to populate additional edges to explore when
247
        // finding a path to the payment's destination.
248
        NewPaymentSession(p *LightningPayment) (PaymentSession, error)
249

250
        // NewPaymentSessionEmpty creates a new paymentSession instance that is
251
        // empty, and will be exhausted immediately. Used for failure reporting
252
        // to missioncontrol for resumed payment we don't want to make more
253
        // attempts for.
254
        NewPaymentSessionEmpty() PaymentSession
255
}
256

257
// MissionController is an interface that exposes failure reporting and
258
// probability estimation.
259
type MissionController interface {
260
        // ReportPaymentFail reports a failed payment to mission control as
261
        // input for future probability estimates. It returns a bool indicating
262
        // whether this error is a final error and no further payment attempts
263
        // need to be made.
264
        ReportPaymentFail(attemptID uint64, rt *route.Route,
265
                failureSourceIdx *int, failure lnwire.FailureMessage) (
266
                *channeldb.FailureReason, error)
267

268
        // ReportPaymentSuccess reports a successful payment to mission control
269
        // as input for future probability estimates.
270
        ReportPaymentSuccess(attemptID uint64, rt *route.Route) error
271

272
        // GetProbability is expected to return the success probability of a
273
        // payment from fromNode along edge.
274
        GetProbability(fromNode, toNode route.Vertex,
275
                amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64
276
}
277

278
// FeeSchema is the set fee configuration for a Lightning Node on the network.
279
// Using the coefficients described within the schema, the required fee to
280
// forward outgoing payments can be derived.
281
type FeeSchema struct {
282
        // BaseFee is the base amount of milli-satoshis that will be chained
283
        // for ANY payment forwarded.
284
        BaseFee lnwire.MilliSatoshi
285

286
        // FeeRate is the rate that will be charged for forwarding payments.
287
        // This value should be interpreted as the numerator for a fraction
288
        // (fixed point arithmetic) whose denominator is 1 million. As a result
289
        // the effective fee rate charged per mSAT will be: (amount *
290
        // FeeRate/1,000,000).
291
        FeeRate uint32
292

293
        // InboundFee is the inbound fee schedule that applies to forwards
294
        // coming in through a channel to which this FeeSchema pertains.
295
        InboundFee fn.Option[models.InboundFee]
296
}
297

298
// ChannelPolicy holds the parameters that determine the policy we enforce
299
// when forwarding payments on a channel. These parameters are communicated
300
// to the rest of the network in ChannelUpdate messages.
301
type ChannelPolicy struct {
302
        // FeeSchema holds the fee configuration for a channel.
303
        FeeSchema
304

305
        // TimeLockDelta is the required HTLC timelock delta to be used
306
        // when forwarding payments.
307
        TimeLockDelta uint32
308

309
        // MaxHTLC is the maximum HTLC size including fees we are allowed to
310
        // forward over this channel.
311
        MaxHTLC lnwire.MilliSatoshi
312

313
        // MinHTLC is the minimum HTLC size including fees we are allowed to
314
        // forward over this channel.
315
        MinHTLC *lnwire.MilliSatoshi
316
}
317

318
// Config defines the configuration for the ChannelRouter. ALL elements within
319
// the configuration MUST be non-nil for the ChannelRouter to carry out its
320
// duties.
321
type Config struct {
322
        // Graph is the channel graph that the ChannelRouter will use to gather
323
        // metrics from and also to carry out path finding queries.
324
        // TODO(roasbeef): make into an interface
325
        Graph *channeldb.ChannelGraph
326

327
        // Chain is the router's source to the most up-to-date blockchain data.
328
        // All incoming advertised channels will be checked against the chain
329
        // to ensure that the channels advertised are still open.
330
        Chain lnwallet.BlockChainIO
331

332
        // ChainView is an instance of a FilteredChainView which is used to
333
        // watch the sub-set of the UTXO set (the set of active channels) that
334
        // we need in order to properly maintain the channel graph.
335
        ChainView chainview.FilteredChainView
336

337
        // Notifier is a reference to the ChainNotifier, used to grab
338
        // the latest blocks if the router is missing any.
339
        Notifier chainntnfs.ChainNotifier
340

341
        // Payer is an instance of a PaymentAttemptDispatcher and is used by
342
        // the router to send payment attempts onto the network, and receive
343
        // their results.
344
        Payer PaymentAttemptDispatcher
345

346
        // Control keeps track of the status of ongoing payments, ensuring we
347
        // can properly resume them across restarts.
348
        Control ControlTower
349

350
        // MissionControl is a shared memory of sorts that executions of
351
        // payment path finding use in order to remember which vertexes/edges
352
        // were pruned from prior attempts. During SendPayment execution,
353
        // errors sent by nodes are mapped into a vertex or edge to be pruned.
354
        // Each run will then take into account this set of pruned
355
        // vertexes/edges to reduce route failure and pass on graph information
356
        // gained to the next execution.
357
        MissionControl MissionController
358

359
        // SessionSource defines a source for the router to retrieve new payment
360
        // sessions.
361
        SessionSource PaymentSessionSource
362

363
        // ChannelPruneExpiry is the duration used to determine if a channel
364
        // should be pruned or not. If the delta between now and when the
365
        // channel was last updated is greater than ChannelPruneExpiry, then
366
        // the channel is marked as a zombie channel eligible for pruning.
367
        ChannelPruneExpiry time.Duration
368

369
        // GraphPruneInterval is used as an interval to determine how often we
370
        // should examine the channel graph to garbage collect zombie channels.
371
        GraphPruneInterval time.Duration
372

373
        // FirstTimePruneDelay is the time we'll wait after startup before
374
        // attempting to prune the graph for zombie channels. We don't do it
375
        // immediately after startup to allow lnd to start up without getting
376
        // blocked by this job.
377
        FirstTimePruneDelay time.Duration
378

379
        // QueryBandwidth is a method that allows the router to query the lower
380
        // link layer to determine the up-to-date available bandwidth at a
381
        // prospective link to be traversed. If the  link isn't available, then
382
        // a value of zero should be returned. Otherwise, the current up-to-
383
        // date knowledge of the available bandwidth of the link should be
384
        // returned.
385
        GetLink getLinkQuery
386

387
        // NextPaymentID is a method that guarantees to return a new, unique ID
388
        // each time it is called. This is used by the router to generate a
389
        // unique payment ID for each payment it attempts to send, such that
390
        // the switch can properly handle the HTLC.
391
        NextPaymentID func() (uint64, error)
392

393
        // AssumeChannelValid toggles whether the router will check for
394
        // spentness of channel outpoints. For neutrino, this saves long rescans
395
        // from blocking initial usage of the daemon.
396
        AssumeChannelValid bool
397

398
        // PathFindingConfig defines global path finding parameters.
399
        PathFindingConfig PathFindingConfig
400

401
        // Clock is mockable time provider.
402
        Clock clock.Clock
403

404
        // StrictZombiePruning determines if we attempt to prune zombie
405
        // channels according to a stricter criteria. If true, then we'll prune
406
        // a channel if only *one* of the edges is considered a zombie.
407
        // Otherwise, we'll only prune the channel when both edges have a very
408
        // dated last update.
409
        StrictZombiePruning bool
410

411
        // IsAlias returns whether a passed ShortChannelID is an alias. This is
412
        // only used for our local channels.
413
        IsAlias func(scid lnwire.ShortChannelID) bool
414
}
415

416
// EdgeLocator is a struct used to identify a specific edge.
417
type EdgeLocator struct {
418
        // ChannelID is the channel of this edge.
419
        ChannelID uint64
420

421
        // Direction takes the value of 0 or 1 and is identical in definition to
422
        // the channel direction flag. A value of 0 means the direction from the
423
        // lower node pubkey to the higher.
424
        Direction uint8
425
}
426

427
// String returns a human-readable version of the edgeLocator values.
428
func (e *EdgeLocator) String() string {
×
429
        return fmt.Sprintf("%v:%v", e.ChannelID, e.Direction)
×
430
}
×
431

432
// ChannelRouter is the layer 3 router within the Lightning stack. Below the
433
// ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain
434
// itself. The primary role of the ChannelRouter is to respond to queries for
435
// potential routes that can support a payment amount, and also general graph
436
// reachability questions. The router will prune the channel graph
437
// automatically as new blocks are discovered which spend certain known funding
438
// outpoints, thereby closing their respective channels.
439
type ChannelRouter struct {
440
        ntfnClientCounter uint64 // To be used atomically.
441

442
        started uint32 // To be used atomically.
443
        stopped uint32 // To be used atomically.
444

445
        bestHeight uint32 // To be used atomically.
446

447
        // cfg is a copy of the configuration struct that the ChannelRouter was
448
        // initialized with.
449
        cfg *Config
450

451
        // selfNode is the center of the star-graph centered around the
452
        // ChannelRouter. The ChannelRouter uses this node as a starting point
453
        // when doing any path finding.
454
        selfNode *channeldb.LightningNode
455

456
        // cachedGraph is an instance of routingGraph that caches the source
457
        // node as well as the channel graph itself in memory.
458
        cachedGraph routingGraph
459

460
        // newBlocks is a channel in which new blocks connected to the end of
461
        // the main chain are sent over, and blocks updated after a call to
462
        // UpdateFilter.
463
        newBlocks <-chan *chainview.FilteredBlock
464

465
        // staleBlocks is a channel in which blocks disconnected from the end
466
        // of our currently known best chain are sent over.
467
        staleBlocks <-chan *chainview.FilteredBlock
468

469
        // networkUpdates is a channel that carries new topology updates
470
        // messages from outside the ChannelRouter to be processed by the
471
        // networkHandler.
472
        networkUpdates chan *routingMsg
473

474
        // topologyClients maps a client's unique notification ID to a
475
        // topologyClient client that contains its notification dispatch
476
        // channel.
477
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
478

479
        // ntfnClientUpdates is a channel that's used to send new updates to
480
        // topology notification clients to the ChannelRouter. Updates either
481
        // add a new notification client, or cancel notifications for an
482
        // existing client.
483
        ntfnClientUpdates chan *topologyClientUpdate
484

485
        // channelEdgeMtx is a mutex we use to make sure we process only one
486
        // ChannelEdgePolicy at a time for a given channelID, to ensure
487
        // consistency between the various database accesses.
488
        channelEdgeMtx *multimutex.Mutex[uint64]
489

490
        // statTicker is a resumable ticker that logs the router's progress as
491
        // it discovers channels or receives updates.
492
        statTicker ticker.Ticker
493

494
        // stats tracks newly processed channels, updates, and node
495
        // announcements over a window of defaultStatInterval.
496
        stats *routerStats
497

498
        quit chan struct{}
499
        wg   sync.WaitGroup
500
}
501

502
// A compile time check to ensure ChannelRouter implements the
503
// ChannelGraphSource interface.
504
var _ ChannelGraphSource = (*ChannelRouter)(nil)
505

506
// New creates a new instance of the ChannelRouter with the specified
507
// configuration parameters. As part of initialization, if the router detects
508
// that the channel graph isn't fully in sync with the latest UTXO (since the
509
// channel graph is a subset of the UTXO set) set, then the router will proceed
510
// to fully sync to the latest state of the UTXO set.
511
func New(cfg Config) (*ChannelRouter, error) {
4✔
512
        selfNode, err := cfg.Graph.SourceNode()
4✔
513
        if err != nil {
4✔
514
                return nil, err
×
515
        }
×
516

517
        r := &ChannelRouter{
4✔
518
                cfg: &cfg,
4✔
519
                cachedGraph: &CachedGraph{
4✔
520
                        graph:  cfg.Graph,
4✔
521
                        source: selfNode.PubKeyBytes,
4✔
522
                },
4✔
523
                networkUpdates:    make(chan *routingMsg),
4✔
524
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
4✔
525
                ntfnClientUpdates: make(chan *topologyClientUpdate),
4✔
526
                channelEdgeMtx:    multimutex.NewMutex[uint64](),
4✔
527
                selfNode:          selfNode,
4✔
528
                statTicker:        ticker.New(defaultStatInterval),
4✔
529
                stats:             new(routerStats),
4✔
530
                quit:              make(chan struct{}),
4✔
531
        }
4✔
532

4✔
533
        return r, nil
4✔
534
}
535

536
// Start launches all the goroutines the ChannelRouter requires to carry out
537
// its duties. If the router has already been started, then this method is a
538
// noop.
539
func (r *ChannelRouter) Start() error {
4✔
540
        if !atomic.CompareAndSwapUint32(&r.started, 0, 1) {
4✔
541
                return nil
×
542
        }
×
543

544
        log.Info("Channel Router starting")
4✔
545

4✔
546
        bestHash, bestHeight, err := r.cfg.Chain.GetBestBlock()
4✔
547
        if err != nil {
4✔
548
                return err
×
549
        }
×
550

551
        // If the graph has never been pruned, or hasn't fully been created yet,
552
        // then we don't treat this as an explicit error.
553
        if _, _, err := r.cfg.Graph.PruneTip(); err != nil {
8✔
554
                switch {
4✔
555
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
4✔
556
                        fallthrough
4✔
557

558
                case errors.Is(err, channeldb.ErrGraphNotFound):
4✔
559
                        // If the graph has never been pruned, then we'll set
4✔
560
                        // the prune height to the current best height of the
4✔
561
                        // chain backend.
4✔
562
                        _, err = r.cfg.Graph.PruneGraph(
4✔
563
                                nil, bestHash, uint32(bestHeight),
4✔
564
                        )
4✔
565
                        if err != nil {
4✔
566
                                return err
×
567
                        }
×
568

569
                default:
×
570
                        return err
×
571
                }
572
        }
573

574
        // If AssumeChannelValid is present, then we won't rely on pruning
575
        // channels from the graph based on their spentness, but whether they
576
        // are considered zombies or not. We will start zombie pruning after a
577
        // small delay, to avoid slowing down startup of lnd.
578
        if r.cfg.AssumeChannelValid {
4✔
579
                time.AfterFunc(r.cfg.FirstTimePruneDelay, func() {
×
580
                        select {
×
581
                        case <-r.quit:
×
582
                                return
×
583
                        default:
×
584
                        }
585

586
                        log.Info("Initial zombie prune starting")
×
587
                        if err := r.pruneZombieChans(); err != nil {
×
588
                                log.Errorf("Unable to prune zombies: %v", err)
×
589
                        }
×
590
                })
591
        } else {
4✔
592
                // Otherwise, we'll use our filtered chain view to prune
4✔
593
                // channels as soon as they are detected as spent on-chain.
4✔
594
                if err := r.cfg.ChainView.Start(); err != nil {
4✔
595
                        return err
×
596
                }
×
597

598
                // Once the instance is active, we'll fetch the channel we'll
599
                // receive notifications over.
600
                r.newBlocks = r.cfg.ChainView.FilteredBlocks()
4✔
601
                r.staleBlocks = r.cfg.ChainView.DisconnectedBlocks()
4✔
602

4✔
603
                // Before we perform our manual block pruning, we'll construct
4✔
604
                // and apply a fresh chain filter to the active
4✔
605
                // FilteredChainView instance.  We do this before, as otherwise
4✔
606
                // we may miss on-chain events as the filter hasn't properly
4✔
607
                // been applied.
4✔
608
                channelView, err := r.cfg.Graph.ChannelView()
4✔
609
                if err != nil && !errors.Is(
4✔
610
                        err, channeldb.ErrGraphNoEdgesFound,
4✔
611
                ) {
4✔
612

×
613
                        return err
×
614
                }
×
615

616
                log.Infof("Filtering chain using %v channels active",
4✔
617
                        len(channelView))
4✔
618

4✔
619
                if len(channelView) != 0 {
8✔
620
                        err = r.cfg.ChainView.UpdateFilter(
4✔
621
                                channelView, uint32(bestHeight),
4✔
622
                        )
4✔
623
                        if err != nil {
4✔
624
                                return err
×
625
                        }
×
626
                }
627

628
                // The graph pruning might have taken a while and there could be
629
                // new blocks available.
630
                _, bestHeight, err = r.cfg.Chain.GetBestBlock()
4✔
631
                if err != nil {
4✔
632
                        return err
×
633
                }
×
634
                r.bestHeight = uint32(bestHeight)
4✔
635

4✔
636
                // Before we begin normal operation of the router, we first need
4✔
637
                // to synchronize the channel graph to the latest state of the
4✔
638
                // UTXO set.
4✔
639
                if err := r.syncGraphWithChain(); err != nil {
4✔
640
                        return err
×
641
                }
×
642

643
                // Finally, before we proceed, we'll prune any unconnected nodes
644
                // from the graph in order to ensure we maintain a tight graph
645
                // of "useful" nodes.
646
                err = r.cfg.Graph.PruneGraphNodes()
4✔
647
                if err != nil && !errors.Is(
4✔
648
                        err, channeldb.ErrGraphNodesNotFound,
4✔
649
                ) {
4✔
650

×
651
                        return err
×
652
                }
×
653
        }
654

655
        // If any payments are still in flight, we resume, to make sure their
656
        // results are properly handled.
657
        payments, err := r.cfg.Control.FetchInFlightPayments()
4✔
658
        if err != nil {
4✔
659
                return err
×
660
        }
×
661

662
        // Before we restart existing payments and start accepting more
663
        // payments to be made, we clean the network result store of the
664
        // Switch. We do this here at startup to ensure no more payments can be
665
        // made concurrently, so we know the toKeep map will be up-to-date
666
        // until the cleaning has finished.
667
        toKeep := make(map[uint64]struct{})
4✔
668
        for _, p := range payments {
8✔
669
                for _, a := range p.HTLCs {
8✔
670
                        toKeep[a.AttemptID] = struct{}{}
4✔
671
                }
4✔
672
        }
673

674
        log.Debugf("Cleaning network result store.")
4✔
675
        if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
4✔
676
                return err
×
677
        }
×
678

679
        for _, payment := range payments {
8✔
680
                log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier)
4✔
681
                r.wg.Add(1)
4✔
682
                go func(payment *channeldb.MPPayment) {
8✔
683
                        defer r.wg.Done()
4✔
684

4✔
685
                        // Get the hashes used for the outstanding HTLCs.
4✔
686
                        htlcs := make(map[uint64]lntypes.Hash)
4✔
687
                        for _, a := range payment.HTLCs {
8✔
688
                                a := a
4✔
689

4✔
690
                                // We check whether the individual attempts
4✔
691
                                // have their HTLC hash set, if not we'll fall
4✔
692
                                // back to the overall payment hash.
4✔
693
                                hash := payment.Info.PaymentIdentifier
4✔
694
                                if a.Hash != nil {
8✔
695
                                        hash = *a.Hash
4✔
696
                                }
4✔
697

698
                                htlcs[a.AttemptID] = hash
4✔
699
                        }
700

701
                        // Since we are not supporting creating more shards
702
                        // after a restart (only receiving the result of the
703
                        // shards already outstanding), we create a simple
704
                        // shard tracker that will map the attempt IDs to
705
                        // hashes used for the HTLCs. This will be enough also
706
                        // for AMP payments, since we only need the hashes for
707
                        // the individual HTLCs to regenerate the circuits, and
708
                        // we don't currently persist the root share necessary
709
                        // to re-derive them.
710
                        shardTracker := shards.NewSimpleShardTracker(
4✔
711
                                payment.Info.PaymentIdentifier, htlcs,
4✔
712
                        )
4✔
713

4✔
714
                        // We create a dummy, empty payment session such that
4✔
715
                        // we won't make another payment attempt when the
4✔
716
                        // result for the in-flight attempt is received.
4✔
717
                        paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
4✔
718

4✔
719
                        // We pass in a non-timeout context, to indicate we
4✔
720
                        // don't need it to timeout. It will stop immediately
4✔
721
                        // after the existing attempt has finished anyway. We
4✔
722
                        // also set a zero fee limit, as no more routes should
4✔
723
                        // be tried.
4✔
724
                        noTimeout := time.Duration(0)
4✔
725
                        _, _, err := r.sendPayment(
4✔
726
                                context.Background(), 0,
4✔
727
                                payment.Info.PaymentIdentifier, noTimeout,
4✔
728
                                paySession, shardTracker,
4✔
729
                        )
4✔
730
                        if err != nil {
8✔
731
                                log.Errorf("Resuming payment %v failed: %v.",
4✔
732
                                        payment.Info.PaymentIdentifier, err)
4✔
733
                                return
4✔
734
                        }
4✔
735

736
                        log.Infof("Resumed payment %v completed.",
4✔
737
                                payment.Info.PaymentIdentifier)
4✔
738
                }(payment)
739
        }
740

741
        r.wg.Add(1)
4✔
742
        go r.networkHandler()
4✔
743

4✔
744
        return nil
4✔
745
}
746

747
// Stop signals the ChannelRouter to gracefully halt all routines. This method
748
// will *block* until all goroutines have excited. If the channel router has
749
// already stopped then this method will return immediately.
750
func (r *ChannelRouter) Stop() error {
4✔
751
        if !atomic.CompareAndSwapUint32(&r.stopped, 0, 1) {
4✔
752
                return nil
×
753
        }
×
754

755
        log.Info("Channel Router shutting down...")
4✔
756
        defer log.Debug("Channel Router shutdown complete")
4✔
757

4✔
758
        // Our filtered chain view could've only been started if
4✔
759
        // AssumeChannelValid isn't present.
4✔
760
        if !r.cfg.AssumeChannelValid {
8✔
761
                if err := r.cfg.ChainView.Stop(); err != nil {
4✔
762
                        return err
×
763
                }
×
764
        }
765

766
        close(r.quit)
4✔
767
        r.wg.Wait()
4✔
768

4✔
769
        return nil
4✔
770
}
771

772
// syncGraphWithChain attempts to synchronize the current channel graph with
773
// the latest UTXO set state. This process involves pruning from the channel
774
// graph any channels which have been closed by spending their funding output
775
// since we've been down.
776
func (r *ChannelRouter) syncGraphWithChain() error {
4✔
777
        // First, we'll need to check to see if we're already in sync with the
4✔
778
        // latest state of the UTXO set.
4✔
779
        bestHash, bestHeight, err := r.cfg.Chain.GetBestBlock()
4✔
780
        if err != nil {
4✔
781
                return err
×
782
        }
×
783
        r.bestHeight = uint32(bestHeight)
4✔
784

4✔
785
        pruneHash, pruneHeight, err := r.cfg.Graph.PruneTip()
4✔
786
        if err != nil {
4✔
787
                switch {
×
788
                // If the graph has never been pruned, or hasn't fully been
789
                // created yet, then we don't treat this as an explicit error.
790
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
×
791
                case errors.Is(err, channeldb.ErrGraphNotFound):
×
792
                default:
×
793
                        return err
×
794
                }
795
        }
796

797
        log.Infof("Prune tip for Channel Graph: height=%v, hash=%v",
4✔
798
                pruneHeight, pruneHash)
4✔
799

4✔
800
        switch {
4✔
801

802
        // If the graph has never been pruned, then we can exit early as this
803
        // entails it's being created for the first time and hasn't seen any
804
        // block or created channels.
805
        case pruneHeight == 0 || pruneHash == nil:
×
806
                return nil
×
807

808
        // If the block hashes and heights match exactly, then we don't need to
809
        // prune the channel graph as we're already fully in sync.
810
        case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
4✔
811
                return nil
4✔
812
        }
813

814
        // If the main chain blockhash at prune height is different from the
815
        // prune hash, this might indicate the database is on a stale branch.
816
        mainBlockHash, err := r.cfg.Chain.GetBlockHash(int64(pruneHeight))
4✔
817
        if err != nil {
4✔
818
                return err
×
819
        }
×
820

821
        // While we are on a stale branch of the chain, walk backwards to find
822
        // first common block.
823
        for !pruneHash.IsEqual(mainBlockHash) {
4✔
824
                log.Infof("channel graph is stale. Disconnecting block %v "+
×
825
                        "(hash=%v)", pruneHeight, pruneHash)
×
826
                // Prune the graph for every channel that was opened at height
×
827
                // >= pruneHeight.
×
828
                _, err := r.cfg.Graph.DisconnectBlockAtHeight(pruneHeight)
×
829
                if err != nil {
×
830
                        return err
×
831
                }
×
832

833
                pruneHash, pruneHeight, err = r.cfg.Graph.PruneTip()
×
834
                if err != nil {
×
835
                        switch {
×
836
                        // If at this point the graph has never been pruned, we
837
                        // can exit as this entails we are back to the point
838
                        // where it hasn't seen any block or created channels,
839
                        // alas there's nothing left to prune.
840
                        case errors.Is(err, channeldb.ErrGraphNeverPruned):
×
841
                                return nil
×
842

843
                        case errors.Is(err, channeldb.ErrGraphNotFound):
×
844
                                return nil
×
845

846
                        default:
×
847
                                return err
×
848
                        }
849
                }
850
                mainBlockHash, err = r.cfg.Chain.GetBlockHash(int64(pruneHeight))
×
851
                if err != nil {
×
852
                        return err
×
853
                }
×
854
        }
855

856
        log.Infof("Syncing channel graph from height=%v (hash=%v) to height=%v "+
4✔
857
                "(hash=%v)", pruneHeight, pruneHash, bestHeight, bestHash)
4✔
858

4✔
859
        // If we're not yet caught up, then we'll walk forward in the chain
4✔
860
        // pruning the channel graph with each new block that hasn't yet been
4✔
861
        // consumed by the channel graph.
4✔
862
        var spentOutputs []*wire.OutPoint
4✔
863
        for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ {
8✔
864
                // Break out of the rescan early if a shutdown has been
4✔
865
                // requested, otherwise long rescans will block the daemon from
4✔
866
                // shutting down promptly.
4✔
867
                select {
4✔
868
                case <-r.quit:
×
869
                        return ErrRouterShuttingDown
×
870
                default:
4✔
871
                }
872

873
                // Using the next height, request a manual block pruning from
874
                // the chainview for the particular block hash.
875
                log.Infof("Filtering block for closed channels, at height: %v",
4✔
876
                        int64(nextHeight))
4✔
877
                nextHash, err := r.cfg.Chain.GetBlockHash(int64(nextHeight))
4✔
878
                if err != nil {
4✔
879
                        return err
×
880
                }
×
881
                log.Tracef("Running block filter on block with hash: %v",
4✔
882
                        nextHash)
4✔
883
                filterBlock, err := r.cfg.ChainView.FilterBlock(nextHash)
4✔
884
                if err != nil {
4✔
885
                        return err
×
886
                }
×
887

888
                // We're only interested in all prior outputs that have been
889
                // spent in the block, so collate all the referenced previous
890
                // outpoints within each tx and input.
891
                for _, tx := range filterBlock.Transactions {
8✔
892
                        for _, txIn := range tx.TxIn {
8✔
893
                                spentOutputs = append(spentOutputs,
4✔
894
                                        &txIn.PreviousOutPoint)
4✔
895
                        }
4✔
896
                }
897
        }
898

899
        // With the spent outputs gathered, attempt to prune the channel graph,
900
        // also passing in the best hash+height so the prune tip can be updated.
901
        closedChans, err := r.cfg.Graph.PruneGraph(
4✔
902
                spentOutputs, bestHash, uint32(bestHeight),
4✔
903
        )
4✔
904
        if err != nil {
4✔
905
                return err
×
906
        }
×
907

908
        log.Infof("Graph pruning complete: %v channels were closed since "+
4✔
909
                "height %v", len(closedChans), pruneHeight)
4✔
910
        return nil
4✔
911
}
912

913
// isZombieChannel takes two edge policy updates and determines if the
914
// corresponding channel should be considered a zombie. The first boolean is
915
// true if the policy update from node 1 is considered a zombie, the second
916
// boolean is that of node 2, and the final boolean is true if the channel
917
// is considered a zombie.
918
func (r *ChannelRouter) isZombieChannel(e1,
919
        e2 *models.ChannelEdgePolicy) (bool, bool, bool) {
×
920

×
921
        chanExpiry := r.cfg.ChannelPruneExpiry
×
922

×
923
        e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
×
924
        e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
×
925

×
926
        var e1Time, e2Time time.Time
×
927
        if e1 != nil {
×
928
                e1Time = e1.LastUpdate
×
929
        }
×
930
        if e2 != nil {
×
931
                e2Time = e2.LastUpdate
×
932
        }
×
933

934
        return e1Zombie, e2Zombie, r.IsZombieChannel(e1Time, e2Time)
×
935
}
936

937
// IsZombieChannel takes the timestamps of the latest channel updates for a
938
// channel and returns true if the channel should be considered a zombie based
939
// on these timestamps.
940
func (r *ChannelRouter) IsZombieChannel(updateTime1,
941
        updateTime2 time.Time) bool {
4✔
942

4✔
943
        chanExpiry := r.cfg.ChannelPruneExpiry
4✔
944

4✔
945
        e1Zombie := updateTime1.IsZero() ||
4✔
946
                time.Since(updateTime1) >= chanExpiry
4✔
947

4✔
948
        e2Zombie := updateTime2.IsZero() ||
4✔
949
                time.Since(updateTime2) >= chanExpiry
4✔
950

4✔
951
        // If we're using strict zombie pruning, then a channel is only
4✔
952
        // considered live if both edges have a recent update we know of.
4✔
953
        if r.cfg.StrictZombiePruning {
5✔
954
                return e1Zombie || e2Zombie
1✔
955
        }
1✔
956

957
        // Otherwise, if we're using the less strict variant, then a channel is
958
        // considered live if either of the edges have a recent update.
959
        return e1Zombie && e2Zombie
3✔
960
}
961

962
// pruneZombieChans is a method that will be called periodically to prune out
963
// any "zombie" channels. We consider channels zombies if *both* edges haven't
964
// been updated since our zombie horizon. If AssumeChannelValid is present,
965
// we'll also consider channels zombies if *both* edges are disabled. This
966
// usually signals that a channel has been closed on-chain. We do this
967
// periodically to keep a healthy, lively routing table.
968
func (r *ChannelRouter) pruneZombieChans() error {
×
969
        chansToPrune := make(map[uint64]struct{})
×
970
        chanExpiry := r.cfg.ChannelPruneExpiry
×
971

×
972
        log.Infof("Examining channel graph for zombie channels")
×
973

×
974
        // A helper method to detect if the channel belongs to this node
×
975
        isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool {
×
976
                return info.NodeKey1Bytes == r.selfNode.PubKeyBytes ||
×
977
                        info.NodeKey2Bytes == r.selfNode.PubKeyBytes
×
978
        }
×
979

980
        // First, we'll collect all the channels which are eligible for garbage
981
        // collection due to being zombies.
982
        filterPruneChans := func(info *models.ChannelEdgeInfo,
×
983
                e1, e2 *models.ChannelEdgePolicy) error {
×
984

×
985
                // Exit early in case this channel is already marked to be
×
986
                // pruned
×
987
                _, markedToPrune := chansToPrune[info.ChannelID]
×
988
                if markedToPrune {
×
989
                        return nil
×
990
                }
×
991

992
                // We'll ensure that we don't attempt to prune our *own*
993
                // channels from the graph, as in any case this should be
994
                // re-advertised by the sub-system above us.
995
                if isSelfChannelEdge(info) {
×
996
                        return nil
×
997
                }
×
998

999
                e1Zombie, e2Zombie, isZombieChan := r.isZombieChannel(e1, e2)
×
1000

×
1001
                if e1Zombie {
×
1002
                        log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
×
1003
                                info.NodeKey1Bytes, info.ChannelID)
×
1004
                }
×
1005

1006
                if e2Zombie {
×
1007
                        log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie",
×
1008
                                info.NodeKey2Bytes, info.ChannelID)
×
1009
                }
×
1010

1011
                // If either edge hasn't been updated for a period of
1012
                // chanExpiry, then we'll mark the channel itself as eligible
1013
                // for graph pruning.
1014
                if !isZombieChan {
×
1015
                        return nil
×
1016
                }
×
1017

1018
                log.Debugf("ChannelID(%v) is a zombie, collecting to prune",
×
1019
                        info.ChannelID)
×
1020

×
1021
                // TODO(roasbeef): add ability to delete single directional edge
×
1022
                chansToPrune[info.ChannelID] = struct{}{}
×
1023

×
1024
                return nil
×
1025
        }
1026

1027
        // If AssumeChannelValid is present we'll look at the disabled bit for
1028
        // both edges. If they're both disabled, then we can interpret this as
1029
        // the channel being closed and can prune it from our graph.
1030
        if r.cfg.AssumeChannelValid {
×
1031
                disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs()
×
1032
                if err != nil {
×
1033
                        return fmt.Errorf("unable to get disabled channels "+
×
1034
                                "ids chans: %v", err)
×
1035
                }
×
1036

1037
                disabledEdges, err := r.cfg.Graph.FetchChanInfos(
×
1038
                        nil, disabledChanIDs,
×
1039
                )
×
1040
                if err != nil {
×
1041
                        return fmt.Errorf("unable to fetch disabled channels "+
×
1042
                                "edges chans: %v", err)
×
1043
                }
×
1044

1045
                // Ensuring we won't prune our own channel from the graph.
1046
                for _, disabledEdge := range disabledEdges {
×
1047
                        if !isSelfChannelEdge(disabledEdge.Info) {
×
1048
                                chansToPrune[disabledEdge.Info.ChannelID] =
×
1049
                                        struct{}{}
×
1050
                        }
×
1051
                }
1052
        }
1053

1054
        startTime := time.Unix(0, 0)
×
1055
        endTime := time.Now().Add(-1 * chanExpiry)
×
1056
        oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
×
1057
        if err != nil {
×
1058
                return fmt.Errorf("unable to fetch expired channel updates "+
×
1059
                        "chans: %v", err)
×
1060
        }
×
1061

1062
        for _, u := range oldEdges {
×
1063
                err = filterPruneChans(u.Info, u.Policy1, u.Policy2)
×
1064
                if err != nil {
×
1065
                        log.Warnf("Filter pruning channels: %w\n", err)
×
1066
                }
×
1067
        }
1068

1069
        log.Infof("Pruning %v zombie channels", len(chansToPrune))
×
1070
        if len(chansToPrune) == 0 {
×
1071
                return nil
×
1072
        }
×
1073

1074
        // With the set of zombie-like channels obtained, we'll do another pass
1075
        // to delete them from the channel graph.
1076
        toPrune := make([]uint64, 0, len(chansToPrune))
×
1077
        for chanID := range chansToPrune {
×
1078
                toPrune = append(toPrune, chanID)
×
1079
                log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
×
1080
        }
×
1081
        err = r.cfg.Graph.DeleteChannelEdges(
×
1082
                r.cfg.StrictZombiePruning, true, toPrune...,
×
1083
        )
×
1084
        if err != nil {
×
1085
                return fmt.Errorf("unable to delete zombie channels: %w", err)
×
1086
        }
×
1087

1088
        // With the channels pruned, we'll also attempt to prune any nodes that
1089
        // were a part of them.
1090
        err = r.cfg.Graph.PruneGraphNodes()
×
1091
        if err != nil && !errors.Is(err, channeldb.ErrGraphNodesNotFound) {
×
1092
                return fmt.Errorf("unable to prune graph nodes: %w", err)
×
1093
        }
×
1094

1095
        return nil
×
1096
}
1097

1098
// handleNetworkUpdate is responsible for processing the update message and
1099
// notifies topology changes, if any.
1100
//
1101
// NOTE: must be run inside goroutine.
1102
func (r *ChannelRouter) handleNetworkUpdate(vb *ValidationBarrier,
1103
        update *routingMsg) {
4✔
1104

4✔
1105
        defer r.wg.Done()
4✔
1106
        defer vb.CompleteJob()
4✔
1107

4✔
1108
        // If this message has an existing dependency, then we'll wait until
4✔
1109
        // that has been fully validated before we proceed.
4✔
1110
        err := vb.WaitForDependants(update.msg)
4✔
1111
        if err != nil {
4✔
1112
                switch {
×
1113
                case IsError(err, ErrVBarrierShuttingDown):
×
1114
                        update.err <- err
×
1115

1116
                case IsError(err, ErrParentValidationFailed):
×
1117
                        update.err <- newErrf(ErrIgnored, err.Error())
×
1118

1119
                default:
×
1120
                        log.Warnf("unexpected error during validation "+
×
1121
                                "barrier shutdown: %v", err)
×
1122
                        update.err <- err
×
1123
                }
1124

1125
                return
×
1126
        }
1127

1128
        // Process the routing update to determine if this is either a new
1129
        // update from our PoV or an update to a prior vertex/edge we
1130
        // previously accepted.
1131
        err = r.processUpdate(update.msg, update.op...)
4✔
1132
        update.err <- err
4✔
1133

4✔
1134
        // If this message had any dependencies, then we can now signal them to
4✔
1135
        // continue.
4✔
1136
        allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
4✔
1137
        vb.SignalDependants(update.msg, allowDependents)
4✔
1138

4✔
1139
        // If the error is not nil here, there's no need to send topology
4✔
1140
        // change.
4✔
1141
        if err != nil {
8✔
1142
                // We now decide to log an error or not. If allowDependents is
4✔
1143
                // false, it means there is an error and the error is neither
4✔
1144
                // ErrIgnored nor ErrOutdated. In this case, we'll log an error.
4✔
1145
                // Otherwise, we'll add debug log only.
4✔
1146
                if allowDependents {
8✔
1147
                        log.Debugf("process network updates got: %v", err)
4✔
1148
                } else {
4✔
1149
                        log.Errorf("process network updates got: %v", err)
×
1150
                }
×
1151

1152
                return
4✔
1153
        }
1154

1155
        // Otherwise, we'll send off a new notification for the newly accepted
1156
        // update, if any.
1157
        topChange := &TopologyChange{}
4✔
1158
        err = addToTopologyChange(r.cfg.Graph, topChange, update.msg)
4✔
1159
        if err != nil {
4✔
1160
                log.Errorf("unable to update topology change notification: %v",
×
1161
                        err)
×
1162
                return
×
1163
        }
×
1164

1165
        if !topChange.isEmpty() {
8✔
1166
                r.notifyTopologyChange(topChange)
4✔
1167
        }
4✔
1168
}
1169

1170
// networkHandler is the primary goroutine for the ChannelRouter. The roles of
1171
// this goroutine include answering queries related to the state of the
1172
// network, pruning the graph on new block notification, applying network
1173
// updates, and registering new topology clients.
1174
//
1175
// NOTE: This MUST be run as a goroutine.
1176
func (r *ChannelRouter) networkHandler() {
4✔
1177
        defer r.wg.Done()
4✔
1178

4✔
1179
        graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval)
4✔
1180
        defer graphPruneTicker.Stop()
4✔
1181

4✔
1182
        defer r.statTicker.Stop()
4✔
1183

4✔
1184
        r.stats.Reset()
4✔
1185

4✔
1186
        // We'll use this validation barrier to ensure that we process all jobs
4✔
1187
        // in the proper order during parallel validation.
4✔
1188
        //
4✔
1189
        // NOTE: For AssumeChannelValid, we bump up the maximum number of
4✔
1190
        // concurrent validation requests since there are no blocks being
4✔
1191
        // fetched. This significantly increases the performance of IGD for
4✔
1192
        // neutrino nodes.
4✔
1193
        //
4✔
1194
        // However, we dial back to use multiple of the number of cores when
4✔
1195
        // fully validating, to avoid fetching up to 1000 blocks from the
4✔
1196
        // backend. On bitcoind, this will empirically cause massive latency
4✔
1197
        // spikes when executing this many concurrent RPC calls. Critical
4✔
1198
        // subsystems or basic rpc calls that rely on calls such as GetBestBlock
4✔
1199
        // will hang due to excessive load.
4✔
1200
        //
4✔
1201
        // See https://github.com/lightningnetwork/lnd/issues/4892.
4✔
1202
        var validationBarrier *ValidationBarrier
4✔
1203
        if r.cfg.AssumeChannelValid {
4✔
1204
                validationBarrier = NewValidationBarrier(1000, r.quit)
×
1205
        } else {
4✔
1206
                validationBarrier = NewValidationBarrier(
4✔
1207
                        4*runtime.NumCPU(), r.quit,
4✔
1208
                )
4✔
1209
        }
4✔
1210

1211
        for {
8✔
1212

4✔
1213
                // If there are stats, resume the statTicker.
4✔
1214
                if !r.stats.Empty() {
8✔
1215
                        r.statTicker.Resume()
4✔
1216
                }
4✔
1217

1218
                select {
4✔
1219
                // A new fully validated network update has just arrived. As a
1220
                // result we'll modify the channel graph accordingly depending
1221
                // on the exact type of the message.
1222
                case update := <-r.networkUpdates:
4✔
1223
                        // We'll set up any dependants, and wait until a free
4✔
1224
                        // slot for this job opens up, this allows us to not
4✔
1225
                        // have thousands of goroutines active.
4✔
1226
                        validationBarrier.InitJobDependencies(update.msg)
4✔
1227

4✔
1228
                        r.wg.Add(1)
4✔
1229
                        go r.handleNetworkUpdate(validationBarrier, update)
4✔
1230

1231
                        // TODO(roasbeef): remove all unconnected vertexes
1232
                        // after N blocks pass with no corresponding
1233
                        // announcements.
1234

1235
                case chainUpdate, ok := <-r.staleBlocks:
3✔
1236
                        // If the channel has been closed, then this indicates
3✔
1237
                        // the daemon is shutting down, so we exit ourselves.
3✔
1238
                        if !ok {
3✔
1239
                                return
×
1240
                        }
×
1241

1242
                        // Since this block is stale, we update our best height
1243
                        // to the previous block.
1244
                        blockHeight := chainUpdate.Height
3✔
1245
                        atomic.StoreUint32(&r.bestHeight, blockHeight-1)
3✔
1246

3✔
1247
                        // Update the channel graph to reflect that this block
3✔
1248
                        // was disconnected.
3✔
1249
                        _, err := r.cfg.Graph.DisconnectBlockAtHeight(blockHeight)
3✔
1250
                        if err != nil {
3✔
1251
                                log.Errorf("unable to prune graph with stale "+
×
1252
                                        "block: %v", err)
×
1253
                                continue
×
1254
                        }
1255

1256
                        // TODO(halseth): notify client about the reorg?
1257

1258
                // A new block has arrived, so we can prune the channel graph
1259
                // of any channels which were closed in the block.
1260
                case chainUpdate, ok := <-r.newBlocks:
4✔
1261
                        // If the channel has been closed, then this indicates
4✔
1262
                        // the daemon is shutting down, so we exit ourselves.
4✔
1263
                        if !ok {
4✔
1264
                                return
×
1265
                        }
×
1266

1267
                        // We'll ensure that any new blocks received attach
1268
                        // directly to the end of our main chain. If not, then
1269
                        // we've somehow missed some blocks. Here we'll catch
1270
                        // up the chain with the latest blocks.
1271
                        currentHeight := atomic.LoadUint32(&r.bestHeight)
4✔
1272
                        switch {
4✔
1273
                        case chainUpdate.Height == currentHeight+1:
4✔
1274
                                err := r.updateGraphWithClosedChannels(
4✔
1275
                                        chainUpdate,
4✔
1276
                                )
4✔
1277
                                if err != nil {
4✔
1278
                                        log.Errorf("unable to prune graph "+
×
1279
                                                "with closed channels: %v", err)
×
1280
                                }
×
1281

1282
                        case chainUpdate.Height > currentHeight+1:
×
1283
                                log.Errorf("out of order block: expecting "+
×
1284
                                        "height=%v, got height=%v",
×
1285
                                        currentHeight+1, chainUpdate.Height)
×
1286

×
1287
                                err := r.getMissingBlocks(currentHeight, chainUpdate)
×
1288
                                if err != nil {
×
1289
                                        log.Errorf("unable to retrieve missing"+
×
1290
                                                "blocks: %v", err)
×
1291
                                }
×
1292

1293
                        case chainUpdate.Height < currentHeight+1:
×
1294
                                log.Errorf("out of order block: expecting "+
×
1295
                                        "height=%v, got height=%v",
×
1296
                                        currentHeight+1, chainUpdate.Height)
×
1297

×
1298
                                log.Infof("Skipping channel pruning since "+
×
1299
                                        "received block height %v was already"+
×
1300
                                        " processed.", chainUpdate.Height)
×
1301
                        }
1302

1303
                // A new notification client update has arrived. We're either
1304
                // gaining a new client, or cancelling notifications for an
1305
                // existing client.
1306
                case ntfnUpdate := <-r.ntfnClientUpdates:
4✔
1307
                        clientID := ntfnUpdate.clientID
4✔
1308

4✔
1309
                        if ntfnUpdate.cancel {
8✔
1310
                                client, ok := r.topologyClients.LoadAndDelete(
4✔
1311
                                        clientID,
4✔
1312
                                )
4✔
1313
                                if ok {
8✔
1314
                                        close(client.exit)
4✔
1315
                                        client.wg.Wait()
4✔
1316

4✔
1317
                                        close(client.ntfnChan)
4✔
1318
                                }
4✔
1319

1320
                                continue
4✔
1321
                        }
1322

1323
                        r.topologyClients.Store(clientID, &topologyClient{
4✔
1324
                                ntfnChan: ntfnUpdate.ntfnChan,
4✔
1325
                                exit:     make(chan struct{}),
4✔
1326
                        })
4✔
1327

1328
                // The graph prune ticker has ticked, so we'll examine the
1329
                // state of the known graph to filter out any zombie channels
1330
                // for pruning.
1331
                case <-graphPruneTicker.C:
×
1332
                        if err := r.pruneZombieChans(); err != nil {
×
1333
                                log.Errorf("Unable to prune zombies: %v", err)
×
1334
                        }
×
1335

1336
                // Log any stats if we've processed a non-empty number of
1337
                // channels, updates, or nodes. We'll only pause the ticker if
1338
                // the last window contained no updates to avoid resuming and
1339
                // pausing while consecutive windows contain new info.
1340
                case <-r.statTicker.Ticks():
4✔
1341
                        if !r.stats.Empty() {
8✔
1342
                                log.Infof(r.stats.String())
4✔
1343
                        } else {
6✔
1344
                                r.statTicker.Pause()
2✔
1345
                        }
2✔
1346
                        r.stats.Reset()
4✔
1347

1348
                // The router has been signalled to exit, to we exit our main
1349
                // loop so the wait group can be decremented.
1350
                case <-r.quit:
4✔
1351
                        return
4✔
1352
                }
1353
        }
1354
}
1355

1356
// getMissingBlocks walks through all missing blocks and updates the graph
1357
// closed channels accordingly.
1358
func (r *ChannelRouter) getMissingBlocks(currentHeight uint32,
1359
        chainUpdate *chainview.FilteredBlock) error {
×
1360

×
1361
        outdatedHash, err := r.cfg.Chain.GetBlockHash(int64(currentHeight))
×
1362
        if err != nil {
×
1363
                return err
×
1364
        }
×
1365

1366
        outdatedBlock := &chainntnfs.BlockEpoch{
×
1367
                Height: int32(currentHeight),
×
1368
                Hash:   outdatedHash,
×
1369
        }
×
1370

×
1371
        epochClient, err := r.cfg.Notifier.RegisterBlockEpochNtfn(
×
1372
                outdatedBlock,
×
1373
        )
×
1374
        if err != nil {
×
1375
                return err
×
1376
        }
×
1377
        defer epochClient.Cancel()
×
1378

×
1379
        blockDifference := int(chainUpdate.Height - currentHeight)
×
1380

×
1381
        // We'll walk through all the outdated blocks and make sure we're able
×
1382
        // to update the graph with any closed channels from them.
×
1383
        for i := 0; i < blockDifference; i++ {
×
1384
                var (
×
1385
                        missingBlock *chainntnfs.BlockEpoch
×
1386
                        ok           bool
×
1387
                )
×
1388

×
1389
                select {
×
1390
                case missingBlock, ok = <-epochClient.Epochs:
×
1391
                        if !ok {
×
1392
                                return nil
×
1393
                        }
×
1394

1395
                case <-r.quit:
×
1396
                        return nil
×
1397
                }
1398

1399
                filteredBlock, err := r.cfg.ChainView.FilterBlock(
×
1400
                        missingBlock.Hash,
×
1401
                )
×
1402
                if err != nil {
×
1403
                        return err
×
1404
                }
×
1405

1406
                err = r.updateGraphWithClosedChannels(
×
1407
                        filteredBlock,
×
1408
                )
×
1409
                if err != nil {
×
1410
                        return err
×
1411
                }
×
1412
        }
1413

1414
        return nil
×
1415
}
1416

1417
// updateGraphWithClosedChannels prunes the channel graph of closed channels
1418
// that are no longer needed.
1419
func (r *ChannelRouter) updateGraphWithClosedChannels(
1420
        chainUpdate *chainview.FilteredBlock) error {
4✔
1421

4✔
1422
        // Once a new block arrives, we update our running track of the height
4✔
1423
        // of the chain tip.
4✔
1424
        blockHeight := chainUpdate.Height
4✔
1425

4✔
1426
        atomic.StoreUint32(&r.bestHeight, blockHeight)
4✔
1427
        log.Infof("Pruning channel graph using block %v (height=%v)",
4✔
1428
                chainUpdate.Hash, blockHeight)
4✔
1429

4✔
1430
        // We're only interested in all prior outputs that have been spent in
4✔
1431
        // the block, so collate all the referenced previous outpoints within
4✔
1432
        // each tx and input.
4✔
1433
        var spentOutputs []*wire.OutPoint
4✔
1434
        for _, tx := range chainUpdate.Transactions {
8✔
1435
                for _, txIn := range tx.TxIn {
8✔
1436
                        spentOutputs = append(spentOutputs,
4✔
1437
                                &txIn.PreviousOutPoint)
4✔
1438
                }
4✔
1439
        }
1440

1441
        // With the spent outputs gathered, attempt to prune the channel graph,
1442
        // also passing in the hash+height of the block being pruned so the
1443
        // prune tip can be updated.
1444
        chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
4✔
1445
                &chainUpdate.Hash, chainUpdate.Height)
4✔
1446
        if err != nil {
4✔
1447
                log.Errorf("unable to prune routing table: %v", err)
×
1448
                return err
×
1449
        }
×
1450

1451
        log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
4✔
1452
                blockHeight, len(chansClosed))
4✔
1453

4✔
1454
        if len(chansClosed) == 0 {
8✔
1455
                return err
4✔
1456
        }
4✔
1457

1458
        // Notify all currently registered clients of the newly closed channels.
1459
        closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
4✔
1460
        r.notifyTopologyChange(&TopologyChange{
4✔
1461
                ClosedChannels: closeSummaries,
4✔
1462
        })
4✔
1463

4✔
1464
        return nil
4✔
1465
}
1466

1467
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
1468
// the database for the passed node with a timestamp newer than the passed
1469
// timestamp. ErrIgnored will be returned if we already have the node, and
1470
// ErrOutdated will be returned if we have a timestamp that's after the new
1471
// timestamp.
1472
func (r *ChannelRouter) assertNodeAnnFreshness(node route.Vertex,
1473
        msgTimestamp time.Time) error {
4✔
1474

4✔
1475
        // If we are not already aware of this node, it means that we don't
4✔
1476
        // know about any channel using this node. To avoid a DoS attack by
4✔
1477
        // node announcements, we will ignore such nodes. If we do know about
4✔
1478
        // this node, check that this update brings info newer than what we
4✔
1479
        // already have.
4✔
1480
        lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(node)
4✔
1481
        if err != nil {
4✔
1482
                return errors.Errorf("unable to query for the "+
×
1483
                        "existence of node: %v", err)
×
1484
        }
×
1485
        if !exists {
8✔
1486
                return newErrf(ErrIgnored, "Ignoring node announcement"+
4✔
1487
                        " for node not found in channel graph (%x)",
4✔
1488
                        node[:])
4✔
1489
        }
4✔
1490

1491
        // If we've reached this point then we're aware of the vertex being
1492
        // advertised. So we now check if the new message has a new time stamp,
1493
        // if not then we won't accept the new data as it would override newer
1494
        // data.
1495
        if !lastUpdate.Before(msgTimestamp) {
8✔
1496
                return newErrf(ErrOutdated, "Ignoring outdated "+
4✔
1497
                        "announcement for %x", node[:])
4✔
1498
        }
4✔
1499

1500
        return nil
4✔
1501
}
1502

1503
// addZombieEdge adds a channel that failed complete validation into the zombie
1504
// index, so we can avoid having to re-validate it in the future.
1505
func (r *ChannelRouter) addZombieEdge(chanID uint64) error {
×
1506
        // If the edge fails validation we'll mark the edge itself as a zombie,
×
1507
        // so we don't continue to request it. We use the "zero key" for both
×
1508
        // node pubkeys so this edge can't be resurrected.
×
1509
        var zeroKey [33]byte
×
1510
        err := r.cfg.Graph.MarkEdgeZombie(chanID, zeroKey, zeroKey)
×
1511
        if err != nil {
×
1512
                return fmt.Errorf("unable to mark spent chan(id=%v) as a "+
×
1513
                        "zombie: %w", chanID, err)
×
1514
        }
×
1515

1516
        return nil
×
1517
}
1518

1519
// makeFundingScript is used to make the funding script for both segwit v0 and
1520
// segwit v1 (taproot) channels.
1521
//
1522
// TODO(roasbeef: export and use elsewhere?
1523
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
1524
        chanFeatures []byte) ([]byte, error) {
4✔
1525

4✔
1526
        legacyFundingScript := func() ([]byte, error) {
8✔
1527
                witnessScript, err := input.GenMultiSigScript(
4✔
1528
                        bitcoinKey1, bitcoinKey2,
4✔
1529
                )
4✔
1530
                if err != nil {
4✔
1531
                        return nil, err
×
1532
                }
×
1533
                pkScript, err := input.WitnessScriptHash(witnessScript)
4✔
1534
                if err != nil {
4✔
1535
                        return nil, err
×
1536
                }
×
1537

1538
                return pkScript, nil
4✔
1539
        }
1540

1541
        if len(chanFeatures) == 0 {
4✔
1542
                return legacyFundingScript()
×
1543
        }
×
1544

1545
        // In order to make the correct funding script, we'll need to parse the
1546
        // chanFeatures bytes into a feature vector we can interact with.
1547
        rawFeatures := lnwire.NewRawFeatureVector()
4✔
1548
        err := rawFeatures.Decode(bytes.NewReader(chanFeatures))
4✔
1549
        if err != nil {
4✔
1550
                return nil, fmt.Errorf("unable to parse chan feature "+
×
1551
                        "bits: %w", err)
×
1552
        }
×
1553

1554
        chanFeatureBits := lnwire.NewFeatureVector(
4✔
1555
                rawFeatures, lnwire.Features,
4✔
1556
        )
4✔
1557
        if chanFeatureBits.HasFeature(
4✔
1558
                lnwire.SimpleTaprootChannelsOptionalStaging,
4✔
1559
        ) {
8✔
1560

4✔
1561
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
4✔
1562
                if err != nil {
4✔
1563
                        return nil, err
×
1564
                }
×
1565
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
4✔
1566
                if err != nil {
4✔
1567
                        return nil, err
×
1568
                }
×
1569

1570
                fundingScript, _, err := input.GenTaprootFundingScript(
4✔
1571
                        pubKey1, pubKey2, 0,
4✔
1572
                )
4✔
1573
                if err != nil {
4✔
1574
                        return nil, err
×
1575
                }
×
1576

1577
                return fundingScript, nil
4✔
1578
        }
1579

1580
        return legacyFundingScript()
4✔
1581
}
1582

1583
// processUpdate processes a new relate authenticated channel/edge, node or
1584
// channel/edge update network update. If the update didn't affect the internal
1585
// state of the draft due to either being out of date, invalid, or redundant,
1586
// then error is returned.
1587
func (r *ChannelRouter) processUpdate(msg interface{},
1588
        op ...batch.SchedulerOption) error {
4✔
1589

4✔
1590
        switch msg := msg.(type) {
4✔
1591
        case *channeldb.LightningNode:
4✔
1592
                // Before we add the node to the database, we'll check to see
4✔
1593
                // if the announcement is "fresh" or not. If it isn't, then
4✔
1594
                // we'll return an error.
4✔
1595
                err := r.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate)
4✔
1596
                if err != nil {
8✔
1597
                        return err
4✔
1598
                }
4✔
1599

1600
                if err := r.cfg.Graph.AddLightningNode(msg, op...); err != nil {
4✔
1601
                        return errors.Errorf("unable to add node %x to the "+
×
1602
                                "graph: %v", msg.PubKeyBytes, err)
×
1603
                }
×
1604

1605
                log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes)
4✔
1606
                r.stats.incNumNodeUpdates()
4✔
1607

1608
        case *models.ChannelEdgeInfo:
4✔
1609
                log.Debugf("Received ChannelEdgeInfo for channel %v",
4✔
1610
                        msg.ChannelID)
4✔
1611

4✔
1612
                // Prior to processing the announcement we first check if we
4✔
1613
                // already know of this channel, if so, then we can exit early.
4✔
1614
                _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge(
4✔
1615
                        msg.ChannelID,
4✔
1616
                )
4✔
1617
                if err != nil && !errors.Is(
4✔
1618
                        err, channeldb.ErrGraphNoEdgesFound,
4✔
1619
                ) {
4✔
1620

×
1621
                        return errors.Errorf("unable to check for edge "+
×
1622
                                "existence: %v", err)
×
1623
                }
×
1624
                if isZombie {
4✔
1625
                        return newErrf(ErrIgnored, "ignoring msg for zombie "+
×
1626
                                "chan_id=%v", msg.ChannelID)
×
1627
                }
×
1628
                if exists {
8✔
1629
                        return newErrf(ErrIgnored, "ignoring msg for known "+
4✔
1630
                                "chan_id=%v", msg.ChannelID)
4✔
1631
                }
4✔
1632

1633
                // If AssumeChannelValid is present, then we are unable to
1634
                // perform any of the expensive checks below, so we'll
1635
                // short-circuit our path straight to adding the edge to our
1636
                // graph. If the passed ShortChannelID is an alias, then we'll
1637
                // skip validation as it will not map to a legitimate tx. This
1638
                // is not a DoS vector as only we can add an alias
1639
                // ChannelAnnouncement from the gossiper.
1640
                scid := lnwire.NewShortChanIDFromInt(msg.ChannelID)
4✔
1641
                if r.cfg.AssumeChannelValid || r.cfg.IsAlias(scid) {
8✔
1642
                        if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
4✔
1643
                                return fmt.Errorf("unable to add edge: %w", err)
×
1644
                        }
×
1645
                        log.Tracef("New channel discovered! Link "+
4✔
1646
                                "connects %x and %x with ChannelID(%v)",
4✔
1647
                                msg.NodeKey1Bytes, msg.NodeKey2Bytes,
4✔
1648
                                msg.ChannelID)
4✔
1649
                        r.stats.incNumEdgesDiscovered()
4✔
1650

4✔
1651
                        break
4✔
1652
                }
1653

1654
                // Before we can add the channel to the channel graph, we need
1655
                // to obtain the full funding outpoint that's encoded within
1656
                // the channel ID.
1657
                channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
4✔
1658
                fundingTx, err := r.fetchFundingTxWrapper(&channelID)
4✔
1659
                if err != nil {
4✔
1660
                        // In order to ensure we don't erroneously mark a
×
1661
                        // channel as a zombie due to an RPC failure, we'll
×
1662
                        // attempt to string match for the relevant errors.
×
1663
                        //
×
1664
                        // * btcd:
×
1665
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
×
1666
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
×
1667
                        // * bitcoind:
×
1668
                        //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
×
1669
                        //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
×
1670
                        switch {
×
1671
                        case strings.Contains(err.Error(), "not found"):
×
1672
                                fallthrough
×
1673

1674
                        case strings.Contains(err.Error(), "out of range"):
×
1675
                                // If the funding transaction isn't found at
×
1676
                                // all, then we'll mark the edge itself as a
×
1677
                                // zombie, so we don't continue to request it.
×
1678
                                // We use the "zero key" for both node pubkeys
×
1679
                                // so this edge can't be resurrected.
×
1680
                                zErr := r.addZombieEdge(msg.ChannelID)
×
1681
                                if zErr != nil {
×
1682
                                        return zErr
×
1683
                                }
×
1684

1685
                        default:
×
1686
                        }
1687

1688
                        return newErrf(ErrNoFundingTransaction, "unable to "+
×
1689
                                "locate funding tx: %v", err)
×
1690
                }
1691

1692
                // Recreate witness output to be sure that declared in channel
1693
                // edge bitcoin keys and channel value corresponds to the
1694
                // reality.
1695
                fundingPkScript, err := makeFundingScript(
4✔
1696
                        msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:],
4✔
1697
                        msg.Features,
4✔
1698
                )
4✔
1699
                if err != nil {
4✔
1700
                        return err
×
1701
                }
×
1702

1703
                // Next we'll validate that this channel is actually
1704
                // well-formed. If this check fails, then this channel either
1705
                // doesn't exist, or isn't the one that was meant to be created
1706
                // according to the passed channel proofs.
1707
                fundingPoint, err := chanvalidate.Validate(&chanvalidate.Context{
4✔
1708
                        Locator: &chanvalidate.ShortChanIDChanLocator{
4✔
1709
                                ID: channelID,
4✔
1710
                        },
4✔
1711
                        MultiSigPkScript: fundingPkScript,
4✔
1712
                        FundingTx:        fundingTx,
4✔
1713
                })
4✔
1714
                if err != nil {
4✔
1715
                        // Mark the edge as a zombie, so we won't try to
×
1716
                        // re-validate it on start up.
×
1717
                        if err := r.addZombieEdge(msg.ChannelID); err != nil {
×
1718
                                return err
×
1719
                        }
×
1720

1721
                        return newErrf(ErrInvalidFundingOutput, "output "+
×
1722
                                "failed validation: %w", err)
×
1723
                }
1724

1725
                // Now that we have the funding outpoint of the channel, ensure
1726
                // that it hasn't yet been spent. If so, then this channel has
1727
                // been closed, so we'll ignore it.
1728
                chanUtxo, err := r.cfg.Chain.GetUtxo(
4✔
1729
                        fundingPoint, fundingPkScript, channelID.BlockHeight,
4✔
1730
                        r.quit,
4✔
1731
                )
4✔
1732
                if err != nil {
4✔
1733
                        if errors.Is(err, btcwallet.ErrOutputSpent) {
×
1734
                                zErr := r.addZombieEdge(msg.ChannelID)
×
1735
                                if zErr != nil {
×
1736
                                        return zErr
×
1737
                                }
×
1738
                        }
1739

1740
                        return newErrf(ErrChannelSpent, "unable to fetch utxo "+
×
1741
                                "for chan_id=%v, chan_point=%v: %v",
×
1742
                                msg.ChannelID, fundingPoint, err)
×
1743
                }
1744

1745
                // TODO(roasbeef): this is a hack, needs to be removed
1746
                // after commitment fees are dynamic.
1747
                msg.Capacity = btcutil.Amount(chanUtxo.Value)
4✔
1748
                msg.ChannelPoint = *fundingPoint
4✔
1749
                if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
4✔
1750
                        return errors.Errorf("unable to add edge: %v", err)
×
1751
                }
×
1752

1753
                log.Debugf("New channel discovered! Link "+
4✔
1754
                        "connects %x and %x with ChannelPoint(%v): "+
4✔
1755
                        "chan_id=%v, capacity=%v",
4✔
1756
                        msg.NodeKey1Bytes, msg.NodeKey2Bytes,
4✔
1757
                        fundingPoint, msg.ChannelID, msg.Capacity)
4✔
1758
                r.stats.incNumEdgesDiscovered()
4✔
1759

4✔
1760
                // As a new edge has been added to the channel graph, we'll
4✔
1761
                // update the current UTXO filter within our active
4✔
1762
                // FilteredChainView, so we are notified if/when this channel is
4✔
1763
                // closed.
4✔
1764
                filterUpdate := []channeldb.EdgePoint{
4✔
1765
                        {
4✔
1766
                                FundingPkScript: fundingPkScript,
4✔
1767
                                OutPoint:        *fundingPoint,
4✔
1768
                        },
4✔
1769
                }
4✔
1770
                err = r.cfg.ChainView.UpdateFilter(
4✔
1771
                        filterUpdate, atomic.LoadUint32(&r.bestHeight),
4✔
1772
                )
4✔
1773
                if err != nil {
4✔
1774
                        return errors.Errorf("unable to update chain "+
×
1775
                                "view: %v", err)
×
1776
                }
×
1777

1778
        case *models.ChannelEdgePolicy:
4✔
1779
                log.Debugf("Received ChannelEdgePolicy for channel %v",
4✔
1780
                        msg.ChannelID)
4✔
1781

4✔
1782
                // We make sure to hold the mutex for this channel ID,
4✔
1783
                // such that no other goroutine is concurrently doing
4✔
1784
                // database accesses for the same channel ID.
4✔
1785
                r.channelEdgeMtx.Lock(msg.ChannelID)
4✔
1786
                defer r.channelEdgeMtx.Unlock(msg.ChannelID)
4✔
1787

4✔
1788
                edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
4✔
1789
                        r.cfg.Graph.HasChannelEdge(msg.ChannelID)
4✔
1790
                if err != nil && !errors.Is(
4✔
1791
                        err, channeldb.ErrGraphNoEdgesFound,
4✔
1792
                ) {
4✔
1793

×
1794
                        return errors.Errorf("unable to check for edge "+
×
1795
                                "existence: %v", err)
×
1796

×
1797
                }
×
1798

1799
                // If the channel is marked as a zombie in our database, and
1800
                // we consider this a stale update, then we should not apply the
1801
                // policy.
1802
                isStaleUpdate := time.Since(msg.LastUpdate) > r.cfg.ChannelPruneExpiry
4✔
1803
                if isZombie && isStaleUpdate {
4✔
1804
                        return newErrf(ErrIgnored, "ignoring stale update "+
×
1805
                                "(flags=%v|%v) for zombie chan_id=%v",
×
1806
                                msg.MessageFlags, msg.ChannelFlags,
×
1807
                                msg.ChannelID)
×
1808
                }
×
1809

1810
                // If the channel doesn't exist in our database, we cannot
1811
                // apply the updated policy.
1812
                if !exists {
4✔
1813
                        return newErrf(ErrIgnored, "ignoring update "+
×
1814
                                "(flags=%v|%v) for unknown chan_id=%v",
×
1815
                                msg.MessageFlags, msg.ChannelFlags,
×
1816
                                msg.ChannelID)
×
1817
                }
×
1818

1819
                // As edges are directional edge node has a unique policy for
1820
                // the direction of the edge they control. Therefore, we first
1821
                // check if we already have the most up-to-date information for
1822
                // that edge. If this message has a timestamp not strictly
1823
                // newer than what we already know of we can exit early.
1824
                switch {
4✔
1825

1826
                // A flag set of 0 indicates this is an announcement for the
1827
                // "first" node in the channel.
1828
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
4✔
1829

4✔
1830
                        // Ignore outdated message.
4✔
1831
                        if !edge1Timestamp.Before(msg.LastUpdate) {
8✔
1832
                                return newErrf(ErrOutdated, "Ignoring "+
4✔
1833
                                        "outdated update (flags=%v|%v) for "+
4✔
1834
                                        "known chan_id=%v", msg.MessageFlags,
4✔
1835
                                        msg.ChannelFlags, msg.ChannelID)
4✔
1836
                        }
4✔
1837

1838
                // Similarly, a flag set of 1 indicates this is an announcement
1839
                // for the "second" node in the channel.
1840
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
4✔
1841

4✔
1842
                        // Ignore outdated message.
4✔
1843
                        if !edge2Timestamp.Before(msg.LastUpdate) {
8✔
1844
                                return newErrf(ErrOutdated, "Ignoring "+
4✔
1845
                                        "outdated update (flags=%v|%v) for "+
4✔
1846
                                        "known chan_id=%v", msg.MessageFlags,
4✔
1847
                                        msg.ChannelFlags, msg.ChannelID)
4✔
1848
                        }
4✔
1849
                }
1850

1851
                // Now that we know this isn't a stale update, we'll apply the
1852
                // new edge policy to the proper directional edge within the
1853
                // channel graph.
1854
                if err = r.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
4✔
1855
                        err := errors.Errorf("unable to add channel: %v", err)
×
1856
                        log.Error(err)
×
1857
                        return err
×
1858
                }
×
1859

1860
                log.Tracef("New channel update applied: %v",
4✔
1861
                        newLogClosure(func() string { return spew.Sdump(msg) }))
4✔
1862
                r.stats.incNumChannelUpdates()
4✔
1863

1864
        default:
×
1865
                return errors.Errorf("wrong routing update message type")
×
1866
        }
1867

1868
        return nil
4✔
1869
}
1870

1871
// fetchFundingTxWrapper is a wrapper around fetchFundingTx, except that it
1872
// will exit if the router has stopped.
1873
func (r *ChannelRouter) fetchFundingTxWrapper(chanID *lnwire.ShortChannelID) (
1874
        *wire.MsgTx, error) {
4✔
1875

4✔
1876
        txChan := make(chan *wire.MsgTx, 1)
4✔
1877
        errChan := make(chan error, 1)
4✔
1878

4✔
1879
        go func() {
8✔
1880
                tx, err := r.fetchFundingTx(chanID)
4✔
1881
                if err != nil {
4✔
1882
                        errChan <- err
×
1883
                        return
×
1884
                }
×
1885

1886
                txChan <- tx
4✔
1887
        }()
1888

1889
        select {
4✔
1890
        case tx := <-txChan:
4✔
1891
                return tx, nil
4✔
1892

1893
        case err := <-errChan:
×
1894
                return nil, err
×
1895

1896
        case <-r.quit:
×
1897
                return nil, ErrRouterShuttingDown
×
1898
        }
1899
}
1900

1901
// fetchFundingTx returns the funding transaction identified by the passed
1902
// short channel ID.
1903
//
1904
// TODO(roasbeef): replace with call to GetBlockTransaction? (would allow to
1905
// later use getblocktxn)
1906
func (r *ChannelRouter) fetchFundingTx(
1907
        chanID *lnwire.ShortChannelID) (*wire.MsgTx, error) {
4✔
1908

4✔
1909
        // First fetch the block hash by the block number encoded, then use
4✔
1910
        // that hash to fetch the block itself.
4✔
1911
        blockNum := int64(chanID.BlockHeight)
4✔
1912
        blockHash, err := r.cfg.Chain.GetBlockHash(blockNum)
4✔
1913
        if err != nil {
4✔
1914
                return nil, err
×
1915
        }
×
1916
        fundingBlock, err := r.cfg.Chain.GetBlock(blockHash)
4✔
1917
        if err != nil {
4✔
1918
                return nil, err
×
1919
        }
×
1920

1921
        // As a sanity check, ensure that the advertised transaction index is
1922
        // within the bounds of the total number of transactions within a
1923
        // block.
1924
        numTxns := uint32(len(fundingBlock.Transactions))
4✔
1925
        if chanID.TxIndex > numTxns-1 {
4✔
1926
                return nil, fmt.Errorf("tx_index=#%v "+
×
1927
                        "is out of range (max_index=%v), network_chan_id=%v",
×
1928
                        chanID.TxIndex, numTxns-1, chanID)
×
1929
        }
×
1930

1931
        return fundingBlock.Transactions[chanID.TxIndex].Copy(), nil
4✔
1932
}
1933

1934
// routingMsg couples a routing related routing topology update to the
1935
// error channel.
1936
type routingMsg struct {
1937
        msg interface{}
1938
        op  []batch.SchedulerOption
1939
        err chan error
1940
}
1941

1942
// RouteRequest contains the parameters for a pathfinding request. It may
1943
// describe a request to make a regular payment or one to a blinded path
1944
// (incdicated by a non-nil BlindedPayment field).
1945
type RouteRequest struct {
1946
        // Source is the node that the path originates from.
1947
        Source route.Vertex
1948

1949
        // Target is the node that the path terminates at. If the route
1950
        // includes a blinded path, target will be the blinded node id of the
1951
        // final hop in the blinded route.
1952
        Target route.Vertex
1953

1954
        // Amount is the Amount in millisatoshis to be delivered to the target
1955
        // node.
1956
        Amount lnwire.MilliSatoshi
1957

1958
        // TimePreference expresses the caller's time preference for
1959
        // pathfinding.
1960
        TimePreference float64
1961

1962
        // Restrictions provides a set of additional Restrictions that the
1963
        // route must adhere to.
1964
        Restrictions *RestrictParams
1965

1966
        // CustomRecords is a set of custom tlv records to include for the
1967
        // final hop.
1968
        CustomRecords record.CustomSet
1969

1970
        // RouteHints contains an additional set of edges to include in our
1971
        // view of the graph. This may either be a set of hints for private
1972
        // channels or a "virtual" hop hint that represents a blinded route.
1973
        RouteHints RouteHints
1974

1975
        // FinalExpiry is the cltv delta for the final hop. If paying to a
1976
        // blinded path, this value is a duplicate of the delta provided
1977
        // in blinded payment.
1978
        FinalExpiry uint16
1979

1980
        // BlindedPayment contains an optional blinded path and parameters
1981
        // used to reach a target node via a blinded path. This field is
1982
        // mutually exclusive with the Target field.
1983
        BlindedPayment *BlindedPayment
1984
}
1985

1986
// RouteHints is an alias type for a set of route hints, with the source node
1987
// as the map's key and the details of the hint(s) in the edge policy.
1988
type RouteHints map[route.Vertex][]AdditionalEdge
1989

1990
// NewRouteRequest produces a new route request for a regular payment or one
1991
// to a blinded route, validating that the target, routeHints and finalExpiry
1992
// parameters are mutually exclusive with the blindedPayment parameter (which
1993
// contains these values for blinded payments).
1994
func NewRouteRequest(source route.Vertex, target *route.Vertex,
1995
        amount lnwire.MilliSatoshi, timePref float64,
1996
        restrictions *RestrictParams, customRecords record.CustomSet,
1997
        routeHints RouteHints, blindedPayment *BlindedPayment,
1998
        finalExpiry uint16) (*RouteRequest, error) {
4✔
1999

4✔
2000
        var (
4✔
2001
                // Assume that we're starting off with a regular payment.
4✔
2002
                requestHints  = routeHints
4✔
2003
                requestExpiry = finalExpiry
4✔
2004
        )
4✔
2005

4✔
2006
        if blindedPayment != nil {
4✔
2007
                if err := blindedPayment.Validate(); err != nil {
8✔
2008
                        return nil, fmt.Errorf("invalid blinded payment: %w",
4✔
2009
                                err)
×
2010
                }
×
2011

×
2012
                introVertex := route.NewVertex(
2013
                        blindedPayment.BlindedPath.IntroductionPoint,
4✔
2014
                )
4✔
2015
                if source == introVertex {
4✔
2016
                        return nil, ErrSelfIntro
4✔
2017
                }
×
2018

×
2019
                // Check that the values for a clear path have not been set,
2020
                // as this is an ambiguous signal from the caller.
2021
                if routeHints != nil {
2022
                        return nil, ErrHintsAndBlinded
4✔
2023
                }
×
2024

×
2025
                if finalExpiry != 0 {
2026
                        return nil, ErrExpiryAndBlinded
4✔
2027
                }
×
2028

×
2029
                // If we have a blinded path with 1 hop, the cltv expiry
2030
                // will not be included in any hop hints (since we're just
2031
                // sending to the introduction node and need no blinded hints).
2032
                // In this case, we include it to make sure that the final
2033
                // cltv delta is accounted for (since it's part of the blinded
2034
                // delta). In the case of a multi-hop route, we set our final
2035
                // cltv to zero, since it's going to be accounted for in the
2036
                // delta for our hints.
2037
                if len(blindedPayment.BlindedPath.BlindedHops) == 1 {
2038
                        requestExpiry = blindedPayment.CltvExpiryDelta
8✔
2039
                }
4✔
2040

4✔
2041
                requestHints = blindedPayment.toRouteHints()
2042
        }
4✔
2043

4✔
2044
        requestTarget, err := getTargetNode(target, blindedPayment)
×
2045
        if err != nil {
×
2046
                return nil, err
2047
        }
2048

4✔
2049
        return &RouteRequest{
4✔
2050
                Source:         source,
×
2051
                Target:         requestTarget,
×
2052
                Amount:         amount,
2053
                TimePreference: timePref,
4✔
2054
                Restrictions:   restrictions,
4✔
2055
                CustomRecords:  customRecords,
4✔
2056
                RouteHints:     requestHints,
4✔
2057
                FinalExpiry:    requestExpiry,
4✔
2058
                BlindedPayment: blindedPayment,
4✔
2059
        }, nil
4✔
2060
}
4✔
2061

4✔
2062
func getTargetNode(target *route.Vertex, blindedPayment *BlindedPayment) (
4✔
2063
        route.Vertex, error) {
4✔
2064

2065
        var (
2066
                blinded   = blindedPayment != nil
2067
                targetSet = target != nil
4✔
2068
        )
4✔
2069

4✔
2070
        switch {
4✔
2071
        case blinded && targetSet:
4✔
2072
                return route.Vertex{}, ErrTargetAndBlinded
4✔
2073

4✔
2074
        case blinded:
4✔
2075
                // If we're dealing with an edge-case blinded path that just
×
2076
                // has an introduction node (first hop expected to be the intro
×
2077
                // hop), then we return the unblinded introduction node as our
2078
                // target.
4✔
2079
                hops := blindedPayment.BlindedPath.BlindedHops
4✔
2080
                if len(hops) == 1 {
4✔
2081
                        return route.NewVertex(
4✔
2082
                                blindedPayment.BlindedPath.IntroductionPoint,
4✔
2083
                        ), nil
4✔
2084
                }
8✔
2085

4✔
2086
                return route.NewVertex(hops[len(hops)-1].BlindedNodePub), nil
4✔
2087

4✔
2088
        case targetSet:
4✔
2089
                return *target, nil
2090

4✔
2091
        default:
2092
                return route.Vertex{}, ErrNoTarget
4✔
2093
        }
4✔
2094
}
2095

×
2096
// blindedPath returns the request's blinded path, which is set if the payment
×
2097
// is to a blinded route.
2098
func (r *RouteRequest) blindedPath() *sphinx.BlindedPath {
2099
        if r.BlindedPayment == nil {
2100
                return nil
2101
        }
2102

4✔
2103
        return r.BlindedPayment.BlindedPath
8✔
2104
}
4✔
2105

4✔
2106
// FindRoute attempts to query the ChannelRouter for the optimum path to a
2107
// particular target destination to which it is able to send `amt` after
4✔
2108
// factoring in channel capacities and cumulative fees along the route.
2109
func (r *ChannelRouter) FindRoute(req *RouteRequest) (*route.Route, float64,
2110
        error) {
2111

2112
        log.Debugf("Searching for path to %v, sending %v", req.Target,
2113
                req.Amount)
2114

4✔
2115
        // We'll attempt to obtain a set of bandwidth hints that can help us
4✔
2116
        // eliminate certain routes early on in the path finding process.
4✔
2117
        bandwidthHints, err := newBandwidthManager(
4✔
2118
                r.cachedGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
4✔
2119
        )
4✔
2120
        if err != nil {
4✔
2121
                return nil, 0, err
4✔
2122
        }
4✔
2123

4✔
2124
        // We'll fetch the current block height, so we can properly calculate
4✔
2125
        // the required HTLC time locks within the route.
×
2126
        _, currentHeight, err := r.cfg.Chain.GetBestBlock()
×
2127
        if err != nil {
2128
                return nil, 0, err
2129
        }
2130

4✔
2131
        // Now that we know the destination is reachable within the graph, we'll
4✔
2132
        // execute our path finding algorithm.
×
2133
        finalHtlcExpiry := currentHeight + int32(req.FinalExpiry)
×
2134

2135
        // Validate time preference.
2136
        timePref := req.TimePreference
2137
        if timePref < -1 || timePref > 1 {
4✔
2138
                return nil, 0, errors.New("time preference out of range")
4✔
2139
        }
4✔
2140

4✔
2141
        path, probability, err := findPath(
4✔
2142
                &graphParams{
×
2143
                        additionalEdges: req.RouteHints,
×
2144
                        bandwidthHints:  bandwidthHints,
2145
                        graph:           r.cachedGraph,
4✔
2146
                },
4✔
2147
                req.Restrictions, &r.cfg.PathFindingConfig, req.Source,
4✔
2148
                req.Target, req.Amount, req.TimePreference, finalHtlcExpiry,
4✔
2149
        )
4✔
2150
        if err != nil {
4✔
2151
                return nil, 0, err
4✔
2152
        }
4✔
2153

4✔
2154
        // Create the route with absolute time lock values.
8✔
2155
        route, err := newRoute(
4✔
2156
                req.Source, path, uint32(currentHeight),
4✔
2157
                finalHopParams{
2158
                        amt:       req.Amount,
2159
                        totalAmt:  req.Amount,
4✔
2160
                        cltvDelta: req.FinalExpiry,
4✔
2161
                        records:   req.CustomRecords,
4✔
2162
                }, req.blindedPath(),
4✔
2163
        )
4✔
2164
        if err != nil {
4✔
2165
                return nil, 0, err
4✔
2166
        }
4✔
2167

4✔
2168
        go log.Tracef("Obtained path to send %v to %x: %v",
4✔
2169
                req.Amount, req.Target, newLogClosure(func() string {
×
2170
                        return spew.Sdump(route)
×
2171
                }),
2172
        )
4✔
2173

4✔
2174
        return route, probability, nil
×
2175
}
×
2176

2177
// generateNewSessionKey generates a new ephemeral private key to be used for a
2178
// payment attempt.
4✔
2179
func generateNewSessionKey() (*btcec.PrivateKey, error) {
2180
        // Generate a new random session key to ensure that we don't trigger
2181
        // any replay.
2182
        //
2183
        // TODO(roasbeef): add more sources of randomness?
4✔
2184
        return btcec.NewPrivateKey()
4✔
2185
}
4✔
2186

4✔
2187
// generateSphinxPacket generates then encodes a sphinx packet which encodes
4✔
2188
// the onion route specified by the passed layer 3 route. The blob returned
4✔
2189
// from this function can immediately be included within an HTLC add packet to
4✔
2190
// be sent to the first hop within the route.
2191
func generateSphinxPacket(rt *route.Route, paymentHash []byte,
2192
        sessionKey *btcec.PrivateKey) ([]byte, *sphinx.Circuit, error) {
2193

2194
        // Now that we know we have an actual route, we'll map the route into a
2195
        // sphinx payment path which includes per-hop payloads for each hop
2196
        // that give each node within the route the necessary information
4✔
2197
        // (fees, CLTV value, etc.) to properly forward the payment.
4✔
2198
        sphinxPath, err := rt.ToSphinxPath()
4✔
2199
        if err != nil {
4✔
2200
                return nil, nil, err
4✔
2201
        }
4✔
2202

4✔
2203
        log.Tracef("Constructed per-hop payloads for payment_hash=%x: %v",
4✔
2204
                paymentHash[:], newLogClosure(func() string {
×
2205
                        path := make(
×
2206
                                []sphinx.OnionHop, sphinxPath.TrueRouteLength(),
2207
                        )
4✔
2208
                        for i := range path {
4✔
2209
                                hopCopy := sphinxPath[i]
×
2210
                                path[i] = hopCopy
×
2211
                        }
×
2212
                        return spew.Sdump(path)
×
2213
                }),
×
2214
        )
×
2215

×
2216
        // Next generate the onion routing packet which allows us to perform
×
2217
        // privacy preserving source routing across the network.
2218
        sphinxPacket, err := sphinx.NewOnionPacket(
2219
                sphinxPath, sessionKey, paymentHash,
2220
                sphinx.DeterministicPacketFiller,
2221
        )
2222
        if err != nil {
4✔
2223
                return nil, nil, err
4✔
2224
        }
4✔
2225

4✔
2226
        // Finally, encode Sphinx packet using its wire representation to be
4✔
2227
        // included within the HTLC add packet.
×
2228
        var onionBlob bytes.Buffer
×
2229
        if err := sphinxPacket.Encode(&onionBlob); err != nil {
2230
                return nil, nil, err
2231
        }
2232

4✔
2233
        log.Tracef("Generated sphinx packet: %v",
4✔
2234
                newLogClosure(func() string {
×
2235
                        // We make a copy of the ephemeral key and unset the
×
2236
                        // internal curve here in order to keep the logs from
2237
                        // getting noisy.
4✔
2238
                        key := *sphinxPacket.EphemeralKey
4✔
2239
                        packetCopy := *sphinxPacket
×
2240
                        packetCopy.EphemeralKey = &key
×
2241
                        return spew.Sdump(packetCopy)
×
2242
                }),
×
2243
        )
×
2244

×
2245
        return onionBlob.Bytes(), &sphinx.Circuit{
×
2246
                SessionKey:  sessionKey,
×
2247
                PaymentPath: sphinxPath.NodeKeys(),
2248
        }, nil
2249
}
4✔
2250

4✔
2251
// LightningPayment describes a payment to be sent through the network to the
4✔
2252
// final destination.
4✔
2253
type LightningPayment struct {
2254
        // Target is the node in which the payment should be routed towards.
2255
        Target route.Vertex
2256

2257
        // Amount is the value of the payment to send through the network in
2258
        // milli-satoshis.
2259
        Amount lnwire.MilliSatoshi
2260

2261
        // FeeLimit is the maximum fee in millisatoshis that the payment should
2262
        // accept when sending it through the network. The payment will fail
2263
        // if there isn't a route with lower fees than this limit.
2264
        FeeLimit lnwire.MilliSatoshi
2265

2266
        // CltvLimit is the maximum time lock that is allowed for attempts to
2267
        // complete this payment.
2268
        CltvLimit uint32
2269

2270
        // paymentHash is the r-hash value to use within the HTLC extended to
2271
        // the first hop. This won't be set for AMP payments.
2272
        paymentHash *lntypes.Hash
2273

2274
        // amp is an optional field that is set if and only if this is am AMP
2275
        // payment.
2276
        amp *AMPOptions
2277

2278
        // FinalCLTVDelta is the CTLV expiry delta to use for the _final_ hop
2279
        // in the route. This means that the final hop will have a CLTV delta
2280
        // of at least: currentHeight + FinalCLTVDelta.
2281
        FinalCLTVDelta uint16
2282

2283
        // PayAttemptTimeout is a timeout value that we'll use to determine
2284
        // when we should should abandon the payment attempt after consecutive
2285
        // payment failure. This prevents us from attempting to send a payment
2286
        // indefinitely. A zero value means the payment will never time out.
2287
        //
2288
        // TODO(halseth): make wallclock time to allow resume after startup.
2289
        PayAttemptTimeout time.Duration
2290

2291
        // RouteHints represents the different routing hints that can be used to
2292
        // assist a payment in reaching its destination successfully. These
2293
        // hints will act as intermediate hops along the route.
2294
        //
2295
        // NOTE: This is optional unless required by the payment. When providing
2296
        // multiple routes, ensure the hop hints within each route are chained
2297
        // together and sorted in forward order in order to reach the
2298
        // destination successfully.
2299
        RouteHints [][]zpay32.HopHint
2300

2301
        // OutgoingChannelIDs is the list of channels that are allowed for the
2302
        // first hop. If nil, any channel may be used.
2303
        OutgoingChannelIDs []uint64
2304

2305
        // LastHop is the pubkey of the last node before the final destination
2306
        // is reached. If nil, any node may be used.
2307
        LastHop *route.Vertex
2308

2309
        // DestFeatures specifies the set of features we assume the final node
2310
        // has for pathfinding. Typically, these will be taken directly from an
2311
        // invoice, but they can also be manually supplied or assumed by the
2312
        // sender. If a nil feature vector is provided, the router will try to
2313
        // fall back to the graph in order to load a feature vector for a node
2314
        // in the public graph.
2315
        DestFeatures *lnwire.FeatureVector
2316

2317
        // PaymentAddr is the payment address specified by the receiver. This
2318
        // field should be a random 32-byte nonce presented in the receiver's
2319
        // invoice to prevent probing of the destination.
2320
        PaymentAddr *[32]byte
2321

2322
        // PaymentRequest is an optional payment request that this payment is
2323
        // attempting to complete.
2324
        PaymentRequest []byte
2325

2326
        // DestCustomRecords are TLV records that are to be sent to the final
2327
        // hop in the new onion payload format. If the destination does not
2328
        // understand this new onion payload format, then the payment will
2329
        // fail.
2330
        DestCustomRecords record.CustomSet
2331

2332
        // MaxParts is the maximum number of partial payments that may be used
2333
        // to complete the full amount.
2334
        MaxParts uint32
2335

2336
        // MaxShardAmt is the largest shard that we'll attempt to split using.
2337
        // If this field is set, and we need to split, rather than attempting
2338
        // half of the original payment amount, we'll use this value if half
2339
        // the payment amount is greater than it.
2340
        //
2341
        // NOTE: This field is _optional_.
2342
        MaxShardAmt *lnwire.MilliSatoshi
2343

2344
        // TimePref is the time preference for this payment. Set to -1 to
2345
        // optimize for fees only, to 1 to optimize for reliability only or a
2346
        // value in between for a mix.
2347
        TimePref float64
2348

2349
        // Metadata is additional data that is sent along with the payment to
2350
        // the payee.
2351
        Metadata []byte
2352
}
2353

2354
// AMPOptions houses information that must be known in order to send an AMP
2355
// payment.
2356
type AMPOptions struct {
2357
        SetID     [32]byte
2358
        RootShare [32]byte
2359
}
2360

2361
// SetPaymentHash sets the given hash as the payment's overall hash. This
2362
// should only be used for non-AMP payments.
2363
func (l *LightningPayment) SetPaymentHash(hash lntypes.Hash) error {
2364
        if l.amp != nil {
2365
                return fmt.Errorf("cannot set payment hash for AMP payment")
2366
        }
2367

4✔
2368
        l.paymentHash = &hash
4✔
2369
        return nil
×
2370
}
×
2371

2372
// SetAMP sets the given AMP options for the payment.
4✔
2373
func (l *LightningPayment) SetAMP(amp *AMPOptions) error {
4✔
2374
        if l.paymentHash != nil {
2375
                return fmt.Errorf("cannot set amp options for payment " +
2376
                        "with payment hash")
2377
        }
4✔
2378

4✔
2379
        l.amp = amp
×
2380
        return nil
×
2381
}
×
2382

2383
// Identifier returns a 32-byte slice that uniquely identifies this single
4✔
2384
// payment. For non-AMP payments this will be the payment hash, for AMP
4✔
2385
// payments this will be the used SetID.
2386
func (l *LightningPayment) Identifier() [32]byte {
2387
        if l.amp != nil {
2388
                return l.amp.SetID
2389
        }
2390

4✔
2391
        return *l.paymentHash
8✔
2392
}
4✔
2393

4✔
2394
// SendPayment attempts to send a payment as described within the passed
2395
// LightningPayment. This function is blocking and will return either: when the
4✔
2396
// payment is successful, or all candidates routes have been attempted and
2397
// resulted in a failed payment. If the payment succeeds, then a non-nil Route
2398
// will be returned which describes the path the successful payment traversed
2399
// within the network to reach the destination. Additionally, the payment
2400
// preimage will also be returned.
2401
func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte,
2402
        *route.Route, error) {
2403

2404
        paySession, shardTracker, err := r.PreparePayment(payment)
2405
        if err != nil {
2406
                return [32]byte{}, nil, err
×
2407
        }
×
2408

×
2409
        log.Tracef("Dispatching SendPayment for lightning payment: %v",
×
2410
                spewPayment(payment))
×
2411

×
2412
        return r.sendPayment(
2413
                context.Background(), payment.FeeLimit, payment.Identifier(),
×
2414
                payment.PayAttemptTimeout, paySession, shardTracker,
×
2415
        )
×
2416
}
×
2417

×
2418
// SendPaymentAsync is the non-blocking version of SendPayment. The payment
×
2419
// result needs to be retrieved via the control tower.
×
2420
func (r *ChannelRouter) SendPaymentAsync(ctx context.Context,
2421
        payment *LightningPayment, ps PaymentSession, st shards.ShardTracker) {
2422

2423
        // Since this is the first time this payment is being made, we pass nil
2424
        // for the existing attempt.
2425
        r.wg.Add(1)
4✔
2426
        go func() {
4✔
2427
                defer r.wg.Done()
4✔
2428

4✔
2429
                log.Tracef("Dispatching SendPayment for lightning payment: %v",
4✔
2430
                        spewPayment(payment))
8✔
2431

4✔
2432
                _, _, err := r.sendPayment(
4✔
2433
                        ctx, payment.FeeLimit, payment.Identifier(),
4✔
2434
                        payment.PayAttemptTimeout, ps, st,
4✔
2435
                )
4✔
2436
                if err != nil {
4✔
2437
                        log.Errorf("Payment %x failed: %v",
4✔
2438
                                payment.Identifier(), err)
4✔
2439
                }
4✔
2440
        }()
8✔
2441
}
4✔
2442

4✔
2443
// spewPayment returns a log closures that provides a spewed string
4✔
2444
// representation of the passed payment.
2445
func spewPayment(payment *LightningPayment) logClosure {
2446
        return newLogClosure(func() string {
2447
                // Make a copy of the payment with a nilled Curve
2448
                // before spewing.
2449
                var routeHints [][]zpay32.HopHint
4✔
2450
                for _, routeHint := range payment.RouteHints {
4✔
2451
                        var hopHints []zpay32.HopHint
×
2452
                        for _, hopHint := range routeHint {
×
2453
                                h := hopHint.Copy()
×
2454
                                hopHints = append(hopHints, h)
×
2455
                        }
×
2456
                        routeHints = append(routeHints, hopHints)
×
2457
                }
×
2458
                p := *payment
×
2459
                p.RouteHints = routeHints
×
2460
                return spew.Sdump(p)
×
2461
        })
2462
}
×
2463

×
2464
// PreparePayment creates the payment session and registers the payment with the
×
2465
// control tower.
2466
func (r *ChannelRouter) PreparePayment(payment *LightningPayment) (
2467
        PaymentSession, shards.ShardTracker, error) {
2468

2469
        // Before starting the HTLC routing attempt, we'll create a fresh
2470
        // payment session which will report our errors back to mission
2471
        // control.
4✔
2472
        paySession, err := r.cfg.SessionSource.NewPaymentSession(payment)
4✔
2473
        if err != nil {
4✔
2474
                return nil, nil, err
4✔
2475
        }
4✔
2476

4✔
2477
        // Record this payment hash with the ControlTower, ensuring it is not
4✔
2478
        // already in-flight.
×
2479
        //
×
2480
        // TODO(roasbeef): store records as part of creation info?
2481
        info := &channeldb.PaymentCreationInfo{
2482
                PaymentIdentifier: payment.Identifier(),
2483
                Value:             payment.Amount,
2484
                CreationTime:      r.cfg.Clock.Now(),
2485
                PaymentRequest:    payment.PaymentRequest,
4✔
2486
        }
4✔
2487

4✔
2488
        // Create a new ShardTracker that we'll use during the life cycle of
4✔
2489
        // this payment.
4✔
2490
        var shardTracker shards.ShardTracker
4✔
2491
        switch {
4✔
2492
        // If this is an AMP payment, we'll use the AMP shard tracker.
4✔
2493
        case payment.amp != nil:
4✔
2494
                shardTracker = amp.NewShardTracker(
4✔
2495
                        payment.amp.RootShare, payment.amp.SetID,
4✔
2496
                        *payment.PaymentAddr, payment.Amount,
2497
                )
4✔
2498

4✔
2499
        // Otherwise we'll use the simple tracker that will map each attempt to
4✔
2500
        // the same payment hash.
4✔
2501
        default:
4✔
2502
                shardTracker = shards.NewSimpleShardTracker(
2503
                        payment.Identifier(), nil,
2504
                )
2505
        }
4✔
2506

4✔
2507
        err = r.cfg.Control.InitPayment(payment.Identifier(), info)
4✔
2508
        if err != nil {
4✔
2509
                return nil, nil, err
2510
        }
2511

4✔
2512
        return paySession, shardTracker, nil
4✔
2513
}
×
2514

×
2515
// SendToRoute sends a payment using the provided route and fails the payment
2516
// when an error is returned from the attempt.
4✔
2517
func (r *ChannelRouter) SendToRoute(htlcHash lntypes.Hash,
2518
        rt *route.Route) (*channeldb.HTLCAttempt, error) {
2519

2520
        return r.sendToRoute(htlcHash, rt, false)
2521
}
2522

4✔
2523
// SendToRouteSkipTempErr sends a payment using the provided route and fails
4✔
2524
// the payment ONLY when a terminal error is returned from the attempt.
4✔
2525
func (r *ChannelRouter) SendToRouteSkipTempErr(htlcHash lntypes.Hash,
4✔
2526
        rt *route.Route) (*channeldb.HTLCAttempt, error) {
2527

2528
        return r.sendToRoute(htlcHash, rt, true)
2529
}
2530

×
2531
// sendToRoute attempts to send a payment with the given hash through the
×
2532
// provided route. This function is blocking and will return the attempt
×
2533
// information as it is stored in the database. For a successful htlc, this
×
2534
// information will contain the preimage. If an error occurs after the attempt
2535
// was initiated, both return values will be non-nil. If skipTempErr is true,
2536
// the payment won't be failed unless a terminal error has occurred.
2537
func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route,
2538
        skipTempErr bool) (*channeldb.HTLCAttempt, error) {
2539

2540
        // Calculate amount paid to receiver.
2541
        amt := rt.ReceiverAmt()
2542

4✔
2543
        // If this is meant as an MP payment shard, we set the amount for the
4✔
2544
        // creating info to the total amount of the payment.
4✔
2545
        finalHop := rt.Hops[len(rt.Hops)-1]
4✔
2546
        mpp := finalHop.MPP
4✔
2547
        if mpp != nil {
4✔
2548
                amt = mpp.TotalMsat()
4✔
2549
        }
4✔
2550

4✔
2551
        // For non-MPP, there's no such thing as temp error as there's only one
8✔
2552
        // HTLC attempt being made. When this HTLC is failed, the payment is
4✔
2553
        // failed hence cannot be retried.
4✔
2554
        if skipTempErr && mpp == nil {
2555
                return nil, ErrSkipTempErr
2556
        }
2557

2558
        // For non-AMP payments the overall payment identifier will be the same
4✔
2559
        // hash as used for this HTLC.
×
2560
        paymentIdentifier := htlcHash
×
2561

2562
        // For AMP-payments, we'll use the setID as the unique ID for the
2563
        // overall payment.
2564
        amp := finalHop.AMP
4✔
2565
        if amp != nil {
4✔
2566
                paymentIdentifier = amp.SetID()
4✔
2567
        }
4✔
2568

4✔
2569
        // Record this payment hash with the ControlTower, ensuring it is not
8✔
2570
        // already in-flight.
4✔
2571
        info := &channeldb.PaymentCreationInfo{
4✔
2572
                PaymentIdentifier: paymentIdentifier,
2573
                Value:             amt,
2574
                CreationTime:      r.cfg.Clock.Now(),
2575
                PaymentRequest:    nil,
4✔
2576
        }
4✔
2577

4✔
2578
        err := r.cfg.Control.InitPayment(paymentIdentifier, info)
4✔
2579
        switch {
4✔
2580
        // If this is an MPP attempt and the hash is already registered with
4✔
2581
        // the database, we can go on to launch the shard.
4✔
2582
        case mpp != nil && errors.Is(err, channeldb.ErrPaymentInFlight):
4✔
2583
        case mpp != nil && errors.Is(err, channeldb.ErrPaymentExists):
4✔
2584

2585
        // Any other error is not tolerated.
2586
        case err != nil:
4✔
2587
                return nil, err
4✔
2588
        }
2589

2590
        log.Tracef("Dispatching SendToRoute for HTLC hash %v: %v",
×
2591
                htlcHash, newLogClosure(func() string {
×
2592
                        return spew.Sdump(rt)
2593
                }),
2594
        )
4✔
2595

4✔
2596
        // Since the HTLC hashes and preimages are specified manually over the
×
2597
        // RPC for SendToRoute requests, we don't have to worry about creating
×
2598
        // a ShardTracker that can generate hashes for AMP payments. Instead, we
2599
        // create a simple tracker that can just return the hash for the single
2600
        // shard we'll now launch.
2601
        shardTracker := shards.NewSimpleShardTracker(htlcHash, nil)
2602

2603
        // Create a payment lifecycle using the given route with,
2604
        // - zero fee limit as we are not requesting routes.
2605
        // - nil payment session (since we already have a route).
4✔
2606
        // - no payment timeout.
4✔
2607
        // - no current block height.
4✔
2608
        p := newPaymentLifecycle(r, 0, paymentIdentifier, nil, shardTracker, 0)
4✔
2609

4✔
2610
        // We found a route to try, create a new HTLC attempt to try.
4✔
2611
        //
4✔
2612
        // NOTE: we use zero `remainingAmt` here to simulate the same effect of
4✔
2613
        // setting the lastShard to be false, which is used by previous
4✔
2614
        // implementation.
4✔
2615
        attempt, err := p.registerAttempt(rt, 0)
4✔
2616
        if err != nil {
4✔
2617
                return nil, err
4✔
2618
        }
4✔
2619

4✔
2620
        // Once the attempt is created, send it to the htlcswitch. Notice that
4✔
2621
        // the `err` returned here has already been processed by
×
2622
        // `handleSwitchErr`, which means if there's a terminal failure, the
×
2623
        // payment has been failed.
2624
        result, err := p.sendAttempt(attempt)
2625
        if err != nil {
2626
                return nil, err
2627
        }
2628

4✔
2629
        // We now look up the payment to see if it's already failed.
4✔
2630
        payment, err := p.router.cfg.Control.FetchPayment(p.identifier)
×
2631
        if err != nil {
×
2632
                return result.attempt, err
2633
        }
2634

4✔
2635
        // Exit if the above error has caused the payment to be failed, we also
4✔
2636
        // return the error from sending attempt to mimic the old behavior of
×
2637
        // this method.
×
2638
        _, failedReason := payment.TerminalInfo()
2639
        if failedReason != nil {
2640
                return result.attempt, result.err
2641
        }
2642

4✔
2643
        // Since for SendToRoute we won't retry in case the shard fails, we'll
4✔
2644
        // mark the payment failed with the control tower immediately if the
×
2645
        // skipTempErr is false.
×
2646
        reason := channeldb.FailureReasonError
2647

2648
        // If we failed to send the HTLC, we need to further decide if we want
2649
        // to fail the payment.
2650
        if result.err != nil {
4✔
2651
                // If skipTempErr, we'll return the attempt and the temp error.
4✔
2652
                if skipTempErr {
4✔
2653
                        return result.attempt, result.err
4✔
2654
                }
8✔
2655

4✔
2656
                // Otherwise we need to fail the payment.
4✔
2657
                err := r.cfg.Control.FailPayment(paymentIdentifier, reason)
×
2658
                if err != nil {
×
2659
                        return nil, err
2660
                }
2661

4✔
2662
                return result.attempt, result.err
4✔
2663
        }
×
2664

×
2665
        // The attempt was successfully sent, wait for the result to be
2666
        // available.
4✔
2667
        result, err = p.collectResult(attempt)
2668
        if err != nil {
2669
                return nil, err
2670
        }
2671

4✔
2672
        // We got a successful result.
4✔
2673
        if result.err == nil {
×
2674
                return result.attempt, nil
×
2675
        }
2676

2677
        // An error returned from collecting the result, we'll mark the payment
8✔
2678
        // as failed if we don't skip temp error.
4✔
2679
        if !skipTempErr {
4✔
2680
                err := r.cfg.Control.FailPayment(paymentIdentifier, reason)
2681
                if err != nil {
2682
                        return nil, err
2683
                }
8✔
2684
        }
4✔
2685

4✔
2686
        return result.attempt, result.err
×
2687
}
×
2688

2689
// sendPayment attempts to send a payment to the passed payment hash. This
2690
// function is blocking and will return either: when the payment is successful,
4✔
2691
// or all candidates routes have been attempted and resulted in a failed
2692
// payment. If the payment succeeds, then a non-nil Route will be returned
2693
// which describes the path the successful payment traversed within the network
2694
// to reach the destination. Additionally, the payment preimage will also be
2695
// returned.
2696
//
2697
// This method relies on the ControlTower's internal payment state machine to
2698
// carry out its execution. After restarts, it is safe, and assumed, that the
2699
// router will call this method for every payment still in-flight according to
2700
// the ControlTower.
2701
func (r *ChannelRouter) sendPayment(ctx context.Context,
2702
        feeLimit lnwire.MilliSatoshi, identifier lntypes.Hash,
2703
        paymentAttemptTimeout time.Duration, paySession PaymentSession,
2704
        shardTracker shards.ShardTracker) ([32]byte, *route.Route, error) {
2705

2706
        // If the user provides a timeout, we will additionally wrap the context
2707
        // in a deadline.
2708
        cancel := func() {}
4✔
2709
        if paymentAttemptTimeout > 0 {
4✔
2710
                ctx, cancel = context.WithTimeout(ctx, paymentAttemptTimeout)
4✔
2711
        }
4✔
2712

8✔
2713
        // Since resumePayment is a blocking call, we'll cancel this
8✔
2714
        // context if the payment completes before the optional
4✔
2715
        // deadline.
4✔
2716
        defer cancel()
2717

2718
        // We'll also fetch the current block height, so we can properly
2719
        // calculate the required HTLC time locks within the route.
2720
        _, currentHeight, err := r.cfg.Chain.GetBestBlock()
4✔
2721
        if err != nil {
4✔
2722
                return [32]byte{}, nil, err
4✔
2723
        }
4✔
2724

4✔
2725
        // Now set up a paymentLifecycle struct with these params, such that we
4✔
2726
        // can resume the payment from the current state.
×
2727
        p := newPaymentLifecycle(
×
2728
                r, feeLimit, identifier, paySession, shardTracker,
2729
                currentHeight,
2730
        )
2731

4✔
2732
        return p.resumePayment(ctx)
4✔
2733
}
4✔
2734

4✔
2735
// extractChannelUpdate examines the error and extracts the channel update.
4✔
2736
func (r *ChannelRouter) extractChannelUpdate(
4✔
2737
        failure lnwire.FailureMessage) *lnwire.ChannelUpdate {
2738

2739
        var update *lnwire.ChannelUpdate
2740
        switch onionErr := failure.(type) {
2741
        case *lnwire.FailExpiryTooSoon:
4✔
2742
                update = &onionErr.Update
4✔
2743
        case *lnwire.FailAmountBelowMinimum:
4✔
2744
                update = &onionErr.Update
4✔
2745
        case *lnwire.FailFeeInsufficient:
×
2746
                update = &onionErr.Update
×
2747
        case *lnwire.FailIncorrectCltvExpiry:
4✔
2748
                update = &onionErr.Update
4✔
2749
        case *lnwire.FailChannelDisabled:
4✔
2750
                update = &onionErr.Update
4✔
2751
        case *lnwire.FailTemporaryChannelFailure:
×
2752
                update = onionErr.Update
×
2753
        }
4✔
2754

4✔
2755
        return update
4✔
2756
}
4✔
2757

2758
// applyChannelUpdate validates a channel update and if valid, applies it to the
2759
// database. It returns a bool indicating whether the updates were successful.
4✔
2760
func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate) bool {
2761
        ch, _, _, err := r.GetChannelByID(msg.ShortChannelID)
2762
        if err != nil {
2763
                log.Errorf("Unable to retrieve channel by id: %v", err)
2764
                return false
4✔
2765
        }
4✔
2766

8✔
2767
        var pubKey *btcec.PublicKey
4✔
2768

4✔
2769
        switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
4✔
2770
        case 0:
2771
                pubKey, _ = ch.NodeKey1()
4✔
2772

4✔
2773
        case 1:
4✔
2774
                pubKey, _ = ch.NodeKey2()
4✔
2775
        }
4✔
2776

2777
        // Exit early if the pubkey cannot be decided.
4✔
2778
        if pubKey == nil {
4✔
2779
                log.Errorf("Unable to decide pubkey with ChannelFlags=%v",
2780
                        msg.ChannelFlags)
2781
                return false
2782
        }
4✔
2783

×
2784
        err = ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg)
×
2785
        if err != nil {
×
2786
                log.Errorf("Unable to validate channel update: %v", err)
×
2787
                return false
2788
        }
4✔
2789

4✔
2790
        err = r.UpdateEdge(&models.ChannelEdgePolicy{
×
2791
                SigBytes:                  msg.Signature.ToSignatureBytes(),
×
2792
                ChannelID:                 msg.ShortChannelID.ToUint64(),
×
2793
                LastUpdate:                time.Unix(int64(msg.Timestamp), 0),
2794
                MessageFlags:              msg.MessageFlags,
4✔
2795
                ChannelFlags:              msg.ChannelFlags,
4✔
2796
                TimeLockDelta:             msg.TimeLockDelta,
4✔
2797
                MinHTLC:                   msg.HtlcMinimumMsat,
4✔
2798
                MaxHTLC:                   msg.HtlcMaximumMsat,
4✔
2799
                FeeBaseMSat:               lnwire.MilliSatoshi(msg.BaseFee),
4✔
2800
                FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
4✔
2801
                ExtraOpaqueData:           msg.ExtraOpaqueData,
4✔
2802
        })
4✔
2803
        if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
4✔
2804
                log.Errorf("Unable to apply channel update: %v", err)
4✔
2805
                return false
4✔
2806
        }
4✔
2807

4✔
2808
        return true
×
2809
}
×
2810

×
2811
// AddNode is used to add information about a node to the router database. If
2812
// the node with this pubkey is not present in an existing channel, it will
4✔
2813
// be ignored.
2814
//
2815
// NOTE: This method is part of the ChannelGraphSource interface.
2816
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode,
2817
        op ...batch.SchedulerOption) error {
2818

2819
        rMsg := &routingMsg{
2820
                msg: node,
2821
                op:  op,
4✔
2822
                err: make(chan error, 1),
4✔
2823
        }
4✔
2824

4✔
2825
        select {
4✔
2826
        case r.networkUpdates <- rMsg:
4✔
2827
                select {
4✔
2828
                case err := <-rMsg.err:
4✔
2829
                        return err
4✔
2830
                case <-r.quit:
4✔
2831
                        return ErrRouterShuttingDown
4✔
2832
                }
4✔
2833
        case <-r.quit:
4✔
2834
                return ErrRouterShuttingDown
×
2835
        }
×
2836
}
2837

×
2838
// AddEdge is used to add edge/channel to the topology of the router, after all
×
2839
// information about channel will be gathered this edge/channel might be used
2840
// in construction of payment path.
2841
//
2842
// NOTE: This method is part of the ChannelGraphSource interface.
2843
func (r *ChannelRouter) AddEdge(edge *models.ChannelEdgeInfo,
2844
        op ...batch.SchedulerOption) error {
2845

2846
        rMsg := &routingMsg{
2847
                msg: edge,
2848
                op:  op,
4✔
2849
                err: make(chan error, 1),
4✔
2850
        }
4✔
2851

4✔
2852
        select {
4✔
2853
        case r.networkUpdates <- rMsg:
4✔
2854
                select {
4✔
2855
                case err := <-rMsg.err:
4✔
2856
                        return err
4✔
2857
                case <-r.quit:
4✔
2858
                        return ErrRouterShuttingDown
4✔
2859
                }
4✔
2860
        case <-r.quit:
4✔
2861
                return ErrRouterShuttingDown
×
2862
        }
×
2863
}
2864

×
2865
// UpdateEdge is used to update edge information, without this message edge
×
2866
// considered as not fully constructed.
2867
//
2868
// NOTE: This method is part of the ChannelGraphSource interface.
2869
func (r *ChannelRouter) UpdateEdge(update *models.ChannelEdgePolicy,
2870
        op ...batch.SchedulerOption) error {
2871

2872
        rMsg := &routingMsg{
2873
                msg: update,
2874
                op:  op,
4✔
2875
                err: make(chan error, 1),
4✔
2876
        }
4✔
2877

4✔
2878
        select {
4✔
2879
        case r.networkUpdates <- rMsg:
4✔
2880
                select {
4✔
2881
                case err := <-rMsg.err:
4✔
2882
                        return err
4✔
2883
                case <-r.quit:
4✔
2884
                        return ErrRouterShuttingDown
4✔
2885
                }
4✔
2886
        case <-r.quit:
4✔
2887
                return ErrRouterShuttingDown
×
2888
        }
×
2889
}
2890

×
2891
// CurrentBlockHeight returns the block height from POV of the router subsystem.
×
2892
//
2893
// NOTE: This method is part of the ChannelGraphSource interface.
2894
func (r *ChannelRouter) CurrentBlockHeight() (uint32, error) {
2895
        _, height, err := r.cfg.Chain.GetBestBlock()
2896
        return uint32(height), err
2897
}
2898

4✔
2899
// SyncedHeight returns the block height to which the router subsystem currently
4✔
2900
// is synced to. This can differ from the above chain height if the goroutine
4✔
2901
// responsible for processing the blocks isn't yet up to speed.
4✔
2902
func (r *ChannelRouter) SyncedHeight() uint32 {
2903
        return atomic.LoadUint32(&r.bestHeight)
2904
}
2905

2906
// GetChannelByID return the channel by the channel id.
4✔
2907
//
4✔
2908
// NOTE: This method is part of the ChannelGraphSource interface.
4✔
2909
func (r *ChannelRouter) GetChannelByID(chanID lnwire.ShortChannelID) (
2910
        *models.ChannelEdgeInfo,
2911
        *models.ChannelEdgePolicy,
2912
        *models.ChannelEdgePolicy, error) {
2913

2914
        return r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
2915
}
2916

4✔
2917
// FetchLightningNode attempts to look up a target node by its identity public
4✔
2918
// key. channeldb.ErrGraphNodeNotFound is returned if the node doesn't exist
4✔
2919
// within the graph.
4✔
2920
//
2921
// NOTE: This method is part of the ChannelGraphSource interface.
2922
func (r *ChannelRouter) FetchLightningNode(
2923
        node route.Vertex) (*channeldb.LightningNode, error) {
2924

2925
        return r.cfg.Graph.FetchLightningNode(nil, node)
2926
}
2927

4✔
2928
// ForEachNode is used to iterate over every node in router topology.
4✔
2929
//
4✔
2930
// NOTE: This method is part of the ChannelGraphSource interface.
4✔
2931
func (r *ChannelRouter) ForEachNode(
2932
        cb func(*channeldb.LightningNode) error) error {
2933

2934
        return r.cfg.Graph.ForEachNode(
2935
                func(_ kvdb.RTx, n *channeldb.LightningNode) error {
2936
                        return cb(n)
×
2937
                })
×
2938
}
×
2939

×
2940
// ForAllOutgoingChannels is used to iterate over all outgoing channels owned by
×
2941
// the router.
×
2942
//
2943
// NOTE: This method is part of the ChannelGraphSource interface.
2944
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx,
2945
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error {
2946

2947
        return r.cfg.Graph.ForEachNodeChannel(nil, r.selfNode.PubKeyBytes,
2948
                func(tx kvdb.RTx, c *models.ChannelEdgeInfo,
2949
                        e *models.ChannelEdgePolicy,
4✔
2950
                        _ *models.ChannelEdgePolicy) error {
4✔
2951

4✔
2952
                        if e == nil {
4✔
2953
                                return fmt.Errorf("channel from self node " +
4✔
2954
                                        "has no policy")
8✔
2955
                        }
4✔
2956

4✔
2957
                        return cb(tx, c, e)
×
2958
                },
×
2959
        )
×
2960
}
2961

4✔
2962
// AddProof updates the channel edge info with proof which is needed to
2963
// properly announce the edge to the rest of the network.
2964
//
2965
// NOTE: This method is part of the ChannelGraphSource interface.
2966
func (r *ChannelRouter) AddProof(chanID lnwire.ShortChannelID,
2967
        proof *models.ChannelAuthProof) error {
2968

2969
        info, _, _, err := r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
2970
        if err != nil {
2971
                return err
4✔
2972
        }
4✔
2973

4✔
2974
        info.AuthProof = proof
4✔
2975
        return r.cfg.Graph.UpdateChannelEdge(info)
×
2976
}
×
2977

2978
// IsStaleNode returns true if the graph source has a node announcement for the
4✔
2979
// target node with a more recent timestamp.
4✔
2980
//
2981
// NOTE: This method is part of the ChannelGraphSource interface.
2982
func (r *ChannelRouter) IsStaleNode(node route.Vertex,
2983
        timestamp time.Time) bool {
2984

2985
        // If our attempt to assert that the node announcement is fresh fails,
2986
        // then we know that this is actually a stale announcement.
2987
        err := r.assertNodeAnnFreshness(node, timestamp)
4✔
2988
        if err != nil {
4✔
2989
                log.Debugf("Checking stale node %x got %v", node, err)
4✔
2990
                return true
4✔
2991
        }
4✔
2992

8✔
2993
        return false
4✔
2994
}
4✔
2995

4✔
2996
// IsPublicNode determines whether the given vertex is seen as a public node in
2997
// the graph from the graph's source node's point of view.
4✔
2998
//
2999
// NOTE: This method is part of the ChannelGraphSource interface.
3000
func (r *ChannelRouter) IsPublicNode(node route.Vertex) (bool, error) {
3001
        return r.cfg.Graph.IsPublicNode(node)
3002
}
3003

3004
// IsKnownEdge returns true if the graph source already knows of the passed
4✔
3005
// channel ID either as a live or zombie edge.
4✔
3006
//
4✔
3007
// NOTE: This method is part of the ChannelGraphSource interface.
3008
func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
3009
        _, _, exists, isZombie, _ := r.cfg.Graph.HasChannelEdge(
3010
                chanID.ToUint64(),
3011
        )
3012
        return exists || isZombie
4✔
3013
}
4✔
3014

4✔
3015
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
4✔
3016
// the passed channel ID (and flags) that have a more recent timestamp.
4✔
3017
//
4✔
3018
// NOTE: This method is part of the ChannelGraphSource interface.
3019
func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
3020
        timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
3021

3022
        edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
3023
                r.cfg.Graph.HasChannelEdge(chanID.ToUint64())
3024
        if err != nil {
4✔
3025
                log.Debugf("Check stale edge policy got error: %v", err)
4✔
3026
                return false
4✔
3027

4✔
3028
        }
4✔
3029

×
3030
        // If we know of the edge as a zombie, then we'll make some additional
×
3031
        // checks to determine if the new policy is fresh.
×
3032
        if isZombie {
×
3033
                // When running with AssumeChannelValid, we also prune channels
3034
                // if both of their edges are disabled. We'll mark the new
3035
                // policy as stale if it remains disabled.
3036
                if r.cfg.AssumeChannelValid {
8✔
3037
                        isDisabled := flags&lnwire.ChanUpdateDisabled ==
4✔
3038
                                lnwire.ChanUpdateDisabled
4✔
3039
                        if isDisabled {
4✔
3040
                                return true
4✔
3041
                        }
×
3042
                }
×
3043

×
3044
                // Otherwise, we'll fall back to our usual ChannelPruneExpiry.
×
3045
                return time.Since(timestamp) > r.cfg.ChannelPruneExpiry
×
3046
        }
3047

3048
        // If we don't know of the edge, then it means it's fresh (thus not
3049
        // stale).
4✔
3050
        if !exists {
3051
                return false
3052
        }
3053

3054
        // As edges are directional edge node has a unique policy for the
8✔
3055
        // direction of the edge they control. Therefore, we first check if we
4✔
3056
        // already have the most up-to-date information for that edge. If so,
4✔
3057
        // then we can exit early.
3058
        switch {
3059
        // A flag set of 0 indicates this is an announcement for the "first"
3060
        // node in the channel.
3061
        case flags&lnwire.ChanUpdateDirection == 0:
3062
                return !edge1Timestamp.Before(timestamp)
4✔
3063

3064
        // Similarly, a flag set of 1 indicates this is an announcement for the
3065
        // "second" node in the channel.
4✔
3066
        case flags&lnwire.ChanUpdateDirection == 1:
4✔
3067
                return !edge2Timestamp.Before(timestamp)
3068
        }
3069

3070
        return false
4✔
3071
}
4✔
3072

3073
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3074
//
×
3075
// NOTE: This method is part of the ChannelGraphSource interface.
3076
func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
3077
        return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
3078
}
3079

3080
// ErrNoChannel is returned when a route cannot be built because there are no
4✔
3081
// channels that satisfy all requirements.
4✔
3082
type ErrNoChannel struct {
4✔
3083
        position int
3084
        fromNode route.Vertex
3085
}
3086

3087
// Error returns a human-readable string describing the error.
3088
func (e ErrNoChannel) Error() string {
3089
        return fmt.Sprintf("no matching outgoing channel available for "+
3090
                "node %v (%v)", e.position, e.fromNode)
3091
}
3092

×
3093
// BuildRoute returns a fully specified route based on a list of pubkeys. If
×
3094
// amount is nil, the minimum routable amount is used. To force a specific
×
3095
// outgoing channel, use the outgoingChan parameter.
×
3096
func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi,
3097
        hops []route.Vertex, outgoingChan *uint64,
3098
        finalCltvDelta int32, payAddr *[32]byte) (*route.Route, error) {
3099

3100
        log.Tracef("BuildRoute called: hopsCount=%v, amt=%v",
3101
                len(hops), amt)
3102

4✔
3103
        var outgoingChans map[uint64]struct{}
4✔
3104
        if outgoingChan != nil {
4✔
3105
                outgoingChans = map[uint64]struct{}{
4✔
3106
                        *outgoingChan: {},
4✔
3107
                }
4✔
3108
        }
4✔
3109

×
3110
        // If no amount is specified, we need to build a route for the minimum
×
3111
        // amount that this route can carry.
×
3112
        useMinAmt := amt == nil
×
3113

3114
        var runningAmt lnwire.MilliSatoshi
3115
        if useMinAmt {
3116
                // For minimum amount routes, aim to deliver at least 1 msat to
4✔
3117
                // the destination. There are nodes in the wild that have a
4✔
3118
                // min_htlc channel policy of zero, which could lead to a zero
4✔
3119
                // amount payment being made.
4✔
3120
                runningAmt = 1
×
3121
        } else {
×
3122
                // If an amount is specified, we need to build a route that
×
3123
                // delivers exactly this amount to the final destination.
×
3124
                runningAmt = *amt
×
3125
        }
4✔
3126

4✔
3127
        // We'll attempt to obtain a set of bandwidth hints that helps us select
4✔
3128
        // the best outgoing channel to use in case no outgoing channel is set.
4✔
3129
        bandwidthHints, err := newBandwidthManager(
4✔
3130
                r.cachedGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
3131
        )
3132
        if err != nil {
3133
                return nil, err
4✔
3134
        }
4✔
3135

4✔
3136
        // Fetch the current block height outside the routing transaction, to
4✔
3137
        // prevent the rpc call blocking the database.
×
3138
        _, height, err := r.cfg.Chain.GetBestBlock()
×
3139
        if err != nil {
3140
                return nil, err
3141
        }
3142

4✔
3143
        sourceNode := r.selfNode.PubKeyBytes
4✔
3144
        unifiers, senderAmt, err := getRouteUnifiers(
×
3145
                sourceNode, hops, useMinAmt, runningAmt, outgoingChans,
×
3146
                r.cachedGraph, bandwidthHints,
3147
        )
4✔
3148
        if err != nil {
4✔
3149
                return nil, err
4✔
3150
        }
4✔
3151

4✔
3152
        pathEdges, receiverAmt, err := getPathEdges(
4✔
3153
                sourceNode, senderAmt, unifiers, bandwidthHints, hops,
×
3154
        )
×
3155
        if err != nil {
3156
                return nil, err
4✔
3157
        }
4✔
3158

4✔
3159
        // Build and return the final route.
4✔
3160
        return newRoute(
×
3161
                sourceNode, pathEdges, uint32(height),
×
3162
                finalHopParams{
3163
                        amt:         receiverAmt,
3164
                        totalAmt:    receiverAmt,
4✔
3165
                        cltvDelta:   uint16(finalCltvDelta),
4✔
3166
                        records:     nil,
4✔
3167
                        paymentAddr: payAddr,
4✔
3168
                }, nil,
4✔
3169
        )
4✔
3170
}
4✔
3171

4✔
3172
// getRouteUnifiers returns a list of edge unifiers for the given route.
4✔
3173
func getRouteUnifiers(source route.Vertex, hops []route.Vertex,
4✔
3174
        useMinAmt bool, runningAmt lnwire.MilliSatoshi,
3175
        outgoingChans map[uint64]struct{}, graph routingGraph,
3176
        bandwidthHints *bandwidthManager) ([]*edgeUnifier, lnwire.MilliSatoshi,
3177
        error) {
3178

3179
        // Allocate a list that will contain the edge unifiers for this route.
3180
        unifiers := make([]*edgeUnifier, len(hops))
3181

4✔
3182
        // Traverse hops backwards to accumulate fees in the running amounts.
4✔
3183
        for i := len(hops) - 1; i >= 0; i-- {
4✔
3184
                toNode := hops[i]
4✔
3185

4✔
3186
                var fromNode route.Vertex
4✔
3187
                if i == 0 {
8✔
3188
                        fromNode = source
4✔
3189
                } else {
4✔
3190
                        fromNode = hops[i-1]
4✔
3191
                }
8✔
3192

4✔
3193
                localChan := i == 0
8✔
3194

4✔
3195
                // Build unified policies for this hop based on the channels
4✔
3196
                // known in the graph. Don't use inbound fees.
3197
                //
4✔
3198
                // TODO: Add inbound fees support for BuildRoute.
4✔
3199
                u := newNodeEdgeUnifier(
4✔
3200
                        source, toNode, false, outgoingChans,
4✔
3201
                )
4✔
3202

4✔
3203
                err := u.addGraphPolicies(graph)
4✔
3204
                if err != nil {
4✔
3205
                        return nil, 0, err
4✔
3206
                }
4✔
3207

4✔
3208
                // Exit if there are no channels.
4✔
3209
                edgeUnifier, ok := u.edgeUnifiers[fromNode]
×
3210
                if !ok {
×
3211
                        log.Errorf("Cannot find policy for node %v", fromNode)
3212
                        return nil, 0, ErrNoChannel{
3213
                                fromNode: fromNode,
4✔
3214
                                position: i,
4✔
3215
                        }
×
3216
                }
×
3217

×
3218
                // If using min amt, increase amt if needed.
×
3219
                if useMinAmt {
×
3220
                        min := edgeUnifier.minAmt()
×
3221
                        if min > runningAmt {
3222
                                runningAmt = min
3223
                        }
4✔
3224
                }
×
3225

×
3226
                // Get an edge for the specific amount that we want to forward.
×
3227
                edge := edgeUnifier.getEdge(runningAmt, bandwidthHints, 0)
×
3228
                if edge == nil {
3229
                        log.Errorf("Cannot find policy with amt=%v for node %v",
3230
                                runningAmt, fromNode)
3231

4✔
3232
                        return nil, 0, ErrNoChannel{
4✔
3233
                                fromNode: fromNode,
×
3234
                                position: i,
×
3235
                        }
×
3236
                }
×
3237

×
3238
                // Add fee for this hop.
×
3239
                if !localChan {
×
3240
                        runningAmt += edge.policy.ComputeFee(runningAmt)
×
3241
                }
3242

3243
                log.Tracef("Select channel %v at position %v",
8✔
3244
                        edge.policy.ChannelID, i)
4✔
3245

4✔
3246
                unifiers[i] = edgeUnifier
3247
        }
4✔
3248

4✔
3249
        return unifiers, runningAmt, nil
4✔
3250
}
4✔
3251

3252
// getPathEdges returns the edges that make up the path and the total amount,
3253
// including fees, to send the payment.
4✔
3254
func getPathEdges(source route.Vertex, receiverAmt lnwire.MilliSatoshi,
3255
        unifiers []*edgeUnifier, bandwidthHints *bandwidthManager,
3256
        hops []route.Vertex) ([]*unifiedEdge,
3257
        lnwire.MilliSatoshi, error) {
3258

3259
        // Now that we arrived at the start of the route and found out the route
3260
        // total amount, we make a forward pass. Because the amount may have
3261
        // been increased in the backward pass, fees need to be recalculated and
4✔
3262
        // amount ranges re-checked.
4✔
3263
        var pathEdges []*unifiedEdge
4✔
3264
        for i, unifier := range unifiers {
4✔
3265
                edge := unifier.getEdge(receiverAmt, bandwidthHints, 0)
4✔
3266
                if edge == nil {
4✔
3267
                        fromNode := source
4✔
3268
                        if i > 0 {
8✔
3269
                                fromNode = hops[i-1]
4✔
3270
                        }
4✔
3271

×
3272
                        return nil, 0, ErrNoChannel{
×
3273
                                fromNode: fromNode,
×
3274
                                position: i,
×
3275
                        }
3276
                }
×
3277

×
3278
                if i > 0 {
×
3279
                        // Decrease the amount to send while going forward.
×
3280
                        receiverAmt -= edge.policy.ComputeFeeFromIncoming(
3281
                                receiverAmt,
3282
                        )
8✔
3283
                }
4✔
3284

4✔
3285
                pathEdges = append(pathEdges, edge)
4✔
3286
        }
4✔
3287

4✔
3288
        return pathEdges, receiverAmt, nil
3289
}
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc