• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.75
/routing/router.go
1
package routing
2

3
import (
4
        "bytes"
5
        "context"
6
        "fmt"
7
        "math"
8
        "runtime"
9
        "strings"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/davecgh/go-spew/spew"
18
        "github.com/go-errors/errors"
19
        sphinx "github.com/lightningnetwork/lightning-onion"
20
        "github.com/lightningnetwork/lnd/amp"
21
        "github.com/lightningnetwork/lnd/batch"
22
        "github.com/lightningnetwork/lnd/chainntnfs"
23
        "github.com/lightningnetwork/lnd/channeldb"
24
        "github.com/lightningnetwork/lnd/channeldb/models"
25
        "github.com/lightningnetwork/lnd/clock"
26
        "github.com/lightningnetwork/lnd/fn"
27
        "github.com/lightningnetwork/lnd/htlcswitch"
28
        "github.com/lightningnetwork/lnd/input"
29
        "github.com/lightningnetwork/lnd/kvdb"
30
        "github.com/lightningnetwork/lnd/lntypes"
31
        "github.com/lightningnetwork/lnd/lnutils"
32
        "github.com/lightningnetwork/lnd/lnwallet"
33
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
34
        "github.com/lightningnetwork/lnd/lnwallet/chanvalidate"
35
        "github.com/lightningnetwork/lnd/lnwire"
36
        "github.com/lightningnetwork/lnd/multimutex"
37
        "github.com/lightningnetwork/lnd/record"
38
        "github.com/lightningnetwork/lnd/routing/chainview"
39
        "github.com/lightningnetwork/lnd/routing/route"
40
        "github.com/lightningnetwork/lnd/routing/shards"
41
        "github.com/lightningnetwork/lnd/ticker"
42
        "github.com/lightningnetwork/lnd/zpay32"
43
)
44

45
const (
46
        // DefaultPayAttemptTimeout is the default payment attempt timeout. The
47
        // payment attempt timeout defines the duration after which we stop
48
        // trying more routes for a payment.
49
        DefaultPayAttemptTimeout = time.Second * 60
50

51
        // DefaultChannelPruneExpiry is the default duration used to determine
52
        // if a channel should be pruned or not.
53
        DefaultChannelPruneExpiry = time.Hour * 24 * 14
54

55
        // DefaultFirstTimePruneDelay is the time we'll wait after startup
56
        // before attempting to prune the graph for zombie channels. We don't
57
        // do it immediately after startup to allow lnd to start up without
58
        // getting blocked by this job.
59
        DefaultFirstTimePruneDelay = 30 * time.Second
60

61
        // defaultStatInterval governs how often the router will log non-empty
62
        // stats related to processing new channels, updates, or node
63
        // announcements.
64
        defaultStatInterval = time.Minute
65

66
        // MinCLTVDelta is the minimum CLTV value accepted by LND for all
67
        // timelock deltas. This includes both forwarding CLTV deltas set on
68
        // channel updates, as well as final CLTV deltas used to create BOLT 11
69
        // payment requests.
70
        //
71
        // NOTE: For payment requests, BOLT 11 stipulates that a final CLTV
72
        // delta of 9 should be used when no value is decoded. This however
73
        // leads to inflexibility in upgrading this default parameter, since it
74
        // can create inconsistencies around the assumed value between sender
75
        // and receiver. Specifically, if the receiver assumes a higher value
76
        // than the sender, the receiver will always see the received HTLCs as
77
        // invalid due to their timelock not meeting the required delta.
78
        //
79
        // We skirt this by always setting an explicit CLTV delta when creating
80
        // invoices. This allows LND nodes to freely update the minimum without
81
        // creating incompatibilities during the upgrade process. For some time
82
        // LND has used an explicit default final CLTV delta of 40 blocks for
83
        // bitcoin, though we now clamp the lower end of this
84
        // range for user-chosen deltas to 18 blocks to be conservative.
85
        MinCLTVDelta = 18
86

87
        // MaxCLTVDelta is the maximum CLTV value accepted by LND for all
88
        // timelock deltas.
89
        MaxCLTVDelta = math.MaxUint16
90
)
91

92
var (
93
        // ErrRouterShuttingDown is returned if the router is in the process of
94
        // shutting down.
95
        ErrRouterShuttingDown = fmt.Errorf("router shutting down")
96

97
        // ErrSelfIntro is a failure returned when the source node of a
98
        // route request is also the introduction node. This is not yet
99
        // supported because LND does not support blinded forwardingg.
100
        ErrSelfIntro = errors.New("introduction point as own node not " +
101
                "supported")
102

103
        // ErrHintsAndBlinded is returned if a route request has both
104
        // bolt 11 route hints and a blinded path set.
105
        ErrHintsAndBlinded = errors.New("bolt 11 route hints and blinded " +
106
                "paths are mutually exclusive")
107

108
        // ErrExpiryAndBlinded is returned if a final cltv and a blinded path
109
        // are provided, as the cltv should be provided within the blinded
110
        // path.
111
        ErrExpiryAndBlinded = errors.New("final cltv delta and blinded " +
112
                "paths are mutually exclusive")
113

114
        // ErrTargetAndBlinded is returned is a target destination and a
115
        // blinded path are both set (as the target is inferred from the
116
        // blinded path).
117
        ErrTargetAndBlinded = errors.New("target node and blinded paths " +
118
                "are mutually exclusive")
119

120
        // ErrNoTarget is returned when the target node for a route is not
121
        // provided by either a blinded route or a cleartext pubkey.
122
        ErrNoTarget = errors.New("destination not set in target or blinded " +
123
                "path")
124

125
        // ErrSkipTempErr is returned when a non-MPP is made yet the
126
        // skipTempErr flag is set.
127
        ErrSkipTempErr = errors.New("cannot skip temp error for non-MPP")
128
)
129

130
// ChannelGraphSource represents the source of information about the topology
131
// of the lightning network. It's responsible for the addition of nodes, edges,
132
// applying edge updates, and returning the current block height with which the
133
// topology is synchronized.
134
type ChannelGraphSource interface {
135
        // AddNode is used to add information about a node to the router
136
        // database. If the node with this pubkey is not present in an existing
137
        // channel, it will be ignored.
138
        AddNode(node *channeldb.LightningNode,
139
                op ...batch.SchedulerOption) error
140

141
        // AddEdge is used to add edge/channel to the topology of the router,
142
        // after all information about channel will be gathered this
143
        // edge/channel might be used in construction of payment path.
144
        AddEdge(edge *models.ChannelEdgeInfo,
145
                op ...batch.SchedulerOption) error
146

147
        // AddProof updates the channel edge info with proof which is needed to
148
        // properly announce the edge to the rest of the network.
149
        AddProof(chanID lnwire.ShortChannelID,
150
                proof *models.ChannelAuthProof) error
151

152
        // UpdateEdge is used to update edge information, without this message
153
        // edge considered as not fully constructed.
154
        UpdateEdge(policy *models.ChannelEdgePolicy,
155
                op ...batch.SchedulerOption) error
156

157
        // IsStaleNode returns true if the graph source has a node announcement
158
        // for the target node with a more recent timestamp. This method will
159
        // also return true if we don't have an active channel announcement for
160
        // the target node.
161
        IsStaleNode(node route.Vertex, timestamp time.Time) bool
162

163
        // IsPublicNode determines whether the given vertex is seen as a public
164
        // node in the graph from the graph's source node's point of view.
165
        IsPublicNode(node route.Vertex) (bool, error)
166

167
        // IsKnownEdge returns true if the graph source already knows of the
168
        // passed channel ID either as a live or zombie edge.
169
        IsKnownEdge(chanID lnwire.ShortChannelID) bool
170

171
        // IsStaleEdgePolicy returns true if the graph source has a channel
172
        // edge for the passed channel ID (and flags) that have a more recent
173
        // timestamp.
174
        IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time,
175
                flags lnwire.ChanUpdateChanFlags) bool
176

177
        // MarkEdgeLive clears an edge from our zombie index, deeming it as
178
        // live.
179
        MarkEdgeLive(chanID lnwire.ShortChannelID) error
180

181
        // ForAllOutgoingChannels is used to iterate over all channels
182
        // emanating from the "source" node which is the center of the
183
        // star-graph.
184
        ForAllOutgoingChannels(cb func(tx kvdb.RTx,
185
                c *models.ChannelEdgeInfo,
186
                e *models.ChannelEdgePolicy) error) error
187

188
        // CurrentBlockHeight returns the block height from POV of the router
189
        // subsystem.
190
        CurrentBlockHeight() (uint32, error)
191

192
        // GetChannelByID return the channel by the channel id.
193
        GetChannelByID(chanID lnwire.ShortChannelID) (
194
                *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
195
                *models.ChannelEdgePolicy, error)
196

197
        // FetchLightningNode attempts to look up a target node by its identity
198
        // public key. channeldb.ErrGraphNodeNotFound is returned if the node
199
        // doesn't exist within the graph.
200
        FetchLightningNode(route.Vertex) (*channeldb.LightningNode, error)
201

202
        // ForEachNode is used to iterate over every node in the known graph.
203
        ForEachNode(func(node *channeldb.LightningNode) error) error
204
}
205

206
// PaymentAttemptDispatcher is used by the router to send payment attempts onto
207
// the network, and receive their results.
208
type PaymentAttemptDispatcher interface {
209
        // SendHTLC is a function that directs a link-layer switch to
210
        // forward a fully encoded payment to the first hop in the route
211
        // denoted by its public key. A non-nil error is to be returned if the
212
        // payment was unsuccessful.
213
        SendHTLC(firstHop lnwire.ShortChannelID,
214
                attemptID uint64,
215
                htlcAdd *lnwire.UpdateAddHTLC) error
216

217
        // GetAttemptResult returns the result of the payment attempt with
218
        // the given attemptID. The paymentHash should be set to the payment's
219
        // overall hash, or in case of AMP payments the payment's unique
220
        // identifier.
221
        //
222
        // The method returns a channel where the payment result will be sent
223
        // when available, or an error is encountered during forwarding. When a
224
        // result is received on the channel, the HTLC is guaranteed to no
225
        // longer be in flight.  The switch shutting down is signaled by
226
        // closing the channel. If the attemptID is unknown,
227
        // ErrPaymentIDNotFound will be returned.
228
        GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
229
                deobfuscator htlcswitch.ErrorDecrypter) (
230
                <-chan *htlcswitch.PaymentResult, error)
231

232
        // CleanStore calls the underlying result store, telling it is safe to
233
        // delete all entries except the ones in the keepPids map. This should
234
        // be called periodically to let the switch clean up payment results
235
        // that we have handled.
236
        // NOTE: New payment attempts MUST NOT be made after the keepPids map
237
        // has been created and this method has returned.
238
        CleanStore(keepPids map[uint64]struct{}) error
239
}
240

241
// PaymentSessionSource is an interface that defines a source for the router to
242
// retrieve new payment sessions.
243
type PaymentSessionSource interface {
244
        // NewPaymentSession creates a new payment session that will produce
245
        // routes to the given target. An optional set of routing hints can be
246
        // provided in order to populate additional edges to explore when
247
        // finding a path to the payment's destination.
248
        NewPaymentSession(p *LightningPayment) (PaymentSession, error)
249

250
        // NewPaymentSessionEmpty creates a new paymentSession instance that is
251
        // empty, and will be exhausted immediately. Used for failure reporting
252
        // to missioncontrol for resumed payment we don't want to make more
253
        // attempts for.
254
        NewPaymentSessionEmpty() PaymentSession
255
}
256

257
// MissionController is an interface that exposes failure reporting and
258
// probability estimation.
259
type MissionController interface {
260
        // ReportPaymentFail reports a failed payment to mission control as
261
        // input for future probability estimates. It returns a bool indicating
262
        // whether this error is a final error and no further payment attempts
263
        // need to be made.
264
        ReportPaymentFail(attemptID uint64, rt *route.Route,
265
                failureSourceIdx *int, failure lnwire.FailureMessage) (
266
                *channeldb.FailureReason, error)
267

268
        // ReportPaymentSuccess reports a successful payment to mission control
269
        // as input for future probability estimates.
270
        ReportPaymentSuccess(attemptID uint64, rt *route.Route) error
271

272
        // GetProbability is expected to return the success probability of a
273
        // payment from fromNode along edge.
274
        GetProbability(fromNode, toNode route.Vertex,
275
                amt lnwire.MilliSatoshi, capacity btcutil.Amount) float64
276
}
277

278
// FeeSchema is the set fee configuration for a Lightning Node on the network.
279
// Using the coefficients described within the schema, the required fee to
280
// forward outgoing payments can be derived.
281
type FeeSchema struct {
282
        // BaseFee is the base amount of milli-satoshis that will be chained
283
        // for ANY payment forwarded.
284
        BaseFee lnwire.MilliSatoshi
285

286
        // FeeRate is the rate that will be charged for forwarding payments.
287
        // This value should be interpreted as the numerator for a fraction
288
        // (fixed point arithmetic) whose denominator is 1 million. As a result
289
        // the effective fee rate charged per mSAT will be: (amount *
290
        // FeeRate/1,000,000).
291
        FeeRate uint32
292

293
        // InboundFee is the inbound fee schedule that applies to forwards
294
        // coming in through a channel to which this FeeSchema pertains.
295
        InboundFee fn.Option[models.InboundFee]
296
}
297

298
// ChannelPolicy holds the parameters that determine the policy we enforce
299
// when forwarding payments on a channel. These parameters are communicated
300
// to the rest of the network in ChannelUpdate messages.
301
type ChannelPolicy struct {
302
        // FeeSchema holds the fee configuration for a channel.
303
        FeeSchema
304

305
        // TimeLockDelta is the required HTLC timelock delta to be used
306
        // when forwarding payments.
307
        TimeLockDelta uint32
308

309
        // MaxHTLC is the maximum HTLC size including fees we are allowed to
310
        // forward over this channel.
311
        MaxHTLC lnwire.MilliSatoshi
312

313
        // MinHTLC is the minimum HTLC size including fees we are allowed to
314
        // forward over this channel.
315
        MinHTLC *lnwire.MilliSatoshi
316
}
317

318
// Config defines the configuration for the ChannelRouter. ALL elements within
319
// the configuration MUST be non-nil for the ChannelRouter to carry out its
320
// duties.
321
type Config struct {
322
        // Graph is the channel graph that the ChannelRouter will use to gather
323
        // metrics from and also to carry out path finding queries.
324
        // TODO(roasbeef): make into an interface
325
        Graph *channeldb.ChannelGraph
326

327
        // Chain is the router's source to the most up-to-date blockchain data.
328
        // All incoming advertised channels will be checked against the chain
329
        // to ensure that the channels advertised are still open.
330
        Chain lnwallet.BlockChainIO
331

332
        // ChainView is an instance of a FilteredChainView which is used to
333
        // watch the sub-set of the UTXO set (the set of active channels) that
334
        // we need in order to properly maintain the channel graph.
335
        ChainView chainview.FilteredChainView
336

337
        // Notifier is a reference to the ChainNotifier, used to grab
338
        // the latest blocks if the router is missing any.
339
        Notifier chainntnfs.ChainNotifier
340

341
        // Payer is an instance of a PaymentAttemptDispatcher and is used by
342
        // the router to send payment attempts onto the network, and receive
343
        // their results.
344
        Payer PaymentAttemptDispatcher
345

346
        // Control keeps track of the status of ongoing payments, ensuring we
347
        // can properly resume them across restarts.
348
        Control ControlTower
349

350
        // MissionControl is a shared memory of sorts that executions of
351
        // payment path finding use in order to remember which vertexes/edges
352
        // were pruned from prior attempts. During SendPayment execution,
353
        // errors sent by nodes are mapped into a vertex or edge to be pruned.
354
        // Each run will then take into account this set of pruned
355
        // vertexes/edges to reduce route failure and pass on graph information
356
        // gained to the next execution.
357
        MissionControl MissionController
358

359
        // SessionSource defines a source for the router to retrieve new payment
360
        // sessions.
361
        SessionSource PaymentSessionSource
362

363
        // ChannelPruneExpiry is the duration used to determine if a channel
364
        // should be pruned or not. If the delta between now and when the
365
        // channel was last updated is greater than ChannelPruneExpiry, then
366
        // the channel is marked as a zombie channel eligible for pruning.
367
        ChannelPruneExpiry time.Duration
368

369
        // GraphPruneInterval is used as an interval to determine how often we
370
        // should examine the channel graph to garbage collect zombie channels.
371
        GraphPruneInterval time.Duration
372

373
        // FirstTimePruneDelay is the time we'll wait after startup before
374
        // attempting to prune the graph for zombie channels. We don't do it
375
        // immediately after startup to allow lnd to start up without getting
376
        // blocked by this job.
377
        FirstTimePruneDelay time.Duration
378

379
        // QueryBandwidth is a method that allows the router to query the lower
380
        // link layer to determine the up-to-date available bandwidth at a
381
        // prospective link to be traversed. If the  link isn't available, then
382
        // a value of zero should be returned. Otherwise, the current up-to-
383
        // date knowledge of the available bandwidth of the link should be
384
        // returned.
385
        GetLink getLinkQuery
386

387
        // NextPaymentID is a method that guarantees to return a new, unique ID
388
        // each time it is called. This is used by the router to generate a
389
        // unique payment ID for each payment it attempts to send, such that
390
        // the switch can properly handle the HTLC.
391
        NextPaymentID func() (uint64, error)
392

393
        // AssumeChannelValid toggles whether the router will check for
394
        // spentness of channel outpoints. For neutrino, this saves long rescans
395
        // from blocking initial usage of the daemon.
396
        AssumeChannelValid bool
397

398
        // PathFindingConfig defines global path finding parameters.
399
        PathFindingConfig PathFindingConfig
400

401
        // Clock is mockable time provider.
402
        Clock clock.Clock
403

404
        // StrictZombiePruning determines if we attempt to prune zombie
405
        // channels according to a stricter criteria. If true, then we'll prune
406
        // a channel if only *one* of the edges is considered a zombie.
407
        // Otherwise, we'll only prune the channel when both edges have a very
408
        // dated last update.
409
        StrictZombiePruning bool
410

411
        // IsAlias returns whether a passed ShortChannelID is an alias. This is
412
        // only used for our local channels.
413
        IsAlias func(scid lnwire.ShortChannelID) bool
414
}
415

416
// EdgeLocator is a struct used to identify a specific edge.
417
type EdgeLocator struct {
418
        // ChannelID is the channel of this edge.
419
        ChannelID uint64
420

421
        // Direction takes the value of 0 or 1 and is identical in definition to
422
        // the channel direction flag. A value of 0 means the direction from the
423
        // lower node pubkey to the higher.
424
        Direction uint8
425
}
426

427
// String returns a human-readable version of the edgeLocator values.
428
func (e *EdgeLocator) String() string {
×
429
        return fmt.Sprintf("%v:%v", e.ChannelID, e.Direction)
×
430
}
×
431

432
// ChannelRouter is the layer 3 router within the Lightning stack. Below the
433
// ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain
434
// itself. The primary role of the ChannelRouter is to respond to queries for
435
// potential routes that can support a payment amount, and also general graph
436
// reachability questions. The router will prune the channel graph
437
// automatically as new blocks are discovered which spend certain known funding
438
// outpoints, thereby closing their respective channels.
439
type ChannelRouter struct {
440
        ntfnClientCounter uint64 // To be used atomically.
441

442
        started uint32 // To be used atomically.
443
        stopped uint32 // To be used atomically.
444

445
        bestHeight uint32 // To be used atomically.
446

447
        // cfg is a copy of the configuration struct that the ChannelRouter was
448
        // initialized with.
449
        cfg *Config
450

451
        // selfNode is the center of the star-graph centered around the
452
        // ChannelRouter. The ChannelRouter uses this node as a starting point
453
        // when doing any path finding.
454
        selfNode *channeldb.LightningNode
455

456
        // cachedGraph is an instance of routingGraph that caches the source
457
        // node as well as the channel graph itself in memory.
458
        cachedGraph routingGraph
459

460
        // newBlocks is a channel in which new blocks connected to the end of
461
        // the main chain are sent over, and blocks updated after a call to
462
        // UpdateFilter.
463
        newBlocks <-chan *chainview.FilteredBlock
464

465
        // staleBlocks is a channel in which blocks disconnected from the end
466
        // of our currently known best chain are sent over.
467
        staleBlocks <-chan *chainview.FilteredBlock
468

469
        // networkUpdates is a channel that carries new topology updates
470
        // messages from outside the ChannelRouter to be processed by the
471
        // networkHandler.
472
        networkUpdates chan *routingMsg
473

474
        // topologyClients maps a client's unique notification ID to a
475
        // topologyClient client that contains its notification dispatch
476
        // channel.
477
        topologyClients *lnutils.SyncMap[uint64, *topologyClient]
478

479
        // ntfnClientUpdates is a channel that's used to send new updates to
480
        // topology notification clients to the ChannelRouter. Updates either
481
        // add a new notification client, or cancel notifications for an
482
        // existing client.
483
        ntfnClientUpdates chan *topologyClientUpdate
484

485
        // channelEdgeMtx is a mutex we use to make sure we process only one
486
        // ChannelEdgePolicy at a time for a given channelID, to ensure
487
        // consistency between the various database accesses.
488
        channelEdgeMtx *multimutex.Mutex[uint64]
489

490
        // statTicker is a resumable ticker that logs the router's progress as
491
        // it discovers channels or receives updates.
492
        statTicker ticker.Ticker
493

494
        // stats tracks newly processed channels, updates, and node
495
        // announcements over a window of defaultStatInterval.
496
        stats *routerStats
497

498
        quit chan struct{}
499
        wg   sync.WaitGroup
500
}
501

502
// A compile time check to ensure ChannelRouter implements the
503
// ChannelGraphSource interface.
504
var _ ChannelGraphSource = (*ChannelRouter)(nil)
505

506
// New creates a new instance of the ChannelRouter with the specified
507
// configuration parameters. As part of initialization, if the router detects
508
// that the channel graph isn't fully in sync with the latest UTXO (since the
509
// channel graph is a subset of the UTXO set) set, then the router will proceed
510
// to fully sync to the latest state of the UTXO set.
511
func New(cfg Config) (*ChannelRouter, error) {
3✔
512
        selfNode, err := cfg.Graph.SourceNode()
3✔
513
        if err != nil {
3✔
514
                return nil, err
×
515
        }
×
516

517
        r := &ChannelRouter{
3✔
518
                cfg: &cfg,
3✔
519
                cachedGraph: &CachedGraph{
3✔
520
                        graph:  cfg.Graph,
3✔
521
                        source: selfNode.PubKeyBytes,
3✔
522
                },
3✔
523
                networkUpdates:    make(chan *routingMsg),
3✔
524
                topologyClients:   &lnutils.SyncMap[uint64, *topologyClient]{},
3✔
525
                ntfnClientUpdates: make(chan *topologyClientUpdate),
3✔
526
                channelEdgeMtx:    multimutex.NewMutex[uint64](),
3✔
527
                selfNode:          selfNode,
3✔
528
                statTicker:        ticker.New(defaultStatInterval),
3✔
529
                stats:             new(routerStats),
3✔
530
                quit:              make(chan struct{}),
3✔
531
        }
3✔
532

3✔
533
        return r, nil
3✔
534
}
535

536
// Start launches all the goroutines the ChannelRouter requires to carry out
537
// its duties. If the router has already been started, then this method is a
538
// noop.
539
func (r *ChannelRouter) Start() error {
3✔
540
        if !atomic.CompareAndSwapUint32(&r.started, 0, 1) {
3✔
541
                return nil
×
542
        }
×
543

544
        log.Info("Channel Router starting")
3✔
545

3✔
546
        bestHash, bestHeight, err := r.cfg.Chain.GetBestBlock()
3✔
547
        if err != nil {
3✔
548
                return err
×
549
        }
×
550

551
        // If the graph has never been pruned, or hasn't fully been created yet,
552
        // then we don't treat this as an explicit error.
553
        if _, _, err := r.cfg.Graph.PruneTip(); err != nil {
6✔
554
                switch {
3✔
555
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
3✔
556
                        fallthrough
3✔
557

558
                case errors.Is(err, channeldb.ErrGraphNotFound):
3✔
559
                        // If the graph has never been pruned, then we'll set
3✔
560
                        // the prune height to the current best height of the
3✔
561
                        // chain backend.
3✔
562
                        _, err = r.cfg.Graph.PruneGraph(
3✔
563
                                nil, bestHash, uint32(bestHeight),
3✔
564
                        )
3✔
565
                        if err != nil {
3✔
566
                                return err
×
567
                        }
×
568

569
                default:
×
570
                        return err
×
571
                }
572
        }
573

574
        // If AssumeChannelValid is present, then we won't rely on pruning
575
        // channels from the graph based on their spentness, but whether they
576
        // are considered zombies or not. We will start zombie pruning after a
577
        // small delay, to avoid slowing down startup of lnd.
578
        if r.cfg.AssumeChannelValid {
3✔
579
                time.AfterFunc(r.cfg.FirstTimePruneDelay, func() {
×
580
                        select {
×
581
                        case <-r.quit:
×
582
                                return
×
583
                        default:
×
584
                        }
585

586
                        log.Info("Initial zombie prune starting")
×
587
                        if err := r.pruneZombieChans(); err != nil {
×
588
                                log.Errorf("Unable to prune zombies: %v", err)
×
589
                        }
×
590
                })
591
        } else {
3✔
592
                // Otherwise, we'll use our filtered chain view to prune
3✔
593
                // channels as soon as they are detected as spent on-chain.
3✔
594
                if err := r.cfg.ChainView.Start(); err != nil {
3✔
595
                        return err
×
596
                }
×
597

598
                // Once the instance is active, we'll fetch the channel we'll
599
                // receive notifications over.
600
                r.newBlocks = r.cfg.ChainView.FilteredBlocks()
3✔
601
                r.staleBlocks = r.cfg.ChainView.DisconnectedBlocks()
3✔
602

3✔
603
                // Before we perform our manual block pruning, we'll construct
3✔
604
                // and apply a fresh chain filter to the active
3✔
605
                // FilteredChainView instance.  We do this before, as otherwise
3✔
606
                // we may miss on-chain events as the filter hasn't properly
3✔
607
                // been applied.
3✔
608
                channelView, err := r.cfg.Graph.ChannelView()
3✔
609
                if err != nil && !errors.Is(
3✔
610
                        err, channeldb.ErrGraphNoEdgesFound,
3✔
611
                ) {
3✔
612

×
613
                        return err
×
614
                }
×
615

616
                log.Infof("Filtering chain using %v channels active",
3✔
617
                        len(channelView))
3✔
618

3✔
619
                if len(channelView) != 0 {
6✔
620
                        err = r.cfg.ChainView.UpdateFilter(
3✔
621
                                channelView, uint32(bestHeight),
3✔
622
                        )
3✔
623
                        if err != nil {
3✔
624
                                return err
×
625
                        }
×
626
                }
627

628
                // The graph pruning might have taken a while and there could be
629
                // new blocks available.
630
                _, bestHeight, err = r.cfg.Chain.GetBestBlock()
3✔
631
                if err != nil {
3✔
632
                        return err
×
633
                }
×
634
                r.bestHeight = uint32(bestHeight)
3✔
635

3✔
636
                // Before we begin normal operation of the router, we first need
3✔
637
                // to synchronize the channel graph to the latest state of the
3✔
638
                // UTXO set.
3✔
639
                if err := r.syncGraphWithChain(); err != nil {
3✔
640
                        return err
×
641
                }
×
642

643
                // Finally, before we proceed, we'll prune any unconnected nodes
644
                // from the graph in order to ensure we maintain a tight graph
645
                // of "useful" nodes.
646
                err = r.cfg.Graph.PruneGraphNodes()
3✔
647
                if err != nil && !errors.Is(
3✔
648
                        err, channeldb.ErrGraphNodesNotFound,
3✔
649
                ) {
3✔
650

×
651
                        return err
×
652
                }
×
653
        }
654

655
        // If any payments are still in flight, we resume, to make sure their
656
        // results are properly handled.
657
        payments, err := r.cfg.Control.FetchInFlightPayments()
3✔
658
        if err != nil {
3✔
659
                return err
×
660
        }
×
661

662
        // Before we restart existing payments and start accepting more
663
        // payments to be made, we clean the network result store of the
664
        // Switch. We do this here at startup to ensure no more payments can be
665
        // made concurrently, so we know the toKeep map will be up-to-date
666
        // until the cleaning has finished.
667
        toKeep := make(map[uint64]struct{})
3✔
668
        for _, p := range payments {
6✔
669
                for _, a := range p.HTLCs {
6✔
670
                        toKeep[a.AttemptID] = struct{}{}
3✔
671
                }
3✔
672
        }
673

674
        log.Debugf("Cleaning network result store.")
3✔
675
        if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
3✔
676
                return err
×
677
        }
×
678

679
        for _, payment := range payments {
6✔
680
                log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier)
3✔
681
                r.wg.Add(1)
3✔
682
                go func(payment *channeldb.MPPayment) {
6✔
683
                        defer r.wg.Done()
3✔
684

3✔
685
                        // Get the hashes used for the outstanding HTLCs.
3✔
686
                        htlcs := make(map[uint64]lntypes.Hash)
3✔
687
                        for _, a := range payment.HTLCs {
6✔
688
                                a := a
3✔
689

3✔
690
                                // We check whether the individual attempts
3✔
691
                                // have their HTLC hash set, if not we'll fall
3✔
692
                                // back to the overall payment hash.
3✔
693
                                hash := payment.Info.PaymentIdentifier
3✔
694
                                if a.Hash != nil {
6✔
695
                                        hash = *a.Hash
3✔
696
                                }
3✔
697

698
                                htlcs[a.AttemptID] = hash
3✔
699
                        }
700

701
                        // Since we are not supporting creating more shards
702
                        // after a restart (only receiving the result of the
703
                        // shards already outstanding), we create a simple
704
                        // shard tracker that will map the attempt IDs to
705
                        // hashes used for the HTLCs. This will be enough also
706
                        // for AMP payments, since we only need the hashes for
707
                        // the individual HTLCs to regenerate the circuits, and
708
                        // we don't currently persist the root share necessary
709
                        // to re-derive them.
710
                        shardTracker := shards.NewSimpleShardTracker(
3✔
711
                                payment.Info.PaymentIdentifier, htlcs,
3✔
712
                        )
3✔
713

3✔
714
                        // We create a dummy, empty payment session such that
3✔
715
                        // we won't make another payment attempt when the
3✔
716
                        // result for the in-flight attempt is received.
3✔
717
                        paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
3✔
718

3✔
719
                        // We pass in a non-timeout context, to indicate we
3✔
720
                        // don't need it to timeout. It will stop immediately
3✔
721
                        // after the existing attempt has finished anyway. We
3✔
722
                        // also set a zero fee limit, as no more routes should
3✔
723
                        // be tried.
3✔
724
                        noTimeout := time.Duration(0)
3✔
725
                        _, _, err := r.sendPayment(
3✔
726
                                context.Background(), 0,
3✔
727
                                payment.Info.PaymentIdentifier, noTimeout,
3✔
728
                                paySession, shardTracker,
3✔
729
                        )
3✔
730
                        if err != nil {
6✔
731
                                log.Errorf("Resuming payment %v failed: %v.",
3✔
732
                                        payment.Info.PaymentIdentifier, err)
3✔
733
                                return
3✔
734
                        }
3✔
735

736
                        log.Infof("Resumed payment %v completed.",
3✔
737
                                payment.Info.PaymentIdentifier)
3✔
738
                }(payment)
739
        }
740

741
        r.wg.Add(1)
3✔
742
        go r.networkHandler()
3✔
743

3✔
744
        return nil
3✔
745
}
746

747
// Stop signals the ChannelRouter to gracefully halt all routines. This method
748
// will *block* until all goroutines have excited. If the channel router has
749
// already stopped then this method will return immediately.
750
func (r *ChannelRouter) Stop() error {
3✔
751
        if !atomic.CompareAndSwapUint32(&r.stopped, 0, 1) {
3✔
752
                return nil
×
753
        }
×
754

755
        log.Info("Channel Router shutting down...")
3✔
756
        defer log.Debug("Channel Router shutdown complete")
3✔
757

3✔
758
        // Our filtered chain view could've only been started if
3✔
759
        // AssumeChannelValid isn't present.
3✔
760
        if !r.cfg.AssumeChannelValid {
6✔
761
                if err := r.cfg.ChainView.Stop(); err != nil {
3✔
762
                        return err
×
763
                }
×
764
        }
765

766
        close(r.quit)
3✔
767
        r.wg.Wait()
3✔
768

3✔
769
        return nil
3✔
770
}
771

772
// syncGraphWithChain attempts to synchronize the current channel graph with
773
// the latest UTXO set state. This process involves pruning from the channel
774
// graph any channels which have been closed by spending their funding output
775
// since we've been down.
776
func (r *ChannelRouter) syncGraphWithChain() error {
3✔
777
        // First, we'll need to check to see if we're already in sync with the
3✔
778
        // latest state of the UTXO set.
3✔
779
        bestHash, bestHeight, err := r.cfg.Chain.GetBestBlock()
3✔
780
        if err != nil {
3✔
781
                return err
×
782
        }
×
783
        r.bestHeight = uint32(bestHeight)
3✔
784

3✔
785
        pruneHash, pruneHeight, err := r.cfg.Graph.PruneTip()
3✔
786
        if err != nil {
3✔
787
                switch {
×
788
                // If the graph has never been pruned, or hasn't fully been
789
                // created yet, then we don't treat this as an explicit error.
790
                case errors.Is(err, channeldb.ErrGraphNeverPruned):
×
791
                case errors.Is(err, channeldb.ErrGraphNotFound):
×
792
                default:
×
793
                        return err
×
794
                }
795
        }
796

797
        log.Infof("Prune tip for Channel Graph: height=%v, hash=%v",
3✔
798
                pruneHeight, pruneHash)
3✔
799

3✔
800
        switch {
3✔
801

802
        // If the graph has never been pruned, then we can exit early as this
803
        // entails it's being created for the first time and hasn't seen any
804
        // block or created channels.
805
        case pruneHeight == 0 || pruneHash == nil:
×
806
                return nil
×
807

808
        // If the block hashes and heights match exactly, then we don't need to
809
        // prune the channel graph as we're already fully in sync.
810
        case bestHash.IsEqual(pruneHash) && uint32(bestHeight) == pruneHeight:
3✔
811
                return nil
3✔
812
        }
813

814
        // If the main chain blockhash at prune height is different from the
815
        // prune hash, this might indicate the database is on a stale branch.
816
        mainBlockHash, err := r.cfg.Chain.GetBlockHash(int64(pruneHeight))
3✔
817
        if err != nil {
3✔
818
                return err
×
819
        }
×
820

821
        // While we are on a stale branch of the chain, walk backwards to find
822
        // first common block.
823
        for !pruneHash.IsEqual(mainBlockHash) {
3✔
824
                log.Infof("channel graph is stale. Disconnecting block %v "+
×
825
                        "(hash=%v)", pruneHeight, pruneHash)
×
826
                // Prune the graph for every channel that was opened at height
×
827
                // >= pruneHeight.
×
828
                _, err := r.cfg.Graph.DisconnectBlockAtHeight(pruneHeight)
×
829
                if err != nil {
×
830
                        return err
×
831
                }
×
832

833
                pruneHash, pruneHeight, err = r.cfg.Graph.PruneTip()
×
834
                if err != nil {
×
835
                        switch {
×
836
                        // If at this point the graph has never been pruned, we
837
                        // can exit as this entails we are back to the point
838
                        // where it hasn't seen any block or created channels,
839
                        // alas there's nothing left to prune.
840
                        case errors.Is(err, channeldb.ErrGraphNeverPruned):
×
841
                                return nil
×
842

843
                        case errors.Is(err, channeldb.ErrGraphNotFound):
×
844
                                return nil
×
845

846
                        default:
×
847
                                return err
×
848
                        }
849
                }
850
                mainBlockHash, err = r.cfg.Chain.GetBlockHash(int64(pruneHeight))
×
851
                if err != nil {
×
852
                        return err
×
853
                }
×
854
        }
855

856
        log.Infof("Syncing channel graph from height=%v (hash=%v) to height=%v "+
3✔
857
                "(hash=%v)", pruneHeight, pruneHash, bestHeight, bestHash)
3✔
858

3✔
859
        // If we're not yet caught up, then we'll walk forward in the chain
3✔
860
        // pruning the channel graph with each new block that hasn't yet been
3✔
861
        // consumed by the channel graph.
3✔
862
        var spentOutputs []*wire.OutPoint
3✔
863
        for nextHeight := pruneHeight + 1; nextHeight <= uint32(bestHeight); nextHeight++ {
6✔
864
                // Break out of the rescan early if a shutdown has been
3✔
865
                // requested, otherwise long rescans will block the daemon from
3✔
866
                // shutting down promptly.
3✔
867
                select {
3✔
868
                case <-r.quit:
×
869
                        return ErrRouterShuttingDown
×
870
                default:
3✔
871
                }
872

873
                // Using the next height, request a manual block pruning from
874
                // the chainview for the particular block hash.
875
                log.Infof("Filtering block for closed channels, at height: %v",
3✔
876
                        int64(nextHeight))
3✔
877
                nextHash, err := r.cfg.Chain.GetBlockHash(int64(nextHeight))
3✔
878
                if err != nil {
3✔
879
                        return err
×
880
                }
×
881
                log.Tracef("Running block filter on block with hash: %v",
3✔
882
                        nextHash)
3✔
883
                filterBlock, err := r.cfg.ChainView.FilterBlock(nextHash)
3✔
884
                if err != nil {
3✔
885
                        return err
×
886
                }
×
887

888
                // We're only interested in all prior outputs that have been
889
                // spent in the block, so collate all the referenced previous
890
                // outpoints within each tx and input.
891
                for _, tx := range filterBlock.Transactions {
6✔
892
                        for _, txIn := range tx.TxIn {
6✔
893
                                spentOutputs = append(spentOutputs,
3✔
894
                                        &txIn.PreviousOutPoint)
3✔
895
                        }
3✔
896
                }
897
        }
898

899
        // With the spent outputs gathered, attempt to prune the channel graph,
900
        // also passing in the best hash+height so the prune tip can be updated.
901
        closedChans, err := r.cfg.Graph.PruneGraph(
3✔
902
                spentOutputs, bestHash, uint32(bestHeight),
3✔
903
        )
3✔
904
        if err != nil {
3✔
905
                return err
×
906
        }
×
907

908
        log.Infof("Graph pruning complete: %v channels were closed since "+
3✔
909
                "height %v", len(closedChans), pruneHeight)
3✔
910
        return nil
3✔
911
}
912

913
// isZombieChannel takes two edge policy updates and determines if the
914
// corresponding channel should be considered a zombie. The first boolean is
915
// true if the policy update from node 1 is considered a zombie, the second
916
// boolean is that of node 2, and the final boolean is true if the channel
917
// is considered a zombie.
918
func (r *ChannelRouter) isZombieChannel(e1,
919
        e2 *models.ChannelEdgePolicy) (bool, bool, bool) {
×
920

×
921
        chanExpiry := r.cfg.ChannelPruneExpiry
×
922

×
923
        e1Zombie := e1 == nil || time.Since(e1.LastUpdate) >= chanExpiry
×
924
        e2Zombie := e2 == nil || time.Since(e2.LastUpdate) >= chanExpiry
×
925

×
926
        var e1Time, e2Time time.Time
×
927
        if e1 != nil {
×
928
                e1Time = e1.LastUpdate
×
929
        }
×
930
        if e2 != nil {
×
931
                e2Time = e2.LastUpdate
×
932
        }
×
933

934
        return e1Zombie, e2Zombie, r.IsZombieChannel(e1Time, e2Time)
×
935
}
936

937
// IsZombieChannel takes the timestamps of the latest channel updates for a
938
// channel and returns true if the channel should be considered a zombie based
939
// on these timestamps.
940
func (r *ChannelRouter) IsZombieChannel(updateTime1,
941
        updateTime2 time.Time) bool {
3✔
942

3✔
943
        chanExpiry := r.cfg.ChannelPruneExpiry
3✔
944

3✔
945
        e1Zombie := updateTime1.IsZero() ||
3✔
946
                time.Since(updateTime1) >= chanExpiry
3✔
947

3✔
948
        e2Zombie := updateTime2.IsZero() ||
3✔
949
                time.Since(updateTime2) >= chanExpiry
3✔
950

3✔
951
        // If we're using strict zombie pruning, then a channel is only
3✔
952
        // considered live if both edges have a recent update we know of.
3✔
953
        if r.cfg.StrictZombiePruning {
3✔
954
                return e1Zombie || e2Zombie
×
955
        }
×
956

957
        // Otherwise, if we're using the less strict variant, then a channel is
958
        // considered live if either of the edges have a recent update.
959
        return e1Zombie && e2Zombie
3✔
960
}
961

962
// pruneZombieChans is a method that will be called periodically to prune out
963
// any "zombie" channels. We consider channels zombies if *both* edges haven't
964
// been updated since our zombie horizon. If AssumeChannelValid is present,
965
// we'll also consider channels zombies if *both* edges are disabled. This
966
// usually signals that a channel has been closed on-chain. We do this
967
// periodically to keep a healthy, lively routing table.
968
func (r *ChannelRouter) pruneZombieChans() error {
×
969
        chansToPrune := make(map[uint64]struct{})
×
970
        chanExpiry := r.cfg.ChannelPruneExpiry
×
971

×
972
        log.Infof("Examining channel graph for zombie channels")
×
973

×
974
        // A helper method to detect if the channel belongs to this node
×
975
        isSelfChannelEdge := func(info *models.ChannelEdgeInfo) bool {
×
976
                return info.NodeKey1Bytes == r.selfNode.PubKeyBytes ||
×
977
                        info.NodeKey2Bytes == r.selfNode.PubKeyBytes
×
978
        }
×
979

980
        // First, we'll collect all the channels which are eligible for garbage
981
        // collection due to being zombies.
982
        filterPruneChans := func(info *models.ChannelEdgeInfo,
×
983
                e1, e2 *models.ChannelEdgePolicy) error {
×
984

×
985
                // Exit early in case this channel is already marked to be
×
986
                // pruned
×
987
                _, markedToPrune := chansToPrune[info.ChannelID]
×
988
                if markedToPrune {
×
989
                        return nil
×
990
                }
×
991

992
                // We'll ensure that we don't attempt to prune our *own*
993
                // channels from the graph, as in any case this should be
994
                // re-advertised by the sub-system above us.
995
                if isSelfChannelEdge(info) {
×
996
                        return nil
×
997
                }
×
998

999
                e1Zombie, e2Zombie, isZombieChan := r.isZombieChannel(e1, e2)
×
1000

×
1001
                if e1Zombie {
×
1002
                        log.Tracef("Node1 pubkey=%x of chan_id=%v is zombie",
×
1003
                                info.NodeKey1Bytes, info.ChannelID)
×
1004
                }
×
1005

1006
                if e2Zombie {
×
1007
                        log.Tracef("Node2 pubkey=%x of chan_id=%v is zombie",
×
1008
                                info.NodeKey2Bytes, info.ChannelID)
×
1009
                }
×
1010

1011
                // If either edge hasn't been updated for a period of
1012
                // chanExpiry, then we'll mark the channel itself as eligible
1013
                // for graph pruning.
1014
                if !isZombieChan {
×
1015
                        return nil
×
1016
                }
×
1017

1018
                log.Debugf("ChannelID(%v) is a zombie, collecting to prune",
×
1019
                        info.ChannelID)
×
1020

×
1021
                // TODO(roasbeef): add ability to delete single directional edge
×
1022
                chansToPrune[info.ChannelID] = struct{}{}
×
1023

×
1024
                return nil
×
1025
        }
1026

1027
        // If AssumeChannelValid is present we'll look at the disabled bit for
1028
        // both edges. If they're both disabled, then we can interpret this as
1029
        // the channel being closed and can prune it from our graph.
1030
        if r.cfg.AssumeChannelValid {
×
1031
                disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs()
×
1032
                if err != nil {
×
1033
                        return fmt.Errorf("unable to get disabled channels "+
×
1034
                                "ids chans: %v", err)
×
1035
                }
×
1036

1037
                disabledEdges, err := r.cfg.Graph.FetchChanInfos(
×
1038
                        nil, disabledChanIDs,
×
1039
                )
×
1040
                if err != nil {
×
1041
                        return fmt.Errorf("unable to fetch disabled channels "+
×
1042
                                "edges chans: %v", err)
×
1043
                }
×
1044

1045
                // Ensuring we won't prune our own channel from the graph.
1046
                for _, disabledEdge := range disabledEdges {
×
1047
                        if !isSelfChannelEdge(disabledEdge.Info) {
×
1048
                                chansToPrune[disabledEdge.Info.ChannelID] =
×
1049
                                        struct{}{}
×
1050
                        }
×
1051
                }
1052
        }
1053

1054
        startTime := time.Unix(0, 0)
×
1055
        endTime := time.Now().Add(-1 * chanExpiry)
×
1056
        oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
×
1057
        if err != nil {
×
1058
                return fmt.Errorf("unable to fetch expired channel updates "+
×
1059
                        "chans: %v", err)
×
1060
        }
×
1061

1062
        for _, u := range oldEdges {
×
1063
                err = filterPruneChans(u.Info, u.Policy1, u.Policy2)
×
1064
                if err != nil {
×
1065
                        log.Warnf("Filter pruning channels: %w\n", err)
×
1066
                }
×
1067
        }
1068

1069
        log.Infof("Pruning %v zombie channels", len(chansToPrune))
×
1070
        if len(chansToPrune) == 0 {
×
1071
                return nil
×
1072
        }
×
1073

1074
        // With the set of zombie-like channels obtained, we'll do another pass
1075
        // to delete them from the channel graph.
1076
        toPrune := make([]uint64, 0, len(chansToPrune))
×
1077
        for chanID := range chansToPrune {
×
1078
                toPrune = append(toPrune, chanID)
×
1079
                log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
×
1080
        }
×
1081
        err = r.cfg.Graph.DeleteChannelEdges(
×
1082
                r.cfg.StrictZombiePruning, true, toPrune...,
×
1083
        )
×
1084
        if err != nil {
×
1085
                return fmt.Errorf("unable to delete zombie channels: %w", err)
×
1086
        }
×
1087

1088
        // With the channels pruned, we'll also attempt to prune any nodes that
1089
        // were a part of them.
1090
        err = r.cfg.Graph.PruneGraphNodes()
×
1091
        if err != nil && !errors.Is(err, channeldb.ErrGraphNodesNotFound) {
×
1092
                return fmt.Errorf("unable to prune graph nodes: %w", err)
×
1093
        }
×
1094

1095
        return nil
×
1096
}
1097

1098
// handleNetworkUpdate is responsible for processing the update message and
1099
// notifies topology changes, if any.
1100
//
1101
// NOTE: must be run inside goroutine.
1102
func (r *ChannelRouter) handleNetworkUpdate(vb *ValidationBarrier,
1103
        update *routingMsg) {
3✔
1104

3✔
1105
        defer r.wg.Done()
3✔
1106
        defer vb.CompleteJob()
3✔
1107

3✔
1108
        // If this message has an existing dependency, then we'll wait until
3✔
1109
        // that has been fully validated before we proceed.
3✔
1110
        err := vb.WaitForDependants(update.msg)
3✔
1111
        if err != nil {
3✔
1112
                switch {
×
1113
                case IsError(err, ErrVBarrierShuttingDown):
×
1114
                        update.err <- err
×
1115

1116
                case IsError(err, ErrParentValidationFailed):
×
1117
                        update.err <- newErrf(ErrIgnored, err.Error())
×
1118

1119
                default:
×
1120
                        log.Warnf("unexpected error during validation "+
×
1121
                                "barrier shutdown: %v", err)
×
1122
                        update.err <- err
×
1123
                }
1124

1125
                return
×
1126
        }
1127

1128
        // Process the routing update to determine if this is either a new
1129
        // update from our PoV or an update to a prior vertex/edge we
1130
        // previously accepted.
1131
        err = r.processUpdate(update.msg, update.op...)
3✔
1132
        update.err <- err
3✔
1133

3✔
1134
        // If this message had any dependencies, then we can now signal them to
3✔
1135
        // continue.
3✔
1136
        allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
3✔
1137
        vb.SignalDependants(update.msg, allowDependents)
3✔
1138

3✔
1139
        // If the error is not nil here, there's no need to send topology
3✔
1140
        // change.
3✔
1141
        if err != nil {
6✔
1142
                // We now decide to log an error or not. If allowDependents is
3✔
1143
                // false, it means there is an error and the error is neither
3✔
1144
                // ErrIgnored nor ErrOutdated. In this case, we'll log an error.
3✔
1145
                // Otherwise, we'll add debug log only.
3✔
1146
                if allowDependents {
6✔
1147
                        log.Debugf("process network updates got: %v", err)
3✔
1148
                } else {
3✔
1149
                        log.Errorf("process network updates got: %v", err)
×
1150
                }
×
1151

1152
                return
3✔
1153
        }
1154

1155
        // Otherwise, we'll send off a new notification for the newly accepted
1156
        // update, if any.
1157
        topChange := &TopologyChange{}
3✔
1158
        err = addToTopologyChange(r.cfg.Graph, topChange, update.msg)
3✔
1159
        if err != nil {
3✔
1160
                log.Errorf("unable to update topology change notification: %v",
×
1161
                        err)
×
1162
                return
×
1163
        }
×
1164

1165
        if !topChange.isEmpty() {
6✔
1166
                r.notifyTopologyChange(topChange)
3✔
1167
        }
3✔
1168
}
1169

1170
// networkHandler is the primary goroutine for the ChannelRouter. The roles of
1171
// this goroutine include answering queries related to the state of the
1172
// network, pruning the graph on new block notification, applying network
1173
// updates, and registering new topology clients.
1174
//
1175
// NOTE: This MUST be run as a goroutine.
1176
func (r *ChannelRouter) networkHandler() {
3✔
1177
        defer r.wg.Done()
3✔
1178

3✔
1179
        graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval)
3✔
1180
        defer graphPruneTicker.Stop()
3✔
1181

3✔
1182
        defer r.statTicker.Stop()
3✔
1183

3✔
1184
        r.stats.Reset()
3✔
1185

3✔
1186
        // We'll use this validation barrier to ensure that we process all jobs
3✔
1187
        // in the proper order during parallel validation.
3✔
1188
        //
3✔
1189
        // NOTE: For AssumeChannelValid, we bump up the maximum number of
3✔
1190
        // concurrent validation requests since there are no blocks being
3✔
1191
        // fetched. This significantly increases the performance of IGD for
3✔
1192
        // neutrino nodes.
3✔
1193
        //
3✔
1194
        // However, we dial back to use multiple of the number of cores when
3✔
1195
        // fully validating, to avoid fetching up to 1000 blocks from the
3✔
1196
        // backend. On bitcoind, this will empirically cause massive latency
3✔
1197
        // spikes when executing this many concurrent RPC calls. Critical
3✔
1198
        // subsystems or basic rpc calls that rely on calls such as GetBestBlock
3✔
1199
        // will hang due to excessive load.
3✔
1200
        //
3✔
1201
        // See https://github.com/lightningnetwork/lnd/issues/4892.
3✔
1202
        var validationBarrier *ValidationBarrier
3✔
1203
        if r.cfg.AssumeChannelValid {
3✔
1204
                validationBarrier = NewValidationBarrier(1000, r.quit)
×
1205
        } else {
3✔
1206
                validationBarrier = NewValidationBarrier(
3✔
1207
                        4*runtime.NumCPU(), r.quit,
3✔
1208
                )
3✔
1209
        }
3✔
1210

1211
        for {
6✔
1212

3✔
1213
                // If there are stats, resume the statTicker.
3✔
1214
                if !r.stats.Empty() {
6✔
1215
                        r.statTicker.Resume()
3✔
1216
                }
3✔
1217

1218
                select {
3✔
1219
                // A new fully validated network update has just arrived. As a
1220
                // result we'll modify the channel graph accordingly depending
1221
                // on the exact type of the message.
1222
                case update := <-r.networkUpdates:
3✔
1223
                        // We'll set up any dependants, and wait until a free
3✔
1224
                        // slot for this job opens up, this allows us to not
3✔
1225
                        // have thousands of goroutines active.
3✔
1226
                        validationBarrier.InitJobDependencies(update.msg)
3✔
1227

3✔
1228
                        r.wg.Add(1)
3✔
1229
                        go r.handleNetworkUpdate(validationBarrier, update)
3✔
1230

1231
                        // TODO(roasbeef): remove all unconnected vertexes
1232
                        // after N blocks pass with no corresponding
1233
                        // announcements.
1234

1235
                case chainUpdate, ok := <-r.staleBlocks:
3✔
1236
                        // If the channel has been closed, then this indicates
3✔
1237
                        // the daemon is shutting down, so we exit ourselves.
3✔
1238
                        if !ok {
3✔
1239
                                return
×
1240
                        }
×
1241

1242
                        // Since this block is stale, we update our best height
1243
                        // to the previous block.
1244
                        blockHeight := chainUpdate.Height
3✔
1245
                        atomic.StoreUint32(&r.bestHeight, blockHeight-1)
3✔
1246

3✔
1247
                        // Update the channel graph to reflect that this block
3✔
1248
                        // was disconnected.
3✔
1249
                        _, err := r.cfg.Graph.DisconnectBlockAtHeight(blockHeight)
3✔
1250
                        if err != nil {
3✔
1251
                                log.Errorf("unable to prune graph with stale "+
×
1252
                                        "block: %v", err)
×
1253
                                continue
×
1254
                        }
1255

1256
                        // TODO(halseth): notify client about the reorg?
1257

1258
                // A new block has arrived, so we can prune the channel graph
1259
                // of any channels which were closed in the block.
1260
                case chainUpdate, ok := <-r.newBlocks:
3✔
1261
                        // If the channel has been closed, then this indicates
3✔
1262
                        // the daemon is shutting down, so we exit ourselves.
3✔
1263
                        if !ok {
3✔
1264
                                return
×
1265
                        }
×
1266

1267
                        // We'll ensure that any new blocks received attach
1268
                        // directly to the end of our main chain. If not, then
1269
                        // we've somehow missed some blocks. Here we'll catch
1270
                        // up the chain with the latest blocks.
1271
                        currentHeight := atomic.LoadUint32(&r.bestHeight)
3✔
1272
                        switch {
3✔
1273
                        case chainUpdate.Height == currentHeight+1:
3✔
1274
                                err := r.updateGraphWithClosedChannels(
3✔
1275
                                        chainUpdate,
3✔
1276
                                )
3✔
1277
                                if err != nil {
3✔
1278
                                        log.Errorf("unable to prune graph "+
×
1279
                                                "with closed channels: %v", err)
×
1280
                                }
×
1281

1282
                        case chainUpdate.Height > currentHeight+1:
×
1283
                                log.Errorf("out of order block: expecting "+
×
1284
                                        "height=%v, got height=%v",
×
1285
                                        currentHeight+1, chainUpdate.Height)
×
1286

×
1287
                                err := r.getMissingBlocks(currentHeight, chainUpdate)
×
1288
                                if err != nil {
×
1289
                                        log.Errorf("unable to retrieve missing"+
×
1290
                                                "blocks: %v", err)
×
1291
                                }
×
1292

1293
                        case chainUpdate.Height < currentHeight+1:
1✔
1294
                                log.Errorf("out of order block: expecting "+
1✔
1295
                                        "height=%v, got height=%v",
1✔
1296
                                        currentHeight+1, chainUpdate.Height)
1✔
1297

1✔
1298
                                log.Infof("Skipping channel pruning since "+
1✔
1299
                                        "received block height %v was already"+
1✔
1300
                                        " processed.", chainUpdate.Height)
1✔
1301
                        }
1302

1303
                // A new notification client update has arrived. We're either
1304
                // gaining a new client, or cancelling notifications for an
1305
                // existing client.
1306
                case ntfnUpdate := <-r.ntfnClientUpdates:
3✔
1307
                        clientID := ntfnUpdate.clientID
3✔
1308

3✔
1309
                        if ntfnUpdate.cancel {
6✔
1310
                                client, ok := r.topologyClients.LoadAndDelete(
3✔
1311
                                        clientID,
3✔
1312
                                )
3✔
1313
                                if ok {
6✔
1314
                                        close(client.exit)
3✔
1315
                                        client.wg.Wait()
3✔
1316

3✔
1317
                                        close(client.ntfnChan)
3✔
1318
                                }
3✔
1319

1320
                                continue
3✔
1321
                        }
1322

1323
                        r.topologyClients.Store(clientID, &topologyClient{
3✔
1324
                                ntfnChan: ntfnUpdate.ntfnChan,
3✔
1325
                                exit:     make(chan struct{}),
3✔
1326
                        })
3✔
1327

1328
                // The graph prune ticker has ticked, so we'll examine the
1329
                // state of the known graph to filter out any zombie channels
1330
                // for pruning.
1331
                case <-graphPruneTicker.C:
×
1332
                        if err := r.pruneZombieChans(); err != nil {
×
1333
                                log.Errorf("Unable to prune zombies: %v", err)
×
1334
                        }
×
1335

1336
                // Log any stats if we've processed a non-empty number of
1337
                // channels, updates, or nodes. We'll only pause the ticker if
1338
                // the last window contained no updates to avoid resuming and
1339
                // pausing while consecutive windows contain new info.
1340
                case <-r.statTicker.Ticks():
3✔
1341
                        if !r.stats.Empty() {
6✔
1342
                                log.Infof(r.stats.String())
3✔
1343
                        } else {
4✔
1344
                                r.statTicker.Pause()
1✔
1345
                        }
1✔
1346
                        r.stats.Reset()
3✔
1347

1348
                // The router has been signalled to exit, to we exit our main
1349
                // loop so the wait group can be decremented.
1350
                case <-r.quit:
3✔
1351
                        return
3✔
1352
                }
1353
        }
1354
}
1355

1356
// getMissingBlocks walks through all missing blocks and updates the graph
1357
// closed channels accordingly.
1358
func (r *ChannelRouter) getMissingBlocks(currentHeight uint32,
1359
        chainUpdate *chainview.FilteredBlock) error {
×
1360

×
1361
        outdatedHash, err := r.cfg.Chain.GetBlockHash(int64(currentHeight))
×
1362
        if err != nil {
×
1363
                return err
×
1364
        }
×
1365

1366
        outdatedBlock := &chainntnfs.BlockEpoch{
×
1367
                Height: int32(currentHeight),
×
1368
                Hash:   outdatedHash,
×
1369
        }
×
1370

×
1371
        epochClient, err := r.cfg.Notifier.RegisterBlockEpochNtfn(
×
1372
                outdatedBlock,
×
1373
        )
×
1374
        if err != nil {
×
1375
                return err
×
1376
        }
×
1377
        defer epochClient.Cancel()
×
1378

×
1379
        blockDifference := int(chainUpdate.Height - currentHeight)
×
1380

×
1381
        // We'll walk through all the outdated blocks and make sure we're able
×
1382
        // to update the graph with any closed channels from them.
×
1383
        for i := 0; i < blockDifference; i++ {
×
1384
                var (
×
1385
                        missingBlock *chainntnfs.BlockEpoch
×
1386
                        ok           bool
×
1387
                )
×
1388

×
1389
                select {
×
1390
                case missingBlock, ok = <-epochClient.Epochs:
×
1391
                        if !ok {
×
1392
                                return nil
×
1393
                        }
×
1394

1395
                case <-r.quit:
×
1396
                        return nil
×
1397
                }
1398

1399
                filteredBlock, err := r.cfg.ChainView.FilterBlock(
×
1400
                        missingBlock.Hash,
×
1401
                )
×
1402
                if err != nil {
×
1403
                        return err
×
1404
                }
×
1405

1406
                err = r.updateGraphWithClosedChannels(
×
1407
                        filteredBlock,
×
1408
                )
×
1409
                if err != nil {
×
1410
                        return err
×
1411
                }
×
1412
        }
1413

1414
        return nil
×
1415
}
1416

1417
// updateGraphWithClosedChannels prunes the channel graph of closed channels
1418
// that are no longer needed.
1419
func (r *ChannelRouter) updateGraphWithClosedChannels(
1420
        chainUpdate *chainview.FilteredBlock) error {
3✔
1421

3✔
1422
        // Once a new block arrives, we update our running track of the height
3✔
1423
        // of the chain tip.
3✔
1424
        blockHeight := chainUpdate.Height
3✔
1425

3✔
1426
        atomic.StoreUint32(&r.bestHeight, blockHeight)
3✔
1427
        log.Infof("Pruning channel graph using block %v (height=%v)",
3✔
1428
                chainUpdate.Hash, blockHeight)
3✔
1429

3✔
1430
        // We're only interested in all prior outputs that have been spent in
3✔
1431
        // the block, so collate all the referenced previous outpoints within
3✔
1432
        // each tx and input.
3✔
1433
        var spentOutputs []*wire.OutPoint
3✔
1434
        for _, tx := range chainUpdate.Transactions {
6✔
1435
                for _, txIn := range tx.TxIn {
6✔
1436
                        spentOutputs = append(spentOutputs,
3✔
1437
                                &txIn.PreviousOutPoint)
3✔
1438
                }
3✔
1439
        }
1440

1441
        // With the spent outputs gathered, attempt to prune the channel graph,
1442
        // also passing in the hash+height of the block being pruned so the
1443
        // prune tip can be updated.
1444
        chansClosed, err := r.cfg.Graph.PruneGraph(spentOutputs,
3✔
1445
                &chainUpdate.Hash, chainUpdate.Height)
3✔
1446
        if err != nil {
3✔
1447
                log.Errorf("unable to prune routing table: %v", err)
×
1448
                return err
×
1449
        }
×
1450

1451
        log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
3✔
1452
                blockHeight, len(chansClosed))
3✔
1453

3✔
1454
        if len(chansClosed) == 0 {
6✔
1455
                return err
3✔
1456
        }
3✔
1457

1458
        // Notify all currently registered clients of the newly closed channels.
1459
        closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
3✔
1460
        r.notifyTopologyChange(&TopologyChange{
3✔
1461
                ClosedChannels: closeSummaries,
3✔
1462
        })
3✔
1463

3✔
1464
        return nil
3✔
1465
}
1466

1467
// assertNodeAnnFreshness returns a non-nil error if we have an announcement in
1468
// the database for the passed node with a timestamp newer than the passed
1469
// timestamp. ErrIgnored will be returned if we already have the node, and
1470
// ErrOutdated will be returned if we have a timestamp that's after the new
1471
// timestamp.
1472
func (r *ChannelRouter) assertNodeAnnFreshness(node route.Vertex,
1473
        msgTimestamp time.Time) error {
3✔
1474

3✔
1475
        // If we are not already aware of this node, it means that we don't
3✔
1476
        // know about any channel using this node. To avoid a DoS attack by
3✔
1477
        // node announcements, we will ignore such nodes. If we do know about
3✔
1478
        // this node, check that this update brings info newer than what we
3✔
1479
        // already have.
3✔
1480
        lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(node)
3✔
1481
        if err != nil {
3✔
1482
                return errors.Errorf("unable to query for the "+
×
1483
                        "existence of node: %v", err)
×
1484
        }
×
1485
        if !exists {
6✔
1486
                return newErrf(ErrIgnored, "Ignoring node announcement"+
3✔
1487
                        " for node not found in channel graph (%x)",
3✔
1488
                        node[:])
3✔
1489
        }
3✔
1490

1491
        // If we've reached this point then we're aware of the vertex being
1492
        // advertised. So we now check if the new message has a new time stamp,
1493
        // if not then we won't accept the new data as it would override newer
1494
        // data.
1495
        if !lastUpdate.Before(msgTimestamp) {
6✔
1496
                return newErrf(ErrOutdated, "Ignoring outdated "+
3✔
1497
                        "announcement for %x", node[:])
3✔
1498
        }
3✔
1499

1500
        return nil
3✔
1501
}
1502

1503
// addZombieEdge adds a channel that failed complete validation into the zombie
1504
// index, so we can avoid having to re-validate it in the future.
1505
func (r *ChannelRouter) addZombieEdge(chanID uint64) error {
×
1506
        // If the edge fails validation we'll mark the edge itself as a zombie,
×
1507
        // so we don't continue to request it. We use the "zero key" for both
×
1508
        // node pubkeys so this edge can't be resurrected.
×
1509
        var zeroKey [33]byte
×
1510
        err := r.cfg.Graph.MarkEdgeZombie(chanID, zeroKey, zeroKey)
×
1511
        if err != nil {
×
1512
                return fmt.Errorf("unable to mark spent chan(id=%v) as a "+
×
1513
                        "zombie: %w", chanID, err)
×
1514
        }
×
1515

1516
        return nil
×
1517
}
1518

1519
// makeFundingScript is used to make the funding script for both segwit v0 and
1520
// segwit v1 (taproot) channels.
1521
//
1522
// TODO(roasbeef: export and use elsewhere?
1523
func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte,
1524
        chanFeatures []byte) ([]byte, error) {
3✔
1525

3✔
1526
        legacyFundingScript := func() ([]byte, error) {
6✔
1527
                witnessScript, err := input.GenMultiSigScript(
3✔
1528
                        bitcoinKey1, bitcoinKey2,
3✔
1529
                )
3✔
1530
                if err != nil {
3✔
1531
                        return nil, err
×
1532
                }
×
1533
                pkScript, err := input.WitnessScriptHash(witnessScript)
3✔
1534
                if err != nil {
3✔
1535
                        return nil, err
×
1536
                }
×
1537

1538
                return pkScript, nil
3✔
1539
        }
1540

1541
        if len(chanFeatures) == 0 {
3✔
1542
                return legacyFundingScript()
×
1543
        }
×
1544

1545
        // In order to make the correct funding script, we'll need to parse the
1546
        // chanFeatures bytes into a feature vector we can interact with.
1547
        rawFeatures := lnwire.NewRawFeatureVector()
3✔
1548
        err := rawFeatures.Decode(bytes.NewReader(chanFeatures))
3✔
1549
        if err != nil {
3✔
1550
                return nil, fmt.Errorf("unable to parse chan feature "+
×
1551
                        "bits: %w", err)
×
1552
        }
×
1553

1554
        chanFeatureBits := lnwire.NewFeatureVector(
3✔
1555
                rawFeatures, lnwire.Features,
3✔
1556
        )
3✔
1557
        if chanFeatureBits.HasFeature(
3✔
1558
                lnwire.SimpleTaprootChannelsOptionalStaging,
3✔
1559
        ) {
6✔
1560

3✔
1561
                pubKey1, err := btcec.ParsePubKey(bitcoinKey1)
3✔
1562
                if err != nil {
3✔
1563
                        return nil, err
×
1564
                }
×
1565
                pubKey2, err := btcec.ParsePubKey(bitcoinKey2)
3✔
1566
                if err != nil {
3✔
1567
                        return nil, err
×
1568
                }
×
1569

1570
                fundingScript, _, err := input.GenTaprootFundingScript(
3✔
1571
                        pubKey1, pubKey2, 0,
3✔
1572
                )
3✔
1573
                if err != nil {
3✔
1574
                        return nil, err
×
1575
                }
×
1576

1577
                return fundingScript, nil
3✔
1578
        }
1579

1580
        return legacyFundingScript()
3✔
1581
}
1582

1583
// processUpdate processes a new relate authenticated channel/edge, node or
1584
// channel/edge update network update. If the update didn't affect the internal
1585
// state of the draft due to either being out of date, invalid, or redundant,
1586
// then error is returned.
1587
func (r *ChannelRouter) processUpdate(msg interface{},
1588
        op ...batch.SchedulerOption) error {
3✔
1589

3✔
1590
        switch msg := msg.(type) {
3✔
1591
        case *channeldb.LightningNode:
3✔
1592
                // Before we add the node to the database, we'll check to see
3✔
1593
                // if the announcement is "fresh" or not. If it isn't, then
3✔
1594
                // we'll return an error.
3✔
1595
                err := r.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate)
3✔
1596
                if err != nil {
6✔
1597
                        return err
3✔
1598
                }
3✔
1599

1600
                if err := r.cfg.Graph.AddLightningNode(msg, op...); err != nil {
3✔
1601
                        return errors.Errorf("unable to add node %x to the "+
×
1602
                                "graph: %v", msg.PubKeyBytes, err)
×
1603
                }
×
1604

1605
                log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes)
3✔
1606
                r.stats.incNumNodeUpdates()
3✔
1607

1608
        case *models.ChannelEdgeInfo:
3✔
1609
                log.Debugf("Received ChannelEdgeInfo for channel %v",
3✔
1610
                        msg.ChannelID)
3✔
1611

3✔
1612
                // Prior to processing the announcement we first check if we
3✔
1613
                // already know of this channel, if so, then we can exit early.
3✔
1614
                _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge(
3✔
1615
                        msg.ChannelID,
3✔
1616
                )
3✔
1617
                if err != nil && !errors.Is(
3✔
1618
                        err, channeldb.ErrGraphNoEdgesFound,
3✔
1619
                ) {
3✔
1620

×
1621
                        return errors.Errorf("unable to check for edge "+
×
1622
                                "existence: %v", err)
×
1623
                }
×
1624
                if isZombie {
3✔
1625
                        return newErrf(ErrIgnored, "ignoring msg for zombie "+
×
1626
                                "chan_id=%v", msg.ChannelID)
×
1627
                }
×
1628
                if exists {
6✔
1629
                        return newErrf(ErrIgnored, "ignoring msg for known "+
3✔
1630
                                "chan_id=%v", msg.ChannelID)
3✔
1631
                }
3✔
1632

1633
                // If AssumeChannelValid is present, then we are unable to
1634
                // perform any of the expensive checks below, so we'll
1635
                // short-circuit our path straight to adding the edge to our
1636
                // graph. If the passed ShortChannelID is an alias, then we'll
1637
                // skip validation as it will not map to a legitimate tx. This
1638
                // is not a DoS vector as only we can add an alias
1639
                // ChannelAnnouncement from the gossiper.
1640
                scid := lnwire.NewShortChanIDFromInt(msg.ChannelID)
3✔
1641
                if r.cfg.AssumeChannelValid || r.cfg.IsAlias(scid) {
6✔
1642
                        if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
3✔
1643
                                return fmt.Errorf("unable to add edge: %w", err)
×
1644
                        }
×
1645
                        log.Tracef("New channel discovered! Link "+
3✔
1646
                                "connects %x and %x with ChannelID(%v)",
3✔
1647
                                msg.NodeKey1Bytes, msg.NodeKey2Bytes,
3✔
1648
                                msg.ChannelID)
3✔
1649
                        r.stats.incNumEdgesDiscovered()
3✔
1650

3✔
1651
                        break
3✔
1652
                }
1653

1654
                // Before we can add the channel to the channel graph, we need
1655
                // to obtain the full funding outpoint that's encoded within
1656
                // the channel ID.
1657
                channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
3✔
1658
                fundingTx, err := r.fetchFundingTxWrapper(&channelID)
3✔
1659
                if err != nil {
3✔
1660
                        // In order to ensure we don't erroneously mark a
×
1661
                        // channel as a zombie due to an RPC failure, we'll
×
1662
                        // attempt to string match for the relevant errors.
×
1663
                        //
×
1664
                        // * btcd:
×
1665
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
×
1666
                        //    * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
×
1667
                        // * bitcoind:
×
1668
                        //    * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
×
1669
                        //     * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
×
1670
                        switch {
×
1671
                        case strings.Contains(err.Error(), "not found"):
×
1672
                                fallthrough
×
1673

1674
                        case strings.Contains(err.Error(), "out of range"):
×
1675
                                // If the funding transaction isn't found at
×
1676
                                // all, then we'll mark the edge itself as a
×
1677
                                // zombie, so we don't continue to request it.
×
1678
                                // We use the "zero key" for both node pubkeys
×
1679
                                // so this edge can't be resurrected.
×
1680
                                zErr := r.addZombieEdge(msg.ChannelID)
×
1681
                                if zErr != nil {
×
1682
                                        return zErr
×
1683
                                }
×
1684

1685
                        default:
×
1686
                        }
1687

1688
                        return newErrf(ErrNoFundingTransaction, "unable to "+
×
1689
                                "locate funding tx: %v", err)
×
1690
                }
1691

1692
                // Recreate witness output to be sure that declared in channel
1693
                // edge bitcoin keys and channel value corresponds to the
1694
                // reality.
1695
                fundingPkScript, err := makeFundingScript(
3✔
1696
                        msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:],
3✔
1697
                        msg.Features,
3✔
1698
                )
3✔
1699
                if err != nil {
3✔
1700
                        return err
×
1701
                }
×
1702

1703
                // Next we'll validate that this channel is actually
1704
                // well-formed. If this check fails, then this channel either
1705
                // doesn't exist, or isn't the one that was meant to be created
1706
                // according to the passed channel proofs.
1707
                fundingPoint, err := chanvalidate.Validate(&chanvalidate.Context{
3✔
1708
                        Locator: &chanvalidate.ShortChanIDChanLocator{
3✔
1709
                                ID: channelID,
3✔
1710
                        },
3✔
1711
                        MultiSigPkScript: fundingPkScript,
3✔
1712
                        FundingTx:        fundingTx,
3✔
1713
                })
3✔
1714
                if err != nil {
3✔
1715
                        // Mark the edge as a zombie, so we won't try to
×
1716
                        // re-validate it on start up.
×
1717
                        if err := r.addZombieEdge(msg.ChannelID); err != nil {
×
1718
                                return err
×
1719
                        }
×
1720

1721
                        return newErrf(ErrInvalidFundingOutput, "output "+
×
1722
                                "failed validation: %w", err)
×
1723
                }
1724

1725
                // Now that we have the funding outpoint of the channel, ensure
1726
                // that it hasn't yet been spent. If so, then this channel has
1727
                // been closed, so we'll ignore it.
1728
                chanUtxo, err := r.cfg.Chain.GetUtxo(
3✔
1729
                        fundingPoint, fundingPkScript, channelID.BlockHeight,
3✔
1730
                        r.quit,
3✔
1731
                )
3✔
1732
                if err != nil {
3✔
1733
                        if errors.Is(err, btcwallet.ErrOutputSpent) {
×
1734
                                zErr := r.addZombieEdge(msg.ChannelID)
×
1735
                                if zErr != nil {
×
1736
                                        return zErr
×
1737
                                }
×
1738
                        }
1739

1740
                        return newErrf(ErrChannelSpent, "unable to fetch utxo "+
×
1741
                                "for chan_id=%v, chan_point=%v: %v",
×
1742
                                msg.ChannelID, fundingPoint, err)
×
1743
                }
1744

1745
                // TODO(roasbeef): this is a hack, needs to be removed
1746
                // after commitment fees are dynamic.
1747
                msg.Capacity = btcutil.Amount(chanUtxo.Value)
3✔
1748
                msg.ChannelPoint = *fundingPoint
3✔
1749
                if err := r.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
3✔
1750
                        return errors.Errorf("unable to add edge: %v", err)
×
1751
                }
×
1752

1753
                log.Debugf("New channel discovered! Link "+
3✔
1754
                        "connects %x and %x with ChannelPoint(%v): "+
3✔
1755
                        "chan_id=%v, capacity=%v",
3✔
1756
                        msg.NodeKey1Bytes, msg.NodeKey2Bytes,
3✔
1757
                        fundingPoint, msg.ChannelID, msg.Capacity)
3✔
1758
                r.stats.incNumEdgesDiscovered()
3✔
1759

3✔
1760
                // As a new edge has been added to the channel graph, we'll
3✔
1761
                // update the current UTXO filter within our active
3✔
1762
                // FilteredChainView, so we are notified if/when this channel is
3✔
1763
                // closed.
3✔
1764
                filterUpdate := []channeldb.EdgePoint{
3✔
1765
                        {
3✔
1766
                                FundingPkScript: fundingPkScript,
3✔
1767
                                OutPoint:        *fundingPoint,
3✔
1768
                        },
3✔
1769
                }
3✔
1770
                err = r.cfg.ChainView.UpdateFilter(
3✔
1771
                        filterUpdate, atomic.LoadUint32(&r.bestHeight),
3✔
1772
                )
3✔
1773
                if err != nil {
3✔
1774
                        return errors.Errorf("unable to update chain "+
×
1775
                                "view: %v", err)
×
1776
                }
×
1777

1778
        case *models.ChannelEdgePolicy:
3✔
1779
                log.Debugf("Received ChannelEdgePolicy for channel %v",
3✔
1780
                        msg.ChannelID)
3✔
1781

3✔
1782
                // We make sure to hold the mutex for this channel ID,
3✔
1783
                // such that no other goroutine is concurrently doing
3✔
1784
                // database accesses for the same channel ID.
3✔
1785
                r.channelEdgeMtx.Lock(msg.ChannelID)
3✔
1786
                defer r.channelEdgeMtx.Unlock(msg.ChannelID)
3✔
1787

3✔
1788
                edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
3✔
1789
                        r.cfg.Graph.HasChannelEdge(msg.ChannelID)
3✔
1790
                if err != nil && !errors.Is(
3✔
1791
                        err, channeldb.ErrGraphNoEdgesFound,
3✔
1792
                ) {
3✔
1793

×
1794
                        return errors.Errorf("unable to check for edge "+
×
1795
                                "existence: %v", err)
×
1796

×
1797
                }
×
1798

1799
                // If the channel is marked as a zombie in our database, and
1800
                // we consider this a stale update, then we should not apply the
1801
                // policy.
1802
                isStaleUpdate := time.Since(msg.LastUpdate) > r.cfg.ChannelPruneExpiry
3✔
1803
                if isZombie && isStaleUpdate {
3✔
1804
                        return newErrf(ErrIgnored, "ignoring stale update "+
×
1805
                                "(flags=%v|%v) for zombie chan_id=%v",
×
1806
                                msg.MessageFlags, msg.ChannelFlags,
×
1807
                                msg.ChannelID)
×
1808
                }
×
1809

1810
                // If the channel doesn't exist in our database, we cannot
1811
                // apply the updated policy.
1812
                if !exists {
3✔
1813
                        return newErrf(ErrIgnored, "ignoring update "+
×
1814
                                "(flags=%v|%v) for unknown chan_id=%v",
×
1815
                                msg.MessageFlags, msg.ChannelFlags,
×
1816
                                msg.ChannelID)
×
1817
                }
×
1818

1819
                // As edges are directional edge node has a unique policy for
1820
                // the direction of the edge they control. Therefore, we first
1821
                // check if we already have the most up-to-date information for
1822
                // that edge. If this message has a timestamp not strictly
1823
                // newer than what we already know of we can exit early.
1824
                switch {
3✔
1825

1826
                // A flag set of 0 indicates this is an announcement for the
1827
                // "first" node in the channel.
1828
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
3✔
1829

3✔
1830
                        // Ignore outdated message.
3✔
1831
                        if !edge1Timestamp.Before(msg.LastUpdate) {
6✔
1832
                                return newErrf(ErrOutdated, "Ignoring "+
3✔
1833
                                        "outdated update (flags=%v|%v) for "+
3✔
1834
                                        "known chan_id=%v", msg.MessageFlags,
3✔
1835
                                        msg.ChannelFlags, msg.ChannelID)
3✔
1836
                        }
3✔
1837

1838
                // Similarly, a flag set of 1 indicates this is an announcement
1839
                // for the "second" node in the channel.
1840
                case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
3✔
1841

3✔
1842
                        // Ignore outdated message.
3✔
1843
                        if !edge2Timestamp.Before(msg.LastUpdate) {
6✔
1844
                                return newErrf(ErrOutdated, "Ignoring "+
3✔
1845
                                        "outdated update (flags=%v|%v) for "+
3✔
1846
                                        "known chan_id=%v", msg.MessageFlags,
3✔
1847
                                        msg.ChannelFlags, msg.ChannelID)
3✔
1848
                        }
3✔
1849
                }
1850

1851
                // Now that we know this isn't a stale update, we'll apply the
1852
                // new edge policy to the proper directional edge within the
1853
                // channel graph.
1854
                if err = r.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
3✔
1855
                        err := errors.Errorf("unable to add channel: %v", err)
×
1856
                        log.Error(err)
×
1857
                        return err
×
1858
                }
×
1859

1860
                log.Tracef("New channel update applied: %v",
3✔
1861
                        newLogClosure(func() string { return spew.Sdump(msg) }))
3✔
1862
                r.stats.incNumChannelUpdates()
3✔
1863

1864
        default:
×
1865
                return errors.Errorf("wrong routing update message type")
×
1866
        }
1867

1868
        return nil
3✔
1869
}
1870

1871
// fetchFundingTxWrapper is a wrapper around fetchFundingTx, except that it
1872
// will exit if the router has stopped.
1873
func (r *ChannelRouter) fetchFundingTxWrapper(chanID *lnwire.ShortChannelID) (
1874
        *wire.MsgTx, error) {
3✔
1875

3✔
1876
        txChan := make(chan *wire.MsgTx, 1)
3✔
1877
        errChan := make(chan error, 1)
3✔
1878

3✔
1879
        go func() {
6✔
1880
                tx, err := r.fetchFundingTx(chanID)
3✔
1881
                if err != nil {
3✔
1882
                        errChan <- err
×
1883
                        return
×
1884
                }
×
1885

1886
                txChan <- tx
3✔
1887
        }()
1888

1889
        select {
3✔
1890
        case tx := <-txChan:
3✔
1891
                return tx, nil
3✔
1892

1893
        case err := <-errChan:
×
1894
                return nil, err
×
1895

1896
        case <-r.quit:
×
1897
                return nil, ErrRouterShuttingDown
×
1898
        }
1899
}
1900

1901
// fetchFundingTx returns the funding transaction identified by the passed
1902
// short channel ID.
1903
//
1904
// TODO(roasbeef): replace with call to GetBlockTransaction? (would allow to
1905
// later use getblocktxn)
1906
func (r *ChannelRouter) fetchFundingTx(
1907
        chanID *lnwire.ShortChannelID) (*wire.MsgTx, error) {
3✔
1908

3✔
1909
        // First fetch the block hash by the block number encoded, then use
3✔
1910
        // that hash to fetch the block itself.
3✔
1911
        blockNum := int64(chanID.BlockHeight)
3✔
1912
        blockHash, err := r.cfg.Chain.GetBlockHash(blockNum)
3✔
1913
        if err != nil {
3✔
1914
                return nil, err
×
1915
        }
×
1916
        fundingBlock, err := r.cfg.Chain.GetBlock(blockHash)
3✔
1917
        if err != nil {
3✔
1918
                return nil, err
×
1919
        }
×
1920

1921
        // As a sanity check, ensure that the advertised transaction index is
1922
        // within the bounds of the total number of transactions within a
1923
        // block.
1924
        numTxns := uint32(len(fundingBlock.Transactions))
3✔
1925
        if chanID.TxIndex > numTxns-1 {
3✔
1926
                return nil, fmt.Errorf("tx_index=#%v "+
×
1927
                        "is out of range (max_index=%v), network_chan_id=%v",
×
1928
                        chanID.TxIndex, numTxns-1, chanID)
×
1929
        }
×
1930

1931
        return fundingBlock.Transactions[chanID.TxIndex].Copy(), nil
3✔
1932
}
1933

1934
// routingMsg couples a routing related routing topology update to the
1935
// error channel.
1936
type routingMsg struct {
1937
        msg interface{}
1938
        op  []batch.SchedulerOption
1939
        err chan error
1940
}
1941

1942
// RouteRequest contains the parameters for a pathfinding request. It may
1943
// describe a request to make a regular payment or one to a blinded path
1944
// (incdicated by a non-nil BlindedPayment field).
1945
type RouteRequest struct {
1946
        // Source is the node that the path originates from.
1947
        Source route.Vertex
1948

1949
        // Target is the node that the path terminates at. If the route
1950
        // includes a blinded path, target will be the blinded node id of the
1951
        // final hop in the blinded route.
1952
        Target route.Vertex
1953

1954
        // Amount is the Amount in millisatoshis to be delivered to the target
1955
        // node.
1956
        Amount lnwire.MilliSatoshi
1957

1958
        // TimePreference expresses the caller's time preference for
1959
        // pathfinding.
1960
        TimePreference float64
1961

1962
        // Restrictions provides a set of additional Restrictions that the
1963
        // route must adhere to.
1964
        Restrictions *RestrictParams
1965

1966
        // CustomRecords is a set of custom tlv records to include for the
1967
        // final hop.
1968
        CustomRecords record.CustomSet
1969

1970
        // RouteHints contains an additional set of edges to include in our
1971
        // view of the graph. This may either be a set of hints for private
1972
        // channels or a "virtual" hop hint that represents a blinded route.
1973
        RouteHints RouteHints
1974

1975
        // FinalExpiry is the cltv delta for the final hop. If paying to a
1976
        // blinded path, this value is a duplicate of the delta provided
1977
        // in blinded payment.
1978
        FinalExpiry uint16
1979

1980
        // BlindedPayment contains an optional blinded path and parameters
1981
        // used to reach a target node via a blinded path. This field is
1982
        // mutually exclusive with the Target field.
1983
        BlindedPayment *BlindedPayment
1984
}
1985

1986
// RouteHints is an alias type for a set of route hints, with the source node
1987
// as the map's key and the details of the hint(s) in the edge policy.
1988
type RouteHints map[route.Vertex][]AdditionalEdge
1989

1990
// NewRouteRequest produces a new route request for a regular payment or one
1991
// to a blinded route, validating that the target, routeHints and finalExpiry
1992
// parameters are mutually exclusive with the blindedPayment parameter (which
1993
// contains these values for blinded payments).
1994
func NewRouteRequest(source route.Vertex, target *route.Vertex,
1995
        amount lnwire.MilliSatoshi, timePref float64,
1996
        restrictions *RestrictParams, customRecords record.CustomSet,
1997
        routeHints RouteHints, blindedPayment *BlindedPayment,
1998
        finalExpiry uint16) (*RouteRequest, error) {
3✔
1999

3✔
2000
        var (
3✔
2001
                // Assume that we're starting off with a regular payment.
3✔
2002
                requestHints  = routeHints
3✔
2003
                requestExpiry = finalExpiry
3✔
2004
                err           error
3✔
2005
        )
3✔
2006

3✔
2007
        if blindedPayment != nil {
6✔
2008
                if err := blindedPayment.Validate(); err != nil {
3✔
2009
                        return nil, fmt.Errorf("invalid blinded payment: %w",
×
2010
                                err)
×
2011
                }
×
2012

2013
                introVertex := route.NewVertex(
3✔
2014
                        blindedPayment.BlindedPath.IntroductionPoint,
3✔
2015
                )
3✔
2016
                if source == introVertex {
3✔
2017
                        return nil, ErrSelfIntro
×
2018
                }
×
2019

2020
                // Check that the values for a clear path have not been set,
2021
                // as this is an ambiguous signal from the caller.
2022
                if routeHints != nil {
3✔
2023
                        return nil, ErrHintsAndBlinded
×
2024
                }
×
2025

2026
                if finalExpiry != 0 {
3✔
2027
                        return nil, ErrExpiryAndBlinded
×
2028
                }
×
2029

2030
                // If we have a blinded path with 1 hop, the cltv expiry
2031
                // will not be included in any hop hints (since we're just
2032
                // sending to the introduction node and need no blinded hints).
2033
                // In this case, we include it to make sure that the final
2034
                // cltv delta is accounted for (since it's part of the blinded
2035
                // delta). In the case of a multi-hop route, we set our final
2036
                // cltv to zero, since it's going to be accounted for in the
2037
                // delta for our hints.
2038
                if len(blindedPayment.BlindedPath.BlindedHops) == 1 {
6✔
2039
                        requestExpiry = blindedPayment.CltvExpiryDelta
3✔
2040
                }
3✔
2041

2042
                requestHints, err = blindedPayment.toRouteHints()
3✔
2043
                if err != nil {
3✔
2044
                        return nil, err
×
2045
                }
×
2046
        }
2047

2048
        requestTarget, err := getTargetNode(target, blindedPayment)
3✔
2049
        if err != nil {
3✔
2050
                return nil, err
×
2051
        }
×
2052

2053
        return &RouteRequest{
3✔
2054
                Source:         source,
3✔
2055
                Target:         requestTarget,
3✔
2056
                Amount:         amount,
3✔
2057
                TimePreference: timePref,
3✔
2058
                Restrictions:   restrictions,
3✔
2059
                CustomRecords:  customRecords,
3✔
2060
                RouteHints:     requestHints,
3✔
2061
                FinalExpiry:    requestExpiry,
3✔
2062
                BlindedPayment: blindedPayment,
3✔
2063
        }, nil
3✔
2064
}
2065

2066
func getTargetNode(target *route.Vertex, blindedPayment *BlindedPayment) (
2067
        route.Vertex, error) {
3✔
2068

3✔
2069
        var (
3✔
2070
                blinded   = blindedPayment != nil
3✔
2071
                targetSet = target != nil
3✔
2072
        )
3✔
2073

3✔
2074
        switch {
3✔
2075
        case blinded && targetSet:
×
2076
                return route.Vertex{}, ErrTargetAndBlinded
×
2077

2078
        case blinded:
3✔
2079
                // If we're dealing with an edge-case blinded path that just
3✔
2080
                // has an introduction node (first hop expected to be the intro
3✔
2081
                // hop), then we return the unblinded introduction node as our
3✔
2082
                // target.
3✔
2083
                hops := blindedPayment.BlindedPath.BlindedHops
3✔
2084
                if len(hops) == 1 {
6✔
2085
                        return route.NewVertex(
3✔
2086
                                blindedPayment.BlindedPath.IntroductionPoint,
3✔
2087
                        ), nil
3✔
2088
                }
3✔
2089

2090
                return route.NewVertex(hops[len(hops)-1].BlindedNodePub), nil
3✔
2091

2092
        case targetSet:
3✔
2093
                return *target, nil
3✔
2094

2095
        default:
×
2096
                return route.Vertex{}, ErrNoTarget
×
2097
        }
2098
}
2099

2100
// blindedPath returns the request's blinded path, which is set if the payment
2101
// is to a blinded route.
2102
func (r *RouteRequest) blindedPath() *sphinx.BlindedPath {
3✔
2103
        if r.BlindedPayment == nil {
6✔
2104
                return nil
3✔
2105
        }
3✔
2106

2107
        return r.BlindedPayment.BlindedPath
3✔
2108
}
2109

2110
// FindRoute attempts to query the ChannelRouter for the optimum path to a
2111
// particular target destination to which it is able to send `amt` after
2112
// factoring in channel capacities and cumulative fees along the route.
2113
func (r *ChannelRouter) FindRoute(req *RouteRequest) (*route.Route, float64,
2114
        error) {
3✔
2115

3✔
2116
        log.Debugf("Searching for path to %v, sending %v", req.Target,
3✔
2117
                req.Amount)
3✔
2118

3✔
2119
        // We'll attempt to obtain a set of bandwidth hints that can help us
3✔
2120
        // eliminate certain routes early on in the path finding process.
3✔
2121
        bandwidthHints, err := newBandwidthManager(
3✔
2122
                r.cachedGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
3✔
2123
        )
3✔
2124
        if err != nil {
3✔
2125
                return nil, 0, err
×
2126
        }
×
2127

2128
        // We'll fetch the current block height, so we can properly calculate
2129
        // the required HTLC time locks within the route.
2130
        _, currentHeight, err := r.cfg.Chain.GetBestBlock()
3✔
2131
        if err != nil {
3✔
2132
                return nil, 0, err
×
2133
        }
×
2134

2135
        // Now that we know the destination is reachable within the graph, we'll
2136
        // execute our path finding algorithm.
2137
        finalHtlcExpiry := currentHeight + int32(req.FinalExpiry)
3✔
2138

3✔
2139
        // Validate time preference.
3✔
2140
        timePref := req.TimePreference
3✔
2141
        if timePref < -1 || timePref > 1 {
3✔
2142
                return nil, 0, errors.New("time preference out of range")
×
2143
        }
×
2144

2145
        path, probability, err := findPath(
3✔
2146
                &graphParams{
3✔
2147
                        additionalEdges: req.RouteHints,
3✔
2148
                        bandwidthHints:  bandwidthHints,
3✔
2149
                        graph:           r.cachedGraph,
3✔
2150
                },
3✔
2151
                req.Restrictions, &r.cfg.PathFindingConfig, req.Source,
3✔
2152
                req.Target, req.Amount, req.TimePreference, finalHtlcExpiry,
3✔
2153
        )
3✔
2154
        if err != nil {
6✔
2155
                return nil, 0, err
3✔
2156
        }
3✔
2157

2158
        // Create the route with absolute time lock values.
2159
        route, err := newRoute(
3✔
2160
                req.Source, path, uint32(currentHeight),
3✔
2161
                finalHopParams{
3✔
2162
                        amt:       req.Amount,
3✔
2163
                        totalAmt:  req.Amount,
3✔
2164
                        cltvDelta: req.FinalExpiry,
3✔
2165
                        records:   req.CustomRecords,
3✔
2166
                }, req.blindedPath(),
3✔
2167
        )
3✔
2168
        if err != nil {
3✔
2169
                return nil, 0, err
×
2170
        }
×
2171

2172
        go log.Tracef("Obtained path to send %v to %x: %v",
3✔
2173
                req.Amount, req.Target, newLogClosure(func() string {
3✔
2174
                        return spew.Sdump(route)
×
2175
                }),
×
2176
        )
2177

2178
        return route, probability, nil
3✔
2179
}
2180

2181
// generateNewSessionKey generates a new ephemeral private key to be used for a
2182
// payment attempt.
2183
func generateNewSessionKey() (*btcec.PrivateKey, error) {
3✔
2184
        // Generate a new random session key to ensure that we don't trigger
3✔
2185
        // any replay.
3✔
2186
        //
3✔
2187
        // TODO(roasbeef): add more sources of randomness?
3✔
2188
        return btcec.NewPrivateKey()
3✔
2189
}
3✔
2190

2191
// generateSphinxPacket generates then encodes a sphinx packet which encodes
2192
// the onion route specified by the passed layer 3 route. The blob returned
2193
// from this function can immediately be included within an HTLC add packet to
2194
// be sent to the first hop within the route.
2195
func generateSphinxPacket(rt *route.Route, paymentHash []byte,
2196
        sessionKey *btcec.PrivateKey) ([]byte, *sphinx.Circuit, error) {
3✔
2197

3✔
2198
        // Now that we know we have an actual route, we'll map the route into a
3✔
2199
        // sphinx payment path which includes per-hop payloads for each hop
3✔
2200
        // that give each node within the route the necessary information
3✔
2201
        // (fees, CLTV value, etc.) to properly forward the payment.
3✔
2202
        sphinxPath, err := rt.ToSphinxPath()
3✔
2203
        if err != nil {
3✔
2204
                return nil, nil, err
×
2205
        }
×
2206

2207
        log.Tracef("Constructed per-hop payloads for payment_hash=%x: %v",
3✔
2208
                paymentHash[:], newLogClosure(func() string {
3✔
2209
                        path := make(
×
2210
                                []sphinx.OnionHop, sphinxPath.TrueRouteLength(),
×
2211
                        )
×
2212
                        for i := range path {
×
2213
                                hopCopy := sphinxPath[i]
×
2214
                                path[i] = hopCopy
×
2215
                        }
×
2216
                        return spew.Sdump(path)
×
2217
                }),
2218
        )
2219

2220
        // Next generate the onion routing packet which allows us to perform
2221
        // privacy preserving source routing across the network.
2222
        sphinxPacket, err := sphinx.NewOnionPacket(
3✔
2223
                sphinxPath, sessionKey, paymentHash,
3✔
2224
                sphinx.DeterministicPacketFiller,
3✔
2225
        )
3✔
2226
        if err != nil {
3✔
2227
                return nil, nil, err
×
2228
        }
×
2229

2230
        // Finally, encode Sphinx packet using its wire representation to be
2231
        // included within the HTLC add packet.
2232
        var onionBlob bytes.Buffer
3✔
2233
        if err := sphinxPacket.Encode(&onionBlob); err != nil {
3✔
2234
                return nil, nil, err
×
2235
        }
×
2236

2237
        log.Tracef("Generated sphinx packet: %v",
3✔
2238
                newLogClosure(func() string {
3✔
2239
                        // We make a copy of the ephemeral key and unset the
×
2240
                        // internal curve here in order to keep the logs from
×
2241
                        // getting noisy.
×
2242
                        key := *sphinxPacket.EphemeralKey
×
2243
                        packetCopy := *sphinxPacket
×
2244
                        packetCopy.EphemeralKey = &key
×
2245
                        return spew.Sdump(packetCopy)
×
2246
                }),
×
2247
        )
2248

2249
        return onionBlob.Bytes(), &sphinx.Circuit{
3✔
2250
                SessionKey:  sessionKey,
3✔
2251
                PaymentPath: sphinxPath.NodeKeys(),
3✔
2252
        }, nil
3✔
2253
}
2254

2255
// LightningPayment describes a payment to be sent through the network to the
2256
// final destination.
2257
type LightningPayment struct {
2258
        // Target is the node in which the payment should be routed towards.
2259
        Target route.Vertex
2260

2261
        // Amount is the value of the payment to send through the network in
2262
        // milli-satoshis.
2263
        Amount lnwire.MilliSatoshi
2264

2265
        // FeeLimit is the maximum fee in millisatoshis that the payment should
2266
        // accept when sending it through the network. The payment will fail
2267
        // if there isn't a route with lower fees than this limit.
2268
        FeeLimit lnwire.MilliSatoshi
2269

2270
        // CltvLimit is the maximum time lock that is allowed for attempts to
2271
        // complete this payment.
2272
        CltvLimit uint32
2273

2274
        // paymentHash is the r-hash value to use within the HTLC extended to
2275
        // the first hop. This won't be set for AMP payments.
2276
        paymentHash *lntypes.Hash
2277

2278
        // amp is an optional field that is set if and only if this is am AMP
2279
        // payment.
2280
        amp *AMPOptions
2281

2282
        // FinalCLTVDelta is the CTLV expiry delta to use for the _final_ hop
2283
        // in the route. This means that the final hop will have a CLTV delta
2284
        // of at least: currentHeight + FinalCLTVDelta.
2285
        FinalCLTVDelta uint16
2286

2287
        // PayAttemptTimeout is a timeout value that we'll use to determine
2288
        // when we should should abandon the payment attempt after consecutive
2289
        // payment failure. This prevents us from attempting to send a payment
2290
        // indefinitely. A zero value means the payment will never time out.
2291
        //
2292
        // TODO(halseth): make wallclock time to allow resume after startup.
2293
        PayAttemptTimeout time.Duration
2294

2295
        // RouteHints represents the different routing hints that can be used to
2296
        // assist a payment in reaching its destination successfully. These
2297
        // hints will act as intermediate hops along the route.
2298
        //
2299
        // NOTE: This is optional unless required by the payment. When providing
2300
        // multiple routes, ensure the hop hints within each route are chained
2301
        // together and sorted in forward order in order to reach the
2302
        // destination successfully.
2303
        RouteHints [][]zpay32.HopHint
2304

2305
        // OutgoingChannelIDs is the list of channels that are allowed for the
2306
        // first hop. If nil, any channel may be used.
2307
        OutgoingChannelIDs []uint64
2308

2309
        // LastHop is the pubkey of the last node before the final destination
2310
        // is reached. If nil, any node may be used.
2311
        LastHop *route.Vertex
2312

2313
        // DestFeatures specifies the set of features we assume the final node
2314
        // has for pathfinding. Typically, these will be taken directly from an
2315
        // invoice, but they can also be manually supplied or assumed by the
2316
        // sender. If a nil feature vector is provided, the router will try to
2317
        // fall back to the graph in order to load a feature vector for a node
2318
        // in the public graph.
2319
        DestFeatures *lnwire.FeatureVector
2320

2321
        // PaymentAddr is the payment address specified by the receiver. This
2322
        // field should be a random 32-byte nonce presented in the receiver's
2323
        // invoice to prevent probing of the destination.
2324
        PaymentAddr *[32]byte
2325

2326
        // PaymentRequest is an optional payment request that this payment is
2327
        // attempting to complete.
2328
        PaymentRequest []byte
2329

2330
        // DestCustomRecords are TLV records that are to be sent to the final
2331
        // hop in the new onion payload format. If the destination does not
2332
        // understand this new onion payload format, then the payment will
2333
        // fail.
2334
        DestCustomRecords record.CustomSet
2335

2336
        // MaxParts is the maximum number of partial payments that may be used
2337
        // to complete the full amount.
2338
        MaxParts uint32
2339

2340
        // MaxShardAmt is the largest shard that we'll attempt to split using.
2341
        // If this field is set, and we need to split, rather than attempting
2342
        // half of the original payment amount, we'll use this value if half
2343
        // the payment amount is greater than it.
2344
        //
2345
        // NOTE: This field is _optional_.
2346
        MaxShardAmt *lnwire.MilliSatoshi
2347

2348
        // TimePref is the time preference for this payment. Set to -1 to
2349
        // optimize for fees only, to 1 to optimize for reliability only or a
2350
        // value in between for a mix.
2351
        TimePref float64
2352

2353
        // Metadata is additional data that is sent along with the payment to
2354
        // the payee.
2355
        Metadata []byte
2356
}
2357

2358
// AMPOptions houses information that must be known in order to send an AMP
2359
// payment.
2360
type AMPOptions struct {
2361
        SetID     [32]byte
2362
        RootShare [32]byte
2363
}
2364

2365
// SetPaymentHash sets the given hash as the payment's overall hash. This
2366
// should only be used for non-AMP payments.
2367
func (l *LightningPayment) SetPaymentHash(hash lntypes.Hash) error {
3✔
2368
        if l.amp != nil {
3✔
2369
                return fmt.Errorf("cannot set payment hash for AMP payment")
×
2370
        }
×
2371

2372
        l.paymentHash = &hash
3✔
2373
        return nil
3✔
2374
}
2375

2376
// SetAMP sets the given AMP options for the payment.
2377
func (l *LightningPayment) SetAMP(amp *AMPOptions) error {
3✔
2378
        if l.paymentHash != nil {
3✔
2379
                return fmt.Errorf("cannot set amp options for payment " +
×
2380
                        "with payment hash")
×
2381
        }
×
2382

2383
        l.amp = amp
3✔
2384
        return nil
3✔
2385
}
2386

2387
// Identifier returns a 32-byte slice that uniquely identifies this single
2388
// payment. For non-AMP payments this will be the payment hash, for AMP
2389
// payments this will be the used SetID.
2390
func (l *LightningPayment) Identifier() [32]byte {
3✔
2391
        if l.amp != nil {
6✔
2392
                return l.amp.SetID
3✔
2393
        }
3✔
2394

2395
        return *l.paymentHash
3✔
2396
}
2397

2398
// SendPayment attempts to send a payment as described within the passed
2399
// LightningPayment. This function is blocking and will return either: when the
2400
// payment is successful, or all candidates routes have been attempted and
2401
// resulted in a failed payment. If the payment succeeds, then a non-nil Route
2402
// will be returned which describes the path the successful payment traversed
2403
// within the network to reach the destination. Additionally, the payment
2404
// preimage will also be returned.
2405
func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte,
2406
        *route.Route, error) {
×
2407

×
2408
        paySession, shardTracker, err := r.PreparePayment(payment)
×
2409
        if err != nil {
×
2410
                return [32]byte{}, nil, err
×
2411
        }
×
2412

2413
        log.Tracef("Dispatching SendPayment for lightning payment: %v",
×
2414
                spewPayment(payment))
×
2415

×
2416
        return r.sendPayment(
×
2417
                context.Background(), payment.FeeLimit, payment.Identifier(),
×
2418
                payment.PayAttemptTimeout, paySession, shardTracker,
×
2419
        )
×
2420
}
2421

2422
// SendPaymentAsync is the non-blocking version of SendPayment. The payment
2423
// result needs to be retrieved via the control tower.
2424
func (r *ChannelRouter) SendPaymentAsync(ctx context.Context,
2425
        payment *LightningPayment, ps PaymentSession, st shards.ShardTracker) {
3✔
2426

3✔
2427
        // Since this is the first time this payment is being made, we pass nil
3✔
2428
        // for the existing attempt.
3✔
2429
        r.wg.Add(1)
3✔
2430
        go func() {
6✔
2431
                defer r.wg.Done()
3✔
2432

3✔
2433
                log.Tracef("Dispatching SendPayment for lightning payment: %v",
3✔
2434
                        spewPayment(payment))
3✔
2435

3✔
2436
                _, _, err := r.sendPayment(
3✔
2437
                        ctx, payment.FeeLimit, payment.Identifier(),
3✔
2438
                        payment.PayAttemptTimeout, ps, st,
3✔
2439
                )
3✔
2440
                if err != nil {
6✔
2441
                        log.Errorf("Payment %x failed: %v",
3✔
2442
                                payment.Identifier(), err)
3✔
2443
                }
3✔
2444
        }()
2445
}
2446

2447
// spewPayment returns a log closures that provides a spewed string
2448
// representation of the passed payment.
2449
func spewPayment(payment *LightningPayment) logClosure {
3✔
2450
        return newLogClosure(func() string {
3✔
2451
                // Make a copy of the payment with a nilled Curve
×
2452
                // before spewing.
×
2453
                var routeHints [][]zpay32.HopHint
×
2454
                for _, routeHint := range payment.RouteHints {
×
2455
                        var hopHints []zpay32.HopHint
×
2456
                        for _, hopHint := range routeHint {
×
2457
                                h := hopHint.Copy()
×
2458
                                hopHints = append(hopHints, h)
×
2459
                        }
×
2460
                        routeHints = append(routeHints, hopHints)
×
2461
                }
2462
                p := *payment
×
2463
                p.RouteHints = routeHints
×
2464
                return spew.Sdump(p)
×
2465
        })
2466
}
2467

2468
// PreparePayment creates the payment session and registers the payment with the
2469
// control tower.
2470
func (r *ChannelRouter) PreparePayment(payment *LightningPayment) (
2471
        PaymentSession, shards.ShardTracker, error) {
3✔
2472

3✔
2473
        // Before starting the HTLC routing attempt, we'll create a fresh
3✔
2474
        // payment session which will report our errors back to mission
3✔
2475
        // control.
3✔
2476
        paySession, err := r.cfg.SessionSource.NewPaymentSession(payment)
3✔
2477
        if err != nil {
3✔
2478
                return nil, nil, err
×
2479
        }
×
2480

2481
        // Record this payment hash with the ControlTower, ensuring it is not
2482
        // already in-flight.
2483
        //
2484
        // TODO(roasbeef): store records as part of creation info?
2485
        info := &channeldb.PaymentCreationInfo{
3✔
2486
                PaymentIdentifier: payment.Identifier(),
3✔
2487
                Value:             payment.Amount,
3✔
2488
                CreationTime:      r.cfg.Clock.Now(),
3✔
2489
                PaymentRequest:    payment.PaymentRequest,
3✔
2490
        }
3✔
2491

3✔
2492
        // Create a new ShardTracker that we'll use during the life cycle of
3✔
2493
        // this payment.
3✔
2494
        var shardTracker shards.ShardTracker
3✔
2495
        switch {
3✔
2496
        // If this is an AMP payment, we'll use the AMP shard tracker.
2497
        case payment.amp != nil:
3✔
2498
                shardTracker = amp.NewShardTracker(
3✔
2499
                        payment.amp.RootShare, payment.amp.SetID,
3✔
2500
                        *payment.PaymentAddr, payment.Amount,
3✔
2501
                )
3✔
2502

2503
        // Otherwise we'll use the simple tracker that will map each attempt to
2504
        // the same payment hash.
2505
        default:
3✔
2506
                shardTracker = shards.NewSimpleShardTracker(
3✔
2507
                        payment.Identifier(), nil,
3✔
2508
                )
3✔
2509
        }
2510

2511
        err = r.cfg.Control.InitPayment(payment.Identifier(), info)
3✔
2512
        if err != nil {
3✔
2513
                return nil, nil, err
×
2514
        }
×
2515

2516
        return paySession, shardTracker, nil
3✔
2517
}
2518

2519
// SendToRoute sends a payment using the provided route and fails the payment
2520
// when an error is returned from the attempt.
2521
func (r *ChannelRouter) SendToRoute(htlcHash lntypes.Hash,
2522
        rt *route.Route) (*channeldb.HTLCAttempt, error) {
3✔
2523

3✔
2524
        return r.sendToRoute(htlcHash, rt, false)
3✔
2525
}
3✔
2526

2527
// SendToRouteSkipTempErr sends a payment using the provided route and fails
2528
// the payment ONLY when a terminal error is returned from the attempt.
2529
func (r *ChannelRouter) SendToRouteSkipTempErr(htlcHash lntypes.Hash,
2530
        rt *route.Route) (*channeldb.HTLCAttempt, error) {
×
2531

×
2532
        return r.sendToRoute(htlcHash, rt, true)
×
2533
}
×
2534

2535
// sendToRoute attempts to send a payment with the given hash through the
2536
// provided route. This function is blocking and will return the attempt
2537
// information as it is stored in the database. For a successful htlc, this
2538
// information will contain the preimage. If an error occurs after the attempt
2539
// was initiated, both return values will be non-nil. If skipTempErr is true,
2540
// the payment won't be failed unless a terminal error has occurred.
2541
func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route,
2542
        skipTempErr bool) (*channeldb.HTLCAttempt, error) {
3✔
2543

3✔
2544
        // Calculate amount paid to receiver.
3✔
2545
        amt := rt.ReceiverAmt()
3✔
2546

3✔
2547
        // If this is meant as an MP payment shard, we set the amount for the
3✔
2548
        // creating info to the total amount of the payment.
3✔
2549
        finalHop := rt.Hops[len(rt.Hops)-1]
3✔
2550
        mpp := finalHop.MPP
3✔
2551
        if mpp != nil {
6✔
2552
                amt = mpp.TotalMsat()
3✔
2553
        }
3✔
2554

2555
        // For non-MPP, there's no such thing as temp error as there's only one
2556
        // HTLC attempt being made. When this HTLC is failed, the payment is
2557
        // failed hence cannot be retried.
2558
        if skipTempErr && mpp == nil {
3✔
2559
                return nil, ErrSkipTempErr
×
2560
        }
×
2561

2562
        // For non-AMP payments the overall payment identifier will be the same
2563
        // hash as used for this HTLC.
2564
        paymentIdentifier := htlcHash
3✔
2565

3✔
2566
        // For AMP-payments, we'll use the setID as the unique ID for the
3✔
2567
        // overall payment.
3✔
2568
        amp := finalHop.AMP
3✔
2569
        if amp != nil {
6✔
2570
                paymentIdentifier = amp.SetID()
3✔
2571
        }
3✔
2572

2573
        // Record this payment hash with the ControlTower, ensuring it is not
2574
        // already in-flight.
2575
        info := &channeldb.PaymentCreationInfo{
3✔
2576
                PaymentIdentifier: paymentIdentifier,
3✔
2577
                Value:             amt,
3✔
2578
                CreationTime:      r.cfg.Clock.Now(),
3✔
2579
                PaymentRequest:    nil,
3✔
2580
        }
3✔
2581

3✔
2582
        err := r.cfg.Control.InitPayment(paymentIdentifier, info)
3✔
2583
        switch {
3✔
2584
        // If this is an MPP attempt and the hash is already registered with
2585
        // the database, we can go on to launch the shard.
2586
        case mpp != nil && errors.Is(err, channeldb.ErrPaymentInFlight):
3✔
2587
        case mpp != nil && errors.Is(err, channeldb.ErrPaymentExists):
3✔
2588

2589
        // Any other error is not tolerated.
2590
        case err != nil:
×
2591
                return nil, err
×
2592
        }
2593

2594
        log.Tracef("Dispatching SendToRoute for HTLC hash %v: %v",
3✔
2595
                htlcHash, newLogClosure(func() string {
3✔
2596
                        return spew.Sdump(rt)
×
2597
                }),
×
2598
        )
2599

2600
        // Since the HTLC hashes and preimages are specified manually over the
2601
        // RPC for SendToRoute requests, we don't have to worry about creating
2602
        // a ShardTracker that can generate hashes for AMP payments. Instead, we
2603
        // create a simple tracker that can just return the hash for the single
2604
        // shard we'll now launch.
2605
        shardTracker := shards.NewSimpleShardTracker(htlcHash, nil)
3✔
2606

3✔
2607
        // Create a payment lifecycle using the given route with,
3✔
2608
        // - zero fee limit as we are not requesting routes.
3✔
2609
        // - nil payment session (since we already have a route).
3✔
2610
        // - no payment timeout.
3✔
2611
        // - no current block height.
3✔
2612
        p := newPaymentLifecycle(r, 0, paymentIdentifier, nil, shardTracker, 0)
3✔
2613

3✔
2614
        // We found a route to try, create a new HTLC attempt to try.
3✔
2615
        //
3✔
2616
        // NOTE: we use zero `remainingAmt` here to simulate the same effect of
3✔
2617
        // setting the lastShard to be false, which is used by previous
3✔
2618
        // implementation.
3✔
2619
        attempt, err := p.registerAttempt(rt, 0)
3✔
2620
        if err != nil {
3✔
2621
                return nil, err
×
2622
        }
×
2623

2624
        // Once the attempt is created, send it to the htlcswitch. Notice that
2625
        // the `err` returned here has already been processed by
2626
        // `handleSwitchErr`, which means if there's a terminal failure, the
2627
        // payment has been failed.
2628
        result, err := p.sendAttempt(attempt)
3✔
2629
        if err != nil {
3✔
2630
                return nil, err
×
2631
        }
×
2632

2633
        // We now look up the payment to see if it's already failed.
2634
        payment, err := p.router.cfg.Control.FetchPayment(p.identifier)
3✔
2635
        if err != nil {
3✔
2636
                return result.attempt, err
×
2637
        }
×
2638

2639
        // Exit if the above error has caused the payment to be failed, we also
2640
        // return the error from sending attempt to mimic the old behavior of
2641
        // this method.
2642
        _, failedReason := payment.TerminalInfo()
3✔
2643
        if failedReason != nil {
3✔
2644
                return result.attempt, result.err
×
2645
        }
×
2646

2647
        // Since for SendToRoute we won't retry in case the shard fails, we'll
2648
        // mark the payment failed with the control tower immediately if the
2649
        // skipTempErr is false.
2650
        reason := channeldb.FailureReasonError
3✔
2651

3✔
2652
        // If we failed to send the HTLC, we need to further decide if we want
3✔
2653
        // to fail the payment.
3✔
2654
        if result.err != nil {
6✔
2655
                // If skipTempErr, we'll return the attempt and the temp error.
3✔
2656
                if skipTempErr {
3✔
2657
                        return result.attempt, result.err
×
2658
                }
×
2659

2660
                // Otherwise we need to fail the payment.
2661
                err := r.cfg.Control.FailPayment(paymentIdentifier, reason)
3✔
2662
                if err != nil {
3✔
2663
                        return nil, err
×
2664
                }
×
2665

2666
                return result.attempt, result.err
3✔
2667
        }
2668

2669
        // The attempt was successfully sent, wait for the result to be
2670
        // available.
2671
        result, err = p.collectResult(attempt)
3✔
2672
        if err != nil {
3✔
2673
                return nil, err
×
2674
        }
×
2675

2676
        // We got a successful result.
2677
        if result.err == nil {
6✔
2678
                return result.attempt, nil
3✔
2679
        }
3✔
2680

2681
        // An error returned from collecting the result, we'll mark the payment
2682
        // as failed if we don't skip temp error.
2683
        if !skipTempErr {
6✔
2684
                err := r.cfg.Control.FailPayment(paymentIdentifier, reason)
3✔
2685
                if err != nil {
3✔
2686
                        return nil, err
×
2687
                }
×
2688
        }
2689

2690
        return result.attempt, result.err
3✔
2691
}
2692

2693
// sendPayment attempts to send a payment to the passed payment hash. This
2694
// function is blocking and will return either: when the payment is successful,
2695
// or all candidates routes have been attempted and resulted in a failed
2696
// payment. If the payment succeeds, then a non-nil Route will be returned
2697
// which describes the path the successful payment traversed within the network
2698
// to reach the destination. Additionally, the payment preimage will also be
2699
// returned.
2700
//
2701
// This method relies on the ControlTower's internal payment state machine to
2702
// carry out its execution. After restarts, it is safe, and assumed, that the
2703
// router will call this method for every payment still in-flight according to
2704
// the ControlTower.
2705
func (r *ChannelRouter) sendPayment(ctx context.Context,
2706
        feeLimit lnwire.MilliSatoshi, identifier lntypes.Hash,
2707
        paymentAttemptTimeout time.Duration, paySession PaymentSession,
2708
        shardTracker shards.ShardTracker) ([32]byte, *route.Route, error) {
3✔
2709

3✔
2710
        // If the user provides a timeout, we will additionally wrap the context
3✔
2711
        // in a deadline.
3✔
2712
        cancel := func() {}
6✔
2713
        if paymentAttemptTimeout > 0 {
6✔
2714
                ctx, cancel = context.WithTimeout(ctx, paymentAttemptTimeout)
3✔
2715
        }
3✔
2716

2717
        // Since resumePayment is a blocking call, we'll cancel this
2718
        // context if the payment completes before the optional
2719
        // deadline.
2720
        defer cancel()
3✔
2721

3✔
2722
        // We'll also fetch the current block height, so we can properly
3✔
2723
        // calculate the required HTLC time locks within the route.
3✔
2724
        _, currentHeight, err := r.cfg.Chain.GetBestBlock()
3✔
2725
        if err != nil {
3✔
2726
                return [32]byte{}, nil, err
×
2727
        }
×
2728

2729
        // Now set up a paymentLifecycle struct with these params, such that we
2730
        // can resume the payment from the current state.
2731
        p := newPaymentLifecycle(
3✔
2732
                r, feeLimit, identifier, paySession, shardTracker,
3✔
2733
                currentHeight,
3✔
2734
        )
3✔
2735

3✔
2736
        return p.resumePayment(ctx)
3✔
2737
}
2738

2739
// extractChannelUpdate examines the error and extracts the channel update.
2740
func (r *ChannelRouter) extractChannelUpdate(
2741
        failure lnwire.FailureMessage) *lnwire.ChannelUpdate {
3✔
2742

3✔
2743
        var update *lnwire.ChannelUpdate
3✔
2744
        switch onionErr := failure.(type) {
3✔
2745
        case *lnwire.FailExpiryTooSoon:
×
2746
                update = &onionErr.Update
×
2747
        case *lnwire.FailAmountBelowMinimum:
3✔
2748
                update = &onionErr.Update
3✔
2749
        case *lnwire.FailFeeInsufficient:
3✔
2750
                update = &onionErr.Update
3✔
2751
        case *lnwire.FailIncorrectCltvExpiry:
×
2752
                update = &onionErr.Update
×
2753
        case *lnwire.FailChannelDisabled:
3✔
2754
                update = &onionErr.Update
3✔
2755
        case *lnwire.FailTemporaryChannelFailure:
3✔
2756
                update = onionErr.Update
3✔
2757
        }
2758

2759
        return update
3✔
2760
}
2761

2762
// applyChannelUpdate validates a channel update and if valid, applies it to the
2763
// database. It returns a bool indicating whether the updates were successful.
2764
func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate) bool {
3✔
2765
        ch, _, _, err := r.GetChannelByID(msg.ShortChannelID)
3✔
2766
        if err != nil {
6✔
2767
                log.Errorf("Unable to retrieve channel by id: %v", err)
3✔
2768
                return false
3✔
2769
        }
3✔
2770

2771
        var pubKey *btcec.PublicKey
3✔
2772

3✔
2773
        switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
3✔
2774
        case 0:
3✔
2775
                pubKey, _ = ch.NodeKey1()
3✔
2776

2777
        case 1:
3✔
2778
                pubKey, _ = ch.NodeKey2()
3✔
2779
        }
2780

2781
        // Exit early if the pubkey cannot be decided.
2782
        if pubKey == nil {
3✔
2783
                log.Errorf("Unable to decide pubkey with ChannelFlags=%v",
×
2784
                        msg.ChannelFlags)
×
2785
                return false
×
2786
        }
×
2787

2788
        err = ValidateChannelUpdateAnn(pubKey, ch.Capacity, msg)
3✔
2789
        if err != nil {
3✔
2790
                log.Errorf("Unable to validate channel update: %v", err)
×
2791
                return false
×
2792
        }
×
2793

2794
        err = r.UpdateEdge(&models.ChannelEdgePolicy{
3✔
2795
                SigBytes:                  msg.Signature.ToSignatureBytes(),
3✔
2796
                ChannelID:                 msg.ShortChannelID.ToUint64(),
3✔
2797
                LastUpdate:                time.Unix(int64(msg.Timestamp), 0),
3✔
2798
                MessageFlags:              msg.MessageFlags,
3✔
2799
                ChannelFlags:              msg.ChannelFlags,
3✔
2800
                TimeLockDelta:             msg.TimeLockDelta,
3✔
2801
                MinHTLC:                   msg.HtlcMinimumMsat,
3✔
2802
                MaxHTLC:                   msg.HtlcMaximumMsat,
3✔
2803
                FeeBaseMSat:               lnwire.MilliSatoshi(msg.BaseFee),
3✔
2804
                FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
3✔
2805
                ExtraOpaqueData:           msg.ExtraOpaqueData,
3✔
2806
        })
3✔
2807
        if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
3✔
2808
                log.Errorf("Unable to apply channel update: %v", err)
×
2809
                return false
×
2810
        }
×
2811

2812
        return true
3✔
2813
}
2814

2815
// AddNode is used to add information about a node to the router database. If
2816
// the node with this pubkey is not present in an existing channel, it will
2817
// be ignored.
2818
//
2819
// NOTE: This method is part of the ChannelGraphSource interface.
2820
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode,
2821
        op ...batch.SchedulerOption) error {
3✔
2822

3✔
2823
        rMsg := &routingMsg{
3✔
2824
                msg: node,
3✔
2825
                op:  op,
3✔
2826
                err: make(chan error, 1),
3✔
2827
        }
3✔
2828

3✔
2829
        select {
3✔
2830
        case r.networkUpdates <- rMsg:
3✔
2831
                select {
3✔
2832
                case err := <-rMsg.err:
3✔
2833
                        return err
3✔
2834
                case <-r.quit:
×
2835
                        return ErrRouterShuttingDown
×
2836
                }
2837
        case <-r.quit:
×
2838
                return ErrRouterShuttingDown
×
2839
        }
2840
}
2841

2842
// AddEdge is used to add edge/channel to the topology of the router, after all
2843
// information about channel will be gathered this edge/channel might be used
2844
// in construction of payment path.
2845
//
2846
// NOTE: This method is part of the ChannelGraphSource interface.
2847
func (r *ChannelRouter) AddEdge(edge *models.ChannelEdgeInfo,
2848
        op ...batch.SchedulerOption) error {
3✔
2849

3✔
2850
        rMsg := &routingMsg{
3✔
2851
                msg: edge,
3✔
2852
                op:  op,
3✔
2853
                err: make(chan error, 1),
3✔
2854
        }
3✔
2855

3✔
2856
        select {
3✔
2857
        case r.networkUpdates <- rMsg:
3✔
2858
                select {
3✔
2859
                case err := <-rMsg.err:
3✔
2860
                        return err
3✔
2861
                case <-r.quit:
×
2862
                        return ErrRouterShuttingDown
×
2863
                }
2864
        case <-r.quit:
×
2865
                return ErrRouterShuttingDown
×
2866
        }
2867
}
2868

2869
// UpdateEdge is used to update edge information, without this message edge
2870
// considered as not fully constructed.
2871
//
2872
// NOTE: This method is part of the ChannelGraphSource interface.
2873
func (r *ChannelRouter) UpdateEdge(update *models.ChannelEdgePolicy,
2874
        op ...batch.SchedulerOption) error {
3✔
2875

3✔
2876
        rMsg := &routingMsg{
3✔
2877
                msg: update,
3✔
2878
                op:  op,
3✔
2879
                err: make(chan error, 1),
3✔
2880
        }
3✔
2881

3✔
2882
        select {
3✔
2883
        case r.networkUpdates <- rMsg:
3✔
2884
                select {
3✔
2885
                case err := <-rMsg.err:
3✔
2886
                        return err
3✔
2887
                case <-r.quit:
×
2888
                        return ErrRouterShuttingDown
×
2889
                }
2890
        case <-r.quit:
×
2891
                return ErrRouterShuttingDown
×
2892
        }
2893
}
2894

2895
// CurrentBlockHeight returns the block height from POV of the router subsystem.
2896
//
2897
// NOTE: This method is part of the ChannelGraphSource interface.
2898
func (r *ChannelRouter) CurrentBlockHeight() (uint32, error) {
3✔
2899
        _, height, err := r.cfg.Chain.GetBestBlock()
3✔
2900
        return uint32(height), err
3✔
2901
}
3✔
2902

2903
// SyncedHeight returns the block height to which the router subsystem currently
2904
// is synced to. This can differ from the above chain height if the goroutine
2905
// responsible for processing the blocks isn't yet up to speed.
2906
func (r *ChannelRouter) SyncedHeight() uint32 {
3✔
2907
        return atomic.LoadUint32(&r.bestHeight)
3✔
2908
}
3✔
2909

2910
// GetChannelByID return the channel by the channel id.
2911
//
2912
// NOTE: This method is part of the ChannelGraphSource interface.
2913
func (r *ChannelRouter) GetChannelByID(chanID lnwire.ShortChannelID) (
2914
        *models.ChannelEdgeInfo,
2915
        *models.ChannelEdgePolicy,
2916
        *models.ChannelEdgePolicy, error) {
3✔
2917

3✔
2918
        return r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
3✔
2919
}
3✔
2920

2921
// FetchLightningNode attempts to look up a target node by its identity public
2922
// key. channeldb.ErrGraphNodeNotFound is returned if the node doesn't exist
2923
// within the graph.
2924
//
2925
// NOTE: This method is part of the ChannelGraphSource interface.
2926
func (r *ChannelRouter) FetchLightningNode(
2927
        node route.Vertex) (*channeldb.LightningNode, error) {
3✔
2928

3✔
2929
        return r.cfg.Graph.FetchLightningNode(nil, node)
3✔
2930
}
3✔
2931

2932
// ForEachNode is used to iterate over every node in router topology.
2933
//
2934
// NOTE: This method is part of the ChannelGraphSource interface.
2935
func (r *ChannelRouter) ForEachNode(
2936
        cb func(*channeldb.LightningNode) error) error {
×
2937

×
2938
        return r.cfg.Graph.ForEachNode(
×
2939
                func(_ kvdb.RTx, n *channeldb.LightningNode) error {
×
2940
                        return cb(n)
×
2941
                })
×
2942
}
2943

2944
// ForAllOutgoingChannels is used to iterate over all outgoing channels owned by
2945
// the router.
2946
//
2947
// NOTE: This method is part of the ChannelGraphSource interface.
2948
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx,
2949
        *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error {
3✔
2950

3✔
2951
        return r.cfg.Graph.ForEachNodeChannel(nil, r.selfNode.PubKeyBytes,
3✔
2952
                func(tx kvdb.RTx, c *models.ChannelEdgeInfo,
3✔
2953
                        e *models.ChannelEdgePolicy,
3✔
2954
                        _ *models.ChannelEdgePolicy) error {
6✔
2955

3✔
2956
                        if e == nil {
3✔
2957
                                return fmt.Errorf("channel from self node " +
×
2958
                                        "has no policy")
×
2959
                        }
×
2960

2961
                        return cb(tx, c, e)
3✔
2962
                },
2963
        )
2964
}
2965

2966
// AddProof updates the channel edge info with proof which is needed to
2967
// properly announce the edge to the rest of the network.
2968
//
2969
// NOTE: This method is part of the ChannelGraphSource interface.
2970
func (r *ChannelRouter) AddProof(chanID lnwire.ShortChannelID,
2971
        proof *models.ChannelAuthProof) error {
3✔
2972

3✔
2973
        info, _, _, err := r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
3✔
2974
        if err != nil {
3✔
2975
                return err
×
2976
        }
×
2977

2978
        info.AuthProof = proof
3✔
2979
        return r.cfg.Graph.UpdateChannelEdge(info)
3✔
2980
}
2981

2982
// IsStaleNode returns true if the graph source has a node announcement for the
2983
// target node with a more recent timestamp.
2984
//
2985
// NOTE: This method is part of the ChannelGraphSource interface.
2986
func (r *ChannelRouter) IsStaleNode(node route.Vertex,
2987
        timestamp time.Time) bool {
3✔
2988

3✔
2989
        // If our attempt to assert that the node announcement is fresh fails,
3✔
2990
        // then we know that this is actually a stale announcement.
3✔
2991
        err := r.assertNodeAnnFreshness(node, timestamp)
3✔
2992
        if err != nil {
6✔
2993
                log.Debugf("Checking stale node %x got %v", node, err)
3✔
2994
                return true
3✔
2995
        }
3✔
2996

2997
        return false
3✔
2998
}
2999

3000
// IsPublicNode determines whether the given vertex is seen as a public node in
3001
// the graph from the graph's source node's point of view.
3002
//
3003
// NOTE: This method is part of the ChannelGraphSource interface.
3004
func (r *ChannelRouter) IsPublicNode(node route.Vertex) (bool, error) {
3✔
3005
        return r.cfg.Graph.IsPublicNode(node)
3✔
3006
}
3✔
3007

3008
// IsKnownEdge returns true if the graph source already knows of the passed
3009
// channel ID either as a live or zombie edge.
3010
//
3011
// NOTE: This method is part of the ChannelGraphSource interface.
3012
func (r *ChannelRouter) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
3✔
3013
        _, _, exists, isZombie, _ := r.cfg.Graph.HasChannelEdge(
3✔
3014
                chanID.ToUint64(),
3✔
3015
        )
3✔
3016
        return exists || isZombie
3✔
3017
}
3✔
3018

3019
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
3020
// the passed channel ID (and flags) that have a more recent timestamp.
3021
//
3022
// NOTE: This method is part of the ChannelGraphSource interface.
3023
func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
3024
        timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
3✔
3025

3✔
3026
        edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
3✔
3027
                r.cfg.Graph.HasChannelEdge(chanID.ToUint64())
3✔
3028
        if err != nil {
3✔
3029
                log.Debugf("Check stale edge policy got error: %v", err)
×
3030
                return false
×
3031

×
3032
        }
×
3033

3034
        // If we know of the edge as a zombie, then we'll make some additional
3035
        // checks to determine if the new policy is fresh.
3036
        if isZombie {
6✔
3037
                // When running with AssumeChannelValid, we also prune channels
3✔
3038
                // if both of their edges are disabled. We'll mark the new
3✔
3039
                // policy as stale if it remains disabled.
3✔
3040
                if r.cfg.AssumeChannelValid {
3✔
3041
                        isDisabled := flags&lnwire.ChanUpdateDisabled ==
×
3042
                                lnwire.ChanUpdateDisabled
×
3043
                        if isDisabled {
×
3044
                                return true
×
3045
                        }
×
3046
                }
3047

3048
                // Otherwise, we'll fall back to our usual ChannelPruneExpiry.
3049
                return time.Since(timestamp) > r.cfg.ChannelPruneExpiry
3✔
3050
        }
3051

3052
        // If we don't know of the edge, then it means it's fresh (thus not
3053
        // stale).
3054
        if !exists {
6✔
3055
                return false
3✔
3056
        }
3✔
3057

3058
        // As edges are directional edge node has a unique policy for the
3059
        // direction of the edge they control. Therefore, we first check if we
3060
        // already have the most up-to-date information for that edge. If so,
3061
        // then we can exit early.
3062
        switch {
3✔
3063
        // A flag set of 0 indicates this is an announcement for the "first"
3064
        // node in the channel.
3065
        case flags&lnwire.ChanUpdateDirection == 0:
3✔
3066
                return !edge1Timestamp.Before(timestamp)
3✔
3067

3068
        // Similarly, a flag set of 1 indicates this is an announcement for the
3069
        // "second" node in the channel.
3070
        case flags&lnwire.ChanUpdateDirection == 1:
3✔
3071
                return !edge2Timestamp.Before(timestamp)
3✔
3072
        }
3073

3074
        return false
×
3075
}
3076

3077
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
3078
//
3079
// NOTE: This method is part of the ChannelGraphSource interface.
3080
func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
3✔
3081
        return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
3✔
3082
}
3✔
3083

3084
// ErrNoChannel is returned when a route cannot be built because there are no
3085
// channels that satisfy all requirements.
3086
type ErrNoChannel struct {
3087
        position int
3088
        fromNode route.Vertex
3089
}
3090

3091
// Error returns a human-readable string describing the error.
3092
func (e ErrNoChannel) Error() string {
×
3093
        return fmt.Sprintf("no matching outgoing channel available for "+
×
3094
                "node %v (%v)", e.position, e.fromNode)
×
3095
}
×
3096

3097
// BuildRoute returns a fully specified route based on a list of pubkeys. If
3098
// amount is nil, the minimum routable amount is used. To force a specific
3099
// outgoing channel, use the outgoingChan parameter.
3100
func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi,
3101
        hops []route.Vertex, outgoingChan *uint64,
3102
        finalCltvDelta int32, payAddr *[32]byte) (*route.Route, error) {
3✔
3103

3✔
3104
        log.Tracef("BuildRoute called: hopsCount=%v, amt=%v",
3✔
3105
                len(hops), amt)
3✔
3106

3✔
3107
        var outgoingChans map[uint64]struct{}
3✔
3108
        if outgoingChan != nil {
3✔
3109
                outgoingChans = map[uint64]struct{}{
×
3110
                        *outgoingChan: {},
×
3111
                }
×
3112
        }
×
3113

3114
        // If no amount is specified, we need to build a route for the minimum
3115
        // amount that this route can carry.
3116
        useMinAmt := amt == nil
3✔
3117

3✔
3118
        var runningAmt lnwire.MilliSatoshi
3✔
3119
        if useMinAmt {
3✔
3120
                // For minimum amount routes, aim to deliver at least 1 msat to
×
3121
                // the destination. There are nodes in the wild that have a
×
3122
                // min_htlc channel policy of zero, which could lead to a zero
×
3123
                // amount payment being made.
×
3124
                runningAmt = 1
×
3125
        } else {
3✔
3126
                // If an amount is specified, we need to build a route that
3✔
3127
                // delivers exactly this amount to the final destination.
3✔
3128
                runningAmt = *amt
3✔
3129
        }
3✔
3130

3131
        // We'll attempt to obtain a set of bandwidth hints that helps us select
3132
        // the best outgoing channel to use in case no outgoing channel is set.
3133
        bandwidthHints, err := newBandwidthManager(
3✔
3134
                r.cachedGraph, r.selfNode.PubKeyBytes, r.cfg.GetLink,
3✔
3135
        )
3✔
3136
        if err != nil {
3✔
3137
                return nil, err
×
3138
        }
×
3139

3140
        // Fetch the current block height outside the routing transaction, to
3141
        // prevent the rpc call blocking the database.
3142
        _, height, err := r.cfg.Chain.GetBestBlock()
3✔
3143
        if err != nil {
3✔
3144
                return nil, err
×
3145
        }
×
3146

3147
        sourceNode := r.selfNode.PubKeyBytes
3✔
3148
        unifiers, senderAmt, err := getRouteUnifiers(
3✔
3149
                sourceNode, hops, useMinAmt, runningAmt, outgoingChans,
3✔
3150
                r.cachedGraph, bandwidthHints,
3✔
3151
        )
3✔
3152
        if err != nil {
3✔
3153
                return nil, err
×
3154
        }
×
3155

3156
        pathEdges, receiverAmt, err := getPathEdges(
3✔
3157
                sourceNode, senderAmt, unifiers, bandwidthHints, hops,
3✔
3158
        )
3✔
3159
        if err != nil {
3✔
3160
                return nil, err
×
3161
        }
×
3162

3163
        // Build and return the final route.
3164
        return newRoute(
3✔
3165
                sourceNode, pathEdges, uint32(height),
3✔
3166
                finalHopParams{
3✔
3167
                        amt:         receiverAmt,
3✔
3168
                        totalAmt:    receiverAmt,
3✔
3169
                        cltvDelta:   uint16(finalCltvDelta),
3✔
3170
                        records:     nil,
3✔
3171
                        paymentAddr: payAddr,
3✔
3172
                }, nil,
3✔
3173
        )
3✔
3174
}
3175

3176
// getRouteUnifiers returns a list of edge unifiers for the given route.
3177
func getRouteUnifiers(source route.Vertex, hops []route.Vertex,
3178
        useMinAmt bool, runningAmt lnwire.MilliSatoshi,
3179
        outgoingChans map[uint64]struct{}, graph routingGraph,
3180
        bandwidthHints *bandwidthManager) ([]*edgeUnifier, lnwire.MilliSatoshi,
3181
        error) {
3✔
3182

3✔
3183
        // Allocate a list that will contain the edge unifiers for this route.
3✔
3184
        unifiers := make([]*edgeUnifier, len(hops))
3✔
3185

3✔
3186
        // Traverse hops backwards to accumulate fees in the running amounts.
3✔
3187
        for i := len(hops) - 1; i >= 0; i-- {
6✔
3188
                toNode := hops[i]
3✔
3189

3✔
3190
                var fromNode route.Vertex
3✔
3191
                if i == 0 {
6✔
3192
                        fromNode = source
3✔
3193
                } else {
6✔
3194
                        fromNode = hops[i-1]
3✔
3195
                }
3✔
3196

3197
                localChan := i == 0
3✔
3198

3✔
3199
                // Build unified policies for this hop based on the channels
3✔
3200
                // known in the graph. Don't use inbound fees.
3✔
3201
                //
3✔
3202
                // TODO: Add inbound fees support for BuildRoute.
3✔
3203
                u := newNodeEdgeUnifier(
3✔
3204
                        source, toNode, false, outgoingChans,
3✔
3205
                )
3✔
3206

3✔
3207
                err := u.addGraphPolicies(graph)
3✔
3208
                if err != nil {
3✔
3209
                        return nil, 0, err
×
3210
                }
×
3211

3212
                // Exit if there are no channels.
3213
                edgeUnifier, ok := u.edgeUnifiers[fromNode]
3✔
3214
                if !ok {
3✔
3215
                        log.Errorf("Cannot find policy for node %v", fromNode)
×
3216
                        return nil, 0, ErrNoChannel{
×
3217
                                fromNode: fromNode,
×
3218
                                position: i,
×
3219
                        }
×
3220
                }
×
3221

3222
                // If using min amt, increase amt if needed.
3223
                if useMinAmt {
3✔
3224
                        min := edgeUnifier.minAmt()
×
3225
                        if min > runningAmt {
×
3226
                                runningAmt = min
×
3227
                        }
×
3228
                }
3229

3230
                // Get an edge for the specific amount that we want to forward.
3231
                edge := edgeUnifier.getEdge(runningAmt, bandwidthHints, 0)
3✔
3232
                if edge == nil {
3✔
3233
                        log.Errorf("Cannot find policy with amt=%v for node %v",
×
3234
                                runningAmt, fromNode)
×
3235

×
3236
                        return nil, 0, ErrNoChannel{
×
3237
                                fromNode: fromNode,
×
3238
                                position: i,
×
3239
                        }
×
3240
                }
×
3241

3242
                // Add fee for this hop.
3243
                if !localChan {
6✔
3244
                        runningAmt += edge.policy.ComputeFee(runningAmt)
3✔
3245
                }
3✔
3246

3247
                log.Tracef("Select channel %v at position %v",
3✔
3248
                        edge.policy.ChannelID, i)
3✔
3249

3✔
3250
                unifiers[i] = edgeUnifier
3✔
3251
        }
3252

3253
        return unifiers, runningAmt, nil
3✔
3254
}
3255

3256
// getPathEdges returns the edges that make up the path and the total amount,
3257
// including fees, to send the payment.
3258
func getPathEdges(source route.Vertex, receiverAmt lnwire.MilliSatoshi,
3259
        unifiers []*edgeUnifier, bandwidthHints *bandwidthManager,
3260
        hops []route.Vertex) ([]*unifiedEdge,
3261
        lnwire.MilliSatoshi, error) {
3✔
3262

3✔
3263
        // Now that we arrived at the start of the route and found out the route
3✔
3264
        // total amount, we make a forward pass. Because the amount may have
3✔
3265
        // been increased in the backward pass, fees need to be recalculated and
3✔
3266
        // amount ranges re-checked.
3✔
3267
        var pathEdges []*unifiedEdge
3✔
3268
        for i, unifier := range unifiers {
6✔
3269
                edge := unifier.getEdge(receiverAmt, bandwidthHints, 0)
3✔
3270
                if edge == nil {
3✔
3271
                        fromNode := source
×
3272
                        if i > 0 {
×
3273
                                fromNode = hops[i-1]
×
3274
                        }
×
3275

3276
                        return nil, 0, ErrNoChannel{
×
3277
                                fromNode: fromNode,
×
3278
                                position: i,
×
3279
                        }
×
3280
                }
3281

3282
                if i > 0 {
6✔
3283
                        // Decrease the amount to send while going forward.
3✔
3284
                        receiverAmt -= edge.policy.ComputeFeeFromIncoming(
3✔
3285
                                receiverAmt,
3✔
3286
                        )
3✔
3287
                }
3✔
3288

3289
                pathEdges = append(pathEdges, edge)
3✔
3290
        }
3291

3292
        return pathEdges, receiverAmt, nil
3✔
3293
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc