• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13587276956

28 Feb 2025 11:32AM UTC coverage: 58.857% (+7.8%) from 51.098%
13587276956

Pull #9567

github

ellemouton
itest: make sure to not hit the natural ChannelUpdate rate limit

Channel Updates have a natural rate limit of 1 update per second due to
the fact that the timestamp carried in the update is only accurate to
the second. So we need to ensure that the next update we send in the
burst is at least 1 second after the last one.
Pull Request #9567: itest: make sure to not hit the natural ChannelUpdate rate limit

136630 of 232137 relevant lines covered (58.86%)

19201.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.92
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "github.com/lightningnetwork/lnd/tlv"
36
)
37

38
func init() {
14✔
39
        prand.Seed(time.Now().UnixNano())
14✔
40
}
14✔
41

42
const (
43
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
44
        // the node accepts for forwarded payments. The value is relative to the
45
        // current block height. The reason to have a maximum is to prevent
46
        // funds getting locked up unreasonably long. Otherwise, an attacker
47
        // willing to lock its own funds too, could force the funds of this node
48
        // to be locked up for an indefinite (max int32) number of blocks.
49
        //
50
        // The value 2016 corresponds to on average two weeks worth of blocks
51
        // and is based on the maximum number of hops (20), the default CLTV
52
        // delta (40), and some extra margin to account for the other lightning
53
        // implementations and past lnd versions which used to have a default
54
        // CLTV delta of 144.
55
        DefaultMaxOutgoingCltvExpiry = 2016
56

57
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
58
        // which a link should propose to update its commitment fee rate.
59
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
60

61
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
62
        // which a link should propose to update its commitment fee rate.
63
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
64

65
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
66
        // a channel's commitment fee to be of its balance. This only applies to
67
        // the initiator of the channel.
68
        DefaultMaxLinkFeeAllocation float64 = 0.5
69
)
70

71
// ExpectedFee computes the expected fee for a given htlc amount. The value
72
// returned from this function is to be used as a sanity check when forwarding
73
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
74
// forwarding policy.
75
//
76
// TODO(roasbeef): also add in current available channel bandwidth, inverse
77
// func
78
func ExpectedFee(f models.ForwardingPolicy,
79
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
81✔
80

81✔
81
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
81✔
82
}
81✔
83

84
// ChannelLinkConfig defines the configuration for the channel link. ALL
85
// elements within the configuration MUST be non-nil for channel link to carry
86
// out its duties.
87
type ChannelLinkConfig struct {
88
        // FwrdingPolicy is the initial forwarding policy to be used when
89
        // deciding whether to forwarding incoming HTLC's or not. This value
90
        // can be updated with subsequent calls to UpdateForwardingPolicy
91
        // targeted at a given ChannelLink concrete interface implementation.
92
        FwrdingPolicy models.ForwardingPolicy
93

94
        // Circuits provides restricted access to the switch's circuit map,
95
        // allowing the link to open and close circuits.
96
        Circuits CircuitModifier
97

98
        // BestHeight returns the best known height.
99
        BestHeight func() uint32
100

101
        // ForwardPackets attempts to forward the batch of htlcs through the
102
        // switch. The function returns and error in case it fails to send one or
103
        // more packets. The link's quit signal should be provided to allow
104
        // cancellation of forwarding during link shutdown.
105
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
106

107
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
108
        // blobs, which are then used to inform how to forward an HTLC.
109
        //
110
        // NOTE: This function assumes the same set of readers and preimages
111
        // are always presented for the same identifier.
112
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
113
                []hop.DecodeHopIteratorResponse, error)
114

115
        // ExtractErrorEncrypter function is responsible for decoding HTLC
116
        // Sphinx onion blob, and creating onion failure obfuscator.
117
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
118

119
        // FetchLastChannelUpdate retrieves the latest routing policy for a
120
        // target channel. This channel will typically be the outgoing channel
121
        // specified when we receive an incoming HTLC.  This will be used to
122
        // provide payment senders our latest policy when sending encrypted
123
        // error messages.
124
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
125
                *lnwire.ChannelUpdate1, error)
126

127
        // Peer is a lightning network node with which we have the channel link
128
        // opened.
129
        Peer lnpeer.Peer
130

131
        // Registry is a sub-system which responsible for managing the invoices
132
        // in thread-safe manner.
133
        Registry InvoiceDatabase
134

135
        // PreimageCache is a global witness beacon that houses any new
136
        // preimages discovered by other links. We'll use this to add new
137
        // witnesses that we discover which will notify any sub-systems
138
        // subscribed to new events.
139
        PreimageCache contractcourt.WitnessBeacon
140

141
        // OnChannelFailure is a function closure that we'll call if the
142
        // channel failed for some reason. Depending on the severity of the
143
        // error, the closure potentially must force close this channel and
144
        // disconnect the peer.
145
        //
146
        // NOTE: The method must return in order for the ChannelLink to be able
147
        // to shut down properly.
148
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
149
                LinkFailureError)
150

151
        // UpdateContractSignals is a function closure that we'll use to update
152
        // outside sub-systems with this channel's latest ShortChannelID.
153
        UpdateContractSignals func(*contractcourt.ContractSignals) error
154

155
        // NotifyContractUpdate is a function closure that we'll use to update
156
        // the contractcourt and more specifically the ChannelArbitrator of the
157
        // latest channel state.
158
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
159

160
        // ChainEvents is an active subscription to the chain watcher for this
161
        // channel to be notified of any on-chain activity related to this
162
        // channel.
163
        ChainEvents *contractcourt.ChainEventSubscription
164

165
        // FeeEstimator is an instance of a live fee estimator which will be
166
        // used to dynamically regulate the current fee of the commitment
167
        // transaction to ensure timely confirmation.
168
        FeeEstimator chainfee.Estimator
169

170
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
171
        // for HTLC forwarding internal to the switch.
172
        //
173
        // NOTE: This should only be used for testing.
174
        HodlMask hodl.Mask
175

176
        // SyncStates is used to indicate that we need send the channel
177
        // reestablishment message to the remote peer. It should be done if our
178
        // clients have been restarted, or remote peer have been reconnected.
179
        SyncStates bool
180

181
        // BatchTicker is the ticker that determines the interval that we'll
182
        // use to check the batch to see if there're any updates we should
183
        // flush out. By batching updates into a single commit, we attempt to
184
        // increase throughput by maximizing the number of updates coalesced
185
        // into a single commit.
186
        BatchTicker ticker.Ticker
187

188
        // FwdPkgGCTicker is the ticker determining the frequency at which
189
        // garbage collection of forwarding packages occurs. We use a
190
        // time-based approach, as opposed to block epochs, as to not hinder
191
        // syncing.
192
        FwdPkgGCTicker ticker.Ticker
193

194
        // PendingCommitTicker is a ticker that allows the link to determine if
195
        // a locally initiated commitment dance gets stuck waiting for the
196
        // remote party to revoke.
197
        PendingCommitTicker ticker.Ticker
198

199
        // BatchSize is the max size of a batch of updates done to the link
200
        // before we do a state update.
201
        BatchSize uint32
202

203
        // UnsafeReplay will cause a link to replay the adds in its latest
204
        // commitment txn after the link is restarted. This should only be used
205
        // in testing, it is here to ensure the sphinx replay detection on the
206
        // receiving node is persistent.
207
        UnsafeReplay bool
208

209
        // MinUpdateTimeout represents the minimum interval in which a link
210
        // will propose to update its commitment fee rate. A random timeout will
211
        // be selected between this and MaxUpdateTimeout.
212
        MinUpdateTimeout time.Duration
213

214
        // MaxUpdateTimeout represents the maximum interval in which a link
215
        // will propose to update its commitment fee rate. A random timeout will
216
        // be selected between this and MinUpdateTimeout.
217
        MaxUpdateTimeout time.Duration
218

219
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
220
        // an htlc where we don't offer an htlc anymore. This should be at least
221
        // the outgoing broadcast delta, because in any case we don't want to
222
        // risk offering an htlc that triggers channel closure.
223
        OutgoingCltvRejectDelta uint32
224

225
        // TowerClient is an optional engine that manages the signing,
226
        // encrypting, and uploading of justice transactions to the daemon's
227
        // configured set of watchtowers for legacy channels.
228
        TowerClient TowerClient
229

230
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
231
        // should accept for a forwarded HTLC. The value is relative to the
232
        // current block height.
233
        MaxOutgoingCltvExpiry uint32
234

235
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
236
        // commitment fee to be of its balance. This only applies to the
237
        // initiator of the channel.
238
        MaxFeeAllocation float64
239

240
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
241
        // the initiator for channels of the anchor type.
242
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
243

244
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
245
        // link is first started.
246
        NotifyActiveLink func(wire.OutPoint)
247

248
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
249
        // channels becomes active.
250
        NotifyActiveChannel func(wire.OutPoint)
251

252
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
253
        // when channels become inactive.
254
        NotifyInactiveChannel func(wire.OutPoint)
255

256
        // NotifyInactiveLinkEvent allows the switch to tell the
257
        // ChannelNotifier when a channel link become inactive.
258
        NotifyInactiveLinkEvent func(wire.OutPoint)
259

260
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
261
        // events through.
262
        HtlcNotifier htlcNotifier
263

264
        // FailAliasUpdate is a function used to fail an HTLC for an
265
        // option_scid_alias channel.
266
        FailAliasUpdate func(sid lnwire.ShortChannelID,
267
                incoming bool) *lnwire.ChannelUpdate1
268

269
        // GetAliases is used by the link and switch to fetch the set of
270
        // aliases for a given link.
271
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
272

273
        // PreviouslySentShutdown is an optional value that is set if, at the
274
        // time of the link being started, persisted shutdown info was found for
275
        // the channel. This value being set means that we previously sent a
276
        // Shutdown message to our peer, and so we should do so again on
277
        // re-establish and should not allow anymore HTLC adds on the outgoing
278
        // direction of the link.
279
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
280

281
        // Adds the option to disable forwarding payments in blinded routes
282
        // by failing back any blinding-related payloads as if they were
283
        // invalid.
284
        DisallowRouteBlinding bool
285

286
        // DisallowQuiescence is a flag that can be used to disable the
287
        // quiescence protocol.
288
        DisallowQuiescence bool
289

290
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
291
        // restrict the flow of HTLCs and fee updates.
292
        MaxFeeExposure lnwire.MilliSatoshi
293

294
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
295
        // should forward experimental endorsement signals.
296
        ShouldFwdExpEndorsement func() bool
297

298
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
299
        // used to manage the bandwidth of the link.
300
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
301
}
302

303
// channelLink is the service which drives a channel's commitment update
304
// state-machine. In the event that an HTLC needs to be propagated to another
305
// link, the forward handler from config is used which sends HTLC to the
306
// switch. Additionally, the link encapsulate logic of commitment protocol
307
// message ordering and updates.
308
type channelLink struct {
309
        // The following fields are only meant to be used *atomically*
310
        started       int32
311
        reestablished int32
312
        shutdown      int32
313

314
        // failed should be set to true in case a link error happens, making
315
        // sure we don't process any more updates.
316
        failed bool
317

318
        // keystoneBatch represents a volatile list of keystones that must be
319
        // written before attempting to sign the next commitment txn. These
320
        // represent all the HTLC's forwarded to the link from the switch. Once
321
        // we lock them into our outgoing commitment, then the circuit has a
322
        // keystone, and is fully opened.
323
        keystoneBatch []Keystone
324

325
        // openedCircuits is the set of all payment circuits that will be open
326
        // once we make our next commitment. After making the commitment we'll
327
        // ACK all these from our mailbox to ensure that they don't get
328
        // re-delivered if we reconnect.
329
        openedCircuits []CircuitKey
330

331
        // closedCircuits is the set of all payment circuits that will be
332
        // closed once we make our next commitment. After taking the commitment
333
        // we'll ACK all these to ensure that they don't get re-delivered if we
334
        // reconnect.
335
        closedCircuits []CircuitKey
336

337
        // channel is a lightning network channel to which we apply htlc
338
        // updates.
339
        channel *lnwallet.LightningChannel
340

341
        // cfg is a structure which carries all dependable fields/handlers
342
        // which may affect behaviour of the service.
343
        cfg ChannelLinkConfig
344

345
        // mailBox is the main interface between the outside world and the
346
        // link. All incoming messages will be sent over this mailBox. Messages
347
        // include new updates from our connected peer, and new packets to be
348
        // forwarded sent by the switch.
349
        mailBox MailBox
350

351
        // upstream is a channel that new messages sent from the remote peer to
352
        // the local peer will be sent across.
353
        upstream chan lnwire.Message
354

355
        // downstream is a channel in which new multi-hop HTLC's to be
356
        // forwarded will be sent across. Messages from this channel are sent
357
        // by the HTLC switch.
358
        downstream chan *htlcPacket
359

360
        // updateFeeTimer is the timer responsible for updating the link's
361
        // commitment fee every time it fires.
362
        updateFeeTimer *time.Timer
363

364
        // uncommittedPreimages stores a list of all preimages that have been
365
        // learned since receiving the last CommitSig from the remote peer. The
366
        // batch will be flushed just before accepting the subsequent CommitSig
367
        // or on shutdown to avoid doing a write for each preimage received.
368
        uncommittedPreimages []lntypes.Preimage
369

370
        sync.RWMutex
371

372
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
373
        // registry.
374
        hodlQueue *queue.ConcurrentQueue
375

376
        // hodlMap stores related htlc data for a circuit key. It allows
377
        // resolving those htlcs when we receive a message on hodlQueue.
378
        hodlMap map[models.CircuitKey]hodlHtlc
379

380
        // log is a link-specific logging instance.
381
        log btclog.Logger
382

383
        // isOutgoingAddBlocked tracks whether the channelLink can send an
384
        // UpdateAddHTLC.
385
        isOutgoingAddBlocked atomic.Bool
386

387
        // isIncomingAddBlocked tracks whether the channelLink can receive an
388
        // UpdateAddHTLC.
389
        isIncomingAddBlocked atomic.Bool
390

391
        // flushHooks is a hookMap that is triggered when we reach a channel
392
        // state with no live HTLCs.
393
        flushHooks hookMap
394

395
        // outgoingCommitHooks is a hookMap that is triggered after we send our
396
        // next CommitSig.
397
        outgoingCommitHooks hookMap
398

399
        // incomingCommitHooks is a hookMap that is triggered after we receive
400
        // our next CommitSig.
401
        incomingCommitHooks hookMap
402

403
        // quiescer is the state machine that tracks where this channel is with
404
        // respect to the quiescence protocol.
405
        quiescer Quiescer
406

407
        // quiescenceReqs is a queue of requests to quiesce this link. The
408
        // members of the queue are send-only channels we should call back with
409
        // the result.
410
        quiescenceReqs chan StfuReq
411

412
        // cg is a helper that encapsulates a wait group and quit channel and
413
        // allows contexts that either block or cancel on those depending on
414
        // the use case.
415
        cg *fn.ContextGuard
416
}
417

418
// hookMap is a data structure that is used to track the hooks that need to be
419
// called in various parts of the channelLink's lifecycle.
420
//
421
// WARNING: NOT thread-safe.
422
type hookMap struct {
423
        // allocIdx keeps track of the next id we haven't yet allocated.
424
        allocIdx atomic.Uint64
425

426
        // transient is a map of hooks that are only called the next time invoke
427
        // is called. These hooks are deleted during invoke.
428
        transient map[uint64]func()
429

430
        // newTransients is a channel that we use to accept new hooks into the
431
        // hookMap.
432
        newTransients chan func()
433
}
434

435
// newHookMap initializes a new empty hookMap.
436
func newHookMap() hookMap {
648✔
437
        return hookMap{
648✔
438
                allocIdx:      atomic.Uint64{},
648✔
439
                transient:     make(map[uint64]func()),
648✔
440
                newTransients: make(chan func()),
648✔
441
        }
648✔
442
}
648✔
443

444
// alloc allocates space in the hook map for the supplied hook, the second
445
// argument determines whether it goes into the transient or persistent part
446
// of the hookMap.
447
func (m *hookMap) alloc(hook func()) uint64 {
5✔
448
        // We assume we never overflow a uint64. Seems OK.
5✔
449
        hookID := m.allocIdx.Add(1)
5✔
450
        if hookID == 0 {
5✔
451
                panic("hookMap allocIdx overflow")
×
452
        }
453
        m.transient[hookID] = hook
5✔
454

5✔
455
        return hookID
5✔
456
}
457

458
// invoke is used on a hook map to call all the registered hooks and then clear
459
// out the transient hooks so they are not called again.
460
func (m *hookMap) invoke() {
2,694✔
461
        for _, hook := range m.transient {
2,699✔
462
                hook()
5✔
463
        }
5✔
464

465
        m.transient = make(map[uint64]func())
2,694✔
466
}
467

468
// hodlHtlc contains htlc data that is required for resolution.
469
type hodlHtlc struct {
470
        add        lnwire.UpdateAddHTLC
471
        sourceRef  channeldb.AddRef
472
        obfuscator hop.ErrorEncrypter
473
}
474

475
// NewChannelLink creates a new instance of a ChannelLink given a configuration
476
// and active channel that will be used to verify/apply updates to.
477
func NewChannelLink(cfg ChannelLinkConfig,
478
        channel *lnwallet.LightningChannel) ChannelLink {
218✔
479

218✔
480
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
218✔
481

218✔
482
        // If the max fee exposure isn't set, use the default.
218✔
483
        if cfg.MaxFeeExposure == 0 {
433✔
484
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
485
        }
215✔
486

487
        var qsm Quiescer
218✔
488
        if !cfg.DisallowQuiescence {
436✔
489
                qsm = NewQuiescer(QuiescerCfg{
218✔
490
                        chanID: lnwire.NewChanIDFromOutPoint(
218✔
491
                                channel.ChannelPoint(),
218✔
492
                        ),
218✔
493
                        channelInitiator: channel.Initiator(),
218✔
494
                        sendMsg: func(s lnwire.Stfu) error {
223✔
495
                                return cfg.Peer.SendMessage(false, &s)
5✔
496
                        },
5✔
497
                        timeoutDuration: defaultQuiescenceTimeout,
498
                        onTimeout: func() {
×
499
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
×
500
                        },
×
501
                })
502
        } else {
×
503
                qsm = &quiescerNoop{}
×
504
        }
×
505

506
        quiescenceReqs := make(
218✔
507
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
218✔
508
        )
218✔
509

218✔
510
        return &channelLink{
218✔
511
                cfg:                 cfg,
218✔
512
                channel:             channel,
218✔
513
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
218✔
514
                hodlQueue:           queue.NewConcurrentQueue(10),
218✔
515
                log:                 log.WithPrefix(logPrefix),
218✔
516
                flushHooks:          newHookMap(),
218✔
517
                outgoingCommitHooks: newHookMap(),
218✔
518
                incomingCommitHooks: newHookMap(),
218✔
519
                quiescer:            qsm,
218✔
520
                quiescenceReqs:      quiescenceReqs,
218✔
521
                cg:                  fn.NewContextGuard(),
218✔
522
        }
218✔
523
}
524

525
// A compile time check to ensure channelLink implements the ChannelLink
526
// interface.
527
var _ ChannelLink = (*channelLink)(nil)
528

529
// Start starts all helper goroutines required for the operation of the channel
530
// link.
531
//
532
// NOTE: Part of the ChannelLink interface.
533
func (l *channelLink) Start() error {
216✔
534
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
216✔
535
                err := fmt.Errorf("channel link(%v): already started", l)
×
536
                l.log.Warn("already started")
×
537
                return err
×
538
        }
×
539

540
        l.log.Info("starting")
216✔
541

216✔
542
        // If the config supplied watchtower client, ensure the channel is
216✔
543
        // registered before trying to use it during operation.
216✔
544
        if l.cfg.TowerClient != nil {
219✔
545
                err := l.cfg.TowerClient.RegisterChannel(
3✔
546
                        l.ChanID(), l.channel.State().ChanType,
3✔
547
                )
3✔
548
                if err != nil {
3✔
549
                        return err
×
550
                }
×
551
        }
552

553
        l.mailBox.ResetMessages()
216✔
554
        l.hodlQueue.Start()
216✔
555

216✔
556
        // Before launching the htlcManager messages, revert any circuits that
216✔
557
        // were marked open in the switch's circuit map, but did not make it
216✔
558
        // into a commitment txn. We use the next local htlc index as the cut
216✔
559
        // off point, since all indexes below that are committed. This action
216✔
560
        // is only performed if the link's final short channel ID has been
216✔
561
        // assigned, otherwise we would try to trim the htlcs belonging to the
216✔
562
        // all-zero, hop.Source ID.
216✔
563
        if l.ShortChanID() != hop.Source {
432✔
564
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
216✔
565
                if err != nil {
216✔
566
                        return fmt.Errorf("unable to retrieve next local "+
×
567
                                "htlc index: %v", err)
×
568
                }
×
569

570
                // NOTE: This is automatically done by the switch when it
571
                // starts up, but is necessary to prevent inconsistencies in
572
                // the case that the link flaps. This is a result of a link's
573
                // life-cycle being shorter than that of the switch.
574
                chanID := l.ShortChanID()
216✔
575
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
216✔
576
                if err != nil {
216✔
577
                        return fmt.Errorf("unable to trim circuits above "+
×
578
                                "local htlc index %d: %v", localHtlcIndex, err)
×
579
                }
×
580

581
                // Since the link is live, before we start the link we'll update
582
                // the ChainArbitrator with the set of new channel signals for
583
                // this channel.
584
                //
585
                // TODO(roasbeef): split goroutines within channel arb to avoid
586
                go func() {
432✔
587
                        signals := &contractcourt.ContractSignals{
216✔
588
                                ShortChanID: l.channel.ShortChanID(),
216✔
589
                        }
216✔
590

216✔
591
                        err := l.cfg.UpdateContractSignals(signals)
216✔
592
                        if err != nil {
216✔
593
                                l.log.Errorf("unable to update signals")
×
594
                        }
×
595
                }()
596
        }
597

598
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
216✔
599

216✔
600
        l.cg.WgAdd(1)
216✔
601
        go l.htlcManager(context.TODO())
216✔
602

216✔
603
        return nil
216✔
604
}
605

606
// Stop gracefully stops all active helper goroutines, then waits until they've
607
// exited.
608
//
609
// NOTE: Part of the ChannelLink interface.
610
func (l *channelLink) Stop() {
217✔
611
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
229✔
612
                l.log.Warn("already stopped")
12✔
613
                return
12✔
614
        }
12✔
615

616
        l.log.Info("stopping")
205✔
617

205✔
618
        // As the link is stopping, we are no longer interested in htlc
205✔
619
        // resolutions coming from the invoice registry.
205✔
620
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
205✔
621

205✔
622
        if l.cfg.ChainEvents.Cancel != nil {
208✔
623
                l.cfg.ChainEvents.Cancel()
3✔
624
        }
3✔
625

626
        // Ensure the channel for the timer is drained.
627
        if l.updateFeeTimer != nil {
410✔
628
                if !l.updateFeeTimer.Stop() {
205✔
629
                        select {
×
630
                        case <-l.updateFeeTimer.C:
×
631
                        default:
×
632
                        }
633
                }
634
        }
635

636
        if l.hodlQueue != nil {
410✔
637
                l.hodlQueue.Stop()
205✔
638
        }
205✔
639

640
        l.cg.Quit()
205✔
641
        l.cg.WgWait()
205✔
642

205✔
643
        // Now that the htlcManager has completely exited, reset the packet
205✔
644
        // courier. This allows the mailbox to revaluate any lingering Adds that
205✔
645
        // were delivered but didn't make it on a commitment to be failed back
205✔
646
        // if the link is offline for an extended period of time. The error is
205✔
647
        // ignored since it can only fail when the daemon is exiting.
205✔
648
        _ = l.mailBox.ResetPackets()
205✔
649

205✔
650
        // As a final precaution, we will attempt to flush any uncommitted
205✔
651
        // preimages to the preimage cache. The preimages should be re-delivered
205✔
652
        // after channel reestablishment, however this adds an extra layer of
205✔
653
        // protection in case the peer never returns. Without this, we will be
205✔
654
        // unable to settle any contracts depending on the preimages even though
205✔
655
        // we had learned them at some point.
205✔
656
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
205✔
657
        if err != nil {
205✔
658
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
659
                        l.uncommittedPreimages, err)
×
660
        }
×
661
}
662

663
// WaitForShutdown blocks until the link finishes shutting down, which includes
664
// termination of all dependent goroutines.
665
func (l *channelLink) WaitForShutdown() {
×
666
        l.cg.WgWait()
×
667
}
×
668

669
// EligibleToForward returns a bool indicating if the channel is able to
670
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
671
// we are eligible to update AND the channel isn't currently flushing the
672
// outgoing half of the channel.
673
//
674
// NOTE: MUST NOT be called from the main event loop.
675
func (l *channelLink) EligibleToForward() bool {
616✔
676
        l.RLock()
616✔
677
        defer l.RUnlock()
616✔
678

616✔
679
        return l.eligibleToForward()
616✔
680
}
616✔
681

682
// eligibleToForward returns a bool indicating if the channel is able to
683
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
684
// we are eligible to update AND the channel isn't currently flushing the
685
// outgoing half of the channel.
686
//
687
// NOTE: MUST be called from the main event loop.
688
func (l *channelLink) eligibleToForward() bool {
616✔
689
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
616✔
690
}
616✔
691

692
// eligibleToUpdate returns a bool indicating if the channel is able to update
693
// channel state. We're able to update channel state if we know the remote
694
// party's next revocation point. Otherwise, we can't initiate new channel
695
// state. We also require that the short channel ID not be the all-zero source
696
// ID, meaning that the channel has had its ID finalized.
697
//
698
// NOTE: MUST be called from the main event loop.
699
func (l *channelLink) eligibleToUpdate() bool {
619✔
700
        return l.channel.RemoteNextRevocation() != nil &&
619✔
701
                l.channel.ShortChanID() != hop.Source &&
619✔
702
                l.isReestablished() &&
619✔
703
                l.quiescer.CanSendUpdates()
619✔
704
}
619✔
705

706
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
707
// the specified direction. It returns true if the state was changed and false
708
// if the desired state was already set before the method was called.
709
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
16✔
710
        if linkDirection == Outgoing {
23✔
711
                return l.isOutgoingAddBlocked.Swap(false)
7✔
712
        }
7✔
713

714
        return l.isIncomingAddBlocked.Swap(false)
9✔
715
}
716

717
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
718
// the specified direction. It returns true if the state was changed and false
719
// if the desired state was already set before the method was called.
720
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
16✔
721
        if linkDirection == Outgoing {
27✔
722
                return !l.isOutgoingAddBlocked.Swap(true)
11✔
723
        }
11✔
724

725
        return !l.isIncomingAddBlocked.Swap(true)
8✔
726
}
727

728
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
729
// the argument.
730
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,594✔
731
        if linkDirection == Outgoing {
2,714✔
732
                return l.isOutgoingAddBlocked.Load()
1,120✔
733
        }
1,120✔
734

735
        return l.isIncomingAddBlocked.Load()
477✔
736
}
737

738
// OnFlushedOnce adds a hook that will be called the next time the channel
739
// state reaches zero htlcs. This hook will only ever be called once. If the
740
// channel state already has zero htlcs, then this will be called immediately.
741
func (l *channelLink) OnFlushedOnce(hook func()) {
4✔
742
        select {
4✔
743
        case l.flushHooks.newTransients <- hook:
4✔
744
        case <-l.cg.Done():
×
745
        }
746
}
747

748
// OnCommitOnce adds a hook that will be called the next time a CommitSig
749
// message is sent in the argument's LinkDirection. This hook will only ever be
750
// called once. If no CommitSig is owed in the argument's LinkDirection, then
751
// we will call this hook be run immediately.
752
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
4✔
753
        var queue chan func()
4✔
754

4✔
755
        if direction == Outgoing {
8✔
756
                queue = l.outgoingCommitHooks.newTransients
4✔
757
        } else {
4✔
758
                queue = l.incomingCommitHooks.newTransients
×
759
        }
×
760

761
        select {
4✔
762
        case queue <- hook:
4✔
763
        case <-l.cg.Done():
×
764
        }
765
}
766

767
// InitStfu allows us to initiate quiescence on this link. It returns a receive
768
// only channel that will block until quiescence has been achieved, or
769
// definitively fails.
770
//
771
// This operation has been added to allow channels to be quiesced via RPC. It
772
// may be removed or reworked in the future as RPC initiated quiescence is a
773
// holdover until we have downstream protocols that use it.
774
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
4✔
775
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
4✔
776
                fn.Unit{},
4✔
777
        )
4✔
778

4✔
779
        select {
4✔
780
        case l.quiescenceReqs <- req:
4✔
781
        case <-l.cg.Done():
×
782
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
783
        }
784

785
        return out
4✔
786
}
787

788
// isReestablished returns true if the link has successfully completed the
789
// channel reestablishment dance.
790
func (l *channelLink) isReestablished() bool {
619✔
791
        return atomic.LoadInt32(&l.reestablished) == 1
619✔
792
}
619✔
793

794
// markReestablished signals that the remote peer has successfully exchanged
795
// channel reestablish messages and that the channel is ready to process
796
// subsequent messages.
797
func (l *channelLink) markReestablished() {
216✔
798
        atomic.StoreInt32(&l.reestablished, 1)
216✔
799
}
216✔
800

801
// IsUnadvertised returns true if the underlying channel is unadvertised.
802
func (l *channelLink) IsUnadvertised() bool {
5✔
803
        state := l.channel.State()
5✔
804
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
5✔
805
}
5✔
806

807
// sampleNetworkFee samples the current fee rate on the network to get into the
808
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
809
// this is the native rate used when computing the fee for commitment
810
// transactions, and the second-level HTLC transactions.
811
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
812
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
813
        // blocks.
4✔
814
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
815
        if err != nil {
4✔
816
                return 0, err
×
817
        }
×
818

819
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
820
                int64(feePerKw))
4✔
821

4✔
822
        return feePerKw, nil
4✔
823
}
824

825
// shouldAdjustCommitFee returns true if we should update our commitment fee to
826
// match that of the network fee. We'll only update our commitment fee if the
827
// network fee is +/- 10% to our commitment fee or if our current commitment
828
// fee is below the minimum relay fee.
829
func shouldAdjustCommitFee(netFee, chanFee,
830
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
831

14✔
832
        switch {
14✔
833
        // If the network fee is greater than our current commitment fee and
834
        // our current commitment fee is below the minimum relay fee then
835
        // we should switch to it no matter if it is less than a 10% increase.
836
        case netFee > chanFee && chanFee < minRelayFee:
1✔
837
                return true
1✔
838

839
        // If the network fee is greater than the commitment fee, then we'll
840
        // switch to it if it's at least 10% greater than the commit fee.
841
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
842
                return true
4✔
843

844
        // If the network fee is less than our commitment fee, then we'll
845
        // switch to it if it's at least 10% less than the commitment fee.
846
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
847
                return true
2✔
848

849
        // Otherwise, we won't modify our fee.
850
        default:
7✔
851
                return false
7✔
852
        }
853
}
854

855
// failCb is used to cut down on the argument verbosity.
856
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
857

858
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
859
// outgoing HTLC. It may return a FailureMessage that references a channel's
860
// alias. If the channel does not have an alias, then the regular channel
861
// update from disk will be returned.
862
func (l *channelLink) createFailureWithUpdate(incoming bool,
863
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
25✔
864

25✔
865
        // Determine which SCID to use in case we need to use aliases in the
25✔
866
        // ChannelUpdate.
25✔
867
        scid := outgoingScid
25✔
868
        if incoming {
25✔
869
                scid = l.ShortChanID()
×
870
        }
×
871

872
        // Try using the FailAliasUpdate function. If it returns nil, fallback
873
        // to the non-alias behavior.
874
        update := l.cfg.FailAliasUpdate(scid, incoming)
25✔
875
        if update == nil {
44✔
876
                // Fallback to the non-alias behavior.
19✔
877
                var err error
19✔
878
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
19✔
879
                if err != nil {
19✔
880
                        return &lnwire.FailTemporaryNodeFailure{}
×
881
                }
×
882
        }
883

884
        return cb(update)
25✔
885
}
886

887
// syncChanState attempts to synchronize channel states with the remote party.
888
// This method is to be called upon reconnection after the initial funding
889
// flow. We'll compare out commitment chains with the remote party, and re-send
890
// either a danging commit signature, a revocation, or both.
891
func (l *channelLink) syncChanStates(ctx context.Context) error {
173✔
892
        chanState := l.channel.State()
173✔
893

173✔
894
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
173✔
895

173✔
896
        // First, we'll generate our ChanSync message to send to the other
173✔
897
        // side. Based on this message, the remote party will decide if they
173✔
898
        // need to retransmit any data or not.
173✔
899
        localChanSyncMsg, err := chanState.ChanSyncMsg()
173✔
900
        if err != nil {
173✔
901
                return fmt.Errorf("unable to generate chan sync message for "+
×
902
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
903
        }
×
904
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
173✔
905
                return fmt.Errorf("unable to send chan sync message for "+
×
906
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
907
        }
×
908

909
        var msgsToReSend []lnwire.Message
173✔
910

173✔
911
        // Next, we'll wait indefinitely to receive the ChanSync message. The
173✔
912
        // first message sent MUST be the ChanSync message.
173✔
913
        select {
173✔
914
        case msg := <-l.upstream:
173✔
915
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
173✔
916
                        l.cfg.Peer.PubKey())
173✔
917

173✔
918
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
173✔
919
                if !ok {
173✔
920
                        return fmt.Errorf("first message sent to sync "+
×
921
                                "should be ChannelReestablish, instead "+
×
922
                                "received: %T", msg)
×
923
                }
×
924

925
                // If the remote party indicates that they think we haven't
926
                // done any state updates yet, then we'll retransmit the
927
                // channel_ready message first. We do this, as at this point
928
                // we can't be sure if they've really received the
929
                // ChannelReady message.
930
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
173✔
931
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
173✔
932
                        !l.channel.IsPending() {
340✔
933

167✔
934
                        l.log.Infof("resending ChannelReady message to peer")
167✔
935

167✔
936
                        nextRevocation, err := l.channel.NextRevocationKey()
167✔
937
                        if err != nil {
167✔
938
                                return fmt.Errorf("unable to create next "+
×
939
                                        "revocation: %v", err)
×
940
                        }
×
941

942
                        channelReadyMsg := lnwire.NewChannelReady(
167✔
943
                                l.ChanID(), nextRevocation,
167✔
944
                        )
167✔
945

167✔
946
                        // If this is a taproot channel, then we'll send the
167✔
947
                        // very same nonce that we sent above, as they should
167✔
948
                        // take the latest verification nonce we send.
167✔
949
                        if chanState.ChanType.IsTaproot() {
170✔
950
                                //nolint:ll
3✔
951
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
3✔
952
                        }
3✔
953

954
                        // For channels that negotiated the option-scid-alias
955
                        // feature bit, ensure that we send over the alias in
956
                        // the channel_ready message. We'll send the first
957
                        // alias we find for the channel since it does not
958
                        // matter which alias we send. We'll error out if no
959
                        // aliases are found.
960
                        if l.negotiatedAliasFeature() {
170✔
961
                                aliases := l.getAliases()
3✔
962
                                if len(aliases) == 0 {
3✔
963
                                        // This shouldn't happen since we
×
964
                                        // always add at least one alias before
×
965
                                        // the channel reaches the link.
×
966
                                        return fmt.Errorf("no aliases found")
×
967
                                }
×
968

969
                                // getAliases returns a copy of the alias slice
970
                                // so it is ok to use a pointer to the first
971
                                // entry.
972
                                channelReadyMsg.AliasScid = &aliases[0]
3✔
973
                        }
974

975
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
167✔
976
                        if err != nil {
167✔
977
                                return fmt.Errorf("unable to re-send "+
×
978
                                        "ChannelReady: %v", err)
×
979
                        }
×
980
                }
981

982
                // In any case, we'll then process their ChanSync message.
983
                l.log.Info("received re-establishment message from remote side")
173✔
984

173✔
985
                var (
173✔
986
                        openedCircuits []CircuitKey
173✔
987
                        closedCircuits []CircuitKey
173✔
988
                )
173✔
989

173✔
990
                // We've just received a ChanSync message from the remote
173✔
991
                // party, so we'll process the message  in order to determine
173✔
992
                // if we need to re-transmit any messages to the remote party.
173✔
993
                ctx, cancel := l.cg.Create(ctx)
173✔
994
                defer cancel()
173✔
995
                msgsToReSend, openedCircuits, closedCircuits, err =
173✔
996
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
173✔
997
                if err != nil {
176✔
998
                        return err
3✔
999
                }
3✔
1000

1001
                // Repopulate any identifiers for circuits that may have been
1002
                // opened or unclosed. This may happen if we needed to
1003
                // retransmit a commitment signature message.
1004
                l.openedCircuits = openedCircuits
173✔
1005
                l.closedCircuits = closedCircuits
173✔
1006

173✔
1007
                // Ensure that all packets have been have been removed from the
173✔
1008
                // link's mailbox.
173✔
1009
                if err := l.ackDownStreamPackets(); err != nil {
173✔
1010
                        return err
×
1011
                }
×
1012

1013
                if len(msgsToReSend) > 0 {
178✔
1014
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1015
                                "state", len(msgsToReSend))
5✔
1016
                }
5✔
1017

1018
                // If we have any messages to retransmit, we'll do so
1019
                // immediately so we return to a synchronized state as soon as
1020
                // possible.
1021
                for _, msg := range msgsToReSend {
184✔
1022
                        l.cfg.Peer.SendMessage(false, msg)
11✔
1023
                }
11✔
1024

1025
        case <-l.cg.Done():
3✔
1026
                return ErrLinkShuttingDown
3✔
1027
        }
1028

1029
        return nil
173✔
1030
}
1031

1032
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1033
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1034
// we previously received are reinstated in memory, and forwarded to the switch
1035
// if necessary. After a restart, this will also delete any previously
1036
// completed packages.
1037
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
216✔
1038
        fwdPkgs, err := l.channel.LoadFwdPkgs()
216✔
1039
        if err != nil {
217✔
1040
                return err
1✔
1041
        }
1✔
1042

1043
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
215✔
1044

215✔
1045
        for _, fwdPkg := range fwdPkgs {
224✔
1046
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
9✔
1047
                        return err
×
1048
                }
×
1049
        }
1050

1051
        // If any of our reprocessing steps require an update to the commitment
1052
        // txn, we initiate a state transition to capture all relevant changes.
1053
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
218✔
1054
                return l.updateCommitTx(ctx)
3✔
1055
        }
3✔
1056

1057
        return nil
215✔
1058
}
1059

1060
// resolveFwdPkg interprets the FwdState of the provided package, either
1061
// reprocesses any outstanding htlcs in the package, or performs garbage
1062
// collection on the package.
1063
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
9✔
1064
        // Remove any completed packages to clear up space.
9✔
1065
        if fwdPkg.State == channeldb.FwdStateCompleted {
13✔
1066
                l.log.Debugf("removing completed fwd pkg for height=%d",
4✔
1067
                        fwdPkg.Height)
4✔
1068

4✔
1069
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
4✔
1070
                if err != nil {
4✔
1071
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1072
                                "%v", fwdPkg.Height, err)
×
1073
                        return err
×
1074
                }
×
1075
        }
1076

1077
        // Otherwise this is either a new package or one has gone through
1078
        // processing, but contains htlcs that need to be restored in memory.
1079
        // We replay this forwarding package to make sure our local mem state
1080
        // is resurrected, we mimic any original responses back to the remote
1081
        // party, and re-forward the relevant HTLCs to the switch.
1082

1083
        // If the package is fully acked but not completed, it must still have
1084
        // settles and fails to propagate.
1085
        if !fwdPkg.SettleFailFilter.IsFull() {
12✔
1086
                l.processRemoteSettleFails(fwdPkg)
3✔
1087
        }
3✔
1088

1089
        // Finally, replay *ALL ADDS* in this forwarding package. The
1090
        // downstream logic is able to filter out any duplicates, but we must
1091
        // shove the entire, original set of adds down the pipeline so that the
1092
        // batch of adds presented to the sphinx router does not ever change.
1093
        if !fwdPkg.AckFilter.IsFull() {
15✔
1094
                l.processRemoteAdds(fwdPkg)
6✔
1095

6✔
1096
                // If the link failed during processing the adds, we must
6✔
1097
                // return to ensure we won't attempted to update the state
6✔
1098
                // further.
6✔
1099
                if l.failed {
6✔
1100
                        return fmt.Errorf("link failed while " +
×
1101
                                "processing remote adds")
×
1102
                }
×
1103
        }
1104

1105
        return nil
9✔
1106
}
1107

1108
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1109
// removes those that can be discarded. It is safe to do this entirely in the
1110
// background, since all state is coordinated on disk. This also ensures the
1111
// link can continue to process messages and interleave database accesses.
1112
//
1113
// NOTE: This MUST be run as a goroutine.
1114
func (l *channelLink) fwdPkgGarbager() {
215✔
1115
        defer l.cg.WgDone()
215✔
1116

215✔
1117
        l.cfg.FwdPkgGCTicker.Resume()
215✔
1118
        defer l.cfg.FwdPkgGCTicker.Stop()
215✔
1119

215✔
1120
        if err := l.loadAndRemove(); err != nil {
215✔
1121
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1122
        }
×
1123

1124
        for {
443✔
1125
                select {
228✔
1126
                case <-l.cfg.FwdPkgGCTicker.Ticks():
13✔
1127
                        if err := l.loadAndRemove(); err != nil {
26✔
1128
                                l.log.Warnf("unable to remove fwd pkgs: %v",
13✔
1129
                                        err)
13✔
1130
                                continue
13✔
1131
                        }
1132
                case <-l.cg.Done():
205✔
1133
                        return
205✔
1134
                }
1135
        }
1136
}
1137

1138
// loadAndRemove loads all the channels forwarding packages and determines if
1139
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1140
// a longer tick interval can be used.
1141
func (l *channelLink) loadAndRemove() error {
228✔
1142
        fwdPkgs, err := l.channel.LoadFwdPkgs()
228✔
1143
        if err != nil {
241✔
1144
                return err
13✔
1145
        }
13✔
1146

1147
        var removeHeights []uint64
215✔
1148
        for _, fwdPkg := range fwdPkgs {
223✔
1149
                if fwdPkg.State != channeldb.FwdStateCompleted {
16✔
1150
                        continue
8✔
1151
                }
1152

1153
                removeHeights = append(removeHeights, fwdPkg.Height)
3✔
1154
        }
1155

1156
        // If removeHeights is empty, return early so we don't use a db
1157
        // transaction.
1158
        if len(removeHeights) == 0 {
430✔
1159
                return nil
215✔
1160
        }
215✔
1161

1162
        return l.channel.RemoveFwdPkgs(removeHeights...)
3✔
1163
}
1164

1165
// handleChanSyncErr performs the error handling logic in the case where we
1166
// could not successfully syncChanStates with our channel peer.
1167
func (l *channelLink) handleChanSyncErr(err error) {
3✔
1168
        l.log.Warnf("error when syncing channel states: %v", err)
3✔
1169

3✔
1170
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
3✔
1171

3✔
1172
        switch {
3✔
1173
        case errors.Is(err, ErrLinkShuttingDown):
3✔
1174
                l.log.Debugf("unable to sync channel states, link is " +
3✔
1175
                        "shutting down")
3✔
1176
                return
3✔
1177

1178
        // We failed syncing the commit chains, probably because the remote has
1179
        // lost state. We should force close the channel.
1180
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
3✔
1181
                fallthrough
3✔
1182

1183
        // The remote sent us an invalid last commit secret, we should force
1184
        // close the channel.
1185
        // TODO(halseth): and permanently ban the peer?
1186
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
3✔
1187
                fallthrough
3✔
1188

1189
        // The remote sent us a commit point different from what they sent us
1190
        // before.
1191
        // TODO(halseth): ban peer?
1192
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
3✔
1193
                // We'll fail the link and tell the peer to force close the
3✔
1194
                // channel. Note that the database state is not updated here,
3✔
1195
                // but will be updated when the close transaction is ready to
3✔
1196
                // avoid that we go down before storing the transaction in the
3✔
1197
                // db.
3✔
1198
                l.failf(
3✔
1199
                        LinkFailureError{
3✔
1200
                                code:          ErrSyncError,
3✔
1201
                                FailureAction: LinkFailureForceClose,
3✔
1202
                        },
3✔
1203
                        "unable to synchronize channel states: %v", err,
3✔
1204
                )
3✔
1205

1206
        // We have lost state and cannot safely force close the channel. Fail
1207
        // the channel and wait for the remote to hopefully force close it. The
1208
        // remote has sent us its latest unrevoked commitment point, and we'll
1209
        // store it in the database, such that we can attempt to recover the
1210
        // funds if the remote force closes the channel.
1211
        case errors.As(err, &errDataLoss):
3✔
1212
                err := l.channel.MarkDataLoss(
3✔
1213
                        errDataLoss.CommitPoint,
3✔
1214
                )
3✔
1215
                if err != nil {
3✔
1216
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1217
                                err)
×
1218
                }
×
1219

1220
        // We determined the commit chains were not possible to sync. We
1221
        // cautiously fail the channel, but don't force close.
1222
        // TODO(halseth): can we safely force close in any cases where this
1223
        // error is returned?
1224
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1225
                if err := l.channel.MarkBorked(); err != nil {
×
1226
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1227
                }
×
1228

1229
        // Other, unspecified error.
1230
        default:
×
1231
        }
1232

1233
        l.failf(
3✔
1234
                LinkFailureError{
3✔
1235
                        code:          ErrRecoveryError,
3✔
1236
                        FailureAction: LinkFailureForceNone,
3✔
1237
                },
3✔
1238
                "unable to synchronize channel states: %v", err,
3✔
1239
        )
3✔
1240
}
1241

1242
// htlcManager is the primary goroutine which drives a channel's commitment
1243
// update state-machine in response to messages received via several channels.
1244
// This goroutine reads messages from the upstream (remote) peer, and also from
1245
// downstream channel managed by the channel link. In the event that an htlc
1246
// needs to be forwarded, then send-only forward handler is used which sends
1247
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1248
// all timeouts for any active HTLCs, manages the channel's revocation window,
1249
// and also the htlc trickle queue+timer for this active channels.
1250
//
1251
// NOTE: This MUST be run as a goroutine.
1252
//
1253
//nolint:funlen
1254
func (l *channelLink) htlcManager(ctx context.Context) {
216✔
1255
        defer func() {
423✔
1256
                l.cfg.BatchTicker.Stop()
207✔
1257
                l.cg.WgDone()
207✔
1258
                l.log.Infof("exited")
207✔
1259
        }()
207✔
1260

1261
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
216✔
1262

216✔
1263
        // Notify any clients that the link is now in the switch via an
216✔
1264
        // ActiveLinkEvent. We'll also defer an inactive link notification for
216✔
1265
        // when the link exits to ensure that every active notification is
216✔
1266
        // matched by an inactive one.
216✔
1267
        l.cfg.NotifyActiveLink(l.ChannelPoint())
216✔
1268
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
216✔
1269

216✔
1270
        // TODO(roasbeef): need to call wipe chan whenever D/C?
216✔
1271

216✔
1272
        // If this isn't the first time that this channel link has been
216✔
1273
        // created, then we'll need to check to see if we need to
216✔
1274
        // re-synchronize state with the remote peer. settledHtlcs is a map of
216✔
1275
        // HTLC's that we re-settled as part of the channel state sync.
216✔
1276
        if l.cfg.SyncStates {
389✔
1277
                err := l.syncChanStates(ctx)
173✔
1278
                if err != nil {
176✔
1279
                        l.handleChanSyncErr(err)
3✔
1280
                        return
3✔
1281
                }
3✔
1282
        }
1283

1284
        // If a shutdown message has previously been sent on this link, then we
1285
        // need to make sure that we have disabled any HTLC adds on the outgoing
1286
        // direction of the link and that we re-resend the same shutdown message
1287
        // that we previously sent.
1288
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
219✔
1289
                // Immediately disallow any new outgoing HTLCs.
3✔
1290
                if !l.DisableAdds(Outgoing) {
3✔
1291
                        l.log.Warnf("Outgoing link adds already disabled")
×
1292
                }
×
1293

1294
                // Re-send the shutdown message the peer. Since syncChanStates
1295
                // would have sent any outstanding CommitSig, it is fine for us
1296
                // to immediately queue the shutdown message now.
1297
                err := l.cfg.Peer.SendMessage(false, &shutdown)
3✔
1298
                if err != nil {
3✔
1299
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1300
                }
×
1301
        })
1302

1303
        // We've successfully reestablished the channel, mark it as such to
1304
        // allow the switch to forward HTLCs in the outbound direction.
1305
        l.markReestablished()
216✔
1306

216✔
1307
        // Now that we've received both channel_ready and channel reestablish,
216✔
1308
        // we can go ahead and send the active channel notification. We'll also
216✔
1309
        // defer the inactive notification for when the link exits to ensure
216✔
1310
        // that every active notification is matched by an inactive one.
216✔
1311
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
216✔
1312
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
216✔
1313

216✔
1314
        // With the channel states synced, we now reset the mailbox to ensure
216✔
1315
        // we start processing all unacked packets in order. This is done here
216✔
1316
        // to ensure that all acknowledgments that occur during channel
216✔
1317
        // resynchronization have taken affect, causing us only to pull unacked
216✔
1318
        // packets after starting to read from the downstream mailbox.
216✔
1319
        l.mailBox.ResetPackets()
216✔
1320

216✔
1321
        // After cleaning up any memory pertaining to incoming packets, we now
216✔
1322
        // replay our forwarding packages to handle any htlcs that can be
216✔
1323
        // processed locally, or need to be forwarded out to the switch. We will
216✔
1324
        // only attempt to resolve packages if our short chan id indicates that
216✔
1325
        // the channel is not pending, otherwise we should have no htlcs to
216✔
1326
        // reforward.
216✔
1327
        if l.ShortChanID() != hop.Source {
432✔
1328
                err := l.resolveFwdPkgs(ctx)
216✔
1329
                switch err {
216✔
1330
                // No error was encountered, success.
1331
                case nil:
215✔
1332

1333
                // If the duplicate keystone error was encountered, we'll fail
1334
                // without sending an Error message to the peer.
1335
                case ErrDuplicateKeystone:
×
1336
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1337
                                "temporary circuit error: %v", err)
×
1338
                        return
×
1339

1340
                // A non-nil error was encountered, send an Error message to
1341
                // the peer.
1342
                default:
1✔
1343
                        l.failf(LinkFailureError{code: ErrInternalError},
1✔
1344
                                "unable to resolve fwd pkgs: %v", err)
1✔
1345
                        return
1✔
1346
                }
1347

1348
                // With our link's in-memory state fully reconstructed, spawn a
1349
                // goroutine to manage the reclamation of disk space occupied by
1350
                // completed forwarding packages.
1351
                l.cg.WgAdd(1)
215✔
1352
                go l.fwdPkgGarbager()
215✔
1353
        }
1354

1355
        for {
4,370✔
1356
                // We must always check if we failed at some point processing
4,155✔
1357
                // the last update before processing the next.
4,155✔
1358
                if l.failed {
4,171✔
1359
                        l.log.Errorf("link failed, exiting htlcManager")
16✔
1360
                        return
16✔
1361
                }
16✔
1362

1363
                // If the previous event resulted in a non-empty batch, resume
1364
                // the batch ticker so that it can be cleared. Otherwise pause
1365
                // the ticker to prevent waking up the htlcManager while the
1366
                // batch is empty.
1367
                numUpdates := l.channel.NumPendingUpdates(
4,142✔
1368
                        lntypes.Local, lntypes.Remote,
4,142✔
1369
                )
4,142✔
1370
                if numUpdates > 0 {
4,658✔
1371
                        l.cfg.BatchTicker.Resume()
516✔
1372
                        l.log.Tracef("BatchTicker resumed, "+
516✔
1373
                                "NumPendingUpdates(Local, Remote)=%d",
516✔
1374
                                numUpdates,
516✔
1375
                        )
516✔
1376
                } else {
4,145✔
1377
                        l.cfg.BatchTicker.Pause()
3,629✔
1378
                        l.log.Trace("BatchTicker paused due to zero " +
3,629✔
1379
                                "NumPendingUpdates(Local, Remote)")
3,629✔
1380
                }
3,629✔
1381

1382
                select {
4,142✔
1383
                // We have a new hook that needs to be run when we reach a clean
1384
                // channel state.
1385
                case hook := <-l.flushHooks.newTransients:
4✔
1386
                        if l.channel.IsChannelClean() {
7✔
1387
                                hook()
3✔
1388
                        } else {
7✔
1389
                                l.flushHooks.alloc(hook)
4✔
1390
                        }
4✔
1391

1392
                // We have a new hook that needs to be run when we have
1393
                // committed all of our updates.
1394
                case hook := <-l.outgoingCommitHooks.newTransients:
4✔
1395
                        if !l.channel.OweCommitment() {
7✔
1396
                                hook()
3✔
1397
                        } else {
4✔
1398
                                l.outgoingCommitHooks.alloc(hook)
1✔
1399
                        }
1✔
1400

1401
                // We have a new hook that needs to be run when our peer has
1402
                // committed all of their updates.
1403
                case hook := <-l.incomingCommitHooks.newTransients:
×
1404
                        if !l.channel.NeedCommitment() {
×
1405
                                hook()
×
1406
                        } else {
×
1407
                                l.incomingCommitHooks.alloc(hook)
×
1408
                        }
×
1409

1410
                // Our update fee timer has fired, so we'll check the network
1411
                // fee to see if we should adjust our commitment fee.
1412
                case <-l.updateFeeTimer.C:
4✔
1413
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1414

4✔
1415
                        // If we're not the initiator of the channel, don't we
4✔
1416
                        // don't control the fees, so we can ignore this.
4✔
1417
                        if !l.channel.IsInitiator() {
4✔
1418
                                continue
×
1419
                        }
1420

1421
                        // If we are the initiator, then we'll sample the
1422
                        // current fee rate to get into the chain within 3
1423
                        // blocks.
1424
                        netFee, err := l.sampleNetworkFee()
4✔
1425
                        if err != nil {
4✔
1426
                                l.log.Errorf("unable to sample network fee: %v",
×
1427
                                        err)
×
1428
                                continue
×
1429
                        }
1430

1431
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1432

4✔
1433
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1434
                                netFee, minRelayFee,
4✔
1435
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1436
                                l.cfg.MaxFeeAllocation,
4✔
1437
                        )
4✔
1438

4✔
1439
                        // We determine if we should adjust the commitment fee
4✔
1440
                        // based on the current commitment fee, the suggested
4✔
1441
                        // new commitment fee and the current minimum relay fee
4✔
1442
                        // rate.
4✔
1443
                        commitFee := l.channel.CommitFeeRate()
4✔
1444
                        if !shouldAdjustCommitFee(
4✔
1445
                                newCommitFee, commitFee, minRelayFee,
4✔
1446
                        ) {
5✔
1447

1✔
1448
                                continue
1✔
1449
                        }
1450

1451
                        // If we do, then we'll send a new UpdateFee message to
1452
                        // the remote party, to be locked in with a new update.
1453
                        err = l.updateChannelFee(ctx, newCommitFee)
3✔
1454
                        if err != nil {
3✔
1455
                                l.log.Errorf("unable to update fee rate: %v",
×
1456
                                        err)
×
1457
                                continue
×
1458
                        }
1459

1460
                // The underlying channel has notified us of a unilateral close
1461
                // carried out by the remote peer. In the case of such an
1462
                // event, we'll wipe the channel state from the peer, and mark
1463
                // the contract as fully settled. Afterwards we can exit.
1464
                //
1465
                // TODO(roasbeef): add force closure? also breach?
1466
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
3✔
1467
                        l.log.Warnf("remote peer has closed on-chain")
3✔
1468

3✔
1469
                        // TODO(roasbeef): remove all together
3✔
1470
                        go func() {
6✔
1471
                                chanPoint := l.channel.ChannelPoint()
3✔
1472
                                l.cfg.Peer.WipeChannel(&chanPoint)
3✔
1473
                        }()
3✔
1474

1475
                        return
3✔
1476

1477
                case <-l.cfg.BatchTicker.Ticks():
199✔
1478
                        // Attempt to extend the remote commitment chain
199✔
1479
                        // including all the currently pending entries. If the
199✔
1480
                        // send was unsuccessful, then abandon the update,
199✔
1481
                        // waiting for the revocation window to open up.
199✔
1482
                        if !l.updateCommitTxOrFail(ctx) {
199✔
1483
                                return
×
1484
                        }
×
1485

1486
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1487
                        l.failf(
1✔
1488
                                LinkFailureError{
1✔
1489
                                        code:          ErrRemoteUnresponsive,
1✔
1490
                                        FailureAction: LinkFailureDisconnect,
1✔
1491
                                },
1✔
1492
                                "unable to complete dance",
1✔
1493
                        )
1✔
1494
                        return
1✔
1495

1496
                // A message from the switch was just received. This indicates
1497
                // that the link is an intermediate hop in a multi-hop HTLC
1498
                // circuit.
1499
                case pkt := <-l.downstream:
524✔
1500
                        l.handleDownstreamPkt(ctx, pkt)
524✔
1501

1502
                // A message from the connected peer was just received. This
1503
                // indicates that we have a new incoming HTLC, either directly
1504
                // for us, or part of a multi-hop HTLC circuit.
1505
                case msg := <-l.upstream:
3,164✔
1506
                        l.handleUpstreamMsg(ctx, msg)
3,164✔
1507

1508
                // A htlc resolution is received. This means that we now have a
1509
                // resolution for a previously accepted htlc.
1510
                case hodlItem := <-l.hodlQueue.ChanOut():
58✔
1511
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
58✔
1512
                        err := l.processHodlQueue(ctx, htlcResolution)
58✔
1513
                        switch err {
58✔
1514
                        // No error, success.
1515
                        case nil:
57✔
1516

1517
                        // If the duplicate keystone error was encountered,
1518
                        // fail back gracefully.
1519
                        case ErrDuplicateKeystone:
×
1520
                                l.failf(LinkFailureError{
×
1521
                                        code: ErrCircuitError,
×
1522
                                }, "process hodl queue: "+
×
1523
                                        "temporary circuit error: %v",
×
1524
                                        err,
×
1525
                                )
×
1526

1527
                        // Send an Error message to the peer.
1528
                        default:
1✔
1529
                                l.failf(LinkFailureError{
1✔
1530
                                        code: ErrInternalError,
1✔
1531
                                }, "process hodl queue: unable to update "+
1✔
1532
                                        "commitment: %v", err,
1✔
1533
                                )
1✔
1534
                        }
1535

1536
                case qReq := <-l.quiescenceReqs:
4✔
1537
                        l.quiescer.InitStfu(qReq)
4✔
1538

4✔
1539
                        if l.noDanglingUpdates(lntypes.Local) {
8✔
1540
                                err := l.quiescer.SendOwedStfu()
4✔
1541
                                if err != nil {
4✔
1542
                                        l.stfuFailf(
×
1543
                                                "SendOwedStfu: %s", err.Error(),
×
1544
                                        )
×
1545
                                        res := fn.Err[lntypes.ChannelParty](err)
×
1546
                                        qReq.Resolve(res)
×
1547
                                }
×
1548
                        }
1549

1550
                case <-l.cg.Done():
192✔
1551
                        return
192✔
1552
                }
1553
        }
1554
}
1555

1556
// processHodlQueue processes a received htlc resolution and continues reading
1557
// from the hodl queue until no more resolutions remain. When this function
1558
// returns without an error, the commit tx should be updated.
1559
func (l *channelLink) processHodlQueue(ctx context.Context,
1560
        firstResolution invoices.HtlcResolution) error {
58✔
1561

58✔
1562
        // Try to read all waiting resolution messages, so that they can all be
58✔
1563
        // processed in a single commitment tx update.
58✔
1564
        htlcResolution := firstResolution
58✔
1565
loop:
58✔
1566
        for {
116✔
1567
                // Lookup all hodl htlcs that can be failed or settled with this event.
58✔
1568
                // The hodl htlc must be present in the map.
58✔
1569
                circuitKey := htlcResolution.CircuitKey()
58✔
1570
                hodlHtlc, ok := l.hodlMap[circuitKey]
58✔
1571
                if !ok {
58✔
1572
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1573
                }
×
1574

1575
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
58✔
1576
                        return err
×
1577
                }
×
1578

1579
                // Clean up hodl map.
1580
                delete(l.hodlMap, circuitKey)
58✔
1581

58✔
1582
                select {
58✔
1583
                case item := <-l.hodlQueue.ChanOut():
3✔
1584
                        htlcResolution = item.(invoices.HtlcResolution)
3✔
1585
                default:
58✔
1586
                        break loop
58✔
1587
                }
1588
        }
1589

1590
        // Update the commitment tx.
1591
        if err := l.updateCommitTx(ctx); err != nil {
59✔
1592
                return err
1✔
1593
        }
1✔
1594

1595
        return nil
57✔
1596
}
1597

1598
// processHtlcResolution applies a received htlc resolution to the provided
1599
// htlc. When this function returns without an error, the commit tx should be
1600
// updated.
1601
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1602
        htlc hodlHtlc) error {
204✔
1603

204✔
1604
        circuitKey := resolution.CircuitKey()
204✔
1605

204✔
1606
        // Determine required action for the resolution based on the type of
204✔
1607
        // resolution we have received.
204✔
1608
        switch res := resolution.(type) {
204✔
1609
        // Settle htlcs that returned a settle resolution using the preimage
1610
        // in the resolution.
1611
        case *invoices.HtlcSettleResolution:
200✔
1612
                l.log.Debugf("received settle resolution for %v "+
200✔
1613
                        "with outcome: %v", circuitKey, res.Outcome)
200✔
1614

200✔
1615
                return l.settleHTLC(
200✔
1616
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
200✔
1617
                )
200✔
1618

1619
        // For htlc failures, we get the relevant failure message based
1620
        // on the failure resolution and then fail the htlc.
1621
        case *invoices.HtlcFailResolution:
7✔
1622
                l.log.Debugf("received cancel resolution for "+
7✔
1623
                        "%v with outcome: %v", circuitKey, res.Outcome)
7✔
1624

7✔
1625
                // Get the lnwire failure message based on the resolution
7✔
1626
                // result.
7✔
1627
                failure := getResolutionFailure(res, htlc.add.Amount)
7✔
1628

7✔
1629
                l.sendHTLCError(
7✔
1630
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
7✔
1631
                        true,
7✔
1632
                )
7✔
1633
                return nil
7✔
1634

1635
        // Fail if we do not get a settle of fail resolution, since we
1636
        // are only expecting to handle settles and fails.
1637
        default:
×
1638
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1639
                        resolution)
×
1640
        }
1641
}
1642

1643
// getResolutionFailure returns the wire message that a htlc resolution should
1644
// be failed with.
1645
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1646
        amount lnwire.MilliSatoshi) *LinkError {
7✔
1647

7✔
1648
        // If the resolution has been resolved as part of a MPP timeout,
7✔
1649
        // we need to fail the htlc with lnwire.FailMppTimeout.
7✔
1650
        if resolution.Outcome == invoices.ResultMppTimeout {
7✔
1651
                return NewDetailedLinkError(
×
1652
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1653
                )
×
1654
        }
×
1655

1656
        // If the htlc is not a MPP timeout, we fail it with
1657
        // FailIncorrectDetails. This error is sent for invoice payment
1658
        // failures such as underpayment/ expiry too soon and hodl invoices
1659
        // (which return FailIncorrectDetails to avoid leaking information).
1660
        incorrectDetails := lnwire.NewFailIncorrectDetails(
7✔
1661
                amount, uint32(resolution.AcceptHeight),
7✔
1662
        )
7✔
1663

7✔
1664
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
7✔
1665
}
1666

1667
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1668
// within the link's configuration that will be used to determine when the link
1669
// should propose an update to its commitment fee rate.
1670
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
220✔
1671
        lower := int64(l.cfg.MinUpdateTimeout)
220✔
1672
        upper := int64(l.cfg.MaxUpdateTimeout)
220✔
1673
        return time.Duration(prand.Int63n(upper-lower) + lower)
220✔
1674
}
220✔
1675

1676
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1677
// downstream HTLC Switch.
1678
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1679
        pkt *htlcPacket) error {
483✔
1680

483✔
1681
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
483✔
1682
        if !ok {
483✔
1683
                return errors.New("not an UpdateAddHTLC packet")
×
1684
        }
×
1685

1686
        // If we are flushing the link in the outgoing direction or we have
1687
        // already sent Stfu, then we can't add new htlcs to the link and we
1688
        // need to bounce it.
1689
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
483✔
1690
                l.mailBox.FailAdd(pkt)
×
1691

×
1692
                return NewDetailedLinkError(
×
1693
                        &lnwire.FailTemporaryChannelFailure{},
×
1694
                        OutgoingFailureLinkNotEligible,
×
1695
                )
×
1696
        }
×
1697

1698
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1699
        // arbitrary delays between the switch adding an ADD to the
1700
        // mailbox, and the HTLC being added to the commitment state.
1701
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
483✔
1702
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1703
                l.mailBox.AckPacket(pkt.inKey())
×
1704
                return nil
×
1705
        }
×
1706

1707
        // Check if we can add the HTLC here without exceededing the max fee
1708
        // exposure threshold.
1709
        if l.isOverexposedWithHtlc(htlc, false) {
487✔
1710
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1711
                        "exposure exceeded")
4✔
1712

4✔
1713
                l.mailBox.FailAdd(pkt)
4✔
1714

4✔
1715
                return NewDetailedLinkError(
4✔
1716
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1717
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1718
                )
4✔
1719
        }
4✔
1720

1721
        // A new payment has been initiated via the downstream channel,
1722
        // so we add the new HTLC to our local log, then update the
1723
        // commitment chains.
1724
        htlc.ChanID = l.ChanID()
479✔
1725
        openCircuitRef := pkt.inKey()
479✔
1726

479✔
1727
        // We enforce the fee buffer for the commitment transaction because
479✔
1728
        // we are in control of adding this htlc. Nothing has locked-in yet so
479✔
1729
        // we can securely enforce the fee buffer which is only relevant if we
479✔
1730
        // are the initiator of the channel.
479✔
1731
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
479✔
1732
        if err != nil {
483✔
1733
                // The HTLC was unable to be added to the state machine,
4✔
1734
                // as a result, we'll signal the switch to cancel the
4✔
1735
                // pending payment.
4✔
1736
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
4✔
1737
                        err)
4✔
1738

4✔
1739
                // Remove this packet from the link's mailbox, this
4✔
1740
                // prevents it from being reprocessed if the link
4✔
1741
                // restarts and resets it mailbox. If this response
4✔
1742
                // doesn't make it back to the originating link, it will
4✔
1743
                // be rejected upon attempting to reforward the Add to
4✔
1744
                // the switch, since the circuit was never fully opened,
4✔
1745
                // and the forwarding package shows it as
4✔
1746
                // unacknowledged.
4✔
1747
                l.mailBox.FailAdd(pkt)
4✔
1748

4✔
1749
                return NewDetailedLinkError(
4✔
1750
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1751
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1752
                )
4✔
1753
        }
4✔
1754

1755
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
478✔
1756
                "local_log_index=%v, pend_updates=%v",
478✔
1757
                htlc.PaymentHash[:], index,
478✔
1758
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
478✔
1759

478✔
1760
        pkt.outgoingChanID = l.ShortChanID()
478✔
1761
        pkt.outgoingHTLCID = index
478✔
1762
        htlc.ID = index
478✔
1763

478✔
1764
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
478✔
1765
                pkt.inKey(), pkt.outKey())
478✔
1766

478✔
1767
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
478✔
1768
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
478✔
1769

478✔
1770
        _ = l.cfg.Peer.SendMessage(false, htlc)
478✔
1771

478✔
1772
        // Send a forward event notification to htlcNotifier.
478✔
1773
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
478✔
1774
                newHtlcKey(pkt),
478✔
1775
                HtlcInfo{
478✔
1776
                        IncomingTimeLock: pkt.incomingTimeout,
478✔
1777
                        IncomingAmt:      pkt.incomingAmount,
478✔
1778
                        OutgoingTimeLock: htlc.Expiry,
478✔
1779
                        OutgoingAmt:      htlc.Amount,
478✔
1780
                },
478✔
1781
                getEventType(pkt),
478✔
1782
        )
478✔
1783

478✔
1784
        l.tryBatchUpdateCommitTx(ctx)
478✔
1785

478✔
1786
        return nil
478✔
1787
}
1788

1789
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1790
// Switch. Possible messages sent by the switch include requests to forward new
1791
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1792
// cleared HTLCs with the upstream peer.
1793
//
1794
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1795
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1796
        pkt *htlcPacket) {
524✔
1797

524✔
1798
        if pkt.htlc.MsgType().IsChannelUpdate() &&
524✔
1799
                !l.quiescer.CanSendUpdates() {
524✔
1800

×
1801
                l.log.Warnf("unable to process channel update. "+
×
1802
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1803

×
1804
                return
×
1805
        }
×
1806

1807
        switch htlc := pkt.htlc.(type) {
524✔
1808
        case *lnwire.UpdateAddHTLC:
483✔
1809
                // Handle add message. The returned error can be ignored,
483✔
1810
                // because it is also sent through the mailbox.
483✔
1811
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
483✔
1812

1813
        case *lnwire.UpdateFulfillHTLC:
26✔
1814
                // If hodl.SettleOutgoing mode is active, we exit early to
26✔
1815
                // simulate arbitrary delays between the switch adding the
26✔
1816
                // SETTLE to the mailbox, and the HTLC being added to the
26✔
1817
                // commitment state.
26✔
1818
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
26✔
1819
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1820
                        l.mailBox.AckPacket(pkt.inKey())
×
1821
                        return
×
1822
                }
×
1823

1824
                // An HTLC we forward to the switch has just settled somewhere
1825
                // upstream. Therefore we settle the HTLC within the our local
1826
                // state machine.
1827
                inKey := pkt.inKey()
26✔
1828
                err := l.channel.SettleHTLC(
26✔
1829
                        htlc.PaymentPreimage,
26✔
1830
                        pkt.incomingHTLCID,
26✔
1831
                        pkt.sourceRef,
26✔
1832
                        pkt.destRef,
26✔
1833
                        &inKey,
26✔
1834
                )
26✔
1835
                if err != nil {
26✔
1836
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1837
                                "circuit-key=%v: %v", inKey, err)
×
1838

×
1839
                        // If the HTLC index for Settle response was not known
×
1840
                        // to our commitment state, it has already been
×
1841
                        // cleaned up by a prior response. We'll thus try to
×
1842
                        // clean up any lingering state to ensure we don't
×
1843
                        // continue reforwarding.
×
1844
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1845
                                l.cleanupSpuriousResponse(pkt)
×
1846
                        }
×
1847

1848
                        // Remove the packet from the link's mailbox to ensure
1849
                        // it doesn't get replayed after a reconnection.
1850
                        l.mailBox.AckPacket(inKey)
×
1851

×
1852
                        return
×
1853
                }
1854

1855
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
26✔
1856
                        "%s->%s", pkt.inKey(), pkt.outKey())
26✔
1857

26✔
1858
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
26✔
1859

26✔
1860
                // With the HTLC settled, we'll need to populate the wire
26✔
1861
                // message to target the specific channel and HTLC to be
26✔
1862
                // canceled.
26✔
1863
                htlc.ChanID = l.ChanID()
26✔
1864
                htlc.ID = pkt.incomingHTLCID
26✔
1865

26✔
1866
                // Then we send the HTLC settle message to the connected peer
26✔
1867
                // so we can continue the propagation of the settle message.
26✔
1868
                l.cfg.Peer.SendMessage(false, htlc)
26✔
1869

26✔
1870
                // Send a settle event notification to htlcNotifier.
26✔
1871
                l.cfg.HtlcNotifier.NotifySettleEvent(
26✔
1872
                        newHtlcKey(pkt),
26✔
1873
                        htlc.PaymentPreimage,
26✔
1874
                        getEventType(pkt),
26✔
1875
                )
26✔
1876

26✔
1877
                // Immediately update the commitment tx to minimize latency.
26✔
1878
                l.updateCommitTxOrFail(ctx)
26✔
1879

1880
        case *lnwire.UpdateFailHTLC:
21✔
1881
                // If hodl.FailOutgoing mode is active, we exit early to
21✔
1882
                // simulate arbitrary delays between the switch adding a FAIL to
21✔
1883
                // the mailbox, and the HTLC being added to the commitment
21✔
1884
                // state.
21✔
1885
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
21✔
1886
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1887
                        l.mailBox.AckPacket(pkt.inKey())
×
1888
                        return
×
1889
                }
×
1890

1891
                // An HTLC cancellation has been triggered somewhere upstream,
1892
                // we'll remove then HTLC from our local state machine.
1893
                inKey := pkt.inKey()
21✔
1894
                err := l.channel.FailHTLC(
21✔
1895
                        pkt.incomingHTLCID,
21✔
1896
                        htlc.Reason,
21✔
1897
                        pkt.sourceRef,
21✔
1898
                        pkt.destRef,
21✔
1899
                        &inKey,
21✔
1900
                )
21✔
1901
                if err != nil {
26✔
1902
                        l.log.Errorf("unable to cancel incoming HTLC for "+
5✔
1903
                                "circuit-key=%v: %v", inKey, err)
5✔
1904

5✔
1905
                        // If the HTLC index for Fail response was not known to
5✔
1906
                        // our commitment state, it has already been cleaned up
5✔
1907
                        // by a prior response. We'll thus try to clean up any
5✔
1908
                        // lingering state to ensure we don't continue
5✔
1909
                        // reforwarding.
5✔
1910
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
7✔
1911
                                l.cleanupSpuriousResponse(pkt)
2✔
1912
                        }
2✔
1913

1914
                        // Remove the packet from the link's mailbox to ensure
1915
                        // it doesn't get replayed after a reconnection.
1916
                        l.mailBox.AckPacket(inKey)
5✔
1917

5✔
1918
                        return
5✔
1919
                }
1920

1921
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
19✔
1922
                        pkt.inKey(), pkt.outKey())
19✔
1923

19✔
1924
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
19✔
1925

19✔
1926
                // With the HTLC removed, we'll need to populate the wire
19✔
1927
                // message to target the specific channel and HTLC to be
19✔
1928
                // canceled. The "Reason" field will have already been set
19✔
1929
                // within the switch.
19✔
1930
                htlc.ChanID = l.ChanID()
19✔
1931
                htlc.ID = pkt.incomingHTLCID
19✔
1932

19✔
1933
                // We send the HTLC message to the peer which initially created
19✔
1934
                // the HTLC. If the incoming blinding point is non-nil, we
19✔
1935
                // know that we are a relaying node in a blinded path.
19✔
1936
                // Otherwise, we're either an introduction node or not part of
19✔
1937
                // a blinded path at all.
19✔
1938
                if err := l.sendIncomingHTLCFailureMsg(
19✔
1939
                        htlc.ID,
19✔
1940
                        pkt.obfuscator,
19✔
1941
                        htlc.Reason,
19✔
1942
                ); err != nil {
19✔
1943
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1944
                                err)
×
1945

×
1946
                        return
×
1947
                }
×
1948

1949
                // If the packet does not have a link failure set, it failed
1950
                // further down the route so we notify a forwarding failure.
1951
                // Otherwise, we notify a link failure because it failed at our
1952
                // node.
1953
                if pkt.linkFailure != nil {
32✔
1954
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
13✔
1955
                                newHtlcKey(pkt),
13✔
1956
                                newHtlcInfo(pkt),
13✔
1957
                                getEventType(pkt),
13✔
1958
                                pkt.linkFailure,
13✔
1959
                                false,
13✔
1960
                        )
13✔
1961
                } else {
22✔
1962
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
9✔
1963
                                newHtlcKey(pkt), getEventType(pkt),
9✔
1964
                        )
9✔
1965
                }
9✔
1966

1967
                // Immediately update the commitment tx to minimize latency.
1968
                l.updateCommitTxOrFail(ctx)
19✔
1969
        }
1970
}
1971

1972
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1973
// full.
1974
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
478✔
1975
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
478✔
1976
        if pending < uint64(l.cfg.BatchSize) {
932✔
1977
                return
454✔
1978
        }
454✔
1979

1980
        l.updateCommitTxOrFail(ctx)
27✔
1981
}
1982

1983
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1984
// associated with this packet. If successful in doing so, it will also purge
1985
// the open circuit from the circuit map and remove the packet from the link's
1986
// mailbox.
1987
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1988
        inKey := pkt.inKey()
2✔
1989

2✔
1990
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1991
                "circuit-key=%v", inKey)
2✔
1992

2✔
1993
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1994
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1995
        if pkt.sourceRef == nil {
3✔
1996
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1997
                        "circuit-key=%v, does not contain source reference",
1✔
1998
                        inKey)
1✔
1999
                return
1✔
2000
        }
1✔
2001

2002
        // If the source reference is present,  we will try to prevent this link
2003
        // from resending the packet to the switch. To do so, we ack the AddRef
2004
        // of the incoming HTLC belonging to this link.
2005
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
2006
        if err != nil {
1✔
2007
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2008
                        "circuit-key=%v: %v", inKey, err)
×
2009

×
2010
                // If this operation failed, it is unsafe to attempt removal of
×
2011
                // the destination reference or circuit, so we exit early. The
×
2012
                // cleanup may proceed with a different packet in the future
×
2013
                // that succeeds on this step.
×
2014
                return
×
2015
        }
×
2016

2017
        // Now that we know this link will stop retransmitting Adds to the
2018
        // switch, we can begin to teardown the response reference and circuit
2019
        // map.
2020
        //
2021
        // If the packet includes a destination reference, then a response for
2022
        // this HTLC was locked into the outgoing channel. Attempt to remove
2023
        // this reference, so we stop retransmitting the response internally.
2024
        // Even if this fails, we will proceed in trying to delete the circuit.
2025
        // When retransmitting responses, the destination references will be
2026
        // cleaned up if an open circuit is not found in the circuit map.
2027
        if pkt.destRef != nil {
1✔
2028
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2029
                if err != nil {
×
2030
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2031
                                "for incoming circuit-key=%v: %v",
×
2032
                                inKey, err)
×
2033
                }
×
2034
        }
2035

2036
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
2037

1✔
2038
        // With all known references acked, we can now safely delete the circuit
1✔
2039
        // from the switch's circuit map, as the state is no longer needed.
1✔
2040
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
2041
        if err != nil {
1✔
2042
                l.log.Errorf("unable to delete circuit for "+
×
2043
                        "circuit-key=%v: %v", inKey, err)
×
2044
        }
×
2045
}
2046

2047
// handleUpstreamMsg processes wire messages related to commitment state
2048
// updates from the upstream peer. The upstream peer is the peer whom we have a
2049
// direct channel with, updating our respective commitment chains.
2050
//
2051
//nolint:funlen
2052
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
2053
        msg lnwire.Message) {
3,164✔
2054

3,164✔
2055
        // First check if the message is an update and we are capable of
3,164✔
2056
        // receiving updates right now.
3,164✔
2057
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,164✔
2058
                l.stfuFailf("update received after stfu: %T", msg)
×
2059
                return
×
2060
        }
×
2061

2062
        switch msg := msg.(type) {
3,164✔
2063
        case *lnwire.UpdateAddHTLC:
453✔
2064
                if l.IsFlushing(Incoming) {
453✔
2065
                        // This is forbidden by the protocol specification.
×
2066
                        // The best chance we have to deal with this is to drop
×
2067
                        // the connection. This should roll back the channel
×
2068
                        // state to the last CommitSig. If the remote has
×
2069
                        // already sent a CommitSig we haven't received yet,
×
2070
                        // channel state will be re-synchronized with a
×
2071
                        // ChannelReestablish message upon reconnection and the
×
2072
                        // protocol state that caused us to flush the link will
×
2073
                        // be rolled back. In the event that there was some
×
2074
                        // non-deterministic behavior in the remote that caused
×
2075
                        // them to violate the protocol, we have a decent shot
×
2076
                        // at correcting it this way, since reconnecting will
×
2077
                        // put us in the cleanest possible state to try again.
×
2078
                        //
×
2079
                        // In addition to the above, it is possible for us to
×
2080
                        // hit this case in situations where we improperly
×
2081
                        // handle message ordering due to concurrency choices.
×
2082
                        // An issue has been filed to address this here:
×
2083
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2084
                        l.failf(
×
2085
                                LinkFailureError{
×
2086
                                        code:             ErrInvalidUpdate,
×
2087
                                        FailureAction:    LinkFailureDisconnect,
×
2088
                                        PermanentFailure: false,
×
2089
                                        Warning:          true,
×
2090
                                },
×
2091
                                "received add while link is flushing",
×
2092
                        )
×
2093

×
2094
                        return
×
2095
                }
×
2096

2097
                // Disallow htlcs with blinding points set if we haven't
2098
                // enabled the feature. This saves us from having to process
2099
                // the onion at all, but will only catch blinded payments
2100
                // where we are a relaying node (as the blinding point will
2101
                // be in the payload when we're the introduction node).
2102
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
453✔
2103
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2104
                                "blinding point included when route blinding "+
×
2105
                                        "is disabled")
×
2106

×
2107
                        return
×
2108
                }
×
2109

2110
                // We have to check the limit here rather than later in the
2111
                // switch because the counterparty can keep sending HTLC's
2112
                // without sending a revoke. This would mean that the switch
2113
                // check would only occur later.
2114
                if l.isOverexposedWithHtlc(msg, true) {
453✔
2115
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2116
                                "peer sent us an HTLC that exceeded our max "+
×
2117
                                        "fee exposure")
×
2118

×
2119
                        return
×
2120
                }
×
2121

2122
                // We just received an add request from an upstream peer, so we
2123
                // add it to our state machine, then add the HTLC to our
2124
                // "settle" list in the event that we know the preimage.
2125
                index, err := l.channel.ReceiveHTLC(msg)
453✔
2126
                if err != nil {
453✔
2127
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2128
                                "unable to handle upstream add HTLC: %v", err)
×
2129
                        return
×
2130
                }
×
2131

2132
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
453✔
2133
                        "assigning index: %v", msg.PaymentHash[:], index)
453✔
2134

2135
        case *lnwire.UpdateFulfillHTLC:
230✔
2136
                pre := msg.PaymentPreimage
230✔
2137
                idx := msg.ID
230✔
2138

230✔
2139
                // Before we pipeline the settle, we'll check the set of active
230✔
2140
                // htlc's to see if the related UpdateAddHTLC has been fully
230✔
2141
                // locked-in.
230✔
2142
                var lockedin bool
230✔
2143
                htlcs := l.channel.ActiveHtlcs()
230✔
2144
                for _, add := range htlcs {
936✔
2145
                        // The HTLC will be outgoing and match idx.
706✔
2146
                        if !add.Incoming && add.HtlcIndex == idx {
934✔
2147
                                lockedin = true
228✔
2148
                                break
228✔
2149
                        }
2150
                }
2151

2152
                if !lockedin {
232✔
2153
                        l.failf(
2✔
2154
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2155
                                "unable to handle upstream settle",
2✔
2156
                        )
2✔
2157
                        return
2✔
2158
                }
2✔
2159

2160
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
231✔
2161
                        l.failf(
3✔
2162
                                LinkFailureError{
3✔
2163
                                        code:          ErrInvalidUpdate,
3✔
2164
                                        FailureAction: LinkFailureForceClose,
3✔
2165
                                },
3✔
2166
                                "unable to handle upstream settle HTLC: %v", err,
3✔
2167
                        )
3✔
2168
                        return
3✔
2169
                }
3✔
2170

2171
                settlePacket := &htlcPacket{
228✔
2172
                        outgoingChanID: l.ShortChanID(),
228✔
2173
                        outgoingHTLCID: idx,
228✔
2174
                        htlc: &lnwire.UpdateFulfillHTLC{
228✔
2175
                                PaymentPreimage: pre,
228✔
2176
                        },
228✔
2177
                }
228✔
2178

228✔
2179
                // Add the newly discovered preimage to our growing list of
228✔
2180
                // uncommitted preimage. These will be written to the witness
228✔
2181
                // cache just before accepting the next commitment signature
228✔
2182
                // from the remote peer.
228✔
2183
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
228✔
2184

228✔
2185
                // Pipeline this settle, send it to the switch.
228✔
2186
                go l.forwardBatch(false, settlePacket)
228✔
2187

2188
        case *lnwire.UpdateFailMalformedHTLC:
6✔
2189
                // Convert the failure type encoded within the HTLC fail
6✔
2190
                // message to the proper generic lnwire error code.
6✔
2191
                var failure lnwire.FailureMessage
6✔
2192
                switch msg.FailureCode {
6✔
2193
                case lnwire.CodeInvalidOnionVersion:
4✔
2194
                        failure = &lnwire.FailInvalidOnionVersion{
4✔
2195
                                OnionSHA256: msg.ShaOnionBlob,
4✔
2196
                        }
4✔
2197
                case lnwire.CodeInvalidOnionHmac:
×
2198
                        failure = &lnwire.FailInvalidOnionHmac{
×
2199
                                OnionSHA256: msg.ShaOnionBlob,
×
2200
                        }
×
2201

2202
                case lnwire.CodeInvalidOnionKey:
×
2203
                        failure = &lnwire.FailInvalidOnionKey{
×
2204
                                OnionSHA256: msg.ShaOnionBlob,
×
2205
                        }
×
2206

2207
                // Handle malformed errors that are part of a blinded route.
2208
                // This case is slightly different, because we expect every
2209
                // relaying node in the blinded portion of the route to send
2210
                // malformed errors. If we're also a relaying node, we're
2211
                // likely going to switch this error out anyway for our own
2212
                // malformed error, but we handle the case here for
2213
                // completeness.
2214
                case lnwire.CodeInvalidBlinding:
3✔
2215
                        failure = &lnwire.FailInvalidBlinding{
3✔
2216
                                OnionSHA256: msg.ShaOnionBlob,
3✔
2217
                        }
3✔
2218

2219
                default:
2✔
2220
                        l.log.Warnf("unexpected failure code received in "+
2✔
2221
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2222

2✔
2223
                        // We don't just pass back the error we received from
2✔
2224
                        // our successor. Otherwise we might report a failure
2✔
2225
                        // that penalizes us more than needed. If the onion that
2✔
2226
                        // we forwarded was correct, the node should have been
2✔
2227
                        // able to send back its own failure. The node did not
2✔
2228
                        // send back its own failure, so we assume there was a
2✔
2229
                        // problem with the onion and report that back. We reuse
2✔
2230
                        // the invalid onion key failure because there is no
2✔
2231
                        // specific error for this case.
2✔
2232
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2233
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2234
                        }
2✔
2235
                }
2236

2237
                // With the error parsed, we'll convert the into it's opaque
2238
                // form.
2239
                var b bytes.Buffer
6✔
2240
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
6✔
2241
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2242
                        return
×
2243
                }
×
2244

2245
                // If remote side have been unable to parse the onion blob we
2246
                // have sent to it, than we should transform the malformed HTLC
2247
                // message to the usual HTLC fail message.
2248
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
6✔
2249
                if err != nil {
6✔
2250
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2251
                                "unable to handle upstream fail HTLC: %v", err)
×
2252
                        return
×
2253
                }
×
2254

2255
        case *lnwire.UpdateFailHTLC:
123✔
2256
                // Verify that the failure reason is at least 256 bytes plus
123✔
2257
                // overhead.
123✔
2258
                const minimumFailReasonLength = lnwire.FailureMessageLength +
123✔
2259
                        2 + 2 + 32
123✔
2260

123✔
2261
                if len(msg.Reason) < minimumFailReasonLength {
124✔
2262
                        // We've received a reason with a non-compliant length.
1✔
2263
                        // Older nodes happily relay back these failures that
1✔
2264
                        // may originate from a node further downstream.
1✔
2265
                        // Therefore we can't just fail the channel.
1✔
2266
                        //
1✔
2267
                        // We want to be compliant ourselves, so we also can't
1✔
2268
                        // pass back the reason unmodified. And we must make
1✔
2269
                        // sure that we don't hit the magic length check of 260
1✔
2270
                        // bytes in processRemoteSettleFails either.
1✔
2271
                        //
1✔
2272
                        // Because the reason is unreadable for the payer
1✔
2273
                        // anyway, we just replace it by a compliant-length
1✔
2274
                        // series of random bytes.
1✔
2275
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2276
                        _, err := crand.Read(msg.Reason[:])
1✔
2277
                        if err != nil {
1✔
2278
                                l.log.Errorf("Random generation error: %v", err)
×
2279

×
2280
                                return
×
2281
                        }
×
2282
                }
2283

2284
                // Add fail to the update log.
2285
                idx := msg.ID
123✔
2286
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
123✔
2287
                if err != nil {
123✔
2288
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2289
                                "unable to handle upstream fail HTLC: %v", err)
×
2290
                        return
×
2291
                }
×
2292

2293
        case *lnwire.CommitSig:
1,186✔
2294
                // Since we may have learned new preimages for the first time,
1,186✔
2295
                // we'll add them to our preimage cache. By doing this, we
1,186✔
2296
                // ensure any contested contracts watched by any on-chain
1,186✔
2297
                // arbitrators can now sweep this HTLC on-chain. We delay
1,186✔
2298
                // committing the preimages until just before accepting the new
1,186✔
2299
                // remote commitment, as afterwards the peer won't resend the
1,186✔
2300
                // Settle messages on the next channel reestablishment. Doing so
1,186✔
2301
                // allows us to more effectively batch this operation, instead
1,186✔
2302
                // of doing a single write per preimage.
1,186✔
2303
                err := l.cfg.PreimageCache.AddPreimages(
1,186✔
2304
                        l.uncommittedPreimages...,
1,186✔
2305
                )
1,186✔
2306
                if err != nil {
1,186✔
2307
                        l.failf(
×
2308
                                LinkFailureError{code: ErrInternalError},
×
2309
                                "unable to add preimages=%v to cache: %v",
×
2310
                                l.uncommittedPreimages, err,
×
2311
                        )
×
2312
                        return
×
2313
                }
×
2314

2315
                // Instead of truncating the slice to conserve memory
2316
                // allocations, we simply set the uncommitted preimage slice to
2317
                // nil so that a new one will be initialized if any more
2318
                // witnesses are discovered. We do this because the maximum size
2319
                // that the slice can occupy is 15KB, and we want to ensure we
2320
                // release that memory back to the runtime.
2321
                l.uncommittedPreimages = nil
1,186✔
2322

1,186✔
2323
                // We just received a new updates to our local commitment
1,186✔
2324
                // chain, validate this new commitment, closing the link if
1,186✔
2325
                // invalid.
1,186✔
2326
                auxSigBlob, err := msg.CustomRecords.Serialize()
1,186✔
2327
                if err != nil {
1,186✔
2328
                        l.failf(
×
2329
                                LinkFailureError{code: ErrInvalidCommitment},
×
2330
                                "unable to serialize custom records: %v", err,
×
2331
                        )
×
2332

×
2333
                        return
×
2334
                }
×
2335
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,186✔
2336
                        CommitSig:  msg.CommitSig,
1,186✔
2337
                        HtlcSigs:   msg.HtlcSigs,
1,186✔
2338
                        PartialSig: msg.PartialSig,
1,186✔
2339
                        AuxSigBlob: auxSigBlob,
1,186✔
2340
                })
1,186✔
2341
                if err != nil {
1,186✔
2342
                        // If we were unable to reconstruct their proposed
×
2343
                        // commitment, then we'll examine the type of error. If
×
2344
                        // it's an InvalidCommitSigError, then we'll send a
×
2345
                        // direct error.
×
2346
                        var sendData []byte
×
2347
                        switch err.(type) {
×
2348
                        case *lnwallet.InvalidCommitSigError:
×
2349
                                sendData = []byte(err.Error())
×
2350
                        case *lnwallet.InvalidHtlcSigError:
×
2351
                                sendData = []byte(err.Error())
×
2352
                        }
2353
                        l.failf(
×
2354
                                LinkFailureError{
×
2355
                                        code:          ErrInvalidCommitment,
×
2356
                                        FailureAction: LinkFailureForceClose,
×
2357
                                        SendData:      sendData,
×
2358
                                },
×
2359
                                "ChannelPoint(%v): unable to accept new "+
×
2360
                                        "commitment: %v",
×
2361
                                l.channel.ChannelPoint(), err,
×
2362
                        )
×
2363
                        return
×
2364
                }
2365

2366
                // As we've just accepted a new state, we'll now
2367
                // immediately send the remote peer a revocation for our prior
2368
                // state.
2369
                nextRevocation, currentHtlcs, finalHTLCs, err :=
1,186✔
2370
                        l.channel.RevokeCurrentCommitment()
1,186✔
2371
                if err != nil {
1,186✔
2372
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2373

×
2374
                        // We need to fail the channel in case revoking our
×
2375
                        // local commitment does not succeed. We might have
×
2376
                        // already advanced our channel state which would lead
×
2377
                        // us to proceed with an unclean state.
×
2378
                        //
×
2379
                        // NOTE: We do not trigger a force close because this
×
2380
                        // could resolve itself in case our db was just busy
×
2381
                        // not accepting new transactions.
×
2382
                        l.failf(
×
2383
                                LinkFailureError{
×
2384
                                        code:          ErrInternalError,
×
2385
                                        Warning:       true,
×
2386
                                        FailureAction: LinkFailureDisconnect,
×
2387
                                },
×
2388
                                "ChannelPoint(%v): unable to accept new "+
×
2389
                                        "commitment: %v",
×
2390
                                l.channel.ChannelPoint(), err,
×
2391
                        )
×
2392
                        return
×
2393
                }
×
2394

2395
                // As soon as we are ready to send our next revocation, we can
2396
                // invoke the incoming commit hooks.
2397
                l.RWMutex.Lock()
1,186✔
2398
                l.incomingCommitHooks.invoke()
1,186✔
2399
                l.RWMutex.Unlock()
1,186✔
2400

1,186✔
2401
                l.cfg.Peer.SendMessage(false, nextRevocation)
1,186✔
2402

1,186✔
2403
                // Notify the incoming htlcs of which the resolutions were
1,186✔
2404
                // locked in.
1,186✔
2405
                for id, settled := range finalHTLCs {
1,520✔
2406
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
334✔
2407
                                models.CircuitKey{
334✔
2408
                                        ChanID: l.ShortChanID(),
334✔
2409
                                        HtlcID: id,
334✔
2410
                                },
334✔
2411
                                channeldb.FinalHtlcInfo{
334✔
2412
                                        Settled:  settled,
334✔
2413
                                        Offchain: true,
334✔
2414
                                },
334✔
2415
                        )
334✔
2416
                }
334✔
2417

2418
                // Since we just revoked our commitment, we may have a new set
2419
                // of HTLC's on our commitment, so we'll send them using our
2420
                // function closure NotifyContractUpdate.
2421
                newUpdate := &contractcourt.ContractUpdate{
1,186✔
2422
                        HtlcKey: contractcourt.LocalHtlcSet,
1,186✔
2423
                        Htlcs:   currentHtlcs,
1,186✔
2424
                }
1,186✔
2425
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,186✔
2426
                if err != nil {
1,186✔
2427
                        l.log.Errorf("unable to notify contract update: %v",
×
2428
                                err)
×
2429
                        return
×
2430
                }
×
2431

2432
                select {
1,186✔
2433
                case <-l.cg.Done():
×
2434
                        return
×
2435
                default:
1,186✔
2436
                }
2437

2438
                // If the remote party initiated the state transition,
2439
                // we'll reply with a signature to provide them with their
2440
                // version of the latest commitment. Otherwise, both commitment
2441
                // chains are fully synced from our PoV, then we don't need to
2442
                // reply with a signature as both sides already have a
2443
                // commitment with the latest accepted.
2444
                if l.channel.OweCommitment() {
1,858✔
2445
                        if !l.updateCommitTxOrFail(ctx) {
672✔
2446
                                return
×
2447
                        }
×
2448
                }
2449

2450
                // If we need to send out an Stfu, this would be the time to do
2451
                // so.
2452
                if l.noDanglingUpdates(lntypes.Local) {
2,231✔
2453
                        err = l.quiescer.SendOwedStfu()
1,045✔
2454
                        if err != nil {
1,045✔
2455
                                l.stfuFailf("sendOwedStfu: %v", err.Error())
×
2456
                        }
×
2457
                }
2458

2459
                // Now that we have finished processing the incoming CommitSig
2460
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2461
                // the channel state is clean.
2462
                l.RWMutex.Lock()
1,186✔
2463
                if l.channel.IsChannelClean() {
1,357✔
2464
                        l.flushHooks.invoke()
171✔
2465
                }
171✔
2466
                l.RWMutex.Unlock()
1,186✔
2467

2468
        case *lnwire.RevokeAndAck:
1,175✔
2469
                // We've received a revocation from the remote chain, if valid,
1,175✔
2470
                // this moves the remote chain forward, and expands our
1,175✔
2471
                // revocation window.
1,175✔
2472

1,175✔
2473
                // We now process the message and advance our remote commit
1,175✔
2474
                // chain.
1,175✔
2475
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,175✔
2476
                if err != nil {
1,175✔
2477
                        // TODO(halseth): force close?
×
2478
                        l.failf(
×
2479
                                LinkFailureError{
×
2480
                                        code:          ErrInvalidRevocation,
×
2481
                                        FailureAction: LinkFailureDisconnect,
×
2482
                                },
×
2483
                                "unable to accept revocation: %v", err,
×
2484
                        )
×
2485
                        return
×
2486
                }
×
2487

2488
                // The remote party now has a new primary commitment, so we'll
2489
                // update the contract court to be aware of this new set (the
2490
                // prior old remote pending).
2491
                newUpdate := &contractcourt.ContractUpdate{
1,175✔
2492
                        HtlcKey: contractcourt.RemoteHtlcSet,
1,175✔
2493
                        Htlcs:   remoteHTLCs,
1,175✔
2494
                }
1,175✔
2495
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,175✔
2496
                if err != nil {
1,175✔
2497
                        l.log.Errorf("unable to notify contract update: %v",
×
2498
                                err)
×
2499
                        return
×
2500
                }
×
2501

2502
                select {
1,175✔
2503
                case <-l.cg.Done():
2✔
2504
                        return
2✔
2505
                default:
1,173✔
2506
                }
2507

2508
                // If we have a tower client for this channel type, we'll
2509
                // create a backup for the current state.
2510
                if l.cfg.TowerClient != nil {
1,176✔
2511
                        state := l.channel.State()
3✔
2512
                        chanID := l.ChanID()
3✔
2513

3✔
2514
                        err = l.cfg.TowerClient.BackupState(
3✔
2515
                                &chanID, state.RemoteCommitment.CommitHeight-1,
3✔
2516
                        )
3✔
2517
                        if err != nil {
3✔
2518
                                l.failf(LinkFailureError{
×
2519
                                        code: ErrInternalError,
×
2520
                                }, "unable to queue breach backup: %v", err)
×
2521
                                return
×
2522
                        }
×
2523
                }
2524

2525
                // If we can send updates then we can process adds in case we
2526
                // are the exit hop and need to send back resolutions, or in
2527
                // case there are validity issues with the packets. Otherwise
2528
                // we defer the action until resume.
2529
                //
2530
                // We are free to process the settles and fails without this
2531
                // check since processing those can't result in further updates
2532
                // to this channel link.
2533
                if l.quiescer.CanSendUpdates() {
2,345✔
2534
                        l.processRemoteAdds(fwdPkg)
1,172✔
2535
                } else {
1,173✔
2536
                        l.quiescer.OnResume(func() {
1✔
2537
                                l.processRemoteAdds(fwdPkg)
×
2538
                        })
×
2539
                }
2540
                l.processRemoteSettleFails(fwdPkg)
1,173✔
2541

1,173✔
2542
                // If the link failed during processing the adds, we must
1,173✔
2543
                // return to ensure we won't attempted to update the state
1,173✔
2544
                // further.
1,173✔
2545
                if l.failed {
1,173✔
2546
                        return
×
2547
                }
×
2548

2549
                // The revocation window opened up. If there are pending local
2550
                // updates, try to update the commit tx. Pending updates could
2551
                // already have been present because of a previously failed
2552
                // update to the commit tx or freshly added in by
2553
                // processRemoteAdds. Also in case there are no local updates,
2554
                // but there are still remote updates that are not in the remote
2555
                // commit tx yet, send out an update.
2556
                if l.channel.OweCommitment() {
1,478✔
2557
                        if !l.updateCommitTxOrFail(ctx) {
312✔
2558
                                return
7✔
2559
                        }
7✔
2560
                }
2561

2562
                // Now that we have finished processing the RevokeAndAck, we
2563
                // can invoke the flushHooks if the channel state is clean.
2564
                l.RWMutex.Lock()
1,166✔
2565
                if l.channel.IsChannelClean() {
1,328✔
2566
                        l.flushHooks.invoke()
162✔
2567
                }
162✔
2568
                l.RWMutex.Unlock()
1,166✔
2569

2570
        case *lnwire.UpdateFee:
3✔
2571
                // Check and see if their proposed fee-rate would make us
3✔
2572
                // exceed the fee threshold.
3✔
2573
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2574

3✔
2575
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2576
                if err != nil {
3✔
2577
                        // This shouldn't typically happen. If it does, it
×
2578
                        // indicates something is wrong with our channel state.
×
2579
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2580
                                "exceeded")
×
2581
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2582
                                "error calculating fee exposure: %v", err)
×
2583

×
2584
                        return
×
2585
                }
×
2586

2587
                if isDust {
3✔
2588
                        // The proposed fee-rate makes us exceed the fee
×
2589
                        // threshold.
×
2590
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2591
                                "fee threshold exceeded: %v", err)
×
2592
                        return
×
2593
                }
×
2594

2595
                // We received fee update from peer. If we are the initiator we
2596
                // will fail the channel, if not we will apply the update.
2597
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2598
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2599
                                "error receiving fee update: %v", err)
×
2600
                        return
×
2601
                }
×
2602

2603
                // Update the mailbox's feerate as well.
2604
                l.mailBox.SetFeeRate(fee)
3✔
2605

2606
        case *lnwire.Stfu:
5✔
2607
                err := l.handleStfu(msg)
5✔
2608
                if err != nil {
5✔
2609
                        l.stfuFailf("handleStfu: %v", err.Error())
×
2610
                }
×
2611

2612
        // In the case where we receive a warning message from our peer, just
2613
        // log it and move on. We choose not to disconnect from our peer,
2614
        // although we "MAY" do so according to the specification.
2615
        case *lnwire.Warning:
1✔
2616
                l.log.Warnf("received warning message from peer: %v",
1✔
2617
                        msg.Warning())
1✔
2618

2619
        case *lnwire.Error:
2✔
2620
                // Error received from remote, MUST fail channel, but should
2✔
2621
                // only print the contents of the error message if all
2✔
2622
                // characters are printable ASCII.
2✔
2623
                l.failf(
2✔
2624
                        LinkFailureError{
2✔
2625
                                code: ErrRemoteError,
2✔
2626

2✔
2627
                                // TODO(halseth): we currently don't fail the
2✔
2628
                                // channel permanently, as there are some sync
2✔
2629
                                // issues with other implementations that will
2✔
2630
                                // lead to them sending an error message, but
2✔
2631
                                // we can recover from on next connection. See
2✔
2632
                                // https://github.com/ElementsProject/lightning/issues/4212
2✔
2633
                                PermanentFailure: false,
2✔
2634
                        },
2✔
2635
                        "ChannelPoint(%v): received error from peer: %v",
2✔
2636
                        l.channel.ChannelPoint(), msg.Error(),
2✔
2637
                )
2✔
2638
        default:
×
2639
                l.log.Warnf("received unknown message of type %T", msg)
×
2640
        }
2641

2642
}
2643

2644
// handleStfu implements the top-level logic for handling the Stfu message from
2645
// our peer.
2646
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
5✔
2647
        if !l.noDanglingUpdates(lntypes.Remote) {
5✔
2648
                return ErrPendingRemoteUpdates
×
2649
        }
×
2650
        err := l.quiescer.RecvStfu(*stfu)
5✔
2651
        if err != nil {
5✔
2652
                return err
×
2653
        }
×
2654

2655
        // If we can immediately send an Stfu response back, we will.
2656
        if l.noDanglingUpdates(lntypes.Local) {
9✔
2657
                return l.quiescer.SendOwedStfu()
4✔
2658
        }
4✔
2659

2660
        return nil
1✔
2661
}
2662

2663
// stfuFailf fails the link in the case where the requirements of the quiescence
2664
// protocol are violated. In all cases we opt to drop the connection as only
2665
// link state (as opposed to channel state) is affected.
2666
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
2667
        l.failf(LinkFailureError{
×
2668
                code:             ErrStfuViolation,
×
2669
                FailureAction:    LinkFailureDisconnect,
×
2670
                PermanentFailure: false,
×
2671
                Warning:          true,
×
2672
        }, format, args...)
×
2673
}
×
2674

2675
// noDanglingUpdates returns true when there are 0 updates that were originally
2676
// issued by whose on either the Local or Remote commitment transaction.
2677
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,191✔
2678
        pendingOnLocal := l.channel.NumPendingUpdates(
1,191✔
2679
                whose, lntypes.Local,
1,191✔
2680
        )
1,191✔
2681
        pendingOnRemote := l.channel.NumPendingUpdates(
1,191✔
2682
                whose, lntypes.Remote,
1,191✔
2683
        )
1,191✔
2684

1,191✔
2685
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,191✔
2686
}
1,191✔
2687

2688
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2689
// for packets delivered from server, and cleaning up any circuits closed by
2690
// signing a previous commitment txn. This method ensures that the circuits are
2691
// removed from the circuit map before removing them from the link's mailbox,
2692
// otherwise it could be possible for some circuit to be missed if this link
2693
// flaps.
2694
func (l *channelLink) ackDownStreamPackets() error {
1,365✔
2695
        // First, remove the downstream Add packets that were included in the
1,365✔
2696
        // previous commitment signature. This will prevent the Adds from being
1,365✔
2697
        // replayed if this link disconnects.
1,365✔
2698
        for _, inKey := range l.openedCircuits {
1,832✔
2699
                // In order to test the sphinx replay logic of the remote
467✔
2700
                // party, unsafe replay does not acknowledge the packets from
467✔
2701
                // the mailbox. We can then force a replay of any Add packets
467✔
2702
                // held in memory by disconnecting and reconnecting the link.
467✔
2703
                if l.cfg.UnsafeReplay {
470✔
2704
                        continue
3✔
2705
                }
2706

2707
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
467✔
2708
                l.mailBox.AckPacket(inKey)
467✔
2709
        }
2710

2711
        // Now, we will delete all circuits closed by the previous commitment
2712
        // signature, which is the result of downstream Settle/Fail packets. We
2713
        // batch them here to ensure circuits are closed atomically and for
2714
        // performance.
2715
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,365✔
2716
        switch err {
1,365✔
2717
        case nil:
1,365✔
2718
                // Successful deletion.
2719

2720
        default:
×
2721
                l.log.Errorf("unable to delete %d circuits: %v",
×
2722
                        len(l.closedCircuits), err)
×
2723
                return err
×
2724
        }
2725

2726
        // With the circuits removed from memory and disk, we now ack any
2727
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2728
        // after startup. If forgive is enabled and we've reached this point,
2729
        // the circuits must have been removed at some point, so it is now safe
2730
        // to un-queue the corresponding Settle/Fails.
2731
        for _, inKey := range l.closedCircuits {
1,407✔
2732
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
42✔
2733
                        inKey)
42✔
2734
                l.mailBox.AckPacket(inKey)
42✔
2735
        }
42✔
2736

2737
        // Lastly, reset our buffers to be empty while keeping any acquired
2738
        // growth in the backing array.
2739
        l.openedCircuits = l.openedCircuits[:0]
1,365✔
2740
        l.closedCircuits = l.closedCircuits[:0]
1,365✔
2741

1,365✔
2742
        return nil
1,365✔
2743
}
2744

2745
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2746
// the link.
2747
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1,233✔
2748
        err := l.updateCommitTx(ctx)
1,233✔
2749
        switch err {
1,233✔
2750
        // No error encountered, success.
2751
        case nil:
1,223✔
2752

2753
        // A duplicate keystone error should be resolved and is not fatal, so
2754
        // we won't send an Error message to the peer.
2755
        case ErrDuplicateKeystone:
×
2756
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2757
                        "temporary circuit error: %v", err)
×
2758
                return false
×
2759

2760
        // Any other error is treated results in an Error message being sent to
2761
        // the peer.
2762
        default:
10✔
2763
                l.failf(LinkFailureError{code: ErrInternalError},
10✔
2764
                        "unable to update commitment: %v", err)
10✔
2765
                return false
10✔
2766
        }
2767

2768
        return true
1,223✔
2769
}
2770

2771
// updateCommitTx signs, then sends an update to the remote peer adding a new
2772
// commitment to their commitment chain which includes all the latest updates
2773
// we've received+processed up to this point.
2774
func (l *channelLink) updateCommitTx(ctx context.Context) error {
1,291✔
2775
        // Preemptively write all pending keystones to disk, just in case the
1,291✔
2776
        // HTLCs we have in memory are included in the subsequent attempt to
1,291✔
2777
        // sign a commitment state.
1,291✔
2778
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,291✔
2779
        if err != nil {
1,291✔
2780
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2781
                // it.
×
2782
                return err
×
2783
        }
×
2784

2785
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2786
        l.keystoneBatch = l.keystoneBatch[:0]
1,291✔
2787

1,291✔
2788
        // If hodl.Commit mode is active, we will refrain from attempting to
1,291✔
2789
        // commit any in-memory modifications to the channel state. Exiting here
1,291✔
2790
        // permits testing of either the switch or link's ability to trim
1,291✔
2791
        // circuits that have been opened, but unsuccessfully committed.
1,291✔
2792
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,298✔
2793
                l.log.Warnf(hodl.Commit.Warning())
7✔
2794
                return nil
7✔
2795
        }
7✔
2796

2797
        ctx, done := l.cg.Create(ctx)
1,287✔
2798
        defer done()
1,287✔
2799

1,287✔
2800
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,287✔
2801
        if err == lnwallet.ErrNoWindow {
1,382✔
2802
                l.cfg.PendingCommitTicker.Resume()
95✔
2803
                l.log.Trace("PendingCommitTicker resumed")
95✔
2804

95✔
2805
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
95✔
2806
                l.log.Tracef("revocation window exhausted, unable to send: "+
95✔
2807
                        "%v, pend_updates=%v, dangling_closes%v", n,
95✔
2808
                        lnutils.SpewLogClosure(l.openedCircuits),
95✔
2809
                        lnutils.SpewLogClosure(l.closedCircuits))
95✔
2810

95✔
2811
                return nil
95✔
2812
        } else if err != nil {
1,290✔
2813
                return err
×
2814
        }
×
2815

2816
        if err := l.ackDownStreamPackets(); err != nil {
1,195✔
2817
                return err
×
2818
        }
×
2819

2820
        l.cfg.PendingCommitTicker.Pause()
1,195✔
2821
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,195✔
2822

1,195✔
2823
        // The remote party now has a new pending commitment, so we'll update
1,195✔
2824
        // the contract court to be aware of this new set (the prior old remote
1,195✔
2825
        // pending).
1,195✔
2826
        newUpdate := &contractcourt.ContractUpdate{
1,195✔
2827
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,195✔
2828
                Htlcs:   newCommit.PendingHTLCs,
1,195✔
2829
        }
1,195✔
2830
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,195✔
2831
        if err != nil {
1,195✔
2832
                l.log.Errorf("unable to notify contract update: %v", err)
×
2833
                return err
×
2834
        }
×
2835

2836
        select {
1,195✔
2837
        case <-l.cg.Done():
11✔
2838
                return ErrLinkShuttingDown
11✔
2839
        default:
1,184✔
2840
        }
2841

2842
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,184✔
2843
        if err != nil {
1,184✔
2844
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2845
        }
×
2846

2847
        commitSig := &lnwire.CommitSig{
1,184✔
2848
                ChanID:        l.ChanID(),
1,184✔
2849
                CommitSig:     newCommit.CommitSig,
1,184✔
2850
                HtlcSigs:      newCommit.HtlcSigs,
1,184✔
2851
                PartialSig:    newCommit.PartialSig,
1,184✔
2852
                CustomRecords: auxBlobRecords,
1,184✔
2853
        }
1,184✔
2854
        l.cfg.Peer.SendMessage(false, commitSig)
1,184✔
2855

1,184✔
2856
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
1,184✔
2857
        // of commit hooks.
1,184✔
2858
        l.RWMutex.Lock()
1,184✔
2859
        l.outgoingCommitHooks.invoke()
1,184✔
2860
        l.RWMutex.Unlock()
1,184✔
2861

1,184✔
2862
        return nil
1,184✔
2863
}
2864

2865
// Peer returns the representation of remote peer with which we have the
2866
// channel link opened.
2867
//
2868
// NOTE: Part of the ChannelLink interface.
2869
func (l *channelLink) PeerPubKey() [33]byte {
444✔
2870
        return l.cfg.Peer.PubKey()
444✔
2871
}
444✔
2872

2873
// ChannelPoint returns the channel outpoint for the channel link.
2874
// NOTE: Part of the ChannelLink interface.
2875
func (l *channelLink) ChannelPoint() wire.OutPoint {
855✔
2876
        return l.channel.ChannelPoint()
855✔
2877
}
855✔
2878

2879
// ShortChanID returns the short channel ID for the channel link. The short
2880
// channel ID encodes the exact location in the main chain that the original
2881
// funding output can be found.
2882
//
2883
// NOTE: Part of the ChannelLink interface.
2884
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,251✔
2885
        l.RLock()
4,251✔
2886
        defer l.RUnlock()
4,251✔
2887

4,251✔
2888
        return l.channel.ShortChanID()
4,251✔
2889
}
4,251✔
2890

2891
// UpdateShortChanID updates the short channel ID for a link. This may be
2892
// required in the event that a link is created before the short chan ID for it
2893
// is known, or a re-org occurs, and the funding transaction changes location
2894
// within the chain.
2895
//
2896
// NOTE: Part of the ChannelLink interface.
2897
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
3✔
2898
        chanID := l.ChanID()
3✔
2899

3✔
2900
        // Refresh the channel state's short channel ID by loading it from disk.
3✔
2901
        // This ensures that the channel state accurately reflects the updated
3✔
2902
        // short channel ID.
3✔
2903
        err := l.channel.State().Refresh()
3✔
2904
        if err != nil {
3✔
2905
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2906
                        "%v", chanID, err)
×
2907
                return hop.Source, err
×
2908
        }
×
2909

2910
        return hop.Source, nil
3✔
2911
}
2912

2913
// ChanID returns the channel ID for the channel link. The channel ID is a more
2914
// compact representation of a channel's full outpoint.
2915
//
2916
// NOTE: Part of the ChannelLink interface.
2917
func (l *channelLink) ChanID() lnwire.ChannelID {
3,924✔
2918
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,924✔
2919
}
3,924✔
2920

2921
// Bandwidth returns the total amount that can flow through the channel link at
2922
// this given instance. The value returned is expressed in millisatoshi and can
2923
// be used by callers when making forwarding decisions to determine if a link
2924
// can accept an HTLC.
2925
//
2926
// NOTE: Part of the ChannelLink interface.
2927
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
814✔
2928
        // Get the balance available on the channel for new HTLCs. This takes
814✔
2929
        // the channel reserve into account so HTLCs up to this value won't
814✔
2930
        // violate it.
814✔
2931
        return l.channel.AvailableBalance()
814✔
2932
}
814✔
2933

2934
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2935
// amount provided to the link. This check does not reserve a space, since
2936
// forwards or other payments may use the available slot, so it should be
2937
// considered best-effort.
2938
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
3✔
2939
        return l.channel.MayAddOutgoingHtlc(amt)
3✔
2940
}
3✔
2941

2942
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2943
// method.
2944
//
2945
// NOTE: Part of the dustHandler interface.
2946
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2947
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,526✔
2948

2,526✔
2949
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,526✔
2950
}
2,526✔
2951

2952
// getFeeRate is a wrapper method that retrieves the underlying channel's
2953
// feerate.
2954
//
2955
// NOTE: Part of the dustHandler interface.
2956
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
672✔
2957
        return l.channel.CommitFeeRate()
672✔
2958
}
672✔
2959

2960
// getDustClosure returns a closure that can be used by the switch or mailbox
2961
// to evaluate whether a given HTLC is dust.
2962
//
2963
// NOTE: Part of the dustHandler interface.
2964
func (l *channelLink) getDustClosure() dustClosure {
1,602✔
2965
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,602✔
2966
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,602✔
2967
        chanType := l.channel.State().ChanType
1,602✔
2968

1,602✔
2969
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,602✔
2970
}
1,602✔
2971

2972
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2973
// is used so that the Switch can have access to the commitment fee without
2974
// needing to have a *LightningChannel. This doesn't include dust.
2975
//
2976
// NOTE: Part of the dustHandler interface.
2977
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,867✔
2978
        if remote {
2,804✔
2979
                return l.channel.State().RemoteCommitment.CommitFee
937✔
2980
        }
937✔
2981

2982
        return l.channel.State().LocalCommitment.CommitFee
933✔
2983
}
2984

2985
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2986
// increases the total dust and fees within the channel past the configured
2987
// fee threshold. It first calculates the dust sum over every update in the
2988
// update log with the proposed fee-rate and taking into account both the local
2989
// and remote dust limits. It uses every update in the update log instead of
2990
// what is actually on the local and remote commitments because it is assumed
2991
// that in a worst-case scenario, every update in the update log could
2992
// theoretically be on either commitment transaction and this needs to be
2993
// accounted for with this fee-rate. It then calculates the local and remote
2994
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2995
// and determines if the fee threshold has been exceeded.
2996
func (l *channelLink) exceedsFeeExposureLimit(
2997
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2998

6✔
2999
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
3000

6✔
3001
        // Get the sum of dust for both the local and remote commitments using
6✔
3002
        // this "dry-run" fee.
6✔
3003
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
3004
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
3005

6✔
3006
        // Calculate the local and remote commitment fees using this dry-run
6✔
3007
        // fee.
6✔
3008
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
3009
        if err != nil {
6✔
3010
                return false, err
×
3011
        }
×
3012

3013
        // Finally, check whether the max fee exposure was exceeded on either
3014
        // future commitment transaction with the fee-rate.
3015
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
3016
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
3017
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3018
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
3019
                        totalLocalDust, localFee)
×
3020

×
3021
                return true, nil
×
3022
        }
×
3023

3024
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
3025
                remoteFee,
6✔
3026
        )
6✔
3027

6✔
3028
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
3029
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3030
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
3031
                        totalRemoteDust, remoteFee)
×
3032

×
3033
                return true, nil
×
3034
        }
×
3035

3036
        return false, nil
6✔
3037
}
3038

3039
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
3040
// channel exceed the fee threshold. It first fetches the largest fee-rate that
3041
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3042
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3043
// the overall dust sum. If it is not dust, it contributes to weight, which
3044
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3045
// either commitment exceeds the configured fee threshold, this function
3046
// returns true.
3047
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3048
        incoming bool) bool {
933✔
3049

933✔
3050
        dustClosure := l.getDustClosure()
933✔
3051

933✔
3052
        feeRate := l.channel.WorstCaseFeeRate()
933✔
3053

933✔
3054
        amount := htlc.Amount.ToSatoshis()
933✔
3055

933✔
3056
        // See if this HTLC is dust on both the local and remote commitments.
933✔
3057
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
933✔
3058
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
933✔
3059

933✔
3060
        // Calculate the dust sum for the local and remote commitments.
933✔
3061
        localDustSum := l.getDustSum(
933✔
3062
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
933✔
3063
        )
933✔
3064
        remoteDustSum := l.getDustSum(
933✔
3065
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
933✔
3066
        )
933✔
3067

933✔
3068
        // Grab the larger of the local and remote commitment fees w/o dust.
933✔
3069
        commitFee := l.getCommitFee(false)
933✔
3070

933✔
3071
        if l.getCommitFee(true) > commitFee {
940✔
3072
                commitFee = l.getCommitFee(true)
7✔
3073
        }
7✔
3074

3075
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
933✔
3076

933✔
3077
        localDustSum += commitFeeMSat
933✔
3078
        remoteDustSum += commitFeeMSat
933✔
3079

933✔
3080
        // Calculate the additional fee increase if this is a non-dust HTLC.
933✔
3081
        weight := lntypes.WeightUnit(input.HTLCWeight)
933✔
3082
        additional := lnwire.NewMSatFromSatoshis(
933✔
3083
                feeRate.FeeForWeight(weight),
933✔
3084
        )
933✔
3085

933✔
3086
        if isLocalDust {
1,569✔
3087
                // If this is dust, it doesn't contribute to weight but does
636✔
3088
                // contribute to the overall dust sum.
636✔
3089
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
636✔
3090
        } else {
936✔
3091
                // Account for the fee increase that comes with an increase in
300✔
3092
                // weight.
300✔
3093
                localDustSum += additional
300✔
3094
        }
300✔
3095

3096
        if localDustSum > l.cfg.MaxFeeExposure {
937✔
3097
                // The max fee exposure was exceeded.
4✔
3098
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
3099
                        "overexposed, total local dust: %v (current commit "+
4✔
3100
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
3101

4✔
3102
                return true
4✔
3103
        }
4✔
3104

3105
        if isRemoteDust {
1,562✔
3106
                // If this is dust, it doesn't contribute to weight but does
633✔
3107
                // contribute to the overall dust sum.
633✔
3108
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
633✔
3109
        } else {
932✔
3110
                // Account for the fee increase that comes with an increase in
299✔
3111
                // weight.
299✔
3112
                remoteDustSum += additional
299✔
3113
        }
299✔
3114

3115
        if remoteDustSum > l.cfg.MaxFeeExposure {
929✔
3116
                // The max fee exposure was exceeded.
×
3117
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
3118
                        "overexposed, total remote dust: %v (current commit "+
×
3119
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
3120

×
3121
                return true
×
3122
        }
×
3123

3124
        return false
929✔
3125
}
3126

3127
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3128
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3129
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3130
// whether to evaluate on the local or remote commit, and finally an HTLC
3131
// amount to test.
3132
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3133
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3134

3135
// dustHelper is used to construct the dustClosure.
3136
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3137
        remoteDustLimit btcutil.Amount) dustClosure {
1,802✔
3138

1,802✔
3139
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,802✔
3140
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,257✔
3141

9,455✔
3142
                var dustLimit btcutil.Amount
9,455✔
3143
                if whoseCommit.IsLocal() {
14,184✔
3144
                        dustLimit = localDustLimit
4,729✔
3145
                } else {
9,458✔
3146
                        dustLimit = remoteDustLimit
4,729✔
3147
                }
4,729✔
3148

3149
                return lnwallet.HtlcIsDust(
9,455✔
3150
                        chantype, incoming, whoseCommit, feerate, amt,
9,455✔
3151
                        dustLimit,
9,455✔
3152
                )
9,455✔
3153
        }
3154

3155
        return isDust
1,802✔
3156
}
3157

3158
// zeroConfConfirmed returns whether or not the zero-conf channel has
3159
// confirmed on-chain.
3160
//
3161
// Part of the scidAliasHandler interface.
3162
func (l *channelLink) zeroConfConfirmed() bool {
6✔
3163
        return l.channel.State().ZeroConfConfirmed()
6✔
3164
}
6✔
3165

3166
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3167
// should not be called for non-zero-conf channels.
3168
//
3169
// Part of the scidAliasHandler interface.
3170
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
6✔
3171
        return l.channel.State().ZeroConfRealScid()
6✔
3172
}
6✔
3173

3174
// isZeroConf returns whether or not the underlying channel is a zero-conf
3175
// channel.
3176
//
3177
// Part of the scidAliasHandler interface.
3178
func (l *channelLink) isZeroConf() bool {
216✔
3179
        return l.channel.State().IsZeroConf()
216✔
3180
}
216✔
3181

3182
// negotiatedAliasFeature returns whether or not the underlying channel has
3183
// negotiated the option-scid-alias feature bit. This will be true for both
3184
// option-scid-alias and zero-conf channel-types. It will also be true for
3185
// channels with the feature bit but without the above channel-types.
3186
//
3187
// Part of the scidAliasFeature interface.
3188
func (l *channelLink) negotiatedAliasFeature() bool {
377✔
3189
        return l.channel.State().NegotiatedAliasFeature()
377✔
3190
}
377✔
3191

3192
// getAliases returns the set of aliases for the underlying channel.
3193
//
3194
// Part of the scidAliasHandler interface.
3195
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
222✔
3196
        return l.cfg.GetAliases(l.ShortChanID())
222✔
3197
}
222✔
3198

3199
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3200
//
3201
// Part of the scidAliasHandler interface.
3202
func (l *channelLink) attachFailAliasUpdate(closure func(
3203
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
217✔
3204

217✔
3205
        l.Lock()
217✔
3206
        l.cfg.FailAliasUpdate = closure
217✔
3207
        l.Unlock()
217✔
3208
}
217✔
3209

3210
// AttachMailBox updates the current mailbox used by this link, and hooks up
3211
// the mailbox's message and packet outboxes to the link's upstream and
3212
// downstream chans, respectively.
3213
func (l *channelLink) AttachMailBox(mailbox MailBox) {
216✔
3214
        l.Lock()
216✔
3215
        l.mailBox = mailbox
216✔
3216
        l.upstream = mailbox.MessageOutBox()
216✔
3217
        l.downstream = mailbox.PacketOutBox()
216✔
3218
        l.Unlock()
216✔
3219

216✔
3220
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
216✔
3221
        // never committed.
216✔
3222
        l.mailBox.SetFeeRate(l.getFeeRate())
216✔
3223

216✔
3224
        // Also set the mailbox's dust closure so that it can query whether HTLC's
216✔
3225
        // are dust given the current feerate.
216✔
3226
        l.mailBox.SetDustClosure(l.getDustClosure())
216✔
3227
}
216✔
3228

3229
// UpdateForwardingPolicy updates the forwarding policy for the target
3230
// ChannelLink. Once updated, the link will use the new forwarding policy to
3231
// govern if it an incoming HTLC should be forwarded or not. We assume that
3232
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3233
// update all of the link's FwrdingPolicy's values.
3234
//
3235
// NOTE: Part of the ChannelLink interface.
3236
func (l *channelLink) UpdateForwardingPolicy(
3237
        newPolicy models.ForwardingPolicy) {
15✔
3238

15✔
3239
        l.Lock()
15✔
3240
        defer l.Unlock()
15✔
3241

15✔
3242
        l.cfg.FwrdingPolicy = newPolicy
15✔
3243
}
15✔
3244

3245
// CheckHtlcForward should return a nil error if the passed HTLC details
3246
// satisfy the current forwarding policy fo the target link. Otherwise,
3247
// a LinkError with a valid protocol failure message should be returned
3248
// in order to signal to the source of the HTLC, the policy consistency
3249
// issue.
3250
//
3251
// NOTE: Part of the ChannelLink interface.
3252
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
3253
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
3254
        outgoingTimeout uint32, inboundFee models.InboundFee,
3255
        heightNow uint32, originalScid lnwire.ShortChannelID,
3256
        customRecords lnwire.CustomRecords) *LinkError {
52✔
3257

52✔
3258
        l.RLock()
52✔
3259
        policy := l.cfg.FwrdingPolicy
52✔
3260
        l.RUnlock()
52✔
3261

52✔
3262
        // Using the outgoing HTLC amount, we'll calculate the outgoing
52✔
3263
        // fee this incoming HTLC must carry in order to satisfy the constraints
52✔
3264
        // of the outgoing link.
52✔
3265
        outFee := ExpectedFee(policy, amtToForward)
52✔
3266

52✔
3267
        // Then calculate the inbound fee that we charge based on the sum of
52✔
3268
        // outgoing HTLC amount and outgoing fee.
52✔
3269
        inFee := inboundFee.CalcFee(amtToForward + outFee)
52✔
3270

52✔
3271
        // Add up both fee components. It is important to calculate both fees
52✔
3272
        // separately. An alternative way of calculating is to first determine
52✔
3273
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
52✔
3274
        // rounding may cause the result to be slightly higher than in the case
52✔
3275
        // of separately rounded fee components. This potentially causes failed
52✔
3276
        // forwards for senders and is something to be avoided.
52✔
3277
        expectedFee := inFee + int64(outFee)
52✔
3278

52✔
3279
        // If the actual fee is less than our expected fee, then we'll reject
52✔
3280
        // this HTLC as it didn't provide a sufficient amount of fees, or the
52✔
3281
        // values have been tampered with, or the send used incorrect/dated
52✔
3282
        // information to construct the forwarding information for this hop. In
52✔
3283
        // any case, we'll cancel this HTLC.
52✔
3284
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
52✔
3285
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
61✔
3286
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
9✔
3287
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
9✔
3288
                        "inboundFee=%v",
9✔
3289
                        payHash[:], expectedFee, actualFee,
9✔
3290
                        incomingHtlcAmt, amtToForward, inboundFee,
9✔
3291
                )
9✔
3292

9✔
3293
                // As part of the returned error, we'll send our latest routing
9✔
3294
                // policy so the sending node obtains the most up to date data.
9✔
3295
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
18✔
3296
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
9✔
3297
                }
9✔
3298
                failure := l.createFailureWithUpdate(false, originalScid, cb)
9✔
3299
                return NewLinkError(failure)
9✔
3300
        }
3301

3302
        // Check whether the outgoing htlc satisfies the channel policy.
3303
        err := l.canSendHtlc(
46✔
3304
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
46✔
3305
                originalScid, customRecords,
46✔
3306
        )
46✔
3307
        if err != nil {
62✔
3308
                return err
16✔
3309
        }
16✔
3310

3311
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3312
        // the following constraint: the incoming time-lock minus our time-lock
3313
        // delta should equal the outgoing time lock. Otherwise, whether the
3314
        // sender messed up, or an intermediate node tampered with the HTLC.
3315
        timeDelta := policy.TimeLockDelta
33✔
3316
        if incomingTimeout < outgoingTimeout+timeDelta {
35✔
3317
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3318
                        "expected at least %v block delta, got %v block delta",
2✔
3319
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3320

2✔
3321
                // Grab the latest routing policy so the sending node is up to
2✔
3322
                // date with our current policy.
2✔
3323
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3324
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3325
                                incomingTimeout, *upd,
2✔
3326
                        )
2✔
3327
                }
2✔
3328
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3329
                return NewLinkError(failure)
2✔
3330
        }
3331

3332
        return nil
31✔
3333
}
3334

3335
// CheckHtlcTransit should return a nil error if the passed HTLC details
3336
// satisfy the current channel policy.  Otherwise, a LinkError with a
3337
// valid protocol failure message should be returned in order to signal
3338
// the violation. This call is intended to be used for locally initiated
3339
// payments for which there is no corresponding incoming htlc.
3340
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3341
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
3342
        customRecords lnwire.CustomRecords) *LinkError {
409✔
3343

409✔
3344
        l.RLock()
409✔
3345
        policy := l.cfg.FwrdingPolicy
409✔
3346
        l.RUnlock()
409✔
3347

409✔
3348
        // We pass in hop.Source here as this is only used in the Switch when
409✔
3349
        // trying to send over a local link. This causes the fallback mechanism
409✔
3350
        // to occur.
409✔
3351
        return l.canSendHtlc(
409✔
3352
                policy, payHash, amt, timeout, heightNow, hop.Source,
409✔
3353
                customRecords,
409✔
3354
        )
409✔
3355
}
409✔
3356

3357
// canSendHtlc checks whether the given htlc parameters satisfy
3358
// the channel's amount and time lock constraints.
3359
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3360
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3361
        heightNow uint32, originalScid lnwire.ShortChannelID,
3362
        customRecords lnwire.CustomRecords) *LinkError {
452✔
3363

452✔
3364
        // As our first sanity check, we'll ensure that the passed HTLC isn't
452✔
3365
        // too small for the next hop. If so, then we'll cancel the HTLC
452✔
3366
        // directly.
452✔
3367
        if amt < policy.MinHTLCOut {
463✔
3368
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
11✔
3369
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
11✔
3370
                        amt)
11✔
3371

11✔
3372
                // As part of the returned error, we'll send our latest routing
11✔
3373
                // policy so the sending node obtains the most up to date data.
11✔
3374
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
22✔
3375
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
11✔
3376
                }
11✔
3377
                failure := l.createFailureWithUpdate(false, originalScid, cb)
11✔
3378
                return NewLinkError(failure)
11✔
3379
        }
3380

3381
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3382
        // cancel the HTLC directly.
3383
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
450✔
3384
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
6✔
3385
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
6✔
3386

6✔
3387
                // As part of the returned error, we'll send our latest routing
6✔
3388
                // policy so the sending node obtains the most up-to-date data.
6✔
3389
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
12✔
3390
                        return lnwire.NewTemporaryChannelFailure(upd)
6✔
3391
                }
6✔
3392
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
3393
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
6✔
3394
        }
3395

3396
        // We want to avoid offering an HTLC which will expire in the near
3397
        // future, so we'll reject an HTLC if the outgoing expiration time is
3398
        // too close to the current height.
3399
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
443✔
3400
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3401
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3402
                        timeout, heightNow)
2✔
3403

2✔
3404
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3405
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3406
                }
2✔
3407
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3408
                return NewLinkError(failure)
2✔
3409
        }
3410

3411
        // Check absolute max delta.
3412
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
440✔
3413
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3414
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3415
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3416

1✔
3417
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3418
        }
1✔
3419

3420
        // We now check the available bandwidth to see if this HTLC can be
3421
        // forwarded.
3422
        availableBandwidth := l.Bandwidth()
438✔
3423
        auxBandwidth, err := fn.MapOptionZ(
438✔
3424
                l.cfg.AuxTrafficShaper,
438✔
3425
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
438✔
3426
                        var htlcBlob fn.Option[tlv.Blob]
×
3427
                        blob, err := customRecords.Serialize()
×
3428
                        if err != nil {
×
3429
                                return fn.Err[OptionalBandwidth](
×
3430
                                        fmt.Errorf("unable to serialize "+
×
3431
                                                "custom records: %w", err))
×
3432
                        }
×
3433

3434
                        if len(blob) > 0 {
×
3435
                                htlcBlob = fn.Some(blob)
×
3436
                        }
×
3437

3438
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
3439
                },
3440
        ).Unpack()
3441
        if err != nil {
438✔
3442
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
3443
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
3444
        }
×
3445

3446
        auxBandwidth.WhenSome(func(bandwidth lnwire.MilliSatoshi) {
438✔
3447
                availableBandwidth = bandwidth
×
3448
        })
×
3449

3450
        // Check to see if there is enough balance in this channel.
3451
        if amt > availableBandwidth {
442✔
3452
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
4✔
3453
                        "larger than %v", amt, availableBandwidth)
4✔
3454
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
8✔
3455
                        return lnwire.NewTemporaryChannelFailure(upd)
4✔
3456
                }
4✔
3457
                failure := l.createFailureWithUpdate(false, originalScid, cb)
4✔
3458
                return NewDetailedLinkError(
4✔
3459
                        failure, OutgoingFailureInsufficientBalance,
4✔
3460
                )
4✔
3461
        }
3462

3463
        return nil
437✔
3464
}
3465

3466
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
3467
// in milli-satoshi. This might be different from the regular BTC bandwidth for
3468
// custom channels. This will always return fn.None() for a regular (non-custom)
3469
// channel.
3470
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
3471
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
3472
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
3473

×
3474
        unknownBandwidth := fn.None[lnwire.MilliSatoshi]()
×
3475

×
3476
        fundingBlob := l.FundingCustomBlob()
×
3477
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob)
×
3478
        if err != nil {
×
3479
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
3480
                        "failed to decide whether to handle traffic: %w", err))
×
3481
        }
×
3482

3483
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
3484
                "traffic: %v", cid, shouldHandle)
×
3485

×
3486
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
3487
        // early.
×
3488
        if !shouldHandle {
×
3489
                return fn.Ok(unknownBandwidth)
×
3490
        }
×
3491

3492
        // Ask for a specific bandwidth to be used for the channel.
3493
        commitmentBlob := l.CommitmentCustomBlob()
×
3494
        auxBandwidth, err := ts.PaymentBandwidth(
×
3495
                htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
3496
        )
×
3497
        if err != nil {
×
3498
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
3499
                        "bandwidth from external traffic shaper: %w", err))
×
3500
        }
×
3501

3502
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
3503
                "bandwidth: %v", cid, auxBandwidth)
×
3504

×
3505
        return fn.Ok(fn.Some(auxBandwidth))
×
3506
}
3507

3508
// Stats returns the statistics of channel link.
3509
//
3510
// NOTE: Part of the ChannelLink interface.
3511
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
7✔
3512
        snapshot := l.channel.StateSnapshot()
7✔
3513

7✔
3514
        return snapshot.ChannelCommitment.CommitHeight,
7✔
3515
                snapshot.TotalMSatSent,
7✔
3516
                snapshot.TotalMSatReceived
7✔
3517
}
7✔
3518

3519
// String returns the string representation of channel link.
3520
//
3521
// NOTE: Part of the ChannelLink interface.
3522
func (l *channelLink) String() string {
×
3523
        return l.channel.ChannelPoint().String()
×
3524
}
×
3525

3526
// handleSwitchPacket handles the switch packets. This packets which might be
3527
// forwarded to us from another channel link in case the htlc update came from
3528
// another peer or if the update was created by user
3529
//
3530
// NOTE: Part of the packetHandler interface.
3531
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
482✔
3532
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
482✔
3533
                pkt.inKey(), pkt.outKey())
482✔
3534

482✔
3535
        return l.mailBox.AddPacket(pkt)
482✔
3536
}
482✔
3537

3538
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3539
// to us from remote peer we have a channel with.
3540
//
3541
// NOTE: Part of the ChannelLink interface.
3542
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,334✔
3543
        select {
3,334✔
3544
        case <-l.cg.Done():
×
3545
                // Return early if the link is already in the process of
×
3546
                // quitting. It doesn't make sense to hand the message to the
×
3547
                // mailbox here.
×
3548
                return
×
3549
        default:
3,334✔
3550
        }
3551

3552
        err := l.mailBox.AddMessage(message)
3,334✔
3553
        if err != nil {
3,334✔
3554
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3555
        }
×
3556
}
3557

3558
// updateChannelFee updates the commitment fee-per-kw on this channel by
3559
// committing to an update_fee message.
3560
func (l *channelLink) updateChannelFee(ctx context.Context,
3561
        feePerKw chainfee.SatPerKWeight) error {
3✔
3562

3✔
3563
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3564

3✔
3565
        // We skip sending the UpdateFee message if the channel is not
3✔
3566
        // currently eligible to forward messages.
3✔
3567
        if !l.eligibleToUpdate() {
3✔
3568
                l.log.Debugf("skipping fee update for inactive channel")
×
3569
                return nil
×
3570
        }
×
3571

3572
        // Check and see if our proposed fee-rate would make us exceed the fee
3573
        // threshold.
3574
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3575
        if err != nil {
3✔
3576
                // This shouldn't typically happen. If it does, it indicates
×
3577
                // something is wrong with our channel state.
×
3578
                return err
×
3579
        }
×
3580

3581
        if thresholdExceeded {
3✔
3582
                return fmt.Errorf("link fee threshold exceeded")
×
3583
        }
×
3584

3585
        // First, we'll update the local fee on our commitment.
3586
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3587
                return err
×
3588
        }
×
3589

3590
        // The fee passed the channel's validation checks, so we update the
3591
        // mailbox feerate.
3592
        l.mailBox.SetFeeRate(feePerKw)
3✔
3593

3✔
3594
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3595
        // in immediately by triggering a commitment update.
3✔
3596
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3597
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3598
                return err
×
3599
        }
×
3600

3601
        return l.updateCommitTx(ctx)
3✔
3602
}
3603

3604
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3605
// after receiving a revocation from the remote party, and reprocesses them in
3606
// the context of the provided forwarding package. Any settles or fails that
3607
// have already been acknowledged in the forwarding package will not be sent to
3608
// the switch.
3609
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,173✔
3610
        if len(fwdPkg.SettleFails) == 0 {
2,032✔
3611
                return
859✔
3612
        }
859✔
3613

3614
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
317✔
3615

317✔
3616
        var switchPackets []*htlcPacket
317✔
3617
        for i, update := range fwdPkg.SettleFails {
634✔
3618
                destRef := fwdPkg.DestRef(uint16(i))
317✔
3619

317✔
3620
                // Skip any settles or fails that have already been
317✔
3621
                // acknowledged by the incoming link that originated the
317✔
3622
                // forwarded Add.
317✔
3623
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
317✔
3624
                        continue
×
3625
                }
3626

3627
                // TODO(roasbeef): rework log entries to a shared
3628
                // interface.
3629

3630
                switch msg := update.UpdateMsg.(type) {
317✔
3631
                // A settle for an HTLC we previously forwarded HTLC has been
3632
                // received. So we'll forward the HTLC to the switch which will
3633
                // handle propagating the settle to the prior hop.
3634
                case *lnwire.UpdateFulfillHTLC:
194✔
3635
                        // If hodl.SettleIncoming is requested, we will not
194✔
3636
                        // forward the SETTLE to the switch and will not signal
194✔
3637
                        // a free slot on the commitment transaction.
194✔
3638
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
194✔
3639
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3640
                                continue
×
3641
                        }
3642

3643
                        settlePacket := &htlcPacket{
194✔
3644
                                outgoingChanID: l.ShortChanID(),
194✔
3645
                                outgoingHTLCID: msg.ID,
194✔
3646
                                destRef:        &destRef,
194✔
3647
                                htlc:           msg,
194✔
3648
                        }
194✔
3649

194✔
3650
                        // Add the packet to the batch to be forwarded, and
194✔
3651
                        // notify the overflow queue that a spare spot has been
194✔
3652
                        // freed up within the commitment state.
194✔
3653
                        switchPackets = append(switchPackets, settlePacket)
194✔
3654

3655
                // A failureCode message for a previously forwarded HTLC has
3656
                // been received. As a result a new slot will be freed up in
3657
                // our commitment state, so we'll forward this to the switch so
3658
                // the backwards undo can continue.
3659
                case *lnwire.UpdateFailHTLC:
126✔
3660
                        // If hodl.SettleIncoming is requested, we will not
126✔
3661
                        // forward the FAIL to the switch and will not signal a
126✔
3662
                        // free slot on the commitment transaction.
126✔
3663
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
126✔
3664
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3665
                                continue
×
3666
                        }
3667

3668
                        // Fetch the reason the HTLC was canceled so we can
3669
                        // continue to propagate it. This failure originated
3670
                        // from another node, so the linkFailure field is not
3671
                        // set on the packet.
3672
                        failPacket := &htlcPacket{
126✔
3673
                                outgoingChanID: l.ShortChanID(),
126✔
3674
                                outgoingHTLCID: msg.ID,
126✔
3675
                                destRef:        &destRef,
126✔
3676
                                htlc:           msg,
126✔
3677
                        }
126✔
3678

126✔
3679
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
126✔
3680

126✔
3681
                        // If the failure message lacks an HMAC (but includes
126✔
3682
                        // the 4 bytes for encoding the message and padding
126✔
3683
                        // lengths, then this means that we received it as an
126✔
3684
                        // UpdateFailMalformedHTLC. As a result, we'll signal
126✔
3685
                        // that we need to convert this error within the switch
126✔
3686
                        // to an actual error, by encrypting it as if we were
126✔
3687
                        // the originating hop.
126✔
3688
                        convertedErrorSize := lnwire.FailureMessageLength + 4
126✔
3689
                        if len(msg.Reason) == convertedErrorSize {
132✔
3690
                                failPacket.convertedError = true
6✔
3691
                        }
6✔
3692

3693
                        // Add the packet to the batch to be forwarded, and
3694
                        // notify the overflow queue that a spare spot has been
3695
                        // freed up within the commitment state.
3696
                        switchPackets = append(switchPackets, failPacket)
126✔
3697
                }
3698
        }
3699

3700
        // Only spawn the task forward packets we have a non-zero number.
3701
        if len(switchPackets) > 0 {
634✔
3702
                go l.forwardBatch(false, switchPackets...)
317✔
3703
        }
317✔
3704
}
3705

3706
// processRemoteAdds serially processes each of the Add payment descriptors
3707
// which have been "locked-in" by receiving a revocation from the remote party.
3708
// The forwarding package provided instructs how to process this batch,
3709
// indicating whether this is the first time these Adds are being processed, or
3710
// whether we are reprocessing as a result of a failure or restart. Adds that
3711
// have already been acknowledged in the forwarding package will be ignored.
3712
//
3713
//nolint:funlen
3714
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,175✔
3715
        l.log.Tracef("processing %d remote adds for height %d",
1,175✔
3716
                len(fwdPkg.Adds), fwdPkg.Height)
1,175✔
3717

1,175✔
3718
        decodeReqs := make(
1,175✔
3719
                []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds),
1,175✔
3720
        )
1,175✔
3721
        for _, update := range fwdPkg.Adds {
1,627✔
3722
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
904✔
3723
                        // Before adding the new htlc to the state machine,
452✔
3724
                        // parse the onion object in order to obtain the
452✔
3725
                        // routing information with DecodeHopIterator function
452✔
3726
                        // which process the Sphinx packet.
452✔
3727
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
452✔
3728

452✔
3729
                        req := hop.DecodeHopIteratorRequest{
452✔
3730
                                OnionReader:    onionReader,
452✔
3731
                                RHash:          msg.PaymentHash[:],
452✔
3732
                                IncomingCltv:   msg.Expiry,
452✔
3733
                                IncomingAmount: msg.Amount,
452✔
3734
                                BlindingPoint:  msg.BlindingPoint,
452✔
3735
                        }
452✔
3736

452✔
3737
                        decodeReqs = append(decodeReqs, req)
452✔
3738
                }
452✔
3739
        }
3740

3741
        // Atomically decode the incoming htlcs, simultaneously checking for
3742
        // replay attempts. A particular index in the returned, spare list of
3743
        // channel iterators should only be used if the failure code at the
3744
        // same index is lnwire.FailCodeNone.
3745
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,175✔
3746
                fwdPkg.ID(), decodeReqs,
1,175✔
3747
        )
1,175✔
3748
        if sphinxErr != nil {
1,175✔
3749
                l.failf(LinkFailureError{code: ErrInternalError},
×
3750
                        "unable to decode hop iterators: %v", sphinxErr)
×
3751
                return
×
3752
        }
×
3753

3754
        var switchPackets []*htlcPacket
1,175✔
3755

1,175✔
3756
        for i, update := range fwdPkg.Adds {
1,627✔
3757
                idx := uint16(i)
452✔
3758

452✔
3759
                //nolint:forcetypeassert
452✔
3760
                add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC)
452✔
3761
                sourceRef := fwdPkg.SourceRef(idx)
452✔
3762

452✔
3763
                if fwdPkg.State == channeldb.FwdStateProcessed &&
452✔
3764
                        fwdPkg.AckFilter.Contains(idx) {
452✔
3765

×
3766
                        // If this index is already found in the ack filter,
×
3767
                        // the response to this forwarding decision has already
×
3768
                        // been committed by one of our commitment txns. ADDs
×
3769
                        // in this state are waiting for the rest of the fwding
×
3770
                        // package to get acked before being garbage collected.
×
3771
                        continue
×
3772
                }
3773

3774
                // An incoming HTLC add has been full-locked in. As a result we
3775
                // can now examine the forwarding details of the HTLC, and the
3776
                // HTLC itself to decide if: we should forward it, cancel it,
3777
                // or are able to settle it (and it adheres to our fee related
3778
                // constraints).
3779

3780
                // Before adding the new htlc to the state machine, parse the
3781
                // onion object in order to obtain the routing information with
3782
                // DecodeHopIterator function which process the Sphinx packet.
3783
                chanIterator, failureCode := decodeResps[i].Result()
452✔
3784
                if failureCode != lnwire.CodeNone {
457✔
3785
                        // If we're unable to process the onion blob then we
5✔
3786
                        // should send the malformed htlc error to payment
5✔
3787
                        // sender.
5✔
3788
                        l.sendMalformedHTLCError(
5✔
3789
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
5✔
3790
                        )
5✔
3791

5✔
3792
                        l.log.Errorf("unable to decode onion hop "+
5✔
3793
                                "iterator: %v", failureCode)
5✔
3794
                        continue
5✔
3795
                }
3796

3797
                heightNow := l.cfg.BestHeight()
450✔
3798

450✔
3799
                pld, routeRole, pldErr := chanIterator.HopPayload()
450✔
3800
                if pldErr != nil {
453✔
3801
                        // If we're unable to process the onion payload, or we
3✔
3802
                        // received invalid onion payload failure, then we
3✔
3803
                        // should send an error back to the caller so the HTLC
3✔
3804
                        // can be canceled.
3✔
3805
                        var failedType uint64
3✔
3806

3✔
3807
                        // We need to get the underlying error value, so we
3✔
3808
                        // can't use errors.As as suggested by the linter.
3✔
3809
                        //nolint:errorlint
3✔
3810
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3✔
3811
                                failedType = uint64(e.Type)
×
3812
                        }
×
3813

3814
                        // If we couldn't parse the payload, make our best
3815
                        // effort at creating an error encrypter that knows
3816
                        // what blinding type we were, but if we couldn't
3817
                        // parse the payload we have no way of knowing whether
3818
                        // we were the introduction node or not.
3819
                        //
3820
                        //nolint:ll
3821
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
3✔
3822
                                l.cfg.ExtractErrorEncrypter,
3✔
3823
                                // We need our route role here because we
3✔
3824
                                // couldn't parse or validate the payload.
3✔
3825
                                routeRole == hop.RouteRoleIntroduction,
3✔
3826
                        )
3✔
3827
                        if failCode != lnwire.CodeNone {
3✔
3828
                                l.log.Errorf("could not extract error "+
×
3829
                                        "encrypter: %v", pldErr)
×
3830

×
3831
                                // We can't process this htlc, send back
×
3832
                                // malformed.
×
3833
                                l.sendMalformedHTLCError(
×
3834
                                        add.ID, failureCode, add.OnionBlob,
×
3835
                                        &sourceRef,
×
3836
                                )
×
3837

×
3838
                                continue
×
3839
                        }
3840

3841
                        // TODO: currently none of the test unit infrastructure
3842
                        // is setup to handle TLV payloads, so testing this
3843
                        // would require implementing a separate mock iterator
3844
                        // for TLV payloads that also supports injecting invalid
3845
                        // payloads. Deferring this non-trival effort till a
3846
                        // later date
3847
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3✔
3848

3✔
3849
                        l.sendHTLCError(
3✔
3850
                                add, sourceRef, NewLinkError(failure),
3✔
3851
                                obfuscator, false,
3✔
3852
                        )
3✔
3853

3✔
3854
                        l.log.Errorf("unable to decode forwarding "+
3✔
3855
                                "instructions: %v", pldErr)
3✔
3856

3✔
3857
                        continue
3✔
3858
                }
3859

3860
                // Retrieve onion obfuscator from onion blob in order to
3861
                // produce initial obfuscation of the onion failureCode.
3862
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
450✔
3863
                        l.cfg.ExtractErrorEncrypter,
450✔
3864
                        routeRole == hop.RouteRoleIntroduction,
450✔
3865
                )
450✔
3866
                if failureCode != lnwire.CodeNone {
451✔
3867
                        // If we're unable to process the onion blob than we
1✔
3868
                        // should send the malformed htlc error to payment
1✔
3869
                        // sender.
1✔
3870
                        l.sendMalformedHTLCError(
1✔
3871
                                add.ID, failureCode, add.OnionBlob,
1✔
3872
                                &sourceRef,
1✔
3873
                        )
1✔
3874

1✔
3875
                        l.log.Errorf("unable to decode onion "+
1✔
3876
                                "obfuscator: %v", failureCode)
1✔
3877

1✔
3878
                        continue
1✔
3879
                }
3880

3881
                fwdInfo := pld.ForwardingInfo()
449✔
3882

449✔
3883
                // Check whether the payload we've just processed uses our
449✔
3884
                // node as the introduction point (gave us a blinding key in
449✔
3885
                // the payload itself) and fail it back if we don't support
449✔
3886
                // route blinding.
449✔
3887
                if fwdInfo.NextBlinding.IsSome() &&
449✔
3888
                        l.cfg.DisallowRouteBlinding {
452✔
3889

3✔
3890
                        failure := lnwire.NewInvalidBlinding(
3✔
3891
                                fn.Some(add.OnionBlob),
3✔
3892
                        )
3✔
3893

3✔
3894
                        l.sendHTLCError(
3✔
3895
                                add, sourceRef, NewLinkError(failure),
3✔
3896
                                obfuscator, false,
3✔
3897
                        )
3✔
3898

3✔
3899
                        l.log.Error("rejected htlc that uses use as an " +
3✔
3900
                                "introduction point when we do not support " +
3✔
3901
                                "route blinding")
3✔
3902

3✔
3903
                        continue
3✔
3904
                }
3905

3906
                switch fwdInfo.NextHop {
449✔
3907
                case hop.Exit:
413✔
3908
                        err := l.processExitHop(
413✔
3909
                                add, sourceRef, obfuscator, fwdInfo,
413✔
3910
                                heightNow, pld,
413✔
3911
                        )
413✔
3912
                        if err != nil {
413✔
3913
                                l.failf(LinkFailureError{
×
3914
                                        code: ErrInternalError,
×
3915
                                }, err.Error()) //nolint
×
3916

×
3917
                                return
×
3918
                        }
×
3919

3920
                // There are additional channels left within this route. So
3921
                // we'll simply do some forwarding package book-keeping.
3922
                default:
39✔
3923
                        // If hodl.AddIncoming is requested, we will not
39✔
3924
                        // validate the forwarded ADD, nor will we send the
39✔
3925
                        // packet to the htlc switch.
39✔
3926
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
39✔
3927
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3928
                                continue
×
3929
                        }
3930

3931
                        endorseValue := l.experimentalEndorsement(
39✔
3932
                                record.CustomSet(add.CustomRecords),
39✔
3933
                        )
39✔
3934
                        endorseType := uint64(
39✔
3935
                                lnwire.ExperimentalEndorsementType,
39✔
3936
                        )
39✔
3937

39✔
3938
                        switch fwdPkg.State {
39✔
3939
                        case channeldb.FwdStateProcessed:
3✔
3940
                                // This add was not forwarded on the previous
3✔
3941
                                // processing phase, run it through our
3✔
3942
                                // validation pipeline to reproduce an error.
3✔
3943
                                // This may trigger a different error due to
3✔
3944
                                // expiring timelocks, but we expect that an
3✔
3945
                                // error will be reproduced.
3✔
3946
                                if !fwdPkg.FwdFilter.Contains(idx) {
3✔
3947
                                        break
×
3948
                                }
3949

3950
                                // Otherwise, it was already processed, we can
3951
                                // can collect it and continue.
3952
                                outgoingAdd := &lnwire.UpdateAddHTLC{
3✔
3953
                                        Expiry:        fwdInfo.OutgoingCTLV,
3✔
3954
                                        Amount:        fwdInfo.AmountToForward,
3✔
3955
                                        PaymentHash:   add.PaymentHash,
3✔
3956
                                        BlindingPoint: fwdInfo.NextBlinding,
3✔
3957
                                }
3✔
3958

3✔
3959
                                endorseValue.WhenSome(func(e byte) {
6✔
3960
                                        custRecords := map[uint64][]byte{
3✔
3961
                                                endorseType: {e},
3✔
3962
                                        }
3✔
3963

3✔
3964
                                        outgoingAdd.CustomRecords = custRecords
3✔
3965
                                })
3✔
3966

3967
                                // Finally, we'll encode the onion packet for
3968
                                // the _next_ hop using the hop iterator
3969
                                // decoded for the current hop.
3970
                                buf := bytes.NewBuffer(
3✔
3971
                                        outgoingAdd.OnionBlob[0:0],
3✔
3972
                                )
3✔
3973

3✔
3974
                                // We know this cannot fail, as this ADD
3✔
3975
                                // was marked forwarded in a previous
3✔
3976
                                // round of processing.
3✔
3977
                                chanIterator.EncodeNextHop(buf)
3✔
3978

3✔
3979
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
3980

3✔
3981
                                //nolint:ll
3✔
3982
                                updatePacket := &htlcPacket{
3✔
3983
                                        incomingChanID:       l.ShortChanID(),
3✔
3984
                                        incomingHTLCID:       add.ID,
3✔
3985
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
3986
                                        sourceRef:            &sourceRef,
3✔
3987
                                        incomingAmount:       add.Amount,
3✔
3988
                                        amount:               outgoingAdd.Amount,
3✔
3989
                                        htlc:                 outgoingAdd,
3✔
3990
                                        obfuscator:           obfuscator,
3✔
3991
                                        incomingTimeout:      add.Expiry,
3✔
3992
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
3993
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
3994
                                        inboundFee:           inboundFee,
3✔
3995
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
3996
                                }
3✔
3997
                                switchPackets = append(
3✔
3998
                                        switchPackets, updatePacket,
3✔
3999
                                )
3✔
4000

3✔
4001
                                continue
3✔
4002
                        }
4003

4004
                        // TODO(roasbeef): ensure don't accept outrageous
4005
                        // timeout for htlc
4006

4007
                        // With all our forwarding constraints met, we'll
4008
                        // create the outgoing HTLC using the parameters as
4009
                        // specified in the forwarding info.
4010
                        addMsg := &lnwire.UpdateAddHTLC{
39✔
4011
                                Expiry:        fwdInfo.OutgoingCTLV,
39✔
4012
                                Amount:        fwdInfo.AmountToForward,
39✔
4013
                                PaymentHash:   add.PaymentHash,
39✔
4014
                                BlindingPoint: fwdInfo.NextBlinding,
39✔
4015
                        }
39✔
4016

39✔
4017
                        endorseValue.WhenSome(func(e byte) {
78✔
4018
                                addMsg.CustomRecords = map[uint64][]byte{
39✔
4019
                                        endorseType: {e},
39✔
4020
                                }
39✔
4021
                        })
39✔
4022

4023
                        // Finally, we'll encode the onion packet for the
4024
                        // _next_ hop using the hop iterator decoded for the
4025
                        // current hop.
4026
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
39✔
4027
                        err := chanIterator.EncodeNextHop(buf)
39✔
4028
                        if err != nil {
39✔
4029
                                l.log.Errorf("unable to encode the "+
×
4030
                                        "remaining route %v", err)
×
4031

×
4032
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
4033
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
4034
                                }
×
4035

4036
                                failure := l.createFailureWithUpdate(
×
4037
                                        true, hop.Source, cb,
×
4038
                                )
×
4039

×
4040
                                l.sendHTLCError(
×
4041
                                        add, sourceRef, NewLinkError(failure),
×
4042
                                        obfuscator, false,
×
4043
                                )
×
4044
                                continue
×
4045
                        }
4046

4047
                        // Now that this add has been reprocessed, only append
4048
                        // it to our list of packets to forward to the switch
4049
                        // this is the first time processing the add. If the
4050
                        // fwd pkg has already been processed, then we entered
4051
                        // the above section to recreate a previous error.  If
4052
                        // the packet had previously been forwarded, it would
4053
                        // have been added to switchPackets at the top of this
4054
                        // section.
4055
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
78✔
4056
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
39✔
4057

39✔
4058
                                //nolint:ll
39✔
4059
                                updatePacket := &htlcPacket{
39✔
4060
                                        incomingChanID:       l.ShortChanID(),
39✔
4061
                                        incomingHTLCID:       add.ID,
39✔
4062
                                        outgoingChanID:       fwdInfo.NextHop,
39✔
4063
                                        sourceRef:            &sourceRef,
39✔
4064
                                        incomingAmount:       add.Amount,
39✔
4065
                                        amount:               addMsg.Amount,
39✔
4066
                                        htlc:                 addMsg,
39✔
4067
                                        obfuscator:           obfuscator,
39✔
4068
                                        incomingTimeout:      add.Expiry,
39✔
4069
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
39✔
4070
                                        inOnionCustomRecords: pld.CustomRecords(),
39✔
4071
                                        inboundFee:           inboundFee,
39✔
4072
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
39✔
4073
                                }
39✔
4074

39✔
4075
                                fwdPkg.FwdFilter.Set(idx)
39✔
4076
                                switchPackets = append(switchPackets,
39✔
4077
                                        updatePacket)
39✔
4078
                        }
39✔
4079
                }
4080
        }
4081

4082
        // Commit the htlcs we are intending to forward if this package has not
4083
        // been fully processed.
4084
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,347✔
4085
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,172✔
4086
                if err != nil {
1,172✔
4087
                        l.failf(LinkFailureError{code: ErrInternalError},
×
4088
                                "unable to set fwd filter: %v", err)
×
4089
                        return
×
4090
                }
×
4091
        }
4092

4093
        if len(switchPackets) == 0 {
2,314✔
4094
                return
1,139✔
4095
        }
1,139✔
4096

4097
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
39✔
4098

39✔
4099
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
39✔
4100
                len(switchPackets), replay)
39✔
4101

39✔
4102
        // NOTE: This call is made synchronous so that we ensure all circuits
39✔
4103
        // are committed in the exact order that they are processed in the link.
39✔
4104
        // Failing to do this could cause reorderings/gaps in the range of
39✔
4105
        // opened circuits, which violates assumptions made by the circuit
39✔
4106
        // trimming.
39✔
4107
        l.forwardBatch(replay, switchPackets...)
39✔
4108
}
4109

4110
// experimentalEndorsement returns the value to set for our outgoing
4111
// experimental endorsement field, and a boolean indicating whether it should
4112
// be populated on the outgoing htlc.
4113
func (l *channelLink) experimentalEndorsement(
4114
        customUpdateAdd record.CustomSet) fn.Option[byte] {
39✔
4115

39✔
4116
        // Only relay experimental signal if we are within the experiment
39✔
4117
        // period.
39✔
4118
        if !l.cfg.ShouldFwdExpEndorsement() {
42✔
4119
                return fn.None[byte]()
3✔
4120
        }
3✔
4121

4122
        // If we don't have any custom records or the experimental field is
4123
        // not set, just forward a zero value.
4124
        if len(customUpdateAdd) == 0 {
78✔
4125
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
39✔
4126
        }
39✔
4127

4128
        t := uint64(lnwire.ExperimentalEndorsementType)
3✔
4129
        value, set := customUpdateAdd[t]
3✔
4130
        if !set {
3✔
4131
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4132
        }
×
4133

4134
        // We expect at least one byte for this field, consider it invalid if
4135
        // it has no data and just forward a zero value.
4136
        if len(value) == 0 {
3✔
4137
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4138
        }
×
4139

4140
        // Only forward endorsed if the incoming link is endorsed.
4141
        if value[0] == lnwire.ExperimentalEndorsed {
6✔
4142
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
3✔
4143
        }
3✔
4144

4145
        // Forward as unendorsed otherwise, including cases where we've
4146
        // received an invalid value that uses more than 3 bits of information.
4147
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
4148
}
4149

4150
// processExitHop handles an htlc for which this link is the exit hop. It
4151
// returns a boolean indicating whether the commitment tx needs an update.
4152
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
4153
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
4154
        fwdInfo hop.ForwardingInfo, heightNow uint32,
4155
        payload invoices.Payload) error {
413✔
4156

413✔
4157
        // If hodl.ExitSettle is requested, we will not validate the final hop's
413✔
4158
        // ADD, nor will we settle the corresponding invoice or respond with the
413✔
4159
        // preimage.
413✔
4160
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
523✔
4161
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
110✔
4162
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
110✔
4163

110✔
4164
                return nil
110✔
4165
        }
110✔
4166

4167
        // In case the traffic shaper is active, we'll check if the HTLC has
4168
        // custom records and skip the amount check in the onion payload below.
4169
        isCustomHTLC := fn.MapOptionZ(
306✔
4170
                l.cfg.AuxTrafficShaper,
306✔
4171
                func(ts AuxTrafficShaper) bool {
306✔
4172
                        return ts.IsCustomHTLC(add.CustomRecords)
×
4173
                },
×
4174
        )
4175

4176
        // As we're the exit hop, we'll double check the hop-payload included in
4177
        // the HTLC to ensure that it was crafted correctly by the sender and
4178
        // is compatible with the HTLC we were extended. If an external
4179
        // validator is active we might bypass the amount check.
4180
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
406✔
4181
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
4182
                        "incompatible value: expected <=%v, got %v",
100✔
4183
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
4184

100✔
4185
                failure := NewLinkError(
100✔
4186
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
4187
                )
100✔
4188
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
4189

100✔
4190
                return nil
100✔
4191
        }
100✔
4192

4193
        // We'll also ensure that our time-lock value has been computed
4194
        // correctly.
4195
        if add.Expiry < fwdInfo.OutgoingCTLV {
207✔
4196
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
4197
                        "incompatible time-lock: expected <=%v, got %v",
1✔
4198
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
4199

1✔
4200
                failure := NewLinkError(
1✔
4201
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
4202
                )
1✔
4203

1✔
4204
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
4205

1✔
4206
                return nil
1✔
4207
        }
1✔
4208

4209
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4210
        // after this, this code will be re-executed after restart. We will
4211
        // receive back a resolution event.
4212
        invoiceHash := lntypes.Hash(add.PaymentHash)
205✔
4213

205✔
4214
        circuitKey := models.CircuitKey{
205✔
4215
                ChanID: l.ShortChanID(),
205✔
4216
                HtlcID: add.ID,
205✔
4217
        }
205✔
4218

205✔
4219
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
205✔
4220
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
205✔
4221
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
205✔
4222
        )
205✔
4223
        if err != nil {
205✔
4224
                return err
×
4225
        }
×
4226

4227
        // Create a hodlHtlc struct and decide either resolved now or later.
4228
        htlc := hodlHtlc{
205✔
4229
                add:        add,
205✔
4230
                sourceRef:  sourceRef,
205✔
4231
                obfuscator: obfuscator,
205✔
4232
        }
205✔
4233

205✔
4234
        // If the event is nil, the invoice is being held, so we save payment
205✔
4235
        // descriptor for future reference.
205✔
4236
        if event == nil {
264✔
4237
                l.hodlMap[circuitKey] = htlc
59✔
4238
                return nil
59✔
4239
        }
59✔
4240

4241
        // Process the received resolution.
4242
        return l.processHtlcResolution(event, htlc)
149✔
4243
}
4244

4245
// settleHTLC settles the HTLC on the channel.
4246
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4247
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
200✔
4248

200✔
4249
        hash := preimage.Hash()
200✔
4250

200✔
4251
        l.log.Infof("settling htlc %v as exit hop", hash)
200✔
4252

200✔
4253
        err := l.channel.SettleHTLC(
200✔
4254
                preimage, htlcIndex, &sourceRef, nil, nil,
200✔
4255
        )
200✔
4256
        if err != nil {
200✔
4257
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4258
        }
×
4259

4260
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4261
        // fake one before sending it to the peer.
4262
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
203✔
4263
                l.log.Warnf(hodl.BogusSettle.Warning())
3✔
4264
                preimage = [32]byte{}
3✔
4265
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3✔
4266
        }
3✔
4267

4268
        // HTLC was successfully settled locally send notification about it
4269
        // remote peer.
4270
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
200✔
4271
                ChanID:          l.ChanID(),
200✔
4272
                ID:              htlcIndex,
200✔
4273
                PaymentPreimage: preimage,
200✔
4274
        })
200✔
4275

200✔
4276
        // Once we have successfully settled the htlc, notify a settle event.
200✔
4277
        l.cfg.HtlcNotifier.NotifySettleEvent(
200✔
4278
                HtlcKey{
200✔
4279
                        IncomingCircuit: models.CircuitKey{
200✔
4280
                                ChanID: l.ShortChanID(),
200✔
4281
                                HtlcID: htlcIndex,
200✔
4282
                        },
200✔
4283
                },
200✔
4284
                preimage,
200✔
4285
                HtlcEventTypeReceive,
200✔
4286
        )
200✔
4287

200✔
4288
        return nil
200✔
4289
}
4290

4291
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4292
// err chan for the individual responses. This method is intended to be spawned
4293
// as a goroutine so the responses can be handled in the background.
4294
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
578✔
4295
        // Don't forward packets for which we already have a response in our
578✔
4296
        // mailbox. This could happen if a packet fails and is buffered in the
578✔
4297
        // mailbox, and the incoming link flaps.
578✔
4298
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
578✔
4299
        for _, pkt := range packets {
1,156✔
4300
                if l.mailBox.HasPacket(pkt.inKey()) {
581✔
4301
                        continue
3✔
4302
                }
4303

4304
                filteredPkts = append(filteredPkts, pkt)
578✔
4305
        }
4306

4307
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
578✔
4308
        if err != nil {
589✔
4309
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
4310
                        "settle/fail over htlcswitch: %v", err)
11✔
4311
        }
11✔
4312
}
4313

4314
// sendHTLCError functions cancels HTLC and send cancel message back to the
4315
// peer from which HTLC was received.
4316
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4317
        sourceRef channeldb.AddRef, failure *LinkError,
4318
        e hop.ErrorEncrypter, isReceive bool) {
108✔
4319

108✔
4320
        reason, err := e.EncryptFirstHop(failure.WireMessage())
108✔
4321
        if err != nil {
108✔
4322
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4323
                return
×
4324
        }
×
4325

4326
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
108✔
4327
        if err != nil {
108✔
4328
                l.log.Errorf("unable cancel htlc: %v", err)
×
4329
                return
×
4330
        }
×
4331

4332
        // Send the appropriate failure message depending on whether we're
4333
        // in a blinded route or not.
4334
        if err := l.sendIncomingHTLCFailureMsg(
108✔
4335
                add.ID, e, reason,
108✔
4336
        ); err != nil {
108✔
4337
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4338
                return
×
4339
        }
×
4340

4341
        // Notify a link failure on our incoming link. Outgoing htlc information
4342
        // is not available at this point, because we have not decrypted the
4343
        // onion, so it is excluded.
4344
        var eventType HtlcEventType
108✔
4345
        if isReceive {
216✔
4346
                eventType = HtlcEventTypeReceive
108✔
4347
        } else {
111✔
4348
                eventType = HtlcEventTypeForward
3✔
4349
        }
3✔
4350

4351
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
108✔
4352
                HtlcKey{
108✔
4353
                        IncomingCircuit: models.CircuitKey{
108✔
4354
                                ChanID: l.ShortChanID(),
108✔
4355
                                HtlcID: add.ID,
108✔
4356
                        },
108✔
4357
                },
108✔
4358
                HtlcInfo{
108✔
4359
                        IncomingTimeLock: add.Expiry,
108✔
4360
                        IncomingAmt:      add.Amount,
108✔
4361
                },
108✔
4362
                eventType,
108✔
4363
                failure,
108✔
4364
                true,
108✔
4365
        )
108✔
4366
}
4367

4368
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4369
// peer from which the HTLC was received. This function is primarily used to
4370
// handle the special requirements of route blinding, specifically:
4371
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4372
// - Introduction nodes should return regular HTLC failure messages.
4373
//
4374
// It accepts the original opaque failure, which will be used in the case
4375
// that we're not part of a blinded route and an error encrypter that'll be
4376
// used if we are the introduction node and need to present an error as if
4377
// we're the failing party.
4378
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4379
        e hop.ErrorEncrypter,
4380
        originalFailure lnwire.OpaqueReason) error {
124✔
4381

124✔
4382
        var msg lnwire.Message
124✔
4383
        switch {
124✔
4384
        // Our circuit's error encrypter will be nil if this was a locally
4385
        // initiated payment. We can only hit a blinded error for a locally
4386
        // initiated payment if we allow ourselves to be picked as the
4387
        // introduction node for our own payments and in that case we
4388
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4389
        // we fail it back and log an error.
4390
        // code.
4391
        case e == nil:
×
4392
                msg = &lnwire.UpdateFailHTLC{
×
4393
                        ChanID: l.ChanID(),
×
4394
                        ID:     htlcIndex,
×
4395
                        Reason: originalFailure,
×
4396
                }
×
4397

×
4398
                l.log.Errorf("Unexpected blinded failure when "+
×
4399
                        "we are the sending node, incoming htlc: %v(%v)",
×
4400
                        l.ShortChanID(), htlcIndex)
×
4401

4402
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4403
        // transformation on the error message and can just send the original.
4404
        case !e.Type().IsBlinded():
124✔
4405
                msg = &lnwire.UpdateFailHTLC{
124✔
4406
                        ChanID: l.ChanID(),
124✔
4407
                        ID:     htlcIndex,
124✔
4408
                        Reason: originalFailure,
124✔
4409
                }
124✔
4410

4411
        // When we're the introduction node, we need to convert the error to
4412
        // a UpdateFailHTLC.
4413
        case e.Type() == hop.EncrypterTypeIntroduction:
3✔
4414
                l.log.Debugf("Introduction blinded node switching out failure "+
3✔
4415
                        "error: %v", htlcIndex)
3✔
4416

3✔
4417
                // The specification does not require that we set the onion
3✔
4418
                // blob.
3✔
4419
                failureMsg := lnwire.NewInvalidBlinding(
3✔
4420
                        fn.None[[lnwire.OnionPacketSize]byte](),
3✔
4421
                )
3✔
4422
                reason, err := e.EncryptFirstHop(failureMsg)
3✔
4423
                if err != nil {
3✔
4424
                        return err
×
4425
                }
×
4426

4427
                msg = &lnwire.UpdateFailHTLC{
3✔
4428
                        ChanID: l.ChanID(),
3✔
4429
                        ID:     htlcIndex,
3✔
4430
                        Reason: reason,
3✔
4431
                }
3✔
4432

4433
        // If we are a relaying node, we need to switch out any error that
4434
        // we've received to a malformed HTLC error.
4435
        case e.Type() == hop.EncrypterTypeRelaying:
3✔
4436
                l.log.Debugf("Relaying blinded node switching out malformed "+
3✔
4437
                        "error: %v", htlcIndex)
3✔
4438

3✔
4439
                msg = &lnwire.UpdateFailMalformedHTLC{
3✔
4440
                        ChanID:      l.ChanID(),
3✔
4441
                        ID:          htlcIndex,
3✔
4442
                        FailureCode: lnwire.CodeInvalidBlinding,
3✔
4443
                }
3✔
4444

4445
        default:
×
4446
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4447
        }
4448

4449
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
124✔
4450
                l.log.Warnf("Send update fail failed: %v", err)
×
4451
        }
×
4452

4453
        return nil
124✔
4454
}
4455

4456
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4457
// to the payment sender.
4458
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4459
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4460
        sourceRef *channeldb.AddRef) {
6✔
4461

6✔
4462
        shaOnionBlob := sha256.Sum256(onionBlob[:])
6✔
4463
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
6✔
4464
        if err != nil {
6✔
4465
                l.log.Errorf("unable cancel htlc: %v", err)
×
4466
                return
×
4467
        }
×
4468

4469
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
6✔
4470
                ChanID:       l.ChanID(),
6✔
4471
                ID:           htlcIndex,
6✔
4472
                ShaOnionBlob: shaOnionBlob,
6✔
4473
                FailureCode:  code,
6✔
4474
        })
6✔
4475
}
4476

4477
// failf is a function which is used to encapsulate the action necessary for
4478
// properly failing the link. It takes a LinkFailureError, which will be passed
4479
// to the OnChannelFailure closure, in order for it to determine if we should
4480
// force close the channel, and if we should send an error message to the
4481
// remote peer.
4482
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4483
        a ...interface{}) {
18✔
4484

18✔
4485
        reason := fmt.Errorf(format, a...)
18✔
4486

18✔
4487
        // Return if we have already notified about a failure.
18✔
4488
        if l.failed {
21✔
4489
                l.log.Warnf("ignoring link failure (%v), as link already "+
3✔
4490
                        "failed", reason)
3✔
4491
                return
3✔
4492
        }
3✔
4493

4494
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
18✔
4495

18✔
4496
        // Set failed, such that we won't process any more updates, and notify
18✔
4497
        // the peer about the failure.
18✔
4498
        l.failed = true
18✔
4499
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
18✔
4500
}
4501

4502
// FundingCustomBlob returns the custom funding blob of the channel that this
4503
// link is associated with. The funding blob represents static information about
4504
// the channel that was created at channel funding time.
4505
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4506
        if l.channel == nil {
×
4507
                return fn.None[tlv.Blob]()
×
4508
        }
×
4509

4510
        if l.channel.State() == nil {
×
4511
                return fn.None[tlv.Blob]()
×
4512
        }
×
4513

4514
        return l.channel.State().CustomBlob
×
4515
}
4516

4517
// CommitmentCustomBlob returns the custom blob of the current local commitment
4518
// of the channel that this link is associated with.
4519
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4520
        if l.channel == nil {
×
4521
                return fn.None[tlv.Blob]()
×
4522
        }
×
4523

4524
        return l.channel.LocalCommitmentBlob()
×
4525
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc