• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16034782542

02 Jul 2025 08:10PM UTC coverage: 57.802% (-9.8%) from 67.589%
16034782542

Pull #10001

github

web-flow
Merge 30348038e into 8a0341419
Pull Request #10001: Enable quiescence in production and add timeout config

5 of 8 new or added lines in 4 files covered. (62.5%)

28413 existing lines in 456 files now uncovered.

98494 of 170400 relevant lines covered (57.8%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.04
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/routing/route"
35
        "github.com/lightningnetwork/lnd/ticker"
36
        "github.com/lightningnetwork/lnd/tlv"
37
)
38

39
func init() {
3✔
40
        prand.Seed(time.Now().UnixNano())
3✔
41
}
3✔
42

43
const (
44
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
45
        // the node accepts for forwarded payments. The value is relative to the
46
        // current block height. The reason to have a maximum is to prevent
47
        // funds getting locked up unreasonably long. Otherwise, an attacker
48
        // willing to lock its own funds too, could force the funds of this node
49
        // to be locked up for an indefinite (max int32) number of blocks.
50
        //
51
        // The value 2016 corresponds to on average two weeks worth of blocks
52
        // and is based on the maximum number of hops (20), the default CLTV
53
        // delta (40), and some extra margin to account for the other lightning
54
        // implementations and past lnd versions which used to have a default
55
        // CLTV delta of 144.
56
        DefaultMaxOutgoingCltvExpiry = 2016
57

58
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
59
        // which a link should propose to update its commitment fee rate.
60
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
61

62
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
63
        // which a link should propose to update its commitment fee rate.
64
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
65

66
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
67
        // a channel's commitment fee to be of its balance. This only applies to
68
        // the initiator of the channel.
69
        DefaultMaxLinkFeeAllocation float64 = 0.5
70
)
71

72
// ExpectedFee computes the expected fee for a given htlc amount. The value
73
// returned from this function is to be used as a sanity check when forwarding
74
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
75
// forwarding policy.
76
//
77
// TODO(roasbeef): also add in current available channel bandwidth, inverse
78
// func
79
func ExpectedFee(f models.ForwardingPolicy,
80
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
3✔
81

3✔
82
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
3✔
83
}
3✔
84

85
// ChannelLinkConfig defines the configuration for the channel link. ALL
86
// elements within the configuration MUST be non-nil for channel link to carry
87
// out its duties.
88
type ChannelLinkConfig struct {
89
        // FwrdingPolicy is the initial forwarding policy to be used when
90
        // deciding whether to forwarding incoming HTLC's or not. This value
91
        // can be updated with subsequent calls to UpdateForwardingPolicy
92
        // targeted at a given ChannelLink concrete interface implementation.
93
        FwrdingPolicy models.ForwardingPolicy
94

95
        // Circuits provides restricted access to the switch's circuit map,
96
        // allowing the link to open and close circuits.
97
        Circuits CircuitModifier
98

99
        // BestHeight returns the best known height.
100
        BestHeight func() uint32
101

102
        // ForwardPackets attempts to forward the batch of htlcs through the
103
        // switch. The function returns and error in case it fails to send one or
104
        // more packets. The link's quit signal should be provided to allow
105
        // cancellation of forwarding during link shutdown.
106
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
107

108
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
109
        // blobs, which are then used to inform how to forward an HTLC.
110
        //
111
        // NOTE: This function assumes the same set of readers and preimages
112
        // are always presented for the same identifier. The last boolean is
113
        // used to decide whether this is a reforwarding or not - when it's
114
        // reforwarding, we skip the replay check enforced in our decay log.
115
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
116
                []hop.DecodeHopIteratorResponse, error)
117

118
        // ExtractErrorEncrypter function is responsible for decoding HTLC
119
        // Sphinx onion blob, and creating onion failure obfuscator.
120
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
121

122
        // FetchLastChannelUpdate retrieves the latest routing policy for a
123
        // target channel. This channel will typically be the outgoing channel
124
        // specified when we receive an incoming HTLC.  This will be used to
125
        // provide payment senders our latest policy when sending encrypted
126
        // error messages.
127
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
128
                *lnwire.ChannelUpdate1, error)
129

130
        // Peer is a lightning network node with which we have the channel link
131
        // opened.
132
        Peer lnpeer.Peer
133

134
        // Registry is a sub-system which responsible for managing the invoices
135
        // in thread-safe manner.
136
        Registry InvoiceDatabase
137

138
        // PreimageCache is a global witness beacon that houses any new
139
        // preimages discovered by other links. We'll use this to add new
140
        // witnesses that we discover which will notify any sub-systems
141
        // subscribed to new events.
142
        PreimageCache contractcourt.WitnessBeacon
143

144
        // OnChannelFailure is a function closure that we'll call if the
145
        // channel failed for some reason. Depending on the severity of the
146
        // error, the closure potentially must force close this channel and
147
        // disconnect the peer.
148
        //
149
        // NOTE: The method must return in order for the ChannelLink to be able
150
        // to shut down properly.
151
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
152
                LinkFailureError)
153

154
        // UpdateContractSignals is a function closure that we'll use to update
155
        // outside sub-systems with this channel's latest ShortChannelID.
156
        UpdateContractSignals func(*contractcourt.ContractSignals) error
157

158
        // NotifyContractUpdate is a function closure that we'll use to update
159
        // the contractcourt and more specifically the ChannelArbitrator of the
160
        // latest channel state.
161
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
162

163
        // ChainEvents is an active subscription to the chain watcher for this
164
        // channel to be notified of any on-chain activity related to this
165
        // channel.
166
        ChainEvents *contractcourt.ChainEventSubscription
167

168
        // FeeEstimator is an instance of a live fee estimator which will be
169
        // used to dynamically regulate the current fee of the commitment
170
        // transaction to ensure timely confirmation.
171
        FeeEstimator chainfee.Estimator
172

173
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
174
        // for HTLC forwarding internal to the switch.
175
        //
176
        // NOTE: This should only be used for testing.
177
        HodlMask hodl.Mask
178

179
        // SyncStates is used to indicate that we need send the channel
180
        // reestablishment message to the remote peer. It should be done if our
181
        // clients have been restarted, or remote peer have been reconnected.
182
        SyncStates bool
183

184
        // BatchTicker is the ticker that determines the interval that we'll
185
        // use to check the batch to see if there're any updates we should
186
        // flush out. By batching updates into a single commit, we attempt to
187
        // increase throughput by maximizing the number of updates coalesced
188
        // into a single commit.
189
        BatchTicker ticker.Ticker
190

191
        // FwdPkgGCTicker is the ticker determining the frequency at which
192
        // garbage collection of forwarding packages occurs. We use a
193
        // time-based approach, as opposed to block epochs, as to not hinder
194
        // syncing.
195
        FwdPkgGCTicker ticker.Ticker
196

197
        // PendingCommitTicker is a ticker that allows the link to determine if
198
        // a locally initiated commitment dance gets stuck waiting for the
199
        // remote party to revoke.
200
        PendingCommitTicker ticker.Ticker
201

202
        // BatchSize is the max size of a batch of updates done to the link
203
        // before we do a state update.
204
        BatchSize uint32
205

206
        // UnsafeReplay will cause a link to replay the adds in its latest
207
        // commitment txn after the link is restarted. This should only be used
208
        // in testing, it is here to ensure the sphinx replay detection on the
209
        // receiving node is persistent.
210
        UnsafeReplay bool
211

212
        // MinUpdateTimeout represents the minimum interval in which a link
213
        // will propose to update its commitment fee rate. A random timeout will
214
        // be selected between this and MaxUpdateTimeout.
215
        MinUpdateTimeout time.Duration
216

217
        // MaxUpdateTimeout represents the maximum interval in which a link
218
        // will propose to update its commitment fee rate. A random timeout will
219
        // be selected between this and MinUpdateTimeout.
220
        MaxUpdateTimeout time.Duration
221

222
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
223
        // an htlc where we don't offer an htlc anymore. This should be at least
224
        // the outgoing broadcast delta, because in any case we don't want to
225
        // risk offering an htlc that triggers channel closure.
226
        OutgoingCltvRejectDelta uint32
227

228
        // TowerClient is an optional engine that manages the signing,
229
        // encrypting, and uploading of justice transactions to the daemon's
230
        // configured set of watchtowers for legacy channels.
231
        TowerClient TowerClient
232

233
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
234
        // should accept for a forwarded HTLC. The value is relative to the
235
        // current block height.
236
        MaxOutgoingCltvExpiry uint32
237

238
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
239
        // commitment fee to be of its balance. This only applies to the
240
        // initiator of the channel.
241
        MaxFeeAllocation float64
242

243
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
244
        // the initiator for channels of the anchor type.
245
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
246

247
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
248
        // link is first started.
249
        NotifyActiveLink func(wire.OutPoint)
250

251
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
252
        // channels becomes active.
253
        NotifyActiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
256
        // when channels become inactive.
257
        NotifyInactiveChannel func(wire.OutPoint)
258

259
        // NotifyInactiveLinkEvent allows the switch to tell the
260
        // ChannelNotifier when a channel link become inactive.
261
        NotifyInactiveLinkEvent func(wire.OutPoint)
262

263
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
264
        // events through.
265
        HtlcNotifier htlcNotifier
266

267
        // FailAliasUpdate is a function used to fail an HTLC for an
268
        // option_scid_alias channel.
269
        FailAliasUpdate func(sid lnwire.ShortChannelID,
270
                incoming bool) *lnwire.ChannelUpdate1
271

272
        // GetAliases is used by the link and switch to fetch the set of
273
        // aliases for a given link.
274
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
275

276
        // PreviouslySentShutdown is an optional value that is set if, at the
277
        // time of the link being started, persisted shutdown info was found for
278
        // the channel. This value being set means that we previously sent a
279
        // Shutdown message to our peer, and so we should do so again on
280
        // re-establish and should not allow anymore HTLC adds on the outgoing
281
        // direction of the link.
282
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
283

284
        // Adds the option to disable forwarding payments in blinded routes
285
        // by failing back any blinding-related payloads as if they were
286
        // invalid.
287
        DisallowRouteBlinding bool
288

289
        // DisallowQuiescence is a flag that can be used to disable the
290
        // quiescence protocol.
291
        DisallowQuiescence bool
292

293
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
294
        // restrict the flow of HTLCs and fee updates.
295
        MaxFeeExposure lnwire.MilliSatoshi
296

297
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
298
        // should forward experimental endorsement signals.
299
        ShouldFwdExpEndorsement func() bool
300

301
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
302
        // used to manage the bandwidth of the link.
303
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
304

305
        // QuiescenceTimeout is the max duration that the channel can be
306
        // quiesced. Any dependent protocols (dynamic commitments, splicing,
307
        // etc.) must finish their operations under this timeout value,
308
        // otherwise the node will disconnect.
309
        QuiescenceTimeout time.Duration
310
}
311

312
// channelLink is the service which drives a channel's commitment update
313
// state-machine. In the event that an HTLC needs to be propagated to another
314
// link, the forward handler from config is used which sends HTLC to the
315
// switch. Additionally, the link encapsulate logic of commitment protocol
316
// message ordering and updates.
317
type channelLink struct {
318
        // The following fields are only meant to be used *atomically*
319
        started       int32
320
        reestablished int32
321
        shutdown      int32
322

323
        // failed should be set to true in case a link error happens, making
324
        // sure we don't process any more updates.
325
        failed bool
326

327
        // keystoneBatch represents a volatile list of keystones that must be
328
        // written before attempting to sign the next commitment txn. These
329
        // represent all the HTLC's forwarded to the link from the switch. Once
330
        // we lock them into our outgoing commitment, then the circuit has a
331
        // keystone, and is fully opened.
332
        keystoneBatch []Keystone
333

334
        // openedCircuits is the set of all payment circuits that will be open
335
        // once we make our next commitment. After making the commitment we'll
336
        // ACK all these from our mailbox to ensure that they don't get
337
        // re-delivered if we reconnect.
338
        openedCircuits []CircuitKey
339

340
        // closedCircuits is the set of all payment circuits that will be
341
        // closed once we make our next commitment. After taking the commitment
342
        // we'll ACK all these to ensure that they don't get re-delivered if we
343
        // reconnect.
344
        closedCircuits []CircuitKey
345

346
        // channel is a lightning network channel to which we apply htlc
347
        // updates.
348
        channel *lnwallet.LightningChannel
349

350
        // cfg is a structure which carries all dependable fields/handlers
351
        // which may affect behaviour of the service.
352
        cfg ChannelLinkConfig
353

354
        // mailBox is the main interface between the outside world and the
355
        // link. All incoming messages will be sent over this mailBox. Messages
356
        // include new updates from our connected peer, and new packets to be
357
        // forwarded sent by the switch.
358
        mailBox MailBox
359

360
        // upstream is a channel that new messages sent from the remote peer to
361
        // the local peer will be sent across.
362
        upstream chan lnwire.Message
363

364
        // downstream is a channel in which new multi-hop HTLC's to be
365
        // forwarded will be sent across. Messages from this channel are sent
366
        // by the HTLC switch.
367
        downstream chan *htlcPacket
368

369
        // updateFeeTimer is the timer responsible for updating the link's
370
        // commitment fee every time it fires.
371
        updateFeeTimer *time.Timer
372

373
        // uncommittedPreimages stores a list of all preimages that have been
374
        // learned since receiving the last CommitSig from the remote peer. The
375
        // batch will be flushed just before accepting the subsequent CommitSig
376
        // or on shutdown to avoid doing a write for each preimage received.
377
        uncommittedPreimages []lntypes.Preimage
378

379
        sync.RWMutex
380

381
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
382
        // registry.
383
        hodlQueue *queue.ConcurrentQueue
384

385
        // hodlMap stores related htlc data for a circuit key. It allows
386
        // resolving those htlcs when we receive a message on hodlQueue.
387
        hodlMap map[models.CircuitKey]hodlHtlc
388

389
        // log is a link-specific logging instance.
390
        log btclog.Logger
391

392
        // isOutgoingAddBlocked tracks whether the channelLink can send an
393
        // UpdateAddHTLC.
394
        isOutgoingAddBlocked atomic.Bool
395

396
        // isIncomingAddBlocked tracks whether the channelLink can receive an
397
        // UpdateAddHTLC.
398
        isIncomingAddBlocked atomic.Bool
399

400
        // flushHooks is a hookMap that is triggered when we reach a channel
401
        // state with no live HTLCs.
402
        flushHooks hookMap
403

404
        // outgoingCommitHooks is a hookMap that is triggered after we send our
405
        // next CommitSig.
406
        outgoingCommitHooks hookMap
407

408
        // incomingCommitHooks is a hookMap that is triggered after we receive
409
        // our next CommitSig.
410
        incomingCommitHooks hookMap
411

412
        // quiescer is the state machine that tracks where this channel is with
413
        // respect to the quiescence protocol.
414
        quiescer Quiescer
415

416
        // quiescenceReqs is a queue of requests to quiesce this link. The
417
        // members of the queue are send-only channels we should call back with
418
        // the result.
419
        quiescenceReqs chan StfuReq
420

421
        // cg is a helper that encapsulates a wait group and quit channel and
422
        // allows contexts that either block or cancel on those depending on
423
        // the use case.
424
        cg *fn.ContextGuard
425
}
426

427
// hookMap is a data structure that is used to track the hooks that need to be
428
// called in various parts of the channelLink's lifecycle.
429
//
430
// WARNING: NOT thread-safe.
431
type hookMap struct {
432
        // allocIdx keeps track of the next id we haven't yet allocated.
433
        allocIdx atomic.Uint64
434

435
        // transient is a map of hooks that are only called the next time invoke
436
        // is called. These hooks are deleted during invoke.
437
        transient map[uint64]func()
438

439
        // newTransients is a channel that we use to accept new hooks into the
440
        // hookMap.
441
        newTransients chan func()
442
}
443

444
// newHookMap initializes a new empty hookMap.
445
func newHookMap() hookMap {
3✔
446
        return hookMap{
3✔
447
                allocIdx:      atomic.Uint64{},
3✔
448
                transient:     make(map[uint64]func()),
3✔
449
                newTransients: make(chan func()),
3✔
450
        }
3✔
451
}
3✔
452

453
// alloc allocates space in the hook map for the supplied hook, the second
454
// argument determines whether it goes into the transient or persistent part
455
// of the hookMap.
456
func (m *hookMap) alloc(hook func()) uint64 {
3✔
457
        // We assume we never overflow a uint64. Seems OK.
3✔
458
        hookID := m.allocIdx.Add(1)
3✔
459
        if hookID == 0 {
3✔
460
                panic("hookMap allocIdx overflow")
×
461
        }
462
        m.transient[hookID] = hook
3✔
463

3✔
464
        return hookID
3✔
465
}
466

467
// invoke is used on a hook map to call all the registered hooks and then clear
468
// out the transient hooks so they are not called again.
469
func (m *hookMap) invoke() {
3✔
470
        for _, hook := range m.transient {
6✔
471
                hook()
3✔
472
        }
3✔
473

474
        m.transient = make(map[uint64]func())
3✔
475
}
476

477
// hodlHtlc contains htlc data that is required for resolution.
478
type hodlHtlc struct {
479
        add        lnwire.UpdateAddHTLC
480
        sourceRef  channeldb.AddRef
481
        obfuscator hop.ErrorEncrypter
482
}
483

484
// NewChannelLink creates a new instance of a ChannelLink given a configuration
485
// and active channel that will be used to verify/apply updates to.
486
func NewChannelLink(cfg ChannelLinkConfig,
487
        channel *lnwallet.LightningChannel) ChannelLink {
3✔
488

3✔
489
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
3✔
490

3✔
491
        // If the max fee exposure isn't set, use the default.
3✔
492
        if cfg.MaxFeeExposure == 0 {
3✔
UNCOV
493
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
×
UNCOV
494
        }
×
495

496
        var qsm Quiescer
3✔
497
        if !cfg.DisallowQuiescence {
6✔
498
                qsm = NewQuiescer(QuiescerCfg{
3✔
499
                        chanID: lnwire.NewChanIDFromOutPoint(
3✔
500
                                channel.ChannelPoint(),
3✔
501
                        ),
3✔
502
                        channelInitiator: channel.Initiator(),
3✔
503
                        sendMsg: func(s lnwire.Stfu) error {
6✔
504
                                return cfg.Peer.SendMessage(false, &s)
3✔
505
                        },
3✔
506
                        timeoutDuration: cfg.QuiescenceTimeout,
507
                        onTimeout: func() {
3✔
508
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
3✔
509
                        },
3✔
510
                })
511
        } else {
×
512
                qsm = &quiescerNoop{}
×
513
        }
×
514

515
        quiescenceReqs := make(
3✔
516
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
3✔
517
        )
3✔
518

3✔
519
        return &channelLink{
3✔
520
                cfg:                 cfg,
3✔
521
                channel:             channel,
3✔
522
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
3✔
523
                hodlQueue:           queue.NewConcurrentQueue(10),
3✔
524
                log:                 log.WithPrefix(logPrefix),
3✔
525
                flushHooks:          newHookMap(),
3✔
526
                outgoingCommitHooks: newHookMap(),
3✔
527
                incomingCommitHooks: newHookMap(),
3✔
528
                quiescer:            qsm,
3✔
529
                quiescenceReqs:      quiescenceReqs,
3✔
530
                cg:                  fn.NewContextGuard(),
3✔
531
        }
3✔
532
}
533

534
// A compile time check to ensure channelLink implements the ChannelLink
535
// interface.
536
var _ ChannelLink = (*channelLink)(nil)
537

538
// Start starts all helper goroutines required for the operation of the channel
539
// link.
540
//
541
// NOTE: Part of the ChannelLink interface.
542
func (l *channelLink) Start() error {
3✔
543
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
3✔
544
                err := fmt.Errorf("channel link(%v): already started", l)
×
545
                l.log.Warn("already started")
×
546
                return err
×
547
        }
×
548

549
        l.log.Info("starting")
3✔
550

3✔
551
        // If the config supplied watchtower client, ensure the channel is
3✔
552
        // registered before trying to use it during operation.
3✔
553
        if l.cfg.TowerClient != nil {
6✔
554
                err := l.cfg.TowerClient.RegisterChannel(
3✔
555
                        l.ChanID(), l.channel.State().ChanType,
3✔
556
                )
3✔
557
                if err != nil {
3✔
558
                        return err
×
559
                }
×
560
        }
561

562
        l.mailBox.ResetMessages()
3✔
563
        l.hodlQueue.Start()
3✔
564

3✔
565
        // Before launching the htlcManager messages, revert any circuits that
3✔
566
        // were marked open in the switch's circuit map, but did not make it
3✔
567
        // into a commitment txn. We use the next local htlc index as the cut
3✔
568
        // off point, since all indexes below that are committed. This action
3✔
569
        // is only performed if the link's final short channel ID has been
3✔
570
        // assigned, otherwise we would try to trim the htlcs belonging to the
3✔
571
        // all-zero, hop.Source ID.
3✔
572
        if l.ShortChanID() != hop.Source {
6✔
573
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
3✔
574
                if err != nil {
3✔
575
                        return fmt.Errorf("unable to retrieve next local "+
×
576
                                "htlc index: %v", err)
×
577
                }
×
578

579
                // NOTE: This is automatically done by the switch when it
580
                // starts up, but is necessary to prevent inconsistencies in
581
                // the case that the link flaps. This is a result of a link's
582
                // life-cycle being shorter than that of the switch.
583
                chanID := l.ShortChanID()
3✔
584
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
3✔
585
                if err != nil {
3✔
586
                        return fmt.Errorf("unable to trim circuits above "+
×
587
                                "local htlc index %d: %v", localHtlcIndex, err)
×
588
                }
×
589

590
                // Since the link is live, before we start the link we'll update
591
                // the ChainArbitrator with the set of new channel signals for
592
                // this channel.
593
                //
594
                // TODO(roasbeef): split goroutines within channel arb to avoid
595
                go func() {
6✔
596
                        signals := &contractcourt.ContractSignals{
3✔
597
                                ShortChanID: l.channel.ShortChanID(),
3✔
598
                        }
3✔
599

3✔
600
                        err := l.cfg.UpdateContractSignals(signals)
3✔
601
                        if err != nil {
3✔
602
                                l.log.Errorf("unable to update signals")
×
603
                        }
×
604
                }()
605
        }
606

607
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
3✔
608

3✔
609
        l.cg.WgAdd(1)
3✔
610
        go l.htlcManager(context.TODO())
3✔
611

3✔
612
        return nil
3✔
613
}
614

615
// Stop gracefully stops all active helper goroutines, then waits until they've
616
// exited.
617
//
618
// NOTE: Part of the ChannelLink interface.
619
func (l *channelLink) Stop() {
3✔
620
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
3✔
UNCOV
621
                l.log.Warn("already stopped")
×
UNCOV
622
                return
×
UNCOV
623
        }
×
624

625
        l.log.Info("stopping")
3✔
626

3✔
627
        // As the link is stopping, we are no longer interested in htlc
3✔
628
        // resolutions coming from the invoice registry.
3✔
629
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
3✔
630

3✔
631
        if l.cfg.ChainEvents.Cancel != nil {
6✔
632
                l.cfg.ChainEvents.Cancel()
3✔
633
        }
3✔
634

635
        // Ensure the channel for the timer is drained.
636
        if l.updateFeeTimer != nil {
6✔
637
                if !l.updateFeeTimer.Stop() {
3✔
638
                        select {
×
639
                        case <-l.updateFeeTimer.C:
×
640
                        default:
×
641
                        }
642
                }
643
        }
644

645
        if l.hodlQueue != nil {
6✔
646
                l.hodlQueue.Stop()
3✔
647
        }
3✔
648

649
        l.cg.Quit()
3✔
650
        l.cg.WgWait()
3✔
651

3✔
652
        // Now that the htlcManager has completely exited, reset the packet
3✔
653
        // courier. This allows the mailbox to revaluate any lingering Adds that
3✔
654
        // were delivered but didn't make it on a commitment to be failed back
3✔
655
        // if the link is offline for an extended period of time. The error is
3✔
656
        // ignored since it can only fail when the daemon is exiting.
3✔
657
        _ = l.mailBox.ResetPackets()
3✔
658

3✔
659
        // As a final precaution, we will attempt to flush any uncommitted
3✔
660
        // preimages to the preimage cache. The preimages should be re-delivered
3✔
661
        // after channel reestablishment, however this adds an extra layer of
3✔
662
        // protection in case the peer never returns. Without this, we will be
3✔
663
        // unable to settle any contracts depending on the preimages even though
3✔
664
        // we had learned them at some point.
3✔
665
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
3✔
666
        if err != nil {
3✔
667
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
668
                        l.uncommittedPreimages, err)
×
669
        }
×
670
}
671

672
// WaitForShutdown blocks until the link finishes shutting down, which includes
673
// termination of all dependent goroutines.
674
func (l *channelLink) WaitForShutdown() {
×
675
        l.cg.WgWait()
×
676
}
×
677

678
// EligibleToForward returns a bool indicating if the channel is able to
679
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
680
// we are eligible to update AND the channel isn't currently flushing the
681
// outgoing half of the channel.
682
//
683
// NOTE: MUST NOT be called from the main event loop.
684
func (l *channelLink) EligibleToForward() bool {
3✔
685
        l.RLock()
3✔
686
        defer l.RUnlock()
3✔
687

3✔
688
        return l.eligibleToForward()
3✔
689
}
3✔
690

691
// eligibleToForward returns a bool indicating if the channel is able to
692
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
693
// we are eligible to update AND the channel isn't currently flushing the
694
// outgoing half of the channel.
695
//
696
// NOTE: MUST be called from the main event loop.
697
func (l *channelLink) eligibleToForward() bool {
3✔
698
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
3✔
699
}
3✔
700

701
// eligibleToUpdate returns a bool indicating if the channel is able to update
702
// channel state. We're able to update channel state if we know the remote
703
// party's next revocation point. Otherwise, we can't initiate new channel
704
// state. We also require that the short channel ID not be the all-zero source
705
// ID, meaning that the channel has had its ID finalized.
706
//
707
// NOTE: MUST be called from the main event loop.
708
func (l *channelLink) eligibleToUpdate() bool {
3✔
709
        return l.channel.RemoteNextRevocation() != nil &&
3✔
710
                l.channel.ShortChanID() != hop.Source &&
3✔
711
                l.isReestablished() &&
3✔
712
                l.quiescer.CanSendUpdates()
3✔
713
}
3✔
714

715
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
716
// the specified direction. It returns true if the state was changed and false
717
// if the desired state was already set before the method was called.
UNCOV
718
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
×
UNCOV
719
        if linkDirection == Outgoing {
×
UNCOV
720
                return l.isOutgoingAddBlocked.Swap(false)
×
UNCOV
721
        }
×
722

UNCOV
723
        return l.isIncomingAddBlocked.Swap(false)
×
724
}
725

726
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
727
// the specified direction. It returns true if the state was changed and false
728
// if the desired state was already set before the method was called.
729
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
3✔
730
        if linkDirection == Outgoing {
6✔
731
                return !l.isOutgoingAddBlocked.Swap(true)
3✔
732
        }
3✔
733

734
        return !l.isIncomingAddBlocked.Swap(true)
3✔
735
}
736

737
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
738
// the argument.
739
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
3✔
740
        if linkDirection == Outgoing {
6✔
741
                return l.isOutgoingAddBlocked.Load()
3✔
742
        }
3✔
743

744
        return l.isIncomingAddBlocked.Load()
3✔
745
}
746

747
// OnFlushedOnce adds a hook that will be called the next time the channel
748
// state reaches zero htlcs. This hook will only ever be called once. If the
749
// channel state already has zero htlcs, then this will be called immediately.
750
func (l *channelLink) OnFlushedOnce(hook func()) {
3✔
751
        select {
3✔
752
        case l.flushHooks.newTransients <- hook:
3✔
753
        case <-l.cg.Done():
×
754
        }
755
}
756

757
// OnCommitOnce adds a hook that will be called the next time a CommitSig
758
// message is sent in the argument's LinkDirection. This hook will only ever be
759
// called once. If no CommitSig is owed in the argument's LinkDirection, then
760
// we will call this hook be run immediately.
761
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
3✔
762
        var queue chan func()
3✔
763

3✔
764
        if direction == Outgoing {
6✔
765
                queue = l.outgoingCommitHooks.newTransients
3✔
766
        } else {
3✔
767
                queue = l.incomingCommitHooks.newTransients
×
768
        }
×
769

770
        select {
3✔
771
        case queue <- hook:
3✔
772
        case <-l.cg.Done():
×
773
        }
774
}
775

776
// InitStfu allows us to initiate quiescence on this link. It returns a receive
777
// only channel that will block until quiescence has been achieved, or
778
// definitively fails.
779
//
780
// This operation has been added to allow channels to be quiesced via RPC. It
781
// may be removed or reworked in the future as RPC initiated quiescence is a
782
// holdover until we have downstream protocols that use it.
783
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
3✔
784
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
3✔
785
                fn.Unit{},
3✔
786
        )
3✔
787

3✔
788
        select {
3✔
789
        case l.quiescenceReqs <- req:
3✔
790
        case <-l.cg.Done():
×
791
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
792
        }
793

794
        return out
3✔
795
}
796

797
// isReestablished returns true if the link has successfully completed the
798
// channel reestablishment dance.
799
func (l *channelLink) isReestablished() bool {
3✔
800
        return atomic.LoadInt32(&l.reestablished) == 1
3✔
801
}
3✔
802

803
// markReestablished signals that the remote peer has successfully exchanged
804
// channel reestablish messages and that the channel is ready to process
805
// subsequent messages.
806
func (l *channelLink) markReestablished() {
3✔
807
        atomic.StoreInt32(&l.reestablished, 1)
3✔
808
}
3✔
809

810
// IsUnadvertised returns true if the underlying channel is unadvertised.
811
func (l *channelLink) IsUnadvertised() bool {
3✔
812
        state := l.channel.State()
3✔
813
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
3✔
814
}
3✔
815

816
// sampleNetworkFee samples the current fee rate on the network to get into the
817
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
818
// this is the native rate used when computing the fee for commitment
819
// transactions, and the second-level HTLC transactions.
UNCOV
820
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
×
UNCOV
821
        // We'll first query for the sat/kw recommended to be confirmed within 3
×
UNCOV
822
        // blocks.
×
UNCOV
823
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
×
UNCOV
824
        if err != nil {
×
825
                return 0, err
×
826
        }
×
827

UNCOV
828
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
×
UNCOV
829
                int64(feePerKw))
×
UNCOV
830

×
UNCOV
831
        return feePerKw, nil
×
832
}
833

834
// shouldAdjustCommitFee returns true if we should update our commitment fee to
835
// match that of the network fee. We'll only update our commitment fee if the
836
// network fee is +/- 10% to our commitment fee or if our current commitment
837
// fee is below the minimum relay fee.
838
func shouldAdjustCommitFee(netFee, chanFee,
UNCOV
839
        minRelayFee chainfee.SatPerKWeight) bool {
×
UNCOV
840

×
UNCOV
841
        switch {
×
842
        // If the network fee is greater than our current commitment fee and
843
        // our current commitment fee is below the minimum relay fee then
844
        // we should switch to it no matter if it is less than a 10% increase.
UNCOV
845
        case netFee > chanFee && chanFee < minRelayFee:
×
UNCOV
846
                return true
×
847

848
        // If the network fee is greater than the commitment fee, then we'll
849
        // switch to it if it's at least 10% greater than the commit fee.
UNCOV
850
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
×
UNCOV
851
                return true
×
852

853
        // If the network fee is less than our commitment fee, then we'll
854
        // switch to it if it's at least 10% less than the commitment fee.
UNCOV
855
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
×
UNCOV
856
                return true
×
857

858
        // Otherwise, we won't modify our fee.
UNCOV
859
        default:
×
UNCOV
860
                return false
×
861
        }
862
}
863

864
// failCb is used to cut down on the argument verbosity.
865
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
866

867
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
868
// outgoing HTLC. It may return a FailureMessage that references a channel's
869
// alias. If the channel does not have an alias, then the regular channel
870
// update from disk will be returned.
871
func (l *channelLink) createFailureWithUpdate(incoming bool,
872
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
3✔
873

3✔
874
        // Determine which SCID to use in case we need to use aliases in the
3✔
875
        // ChannelUpdate.
3✔
876
        scid := outgoingScid
3✔
877
        if incoming {
3✔
878
                scid = l.ShortChanID()
×
879
        }
×
880

881
        // Try using the FailAliasUpdate function. If it returns nil, fallback
882
        // to the non-alias behavior.
883
        update := l.cfg.FailAliasUpdate(scid, incoming)
3✔
884
        if update == nil {
6✔
885
                // Fallback to the non-alias behavior.
3✔
886
                var err error
3✔
887
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
3✔
888
                if err != nil {
3✔
889
                        return &lnwire.FailTemporaryNodeFailure{}
×
890
                }
×
891
        }
892

893
        return cb(update)
3✔
894
}
895

896
// syncChanState attempts to synchronize channel states with the remote party.
897
// This method is to be called upon reconnection after the initial funding
898
// flow. We'll compare out commitment chains with the remote party, and re-send
899
// either a danging commit signature, a revocation, or both.
900
func (l *channelLink) syncChanStates(ctx context.Context) error {
3✔
901
        chanState := l.channel.State()
3✔
902

3✔
903
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
3✔
904

3✔
905
        // First, we'll generate our ChanSync message to send to the other
3✔
906
        // side. Based on this message, the remote party will decide if they
3✔
907
        // need to retransmit any data or not.
3✔
908
        localChanSyncMsg, err := chanState.ChanSyncMsg()
3✔
909
        if err != nil {
3✔
910
                return fmt.Errorf("unable to generate chan sync message for "+
×
911
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
912
        }
×
913
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
3✔
914
                return fmt.Errorf("unable to send chan sync message for "+
×
915
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
916
        }
×
917

918
        var msgsToReSend []lnwire.Message
3✔
919

3✔
920
        // Next, we'll wait indefinitely to receive the ChanSync message. The
3✔
921
        // first message sent MUST be the ChanSync message.
3✔
922
        select {
3✔
923
        case msg := <-l.upstream:
3✔
924
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
3✔
925
                        l.cfg.Peer.PubKey())
3✔
926

3✔
927
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
3✔
928
                if !ok {
3✔
929
                        return fmt.Errorf("first message sent to sync "+
×
930
                                "should be ChannelReestablish, instead "+
×
931
                                "received: %T", msg)
×
932
                }
×
933

934
                // If the remote party indicates that they think we haven't
935
                // done any state updates yet, then we'll retransmit the
936
                // channel_ready message first. We do this, as at this point
937
                // we can't be sure if they've really received the
938
                // ChannelReady message.
939
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
3✔
940
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
3✔
941
                        !l.channel.IsPending() {
6✔
942

3✔
943
                        l.log.Infof("resending ChannelReady message to peer")
3✔
944

3✔
945
                        nextRevocation, err := l.channel.NextRevocationKey()
3✔
946
                        if err != nil {
3✔
947
                                return fmt.Errorf("unable to create next "+
×
948
                                        "revocation: %v", err)
×
949
                        }
×
950

951
                        channelReadyMsg := lnwire.NewChannelReady(
3✔
952
                                l.ChanID(), nextRevocation,
3✔
953
                        )
3✔
954

3✔
955
                        // If this is a taproot channel, then we'll send the
3✔
956
                        // very same nonce that we sent above, as they should
3✔
957
                        // take the latest verification nonce we send.
3✔
958
                        if chanState.ChanType.IsTaproot() {
6✔
959
                                //nolint:ll
3✔
960
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
3✔
961
                        }
3✔
962

963
                        // For channels that negotiated the option-scid-alias
964
                        // feature bit, ensure that we send over the alias in
965
                        // the channel_ready message. We'll send the first
966
                        // alias we find for the channel since it does not
967
                        // matter which alias we send. We'll error out if no
968
                        // aliases are found.
969
                        if l.negotiatedAliasFeature() {
6✔
970
                                aliases := l.getAliases()
3✔
971
                                if len(aliases) == 0 {
3✔
972
                                        // This shouldn't happen since we
×
973
                                        // always add at least one alias before
×
974
                                        // the channel reaches the link.
×
975
                                        return fmt.Errorf("no aliases found")
×
976
                                }
×
977

978
                                // getAliases returns a copy of the alias slice
979
                                // so it is ok to use a pointer to the first
980
                                // entry.
981
                                channelReadyMsg.AliasScid = &aliases[0]
3✔
982
                        }
983

984
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
3✔
985
                        if err != nil {
3✔
986
                                return fmt.Errorf("unable to re-send "+
×
987
                                        "ChannelReady: %v", err)
×
988
                        }
×
989
                }
990

991
                // In any case, we'll then process their ChanSync message.
992
                l.log.Info("received re-establishment message from remote side")
3✔
993

3✔
994
                var (
3✔
995
                        openedCircuits []CircuitKey
3✔
996
                        closedCircuits []CircuitKey
3✔
997
                )
3✔
998

3✔
999
                // We've just received a ChanSync message from the remote
3✔
1000
                // party, so we'll process the message  in order to determine
3✔
1001
                // if we need to re-transmit any messages to the remote party.
3✔
1002
                ctx, cancel := l.cg.Create(ctx)
3✔
1003
                defer cancel()
3✔
1004
                msgsToReSend, openedCircuits, closedCircuits, err =
3✔
1005
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
3✔
1006
                if err != nil {
6✔
1007
                        return err
3✔
1008
                }
3✔
1009

1010
                // Repopulate any identifiers for circuits that may have been
1011
                // opened or unclosed. This may happen if we needed to
1012
                // retransmit a commitment signature message.
1013
                l.openedCircuits = openedCircuits
3✔
1014
                l.closedCircuits = closedCircuits
3✔
1015

3✔
1016
                // Ensure that all packets have been have been removed from the
3✔
1017
                // link's mailbox.
3✔
1018
                if err := l.ackDownStreamPackets(); err != nil {
3✔
1019
                        return err
×
1020
                }
×
1021

1022
                if len(msgsToReSend) > 0 {
3✔
UNCOV
1023
                        l.log.Infof("sending %v updates to synchronize the "+
×
UNCOV
1024
                                "state", len(msgsToReSend))
×
UNCOV
1025
                }
×
1026

1027
                // If we have any messages to retransmit, we'll do so
1028
                // immediately so we return to a synchronized state as soon as
1029
                // possible.
1030
                for _, msg := range msgsToReSend {
3✔
UNCOV
1031
                        l.cfg.Peer.SendMessage(false, msg)
×
UNCOV
1032
                }
×
1033

1034
        case <-l.cg.Done():
3✔
1035
                return ErrLinkShuttingDown
3✔
1036
        }
1037

1038
        return nil
3✔
1039
}
1040

1041
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1042
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1043
// we previously received are reinstated in memory, and forwarded to the switch
1044
// if necessary. After a restart, this will also delete any previously
1045
// completed packages.
1046
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
3✔
1047
        fwdPkgs, err := l.channel.LoadFwdPkgs()
3✔
1048
        if err != nil {
3✔
UNCOV
1049
                return err
×
UNCOV
1050
        }
×
1051

1052
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
3✔
1053

3✔
1054
        for _, fwdPkg := range fwdPkgs {
6✔
1055
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
3✔
1056
                        return err
×
1057
                }
×
1058
        }
1059

1060
        // If any of our reprocessing steps require an update to the commitment
1061
        // txn, we initiate a state transition to capture all relevant changes.
1062
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
6✔
1063
                return l.updateCommitTx(ctx)
3✔
1064
        }
3✔
1065

1066
        return nil
3✔
1067
}
1068

1069
// resolveFwdPkg interprets the FwdState of the provided package, either
1070
// reprocesses any outstanding htlcs in the package, or performs garbage
1071
// collection on the package.
1072
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
3✔
1073
        // Remove any completed packages to clear up space.
3✔
1074
        if fwdPkg.State == channeldb.FwdStateCompleted {
6✔
1075
                l.log.Debugf("removing completed fwd pkg for height=%d",
3✔
1076
                        fwdPkg.Height)
3✔
1077

3✔
1078
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
3✔
1079
                if err != nil {
3✔
1080
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1081
                                "%v", fwdPkg.Height, err)
×
1082
                        return err
×
1083
                }
×
1084
        }
1085

1086
        // Otherwise this is either a new package or one has gone through
1087
        // processing, but contains htlcs that need to be restored in memory.
1088
        // We replay this forwarding package to make sure our local mem state
1089
        // is resurrected, we mimic any original responses back to the remote
1090
        // party, and re-forward the relevant HTLCs to the switch.
1091

1092
        // If the package is fully acked but not completed, it must still have
1093
        // settles and fails to propagate.
1094
        if !fwdPkg.SettleFailFilter.IsFull() {
6✔
1095
                l.processRemoteSettleFails(fwdPkg)
3✔
1096
        }
3✔
1097

1098
        // Finally, replay *ALL ADDS* in this forwarding package. The
1099
        // downstream logic is able to filter out any duplicates, but we must
1100
        // shove the entire, original set of adds down the pipeline so that the
1101
        // batch of adds presented to the sphinx router does not ever change.
1102
        if !fwdPkg.AckFilter.IsFull() {
6✔
1103
                l.processRemoteAdds(fwdPkg)
3✔
1104

3✔
1105
                // If the link failed during processing the adds, we must
3✔
1106
                // return to ensure we won't attempted to update the state
3✔
1107
                // further.
3✔
1108
                if l.failed {
3✔
1109
                        return fmt.Errorf("link failed while " +
×
1110
                                "processing remote adds")
×
1111
                }
×
1112
        }
1113

1114
        return nil
3✔
1115
}
1116

1117
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1118
// removes those that can be discarded. It is safe to do this entirely in the
1119
// background, since all state is coordinated on disk. This also ensures the
1120
// link can continue to process messages and interleave database accesses.
1121
//
1122
// NOTE: This MUST be run as a goroutine.
1123
func (l *channelLink) fwdPkgGarbager() {
3✔
1124
        defer l.cg.WgDone()
3✔
1125

3✔
1126
        l.cfg.FwdPkgGCTicker.Resume()
3✔
1127
        defer l.cfg.FwdPkgGCTicker.Stop()
3✔
1128

3✔
1129
        if err := l.loadAndRemove(); err != nil {
3✔
1130
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1131
        }
×
1132

1133
        for {
6✔
1134
                select {
3✔
UNCOV
1135
                case <-l.cfg.FwdPkgGCTicker.Ticks():
×
UNCOV
1136
                        if err := l.loadAndRemove(); err != nil {
×
UNCOV
1137
                                l.log.Warnf("unable to remove fwd pkgs: %v",
×
UNCOV
1138
                                        err)
×
UNCOV
1139
                                continue
×
1140
                        }
1141
                case <-l.cg.Done():
3✔
1142
                        return
3✔
1143
                }
1144
        }
1145
}
1146

1147
// loadAndRemove loads all the channels forwarding packages and determines if
1148
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1149
// a longer tick interval can be used.
1150
func (l *channelLink) loadAndRemove() error {
3✔
1151
        fwdPkgs, err := l.channel.LoadFwdPkgs()
3✔
1152
        if err != nil {
3✔
UNCOV
1153
                return err
×
UNCOV
1154
        }
×
1155

1156
        var removeHeights []uint64
3✔
1157
        for _, fwdPkg := range fwdPkgs {
6✔
1158
                if fwdPkg.State != channeldb.FwdStateCompleted {
6✔
1159
                        continue
3✔
1160
                }
1161

1162
                removeHeights = append(removeHeights, fwdPkg.Height)
3✔
1163
        }
1164

1165
        // If removeHeights is empty, return early so we don't use a db
1166
        // transaction.
1167
        if len(removeHeights) == 0 {
6✔
1168
                return nil
3✔
1169
        }
3✔
1170

1171
        return l.channel.RemoveFwdPkgs(removeHeights...)
3✔
1172
}
1173

1174
// handleChanSyncErr performs the error handling logic in the case where we
1175
// could not successfully syncChanStates with our channel peer.
1176
func (l *channelLink) handleChanSyncErr(err error) {
3✔
1177
        l.log.Warnf("error when syncing channel states: %v", err)
3✔
1178

3✔
1179
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
3✔
1180

3✔
1181
        switch {
3✔
1182
        case errors.Is(err, ErrLinkShuttingDown):
3✔
1183
                l.log.Debugf("unable to sync channel states, link is " +
3✔
1184
                        "shutting down")
3✔
1185
                return
3✔
1186

1187
        // We failed syncing the commit chains, probably because the remote has
1188
        // lost state. We should force close the channel.
1189
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
3✔
1190
                fallthrough
3✔
1191

1192
        // The remote sent us an invalid last commit secret, we should force
1193
        // close the channel.
1194
        // TODO(halseth): and permanently ban the peer?
1195
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
3✔
1196
                fallthrough
3✔
1197

1198
        // The remote sent us a commit point different from what they sent us
1199
        // before.
1200
        // TODO(halseth): ban peer?
1201
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
3✔
1202
                // We'll fail the link and tell the peer to force close the
3✔
1203
                // channel. Note that the database state is not updated here,
3✔
1204
                // but will be updated when the close transaction is ready to
3✔
1205
                // avoid that we go down before storing the transaction in the
3✔
1206
                // db.
3✔
1207
                l.failf(
3✔
1208
                        LinkFailureError{
3✔
1209
                                code:          ErrSyncError,
3✔
1210
                                FailureAction: LinkFailureForceClose,
3✔
1211
                        },
3✔
1212
                        "unable to synchronize channel states: %v", err,
3✔
1213
                )
3✔
1214

1215
        // We have lost state and cannot safely force close the channel. Fail
1216
        // the channel and wait for the remote to hopefully force close it. The
1217
        // remote has sent us its latest unrevoked commitment point, and we'll
1218
        // store it in the database, such that we can attempt to recover the
1219
        // funds if the remote force closes the channel.
1220
        case errors.As(err, &errDataLoss):
3✔
1221
                err := l.channel.MarkDataLoss(
3✔
1222
                        errDataLoss.CommitPoint,
3✔
1223
                )
3✔
1224
                if err != nil {
3✔
1225
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1226
                                err)
×
1227
                }
×
1228

1229
        // We determined the commit chains were not possible to sync. We
1230
        // cautiously fail the channel, but don't force close.
1231
        // TODO(halseth): can we safely force close in any cases where this
1232
        // error is returned?
1233
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1234
                if err := l.channel.MarkBorked(); err != nil {
×
1235
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1236
                }
×
1237

1238
        // Other, unspecified error.
1239
        default:
×
1240
        }
1241

1242
        l.failf(
3✔
1243
                LinkFailureError{
3✔
1244
                        code:          ErrRecoveryError,
3✔
1245
                        FailureAction: LinkFailureForceNone,
3✔
1246
                },
3✔
1247
                "unable to synchronize channel states: %v", err,
3✔
1248
        )
3✔
1249
}
1250

1251
// htlcManager is the primary goroutine which drives a channel's commitment
1252
// update state-machine in response to messages received via several channels.
1253
// This goroutine reads messages from the upstream (remote) peer, and also from
1254
// downstream channel managed by the channel link. In the event that an htlc
1255
// needs to be forwarded, then send-only forward handler is used which sends
1256
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1257
// all timeouts for any active HTLCs, manages the channel's revocation window,
1258
// and also the htlc trickle queue+timer for this active channels.
1259
//
1260
// NOTE: This MUST be run as a goroutine.
1261
//
1262
//nolint:funlen
1263
func (l *channelLink) htlcManager(ctx context.Context) {
3✔
1264
        defer func() {
6✔
1265
                l.cfg.BatchTicker.Stop()
3✔
1266
                l.cg.WgDone()
3✔
1267
                l.log.Infof("exited")
3✔
1268
        }()
3✔
1269

1270
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
3✔
1271

3✔
1272
        // Notify any clients that the link is now in the switch via an
3✔
1273
        // ActiveLinkEvent. We'll also defer an inactive link notification for
3✔
1274
        // when the link exits to ensure that every active notification is
3✔
1275
        // matched by an inactive one.
3✔
1276
        l.cfg.NotifyActiveLink(l.ChannelPoint())
3✔
1277
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
3✔
1278

3✔
1279
        // TODO(roasbeef): need to call wipe chan whenever D/C?
3✔
1280

3✔
1281
        // If this isn't the first time that this channel link has been
3✔
1282
        // created, then we'll need to check to see if we need to
3✔
1283
        // re-synchronize state with the remote peer. settledHtlcs is a map of
3✔
1284
        // HTLC's that we re-settled as part of the channel state sync.
3✔
1285
        if l.cfg.SyncStates {
6✔
1286
                err := l.syncChanStates(ctx)
3✔
1287
                if err != nil {
6✔
1288
                        l.handleChanSyncErr(err)
3✔
1289
                        return
3✔
1290
                }
3✔
1291
        }
1292

1293
        // If a shutdown message has previously been sent on this link, then we
1294
        // need to make sure that we have disabled any HTLC adds on the outgoing
1295
        // direction of the link and that we re-resend the same shutdown message
1296
        // that we previously sent.
1297
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
6✔
1298
                // Immediately disallow any new outgoing HTLCs.
3✔
1299
                if !l.DisableAdds(Outgoing) {
3✔
1300
                        l.log.Warnf("Outgoing link adds already disabled")
×
1301
                }
×
1302

1303
                // Re-send the shutdown message the peer. Since syncChanStates
1304
                // would have sent any outstanding CommitSig, it is fine for us
1305
                // to immediately queue the shutdown message now.
1306
                err := l.cfg.Peer.SendMessage(false, &shutdown)
3✔
1307
                if err != nil {
3✔
1308
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1309
                }
×
1310
        })
1311

1312
        // We've successfully reestablished the channel, mark it as such to
1313
        // allow the switch to forward HTLCs in the outbound direction.
1314
        l.markReestablished()
3✔
1315

3✔
1316
        // Now that we've received both channel_ready and channel reestablish,
3✔
1317
        // we can go ahead and send the active channel notification. We'll also
3✔
1318
        // defer the inactive notification for when the link exits to ensure
3✔
1319
        // that every active notification is matched by an inactive one.
3✔
1320
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
3✔
1321
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
3✔
1322

3✔
1323
        // With the channel states synced, we now reset the mailbox to ensure
3✔
1324
        // we start processing all unacked packets in order. This is done here
3✔
1325
        // to ensure that all acknowledgments that occur during channel
3✔
1326
        // resynchronization have taken affect, causing us only to pull unacked
3✔
1327
        // packets after starting to read from the downstream mailbox.
3✔
1328
        l.mailBox.ResetPackets()
3✔
1329

3✔
1330
        // After cleaning up any memory pertaining to incoming packets, we now
3✔
1331
        // replay our forwarding packages to handle any htlcs that can be
3✔
1332
        // processed locally, or need to be forwarded out to the switch. We will
3✔
1333
        // only attempt to resolve packages if our short chan id indicates that
3✔
1334
        // the channel is not pending, otherwise we should have no htlcs to
3✔
1335
        // reforward.
3✔
1336
        if l.ShortChanID() != hop.Source {
6✔
1337
                err := l.resolveFwdPkgs(ctx)
3✔
1338
                switch err {
3✔
1339
                // No error was encountered, success.
1340
                case nil:
3✔
1341

1342
                // If the duplicate keystone error was encountered, we'll fail
1343
                // without sending an Error message to the peer.
1344
                case ErrDuplicateKeystone:
×
1345
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1346
                                "temporary circuit error: %v", err)
×
1347
                        return
×
1348

1349
                // A non-nil error was encountered, send an Error message to
1350
                // the peer.
UNCOV
1351
                default:
×
UNCOV
1352
                        l.failf(LinkFailureError{code: ErrInternalError},
×
UNCOV
1353
                                "unable to resolve fwd pkgs: %v", err)
×
UNCOV
1354
                        return
×
1355
                }
1356

1357
                // With our link's in-memory state fully reconstructed, spawn a
1358
                // goroutine to manage the reclamation of disk space occupied by
1359
                // completed forwarding packages.
1360
                l.cg.WgAdd(1)
3✔
1361
                go l.fwdPkgGarbager()
3✔
1362
        }
1363

1364
        for {
6✔
1365
                // We must always check if we failed at some point processing
3✔
1366
                // the last update before processing the next.
3✔
1367
                if l.failed {
6✔
1368
                        l.log.Errorf("link failed, exiting htlcManager")
3✔
1369
                        return
3✔
1370
                }
3✔
1371

1372
                // If the previous event resulted in a non-empty batch, resume
1373
                // the batch ticker so that it can be cleared. Otherwise pause
1374
                // the ticker to prevent waking up the htlcManager while the
1375
                // batch is empty.
1376
                numUpdates := l.channel.NumPendingUpdates(
3✔
1377
                        lntypes.Local, lntypes.Remote,
3✔
1378
                )
3✔
1379
                if numUpdates > 0 {
6✔
1380
                        l.cfg.BatchTicker.Resume()
3✔
1381
                        l.log.Tracef("BatchTicker resumed, "+
3✔
1382
                                "NumPendingUpdates(Local, Remote)=%d",
3✔
1383
                                numUpdates,
3✔
1384
                        )
3✔
1385
                } else {
6✔
1386
                        l.cfg.BatchTicker.Pause()
3✔
1387
                        l.log.Trace("BatchTicker paused due to zero " +
3✔
1388
                                "NumPendingUpdates(Local, Remote)")
3✔
1389
                }
3✔
1390

1391
                select {
3✔
1392
                // We have a new hook that needs to be run when we reach a clean
1393
                // channel state.
1394
                case hook := <-l.flushHooks.newTransients:
3✔
1395
                        if l.channel.IsChannelClean() {
6✔
1396
                                hook()
3✔
1397
                        } else {
6✔
1398
                                l.flushHooks.alloc(hook)
3✔
1399
                        }
3✔
1400

1401
                // We have a new hook that needs to be run when we have
1402
                // committed all of our updates.
1403
                case hook := <-l.outgoingCommitHooks.newTransients:
3✔
1404
                        if !l.channel.OweCommitment() {
6✔
1405
                                hook()
3✔
1406
                        } else {
3✔
UNCOV
1407
                                l.outgoingCommitHooks.alloc(hook)
×
UNCOV
1408
                        }
×
1409

1410
                // We have a new hook that needs to be run when our peer has
1411
                // committed all of their updates.
1412
                case hook := <-l.incomingCommitHooks.newTransients:
×
1413
                        if !l.channel.NeedCommitment() {
×
1414
                                hook()
×
1415
                        } else {
×
1416
                                l.incomingCommitHooks.alloc(hook)
×
1417
                        }
×
1418

1419
                // Our update fee timer has fired, so we'll check the network
1420
                // fee to see if we should adjust our commitment fee.
UNCOV
1421
                case <-l.updateFeeTimer.C:
×
UNCOV
1422
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
×
UNCOV
1423

×
UNCOV
1424
                        // If we're not the initiator of the channel, don't we
×
UNCOV
1425
                        // don't control the fees, so we can ignore this.
×
UNCOV
1426
                        if !l.channel.IsInitiator() {
×
1427
                                continue
×
1428
                        }
1429

1430
                        // If we are the initiator, then we'll sample the
1431
                        // current fee rate to get into the chain within 3
1432
                        // blocks.
UNCOV
1433
                        netFee, err := l.sampleNetworkFee()
×
UNCOV
1434
                        if err != nil {
×
1435
                                l.log.Errorf("unable to sample network fee: %v",
×
1436
                                        err)
×
1437
                                continue
×
1438
                        }
1439

UNCOV
1440
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
1441

×
UNCOV
1442
                        newCommitFee := l.channel.IdealCommitFeeRate(
×
UNCOV
1443
                                netFee, minRelayFee,
×
UNCOV
1444
                                l.cfg.MaxAnchorsCommitFeeRate,
×
UNCOV
1445
                                l.cfg.MaxFeeAllocation,
×
UNCOV
1446
                        )
×
UNCOV
1447

×
UNCOV
1448
                        // We determine if we should adjust the commitment fee
×
UNCOV
1449
                        // based on the current commitment fee, the suggested
×
UNCOV
1450
                        // new commitment fee and the current minimum relay fee
×
UNCOV
1451
                        // rate.
×
UNCOV
1452
                        commitFee := l.channel.CommitFeeRate()
×
UNCOV
1453
                        if !shouldAdjustCommitFee(
×
UNCOV
1454
                                newCommitFee, commitFee, minRelayFee,
×
UNCOV
1455
                        ) {
×
UNCOV
1456

×
UNCOV
1457
                                continue
×
1458
                        }
1459

1460
                        // If we do, then we'll send a new UpdateFee message to
1461
                        // the remote party, to be locked in with a new update.
UNCOV
1462
                        err = l.updateChannelFee(ctx, newCommitFee)
×
UNCOV
1463
                        if err != nil {
×
1464
                                l.log.Errorf("unable to update fee rate: %v",
×
1465
                                        err)
×
1466
                                continue
×
1467
                        }
1468

1469
                // The underlying channel has notified us of a unilateral close
1470
                // carried out by the remote peer. In the case of such an
1471
                // event, we'll wipe the channel state from the peer, and mark
1472
                // the contract as fully settled. Afterwards we can exit.
1473
                //
1474
                // TODO(roasbeef): add force closure? also breach?
1475
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
3✔
1476
                        l.log.Warnf("remote peer has closed on-chain")
3✔
1477

3✔
1478
                        // TODO(roasbeef): remove all together
3✔
1479
                        go func() {
6✔
1480
                                chanPoint := l.channel.ChannelPoint()
3✔
1481
                                l.cfg.Peer.WipeChannel(&chanPoint)
3✔
1482
                        }()
3✔
1483

1484
                        return
3✔
1485

1486
                case <-l.cfg.BatchTicker.Ticks():
3✔
1487
                        // Attempt to extend the remote commitment chain
3✔
1488
                        // including all the currently pending entries. If the
3✔
1489
                        // send was unsuccessful, then abandon the update,
3✔
1490
                        // waiting for the revocation window to open up.
3✔
1491
                        if !l.updateCommitTxOrFail(ctx) {
3✔
1492
                                return
×
1493
                        }
×
1494

UNCOV
1495
                case <-l.cfg.PendingCommitTicker.Ticks():
×
UNCOV
1496
                        l.failf(
×
UNCOV
1497
                                LinkFailureError{
×
UNCOV
1498
                                        code:          ErrRemoteUnresponsive,
×
UNCOV
1499
                                        FailureAction: LinkFailureDisconnect,
×
UNCOV
1500
                                },
×
UNCOV
1501
                                "unable to complete dance",
×
UNCOV
1502
                        )
×
UNCOV
1503
                        return
×
1504

1505
                // A message from the switch was just received. This indicates
1506
                // that the link is an intermediate hop in a multi-hop HTLC
1507
                // circuit.
1508
                case pkt := <-l.downstream:
3✔
1509
                        l.handleDownstreamPkt(ctx, pkt)
3✔
1510

1511
                // A message from the connected peer was just received. This
1512
                // indicates that we have a new incoming HTLC, either directly
1513
                // for us, or part of a multi-hop HTLC circuit.
1514
                case msg := <-l.upstream:
3✔
1515
                        l.handleUpstreamMsg(ctx, msg)
3✔
1516

1517
                // A htlc resolution is received. This means that we now have a
1518
                // resolution for a previously accepted htlc.
1519
                case hodlItem := <-l.hodlQueue.ChanOut():
3✔
1520
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
3✔
1521
                        err := l.processHodlQueue(ctx, htlcResolution)
3✔
1522
                        switch err {
3✔
1523
                        // No error, success.
1524
                        case nil:
3✔
1525

1526
                        // If the duplicate keystone error was encountered,
1527
                        // fail back gracefully.
1528
                        case ErrDuplicateKeystone:
×
1529
                                l.failf(LinkFailureError{
×
1530
                                        code: ErrCircuitError,
×
1531
                                }, "process hodl queue: "+
×
1532
                                        "temporary circuit error: %v",
×
1533
                                        err,
×
1534
                                )
×
1535

1536
                        // Send an Error message to the peer.
UNCOV
1537
                        default:
×
UNCOV
1538
                                l.failf(LinkFailureError{
×
UNCOV
1539
                                        code: ErrInternalError,
×
UNCOV
1540
                                }, "process hodl queue: unable to update "+
×
UNCOV
1541
                                        "commitment: %v", err,
×
UNCOV
1542
                                )
×
1543
                        }
1544

1545
                case qReq := <-l.quiescenceReqs:
3✔
1546
                        l.quiescer.InitStfu(qReq)
3✔
1547

3✔
1548
                        if l.noDanglingUpdates(lntypes.Local) {
6✔
1549
                                err := l.quiescer.SendOwedStfu()
3✔
1550
                                if err != nil {
3✔
1551
                                        l.stfuFailf(
×
1552
                                                "SendOwedStfu: %s", err.Error(),
×
1553
                                        )
×
1554
                                        res := fn.Err[lntypes.ChannelParty](err)
×
1555
                                        qReq.Resolve(res)
×
1556
                                }
×
1557
                        }
1558

1559
                case <-l.cg.Done():
3✔
1560
                        return
3✔
1561
                }
1562
        }
1563
}
1564

1565
// processHodlQueue processes a received htlc resolution and continues reading
1566
// from the hodl queue until no more resolutions remain. When this function
1567
// returns without an error, the commit tx should be updated.
1568
func (l *channelLink) processHodlQueue(ctx context.Context,
1569
        firstResolution invoices.HtlcResolution) error {
3✔
1570

3✔
1571
        // Try to read all waiting resolution messages, so that they can all be
3✔
1572
        // processed in a single commitment tx update.
3✔
1573
        htlcResolution := firstResolution
3✔
1574
loop:
3✔
1575
        for {
6✔
1576
                // Lookup all hodl htlcs that can be failed or settled with this event.
3✔
1577
                // The hodl htlc must be present in the map.
3✔
1578
                circuitKey := htlcResolution.CircuitKey()
3✔
1579
                hodlHtlc, ok := l.hodlMap[circuitKey]
3✔
1580
                if !ok {
3✔
1581
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1582
                }
×
1583

1584
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
3✔
1585
                        return err
×
1586
                }
×
1587

1588
                // Clean up hodl map.
1589
                delete(l.hodlMap, circuitKey)
3✔
1590

3✔
1591
                select {
3✔
1592
                case item := <-l.hodlQueue.ChanOut():
3✔
1593
                        htlcResolution = item.(invoices.HtlcResolution)
3✔
1594

1595
                // No need to process it if the link is broken.
1596
                case <-l.cg.Done():
×
1597
                        return ErrLinkShuttingDown
×
1598

1599
                default:
3✔
1600
                        break loop
3✔
1601
                }
1602
        }
1603

1604
        // Update the commitment tx.
1605
        if err := l.updateCommitTx(ctx); err != nil {
3✔
UNCOV
1606
                return err
×
UNCOV
1607
        }
×
1608

1609
        return nil
3✔
1610
}
1611

1612
// processHtlcResolution applies a received htlc resolution to the provided
1613
// htlc. When this function returns without an error, the commit tx should be
1614
// updated.
1615
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1616
        htlc hodlHtlc) error {
3✔
1617

3✔
1618
        circuitKey := resolution.CircuitKey()
3✔
1619

3✔
1620
        // Determine required action for the resolution based on the type of
3✔
1621
        // resolution we have received.
3✔
1622
        switch res := resolution.(type) {
3✔
1623
        // Settle htlcs that returned a settle resolution using the preimage
1624
        // in the resolution.
1625
        case *invoices.HtlcSettleResolution:
3✔
1626
                l.log.Debugf("received settle resolution for %v "+
3✔
1627
                        "with outcome: %v", circuitKey, res.Outcome)
3✔
1628

3✔
1629
                return l.settleHTLC(
3✔
1630
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
3✔
1631
                )
3✔
1632

1633
        // For htlc failures, we get the relevant failure message based
1634
        // on the failure resolution and then fail the htlc.
1635
        case *invoices.HtlcFailResolution:
3✔
1636
                l.log.Debugf("received cancel resolution for "+
3✔
1637
                        "%v with outcome: %v", circuitKey, res.Outcome)
3✔
1638

3✔
1639
                // Get the lnwire failure message based on the resolution
3✔
1640
                // result.
3✔
1641
                failure := getResolutionFailure(res, htlc.add.Amount)
3✔
1642

3✔
1643
                l.sendHTLCError(
3✔
1644
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
3✔
1645
                        true,
3✔
1646
                )
3✔
1647
                return nil
3✔
1648

1649
        // Fail if we do not get a settle of fail resolution, since we
1650
        // are only expecting to handle settles and fails.
1651
        default:
×
1652
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1653
                        resolution)
×
1654
        }
1655
}
1656

1657
// getResolutionFailure returns the wire message that a htlc resolution should
1658
// be failed with.
1659
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1660
        amount lnwire.MilliSatoshi) *LinkError {
3✔
1661

3✔
1662
        // If the resolution has been resolved as part of a MPP timeout,
3✔
1663
        // we need to fail the htlc with lnwire.FailMppTimeout.
3✔
1664
        if resolution.Outcome == invoices.ResultMppTimeout {
3✔
1665
                return NewDetailedLinkError(
×
1666
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1667
                )
×
1668
        }
×
1669

1670
        // If the htlc is not a MPP timeout, we fail it with
1671
        // FailIncorrectDetails. This error is sent for invoice payment
1672
        // failures such as underpayment/ expiry too soon and hodl invoices
1673
        // (which return FailIncorrectDetails to avoid leaking information).
1674
        incorrectDetails := lnwire.NewFailIncorrectDetails(
3✔
1675
                amount, uint32(resolution.AcceptHeight),
3✔
1676
        )
3✔
1677

3✔
1678
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
3✔
1679
}
1680

1681
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1682
// within the link's configuration that will be used to determine when the link
1683
// should propose an update to its commitment fee rate.
1684
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
3✔
1685
        lower := int64(l.cfg.MinUpdateTimeout)
3✔
1686
        upper := int64(l.cfg.MaxUpdateTimeout)
3✔
1687
        return time.Duration(prand.Int63n(upper-lower) + lower)
3✔
1688
}
3✔
1689

1690
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1691
// downstream HTLC Switch.
1692
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1693
        pkt *htlcPacket) error {
3✔
1694

3✔
1695
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
3✔
1696
        if !ok {
3✔
1697
                return errors.New("not an UpdateAddHTLC packet")
×
1698
        }
×
1699

1700
        // If we are flushing the link in the outgoing direction or we have
1701
        // already sent Stfu, then we can't add new htlcs to the link and we
1702
        // need to bounce it.
1703
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
3✔
1704
                l.mailBox.FailAdd(pkt)
×
1705

×
1706
                return NewDetailedLinkError(
×
1707
                        &lnwire.FailTemporaryChannelFailure{},
×
1708
                        OutgoingFailureLinkNotEligible,
×
1709
                )
×
1710
        }
×
1711

1712
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1713
        // arbitrary delays between the switch adding an ADD to the
1714
        // mailbox, and the HTLC being added to the commitment state.
1715
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
3✔
1716
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1717
                l.mailBox.AckPacket(pkt.inKey())
×
1718
                return nil
×
1719
        }
×
1720

1721
        // Check if we can add the HTLC here without exceededing the max fee
1722
        // exposure threshold.
1723
        if l.isOverexposedWithHtlc(htlc, false) {
3✔
UNCOV
1724
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
×
UNCOV
1725
                        "exposure exceeded")
×
UNCOV
1726

×
UNCOV
1727
                l.mailBox.FailAdd(pkt)
×
UNCOV
1728

×
UNCOV
1729
                return NewDetailedLinkError(
×
UNCOV
1730
                        lnwire.NewTemporaryChannelFailure(nil),
×
UNCOV
1731
                        OutgoingFailureDownstreamHtlcAdd,
×
UNCOV
1732
                )
×
UNCOV
1733
        }
×
1734

1735
        // A new payment has been initiated via the downstream channel,
1736
        // so we add the new HTLC to our local log, then update the
1737
        // commitment chains.
1738
        htlc.ChanID = l.ChanID()
3✔
1739
        openCircuitRef := pkt.inKey()
3✔
1740

3✔
1741
        // We enforce the fee buffer for the commitment transaction because
3✔
1742
        // we are in control of adding this htlc. Nothing has locked-in yet so
3✔
1743
        // we can securely enforce the fee buffer which is only relevant if we
3✔
1744
        // are the initiator of the channel.
3✔
1745
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
3✔
1746
        if err != nil {
6✔
1747
                // The HTLC was unable to be added to the state machine,
3✔
1748
                // as a result, we'll signal the switch to cancel the
3✔
1749
                // pending payment.
3✔
1750
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
3✔
1751
                        err)
3✔
1752

3✔
1753
                // Remove this packet from the link's mailbox, this
3✔
1754
                // prevents it from being reprocessed if the link
3✔
1755
                // restarts and resets it mailbox. If this response
3✔
1756
                // doesn't make it back to the originating link, it will
3✔
1757
                // be rejected upon attempting to reforward the Add to
3✔
1758
                // the switch, since the circuit was never fully opened,
3✔
1759
                // and the forwarding package shows it as
3✔
1760
                // unacknowledged.
3✔
1761
                l.mailBox.FailAdd(pkt)
3✔
1762

3✔
1763
                return NewDetailedLinkError(
3✔
1764
                        lnwire.NewTemporaryChannelFailure(nil),
3✔
1765
                        OutgoingFailureDownstreamHtlcAdd,
3✔
1766
                )
3✔
1767
        }
3✔
1768

1769
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
3✔
1770
                "local_log_index=%v, pend_updates=%v",
3✔
1771
                htlc.PaymentHash[:], index,
3✔
1772
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
3✔
1773

3✔
1774
        pkt.outgoingChanID = l.ShortChanID()
3✔
1775
        pkt.outgoingHTLCID = index
3✔
1776
        htlc.ID = index
3✔
1777

3✔
1778
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
3✔
1779
                pkt.inKey(), pkt.outKey())
3✔
1780

3✔
1781
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
3✔
1782
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
3✔
1783

3✔
1784
        _ = l.cfg.Peer.SendMessage(false, htlc)
3✔
1785

3✔
1786
        // Send a forward event notification to htlcNotifier.
3✔
1787
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
3✔
1788
                newHtlcKey(pkt),
3✔
1789
                HtlcInfo{
3✔
1790
                        IncomingTimeLock: pkt.incomingTimeout,
3✔
1791
                        IncomingAmt:      pkt.incomingAmount,
3✔
1792
                        OutgoingTimeLock: htlc.Expiry,
3✔
1793
                        OutgoingAmt:      htlc.Amount,
3✔
1794
                },
3✔
1795
                getEventType(pkt),
3✔
1796
        )
3✔
1797

3✔
1798
        l.tryBatchUpdateCommitTx(ctx)
3✔
1799

3✔
1800
        return nil
3✔
1801
}
1802

1803
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1804
// Switch. Possible messages sent by the switch include requests to forward new
1805
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1806
// cleared HTLCs with the upstream peer.
1807
//
1808
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1809
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1810
        pkt *htlcPacket) {
3✔
1811

3✔
1812
        if pkt.htlc.MsgType().IsChannelUpdate() &&
3✔
1813
                !l.quiescer.CanSendUpdates() {
3✔
1814

×
1815
                l.log.Warnf("unable to process channel update. "+
×
1816
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1817

×
1818
                return
×
1819
        }
×
1820

1821
        switch htlc := pkt.htlc.(type) {
3✔
1822
        case *lnwire.UpdateAddHTLC:
3✔
1823
                // Handle add message. The returned error can be ignored,
3✔
1824
                // because it is also sent through the mailbox.
3✔
1825
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
3✔
1826

1827
        case *lnwire.UpdateFulfillHTLC:
3✔
1828
                // If hodl.SettleOutgoing mode is active, we exit early to
3✔
1829
                // simulate arbitrary delays between the switch adding the
3✔
1830
                // SETTLE to the mailbox, and the HTLC being added to the
3✔
1831
                // commitment state.
3✔
1832
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
3✔
1833
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1834
                        l.mailBox.AckPacket(pkt.inKey())
×
1835
                        return
×
1836
                }
×
1837

1838
                // An HTLC we forward to the switch has just settled somewhere
1839
                // upstream. Therefore we settle the HTLC within the our local
1840
                // state machine.
1841
                inKey := pkt.inKey()
3✔
1842
                err := l.channel.SettleHTLC(
3✔
1843
                        htlc.PaymentPreimage,
3✔
1844
                        pkt.incomingHTLCID,
3✔
1845
                        pkt.sourceRef,
3✔
1846
                        pkt.destRef,
3✔
1847
                        &inKey,
3✔
1848
                )
3✔
1849
                if err != nil {
3✔
1850
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1851
                                "circuit-key=%v: %v", inKey, err)
×
1852

×
1853
                        // If the HTLC index for Settle response was not known
×
1854
                        // to our commitment state, it has already been
×
1855
                        // cleaned up by a prior response. We'll thus try to
×
1856
                        // clean up any lingering state to ensure we don't
×
1857
                        // continue reforwarding.
×
1858
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1859
                                l.cleanupSpuriousResponse(pkt)
×
1860
                        }
×
1861

1862
                        // Remove the packet from the link's mailbox to ensure
1863
                        // it doesn't get replayed after a reconnection.
1864
                        l.mailBox.AckPacket(inKey)
×
1865

×
1866
                        return
×
1867
                }
1868

1869
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
3✔
1870
                        "%s->%s", pkt.inKey(), pkt.outKey())
3✔
1871

3✔
1872
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
3✔
1873

3✔
1874
                // With the HTLC settled, we'll need to populate the wire
3✔
1875
                // message to target the specific channel and HTLC to be
3✔
1876
                // canceled.
3✔
1877
                htlc.ChanID = l.ChanID()
3✔
1878
                htlc.ID = pkt.incomingHTLCID
3✔
1879

3✔
1880
                // Then we send the HTLC settle message to the connected peer
3✔
1881
                // so we can continue the propagation of the settle message.
3✔
1882
                l.cfg.Peer.SendMessage(false, htlc)
3✔
1883

3✔
1884
                // Send a settle event notification to htlcNotifier.
3✔
1885
                l.cfg.HtlcNotifier.NotifySettleEvent(
3✔
1886
                        newHtlcKey(pkt),
3✔
1887
                        htlc.PaymentPreimage,
3✔
1888
                        getEventType(pkt),
3✔
1889
                )
3✔
1890

3✔
1891
                // Immediately update the commitment tx to minimize latency.
3✔
1892
                l.updateCommitTxOrFail(ctx)
3✔
1893

1894
        case *lnwire.UpdateFailHTLC:
3✔
1895
                // If hodl.FailOutgoing mode is active, we exit early to
3✔
1896
                // simulate arbitrary delays between the switch adding a FAIL to
3✔
1897
                // the mailbox, and the HTLC being added to the commitment
3✔
1898
                // state.
3✔
1899
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
3✔
1900
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1901
                        l.mailBox.AckPacket(pkt.inKey())
×
1902
                        return
×
1903
                }
×
1904

1905
                // An HTLC cancellation has been triggered somewhere upstream,
1906
                // we'll remove then HTLC from our local state machine.
1907
                inKey := pkt.inKey()
3✔
1908
                err := l.channel.FailHTLC(
3✔
1909
                        pkt.incomingHTLCID,
3✔
1910
                        htlc.Reason,
3✔
1911
                        pkt.sourceRef,
3✔
1912
                        pkt.destRef,
3✔
1913
                        &inKey,
3✔
1914
                )
3✔
1915
                if err != nil {
6✔
1916
                        l.log.Errorf("unable to cancel incoming HTLC for "+
3✔
1917
                                "circuit-key=%v: %v", inKey, err)
3✔
1918

3✔
1919
                        // If the HTLC index for Fail response was not known to
3✔
1920
                        // our commitment state, it has already been cleaned up
3✔
1921
                        // by a prior response. We'll thus try to clean up any
3✔
1922
                        // lingering state to ensure we don't continue
3✔
1923
                        // reforwarding.
3✔
1924
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
3✔
UNCOV
1925
                                l.cleanupSpuriousResponse(pkt)
×
UNCOV
1926
                        }
×
1927

1928
                        // Remove the packet from the link's mailbox to ensure
1929
                        // it doesn't get replayed after a reconnection.
1930
                        l.mailBox.AckPacket(inKey)
3✔
1931

3✔
1932
                        return
3✔
1933
                }
1934

1935
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
3✔
1936
                        pkt.inKey(), pkt.outKey())
3✔
1937

3✔
1938
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
3✔
1939

3✔
1940
                // With the HTLC removed, we'll need to populate the wire
3✔
1941
                // message to target the specific channel and HTLC to be
3✔
1942
                // canceled. The "Reason" field will have already been set
3✔
1943
                // within the switch.
3✔
1944
                htlc.ChanID = l.ChanID()
3✔
1945
                htlc.ID = pkt.incomingHTLCID
3✔
1946

3✔
1947
                // We send the HTLC message to the peer which initially created
3✔
1948
                // the HTLC. If the incoming blinding point is non-nil, we
3✔
1949
                // know that we are a relaying node in a blinded path.
3✔
1950
                // Otherwise, we're either an introduction node or not part of
3✔
1951
                // a blinded path at all.
3✔
1952
                if err := l.sendIncomingHTLCFailureMsg(
3✔
1953
                        htlc.ID,
3✔
1954
                        pkt.obfuscator,
3✔
1955
                        htlc.Reason,
3✔
1956
                ); err != nil {
3✔
1957
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1958
                                err)
×
1959

×
1960
                        return
×
1961
                }
×
1962

1963
                // If the packet does not have a link failure set, it failed
1964
                // further down the route so we notify a forwarding failure.
1965
                // Otherwise, we notify a link failure because it failed at our
1966
                // node.
1967
                if pkt.linkFailure != nil {
6✔
1968
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
3✔
1969
                                newHtlcKey(pkt),
3✔
1970
                                newHtlcInfo(pkt),
3✔
1971
                                getEventType(pkt),
3✔
1972
                                pkt.linkFailure,
3✔
1973
                                false,
3✔
1974
                        )
3✔
1975
                } else {
6✔
1976
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
3✔
1977
                                newHtlcKey(pkt), getEventType(pkt),
3✔
1978
                        )
3✔
1979
                }
3✔
1980

1981
                // Immediately update the commitment tx to minimize latency.
1982
                l.updateCommitTxOrFail(ctx)
3✔
1983
        }
1984
}
1985

1986
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1987
// full.
1988
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
3✔
1989
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
3✔
1990
        if pending < uint64(l.cfg.BatchSize) {
6✔
1991
                return
3✔
1992
        }
3✔
1993

1994
        l.updateCommitTxOrFail(ctx)
3✔
1995
}
1996

1997
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1998
// associated with this packet. If successful in doing so, it will also purge
1999
// the open circuit from the circuit map and remove the packet from the link's
2000
// mailbox.
UNCOV
2001
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
×
UNCOV
2002
        inKey := pkt.inKey()
×
UNCOV
2003

×
UNCOV
2004
        l.log.Debugf("cleaning up spurious response for incoming "+
×
UNCOV
2005
                "circuit-key=%v", inKey)
×
UNCOV
2006

×
UNCOV
2007
        // If the htlc packet doesn't have a source reference, it is unsafe to
×
UNCOV
2008
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
×
UNCOV
2009
        if pkt.sourceRef == nil {
×
UNCOV
2010
                l.log.Errorf("unable to cleanup response for incoming "+
×
UNCOV
2011
                        "circuit-key=%v, does not contain source reference",
×
UNCOV
2012
                        inKey)
×
UNCOV
2013
                return
×
UNCOV
2014
        }
×
2015

2016
        // If the source reference is present,  we will try to prevent this link
2017
        // from resending the packet to the switch. To do so, we ack the AddRef
2018
        // of the incoming HTLC belonging to this link.
UNCOV
2019
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
×
UNCOV
2020
        if err != nil {
×
2021
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2022
                        "circuit-key=%v: %v", inKey, err)
×
2023

×
2024
                // If this operation failed, it is unsafe to attempt removal of
×
2025
                // the destination reference or circuit, so we exit early. The
×
2026
                // cleanup may proceed with a different packet in the future
×
2027
                // that succeeds on this step.
×
2028
                return
×
2029
        }
×
2030

2031
        // Now that we know this link will stop retransmitting Adds to the
2032
        // switch, we can begin to teardown the response reference and circuit
2033
        // map.
2034
        //
2035
        // If the packet includes a destination reference, then a response for
2036
        // this HTLC was locked into the outgoing channel. Attempt to remove
2037
        // this reference, so we stop retransmitting the response internally.
2038
        // Even if this fails, we will proceed in trying to delete the circuit.
2039
        // When retransmitting responses, the destination references will be
2040
        // cleaned up if an open circuit is not found in the circuit map.
UNCOV
2041
        if pkt.destRef != nil {
×
2042
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2043
                if err != nil {
×
2044
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2045
                                "for incoming circuit-key=%v: %v",
×
2046
                                inKey, err)
×
2047
                }
×
2048
        }
2049

UNCOV
2050
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
×
UNCOV
2051

×
UNCOV
2052
        // With all known references acked, we can now safely delete the circuit
×
UNCOV
2053
        // from the switch's circuit map, as the state is no longer needed.
×
UNCOV
2054
        err = l.cfg.Circuits.DeleteCircuits(inKey)
×
UNCOV
2055
        if err != nil {
×
2056
                l.log.Errorf("unable to delete circuit for "+
×
2057
                        "circuit-key=%v: %v", inKey, err)
×
2058
        }
×
2059
}
2060

2061
// handleUpstreamMsg processes wire messages related to commitment state
2062
// updates from the upstream peer. The upstream peer is the peer whom we have a
2063
// direct channel with, updating our respective commitment chains.
2064
//
2065
//nolint:funlen
2066
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
2067
        msg lnwire.Message) {
3✔
2068

3✔
2069
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
3✔
2070
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
3✔
2071

3✔
2072
        // First check if the message is an update and we are capable of
3✔
2073
        // receiving updates right now.
3✔
2074
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3✔
2075
                l.stfuFailf("update received after stfu: %T", msg)
×
2076
                return
×
2077
        }
×
2078

2079
        switch msg := msg.(type) {
3✔
2080
        case *lnwire.UpdateAddHTLC:
3✔
2081
                if l.IsFlushing(Incoming) {
3✔
2082
                        // This is forbidden by the protocol specification.
×
2083
                        // The best chance we have to deal with this is to drop
×
2084
                        // the connection. This should roll back the channel
×
2085
                        // state to the last CommitSig. If the remote has
×
2086
                        // already sent a CommitSig we haven't received yet,
×
2087
                        // channel state will be re-synchronized with a
×
2088
                        // ChannelReestablish message upon reconnection and the
×
2089
                        // protocol state that caused us to flush the link will
×
2090
                        // be rolled back. In the event that there was some
×
2091
                        // non-deterministic behavior in the remote that caused
×
2092
                        // them to violate the protocol, we have a decent shot
×
2093
                        // at correcting it this way, since reconnecting will
×
2094
                        // put us in the cleanest possible state to try again.
×
2095
                        //
×
2096
                        // In addition to the above, it is possible for us to
×
2097
                        // hit this case in situations where we improperly
×
2098
                        // handle message ordering due to concurrency choices.
×
2099
                        // An issue has been filed to address this here:
×
2100
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2101
                        l.failf(
×
2102
                                LinkFailureError{
×
2103
                                        code:             ErrInvalidUpdate,
×
2104
                                        FailureAction:    LinkFailureDisconnect,
×
2105
                                        PermanentFailure: false,
×
2106
                                        Warning:          true,
×
2107
                                },
×
2108
                                "received add while link is flushing",
×
2109
                        )
×
2110

×
2111
                        return
×
2112
                }
×
2113

2114
                // Disallow htlcs with blinding points set if we haven't
2115
                // enabled the feature. This saves us from having to process
2116
                // the onion at all, but will only catch blinded payments
2117
                // where we are a relaying node (as the blinding point will
2118
                // be in the payload when we're the introduction node).
2119
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
3✔
2120
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2121
                                "blinding point included when route blinding "+
×
2122
                                        "is disabled")
×
2123

×
2124
                        return
×
2125
                }
×
2126

2127
                // We have to check the limit here rather than later in the
2128
                // switch because the counterparty can keep sending HTLC's
2129
                // without sending a revoke. This would mean that the switch
2130
                // check would only occur later.
2131
                if l.isOverexposedWithHtlc(msg, true) {
3✔
2132
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2133
                                "peer sent us an HTLC that exceeded our max "+
×
2134
                                        "fee exposure")
×
2135

×
2136
                        return
×
2137
                }
×
2138

2139
                // We just received an add request from an upstream peer, so we
2140
                // add it to our state machine, then add the HTLC to our
2141
                // "settle" list in the event that we know the preimage.
2142
                index, err := l.channel.ReceiveHTLC(msg)
3✔
2143
                if err != nil {
3✔
2144
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2145
                                "unable to handle upstream add HTLC: %v", err)
×
2146
                        return
×
2147
                }
×
2148

2149
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
3✔
2150
                        "assigning index: %v", msg.PaymentHash[:], index)
3✔
2151

2152
        case *lnwire.UpdateFulfillHTLC:
3✔
2153
                pre := msg.PaymentPreimage
3✔
2154
                idx := msg.ID
3✔
2155

3✔
2156
                // Before we pipeline the settle, we'll check the set of active
3✔
2157
                // htlc's to see if the related UpdateAddHTLC has been fully
3✔
2158
                // locked-in.
3✔
2159
                var lockedin bool
3✔
2160
                htlcs := l.channel.ActiveHtlcs()
3✔
2161
                for _, add := range htlcs {
6✔
2162
                        // The HTLC will be outgoing and match idx.
3✔
2163
                        if !add.Incoming && add.HtlcIndex == idx {
6✔
2164
                                lockedin = true
3✔
2165
                                break
3✔
2166
                        }
2167
                }
2168

2169
                if !lockedin {
3✔
UNCOV
2170
                        l.failf(
×
UNCOV
2171
                                LinkFailureError{code: ErrInvalidUpdate},
×
UNCOV
2172
                                "unable to handle upstream settle",
×
UNCOV
2173
                        )
×
UNCOV
2174
                        return
×
UNCOV
2175
                }
×
2176

2177
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
6✔
2178
                        l.failf(
3✔
2179
                                LinkFailureError{
3✔
2180
                                        code:          ErrInvalidUpdate,
3✔
2181
                                        FailureAction: LinkFailureForceClose,
3✔
2182
                                },
3✔
2183
                                "unable to handle upstream settle HTLC: %v", err,
3✔
2184
                        )
3✔
2185
                        return
3✔
2186
                }
3✔
2187

2188
                settlePacket := &htlcPacket{
3✔
2189
                        outgoingChanID: l.ShortChanID(),
3✔
2190
                        outgoingHTLCID: idx,
3✔
2191
                        htlc: &lnwire.UpdateFulfillHTLC{
3✔
2192
                                PaymentPreimage: pre,
3✔
2193
                        },
3✔
2194
                }
3✔
2195

3✔
2196
                // Add the newly discovered preimage to our growing list of
3✔
2197
                // uncommitted preimage. These will be written to the witness
3✔
2198
                // cache just before accepting the next commitment signature
3✔
2199
                // from the remote peer.
3✔
2200
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
3✔
2201

3✔
2202
                // Pipeline this settle, send it to the switch.
3✔
2203
                go l.forwardBatch(false, settlePacket)
3✔
2204

2205
        case *lnwire.UpdateFailMalformedHTLC:
3✔
2206
                // Convert the failure type encoded within the HTLC fail
3✔
2207
                // message to the proper generic lnwire error code.
3✔
2208
                var failure lnwire.FailureMessage
3✔
2209
                switch msg.FailureCode {
3✔
2210
                case lnwire.CodeInvalidOnionVersion:
3✔
2211
                        failure = &lnwire.FailInvalidOnionVersion{
3✔
2212
                                OnionSHA256: msg.ShaOnionBlob,
3✔
2213
                        }
3✔
2214
                case lnwire.CodeInvalidOnionHmac:
×
2215
                        failure = &lnwire.FailInvalidOnionHmac{
×
2216
                                OnionSHA256: msg.ShaOnionBlob,
×
2217
                        }
×
2218

2219
                case lnwire.CodeInvalidOnionKey:
×
2220
                        failure = &lnwire.FailInvalidOnionKey{
×
2221
                                OnionSHA256: msg.ShaOnionBlob,
×
2222
                        }
×
2223

2224
                // Handle malformed errors that are part of a blinded route.
2225
                // This case is slightly different, because we expect every
2226
                // relaying node in the blinded portion of the route to send
2227
                // malformed errors. If we're also a relaying node, we're
2228
                // likely going to switch this error out anyway for our own
2229
                // malformed error, but we handle the case here for
2230
                // completeness.
2231
                case lnwire.CodeInvalidBlinding:
3✔
2232
                        failure = &lnwire.FailInvalidBlinding{
3✔
2233
                                OnionSHA256: msg.ShaOnionBlob,
3✔
2234
                        }
3✔
2235

UNCOV
2236
                default:
×
UNCOV
2237
                        l.log.Warnf("unexpected failure code received in "+
×
UNCOV
2238
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
×
UNCOV
2239

×
UNCOV
2240
                        // We don't just pass back the error we received from
×
UNCOV
2241
                        // our successor. Otherwise we might report a failure
×
UNCOV
2242
                        // that penalizes us more than needed. If the onion that
×
UNCOV
2243
                        // we forwarded was correct, the node should have been
×
UNCOV
2244
                        // able to send back its own failure. The node did not
×
UNCOV
2245
                        // send back its own failure, so we assume there was a
×
UNCOV
2246
                        // problem with the onion and report that back. We reuse
×
UNCOV
2247
                        // the invalid onion key failure because there is no
×
UNCOV
2248
                        // specific error for this case.
×
UNCOV
2249
                        failure = &lnwire.FailInvalidOnionKey{
×
UNCOV
2250
                                OnionSHA256: msg.ShaOnionBlob,
×
UNCOV
2251
                        }
×
2252
                }
2253

2254
                // With the error parsed, we'll convert the into it's opaque
2255
                // form.
2256
                var b bytes.Buffer
3✔
2257
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
3✔
2258
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2259
                        return
×
2260
                }
×
2261

2262
                // If remote side have been unable to parse the onion blob we
2263
                // have sent to it, than we should transform the malformed HTLC
2264
                // message to the usual HTLC fail message.
2265
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
3✔
2266
                if err != nil {
3✔
2267
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2268
                                "unable to handle upstream fail HTLC: %v", err)
×
2269
                        return
×
2270
                }
×
2271

2272
        case *lnwire.UpdateFailHTLC:
3✔
2273
                // Verify that the failure reason is at least 256 bytes plus
3✔
2274
                // overhead.
3✔
2275
                const minimumFailReasonLength = lnwire.FailureMessageLength +
3✔
2276
                        2 + 2 + 32
3✔
2277

3✔
2278
                if len(msg.Reason) < minimumFailReasonLength {
3✔
UNCOV
2279
                        // We've received a reason with a non-compliant length.
×
UNCOV
2280
                        // Older nodes happily relay back these failures that
×
UNCOV
2281
                        // may originate from a node further downstream.
×
UNCOV
2282
                        // Therefore we can't just fail the channel.
×
UNCOV
2283
                        //
×
UNCOV
2284
                        // We want to be compliant ourselves, so we also can't
×
UNCOV
2285
                        // pass back the reason unmodified. And we must make
×
UNCOV
2286
                        // sure that we don't hit the magic length check of 260
×
UNCOV
2287
                        // bytes in processRemoteSettleFails either.
×
UNCOV
2288
                        //
×
UNCOV
2289
                        // Because the reason is unreadable for the payer
×
UNCOV
2290
                        // anyway, we just replace it by a compliant-length
×
UNCOV
2291
                        // series of random bytes.
×
UNCOV
2292
                        msg.Reason = make([]byte, minimumFailReasonLength)
×
UNCOV
2293
                        _, err := crand.Read(msg.Reason[:])
×
UNCOV
2294
                        if err != nil {
×
2295
                                l.log.Errorf("Random generation error: %v", err)
×
2296

×
2297
                                return
×
2298
                        }
×
2299
                }
2300

2301
                // Add fail to the update log.
2302
                idx := msg.ID
3✔
2303
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
3✔
2304
                if err != nil {
3✔
2305
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2306
                                "unable to handle upstream fail HTLC: %v", err)
×
2307
                        return
×
2308
                }
×
2309

2310
        case *lnwire.CommitSig:
3✔
2311
                // Since we may have learned new preimages for the first time,
3✔
2312
                // we'll add them to our preimage cache. By doing this, we
3✔
2313
                // ensure any contested contracts watched by any on-chain
3✔
2314
                // arbitrators can now sweep this HTLC on-chain. We delay
3✔
2315
                // committing the preimages until just before accepting the new
3✔
2316
                // remote commitment, as afterwards the peer won't resend the
3✔
2317
                // Settle messages on the next channel reestablishment. Doing so
3✔
2318
                // allows us to more effectively batch this operation, instead
3✔
2319
                // of doing a single write per preimage.
3✔
2320
                err := l.cfg.PreimageCache.AddPreimages(
3✔
2321
                        l.uncommittedPreimages...,
3✔
2322
                )
3✔
2323
                if err != nil {
3✔
2324
                        l.failf(
×
2325
                                LinkFailureError{code: ErrInternalError},
×
2326
                                "unable to add preimages=%v to cache: %v",
×
2327
                                l.uncommittedPreimages, err,
×
2328
                        )
×
2329
                        return
×
2330
                }
×
2331

2332
                // Instead of truncating the slice to conserve memory
2333
                // allocations, we simply set the uncommitted preimage slice to
2334
                // nil so that a new one will be initialized if any more
2335
                // witnesses are discovered. We do this because the maximum size
2336
                // that the slice can occupy is 15KB, and we want to ensure we
2337
                // release that memory back to the runtime.
2338
                l.uncommittedPreimages = nil
3✔
2339

3✔
2340
                // We just received a new updates to our local commitment
3✔
2341
                // chain, validate this new commitment, closing the link if
3✔
2342
                // invalid.
3✔
2343
                auxSigBlob, err := msg.CustomRecords.Serialize()
3✔
2344
                if err != nil {
3✔
2345
                        l.failf(
×
2346
                                LinkFailureError{code: ErrInvalidCommitment},
×
2347
                                "unable to serialize custom records: %v", err,
×
2348
                        )
×
2349

×
2350
                        return
×
2351
                }
×
2352
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
3✔
2353
                        CommitSig:  msg.CommitSig,
3✔
2354
                        HtlcSigs:   msg.HtlcSigs,
3✔
2355
                        PartialSig: msg.PartialSig,
3✔
2356
                        AuxSigBlob: auxSigBlob,
3✔
2357
                })
3✔
2358
                if err != nil {
3✔
2359
                        // If we were unable to reconstruct their proposed
×
2360
                        // commitment, then we'll examine the type of error. If
×
2361
                        // it's an InvalidCommitSigError, then we'll send a
×
2362
                        // direct error.
×
2363
                        var sendData []byte
×
2364
                        switch err.(type) {
×
2365
                        case *lnwallet.InvalidCommitSigError:
×
2366
                                sendData = []byte(err.Error())
×
2367
                        case *lnwallet.InvalidHtlcSigError:
×
2368
                                sendData = []byte(err.Error())
×
2369
                        }
2370
                        l.failf(
×
2371
                                LinkFailureError{
×
2372
                                        code:          ErrInvalidCommitment,
×
2373
                                        FailureAction: LinkFailureForceClose,
×
2374
                                        SendData:      sendData,
×
2375
                                },
×
2376
                                "ChannelPoint(%v): unable to accept new "+
×
2377
                                        "commitment: %v",
×
2378
                                l.channel.ChannelPoint(), err,
×
2379
                        )
×
2380
                        return
×
2381
                }
2382

2383
                // As we've just accepted a new state, we'll now
2384
                // immediately send the remote peer a revocation for our prior
2385
                // state.
2386
                nextRevocation, currentHtlcs, finalHTLCs, err :=
3✔
2387
                        l.channel.RevokeCurrentCommitment()
3✔
2388
                if err != nil {
3✔
2389
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2390

×
2391
                        // We need to fail the channel in case revoking our
×
2392
                        // local commitment does not succeed. We might have
×
2393
                        // already advanced our channel state which would lead
×
2394
                        // us to proceed with an unclean state.
×
2395
                        //
×
2396
                        // NOTE: We do not trigger a force close because this
×
2397
                        // could resolve itself in case our db was just busy
×
2398
                        // not accepting new transactions.
×
2399
                        l.failf(
×
2400
                                LinkFailureError{
×
2401
                                        code:          ErrInternalError,
×
2402
                                        Warning:       true,
×
2403
                                        FailureAction: LinkFailureDisconnect,
×
2404
                                },
×
2405
                                "ChannelPoint(%v): unable to accept new "+
×
2406
                                        "commitment: %v",
×
2407
                                l.channel.ChannelPoint(), err,
×
2408
                        )
×
2409
                        return
×
2410
                }
×
2411

2412
                // As soon as we are ready to send our next revocation, we can
2413
                // invoke the incoming commit hooks.
2414
                l.RWMutex.Lock()
3✔
2415
                l.incomingCommitHooks.invoke()
3✔
2416
                l.RWMutex.Unlock()
3✔
2417

3✔
2418
                l.cfg.Peer.SendMessage(false, nextRevocation)
3✔
2419

3✔
2420
                // Notify the incoming htlcs of which the resolutions were
3✔
2421
                // locked in.
3✔
2422
                for id, settled := range finalHTLCs {
6✔
2423
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
3✔
2424
                                models.CircuitKey{
3✔
2425
                                        ChanID: l.ShortChanID(),
3✔
2426
                                        HtlcID: id,
3✔
2427
                                },
3✔
2428
                                channeldb.FinalHtlcInfo{
3✔
2429
                                        Settled:  settled,
3✔
2430
                                        Offchain: true,
3✔
2431
                                },
3✔
2432
                        )
3✔
2433
                }
3✔
2434

2435
                // Since we just revoked our commitment, we may have a new set
2436
                // of HTLC's on our commitment, so we'll send them using our
2437
                // function closure NotifyContractUpdate.
2438
                newUpdate := &contractcourt.ContractUpdate{
3✔
2439
                        HtlcKey: contractcourt.LocalHtlcSet,
3✔
2440
                        Htlcs:   currentHtlcs,
3✔
2441
                }
3✔
2442
                err = l.cfg.NotifyContractUpdate(newUpdate)
3✔
2443
                if err != nil {
3✔
2444
                        l.log.Errorf("unable to notify contract update: %v",
×
2445
                                err)
×
2446
                        return
×
2447
                }
×
2448

2449
                select {
3✔
2450
                case <-l.cg.Done():
×
2451
                        return
×
2452
                default:
3✔
2453
                }
2454

2455
                // If the remote party initiated the state transition,
2456
                // we'll reply with a signature to provide them with their
2457
                // version of the latest commitment. Otherwise, both commitment
2458
                // chains are fully synced from our PoV, then we don't need to
2459
                // reply with a signature as both sides already have a
2460
                // commitment with the latest accepted.
2461
                if l.channel.OweCommitment() {
6✔
2462
                        if !l.updateCommitTxOrFail(ctx) {
3✔
2463
                                return
×
2464
                        }
×
2465
                }
2466

2467
                // If we need to send out an Stfu, this would be the time to do
2468
                // so.
2469
                if l.noDanglingUpdates(lntypes.Local) {
6✔
2470
                        err = l.quiescer.SendOwedStfu()
3✔
2471
                        if err != nil {
3✔
2472
                                l.stfuFailf("sendOwedStfu: %v", err.Error())
×
2473
                        }
×
2474
                }
2475

2476
                // Now that we have finished processing the incoming CommitSig
2477
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2478
                // the channel state is clean.
2479
                l.RWMutex.Lock()
3✔
2480
                if l.channel.IsChannelClean() {
6✔
2481
                        l.flushHooks.invoke()
3✔
2482
                }
3✔
2483
                l.RWMutex.Unlock()
3✔
2484

2485
        case *lnwire.RevokeAndAck:
3✔
2486
                // We've received a revocation from the remote chain, if valid,
3✔
2487
                // this moves the remote chain forward, and expands our
3✔
2488
                // revocation window.
3✔
2489

3✔
2490
                // We now process the message and advance our remote commit
3✔
2491
                // chain.
3✔
2492
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
3✔
2493
                if err != nil {
3✔
2494
                        // TODO(halseth): force close?
×
2495
                        l.failf(
×
2496
                                LinkFailureError{
×
2497
                                        code:          ErrInvalidRevocation,
×
2498
                                        FailureAction: LinkFailureDisconnect,
×
2499
                                },
×
2500
                                "unable to accept revocation: %v", err,
×
2501
                        )
×
2502
                        return
×
2503
                }
×
2504

2505
                // The remote party now has a new primary commitment, so we'll
2506
                // update the contract court to be aware of this new set (the
2507
                // prior old remote pending).
2508
                newUpdate := &contractcourt.ContractUpdate{
3✔
2509
                        HtlcKey: contractcourt.RemoteHtlcSet,
3✔
2510
                        Htlcs:   remoteHTLCs,
3✔
2511
                }
3✔
2512
                err = l.cfg.NotifyContractUpdate(newUpdate)
3✔
2513
                if err != nil {
3✔
2514
                        l.log.Errorf("unable to notify contract update: %v",
×
2515
                                err)
×
2516
                        return
×
2517
                }
×
2518

2519
                select {
3✔
2520
                case <-l.cg.Done():
×
2521
                        return
×
2522
                default:
3✔
2523
                }
2524

2525
                // If we have a tower client for this channel type, we'll
2526
                // create a backup for the current state.
2527
                if l.cfg.TowerClient != nil {
6✔
2528
                        state := l.channel.State()
3✔
2529
                        chanID := l.ChanID()
3✔
2530

3✔
2531
                        err = l.cfg.TowerClient.BackupState(
3✔
2532
                                &chanID, state.RemoteCommitment.CommitHeight-1,
3✔
2533
                        )
3✔
2534
                        if err != nil {
3✔
2535
                                l.failf(LinkFailureError{
×
2536
                                        code: ErrInternalError,
×
2537
                                }, "unable to queue breach backup: %v", err)
×
2538
                                return
×
2539
                        }
×
2540
                }
2541

2542
                // If we can send updates then we can process adds in case we
2543
                // are the exit hop and need to send back resolutions, or in
2544
                // case there are validity issues with the packets. Otherwise
2545
                // we defer the action until resume.
2546
                //
2547
                // We are free to process the settles and fails without this
2548
                // check since processing those can't result in further updates
2549
                // to this channel link.
2550
                if l.quiescer.CanSendUpdates() {
6✔
2551
                        l.processRemoteAdds(fwdPkg)
3✔
2552
                } else {
3✔
UNCOV
2553
                        l.quiescer.OnResume(func() {
×
2554
                                l.processRemoteAdds(fwdPkg)
×
2555
                        })
×
2556
                }
2557
                l.processRemoteSettleFails(fwdPkg)
3✔
2558

3✔
2559
                // If the link failed during processing the adds, we must
3✔
2560
                // return to ensure we won't attempted to update the state
3✔
2561
                // further.
3✔
2562
                if l.failed {
3✔
2563
                        return
×
2564
                }
×
2565

2566
                // The revocation window opened up. If there are pending local
2567
                // updates, try to update the commit tx. Pending updates could
2568
                // already have been present because of a previously failed
2569
                // update to the commit tx or freshly added in by
2570
                // processRemoteAdds. Also in case there are no local updates,
2571
                // but there are still remote updates that are not in the remote
2572
                // commit tx yet, send out an update.
2573
                if l.channel.OweCommitment() {
6✔
2574
                        if !l.updateCommitTxOrFail(ctx) {
3✔
UNCOV
2575
                                return
×
UNCOV
2576
                        }
×
2577
                }
2578

2579
                // Now that we have finished processing the RevokeAndAck, we
2580
                // can invoke the flushHooks if the channel state is clean.
2581
                l.RWMutex.Lock()
3✔
2582
                if l.channel.IsChannelClean() {
6✔
2583
                        l.flushHooks.invoke()
3✔
2584
                }
3✔
2585
                l.RWMutex.Unlock()
3✔
2586

UNCOV
2587
        case *lnwire.UpdateFee:
×
UNCOV
2588
                // Check and see if their proposed fee-rate would make us
×
UNCOV
2589
                // exceed the fee threshold.
×
UNCOV
2590
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
×
UNCOV
2591

×
UNCOV
2592
                isDust, err := l.exceedsFeeExposureLimit(fee)
×
UNCOV
2593
                if err != nil {
×
2594
                        // This shouldn't typically happen. If it does, it
×
2595
                        // indicates something is wrong with our channel state.
×
2596
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2597
                                "exceeded")
×
2598
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2599
                                "error calculating fee exposure: %v", err)
×
2600

×
2601
                        return
×
2602
                }
×
2603

UNCOV
2604
                if isDust {
×
2605
                        // The proposed fee-rate makes us exceed the fee
×
2606
                        // threshold.
×
2607
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2608
                                "fee threshold exceeded: %v", err)
×
2609
                        return
×
2610
                }
×
2611

2612
                // We received fee update from peer. If we are the initiator we
2613
                // will fail the channel, if not we will apply the update.
UNCOV
2614
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
×
2615
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2616
                                "error receiving fee update: %v", err)
×
2617
                        return
×
2618
                }
×
2619

2620
                // Update the mailbox's feerate as well.
UNCOV
2621
                l.mailBox.SetFeeRate(fee)
×
2622

2623
        case *lnwire.Stfu:
3✔
2624
                err := l.handleStfu(msg)
3✔
2625
                if err != nil {
3✔
2626
                        l.stfuFailf("handleStfu: %v", err.Error())
×
2627
                }
×
2628

2629
        // In the case where we receive a warning message from our peer, just
2630
        // log it and move on. We choose not to disconnect from our peer,
2631
        // although we "MAY" do so according to the specification.
UNCOV
2632
        case *lnwire.Warning:
×
UNCOV
2633
                l.log.Warnf("received warning message from peer: %v",
×
UNCOV
2634
                        msg.Warning())
×
2635

2636
        case *lnwire.Error:
2✔
2637
                // Error received from remote, MUST fail channel, but should
2✔
2638
                // only print the contents of the error message if all
2✔
2639
                // characters are printable ASCII.
2✔
2640
                l.failf(
2✔
2641
                        LinkFailureError{
2✔
2642
                                code: ErrRemoteError,
2✔
2643

2✔
2644
                                // TODO(halseth): we currently don't fail the
2✔
2645
                                // channel permanently, as there are some sync
2✔
2646
                                // issues with other implementations that will
2✔
2647
                                // lead to them sending an error message, but
2✔
2648
                                // we can recover from on next connection. See
2✔
2649
                                // https://github.com/ElementsProject/lightning/issues/4212
2✔
2650
                                PermanentFailure: false,
2✔
2651
                        },
2✔
2652
                        "ChannelPoint(%v): received error from peer: %v",
2✔
2653
                        l.channel.ChannelPoint(), msg.Error(),
2✔
2654
                )
2✔
2655
        default:
×
2656
                l.log.Warnf("received unknown message of type %T", msg)
×
2657
        }
2658

2659
}
2660

2661
// handleStfu implements the top-level logic for handling the Stfu message from
2662
// our peer.
2663
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
3✔
2664
        if !l.noDanglingUpdates(lntypes.Remote) {
3✔
2665
                return ErrPendingRemoteUpdates
×
2666
        }
×
2667
        err := l.quiescer.RecvStfu(*stfu)
3✔
2668
        if err != nil {
3✔
2669
                return err
×
2670
        }
×
2671

2672
        // If we can immediately send an Stfu response back, we will.
2673
        if l.noDanglingUpdates(lntypes.Local) {
6✔
2674
                return l.quiescer.SendOwedStfu()
3✔
2675
        }
3✔
2676

UNCOV
2677
        return nil
×
2678
}
2679

2680
// stfuFailf fails the link in the case where the requirements of the quiescence
2681
// protocol are violated. In all cases we opt to drop the connection as only
2682
// link state (as opposed to channel state) is affected.
2683
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
2684
        l.failf(LinkFailureError{
×
2685
                code:             ErrStfuViolation,
×
2686
                FailureAction:    LinkFailureDisconnect,
×
2687
                PermanentFailure: false,
×
2688
                Warning:          true,
×
2689
        }, format, args...)
×
2690
}
×
2691

2692
// noDanglingUpdates returns true when there are 0 updates that were originally
2693
// issued by whose on either the Local or Remote commitment transaction.
2694
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
3✔
2695
        pendingOnLocal := l.channel.NumPendingUpdates(
3✔
2696
                whose, lntypes.Local,
3✔
2697
        )
3✔
2698
        pendingOnRemote := l.channel.NumPendingUpdates(
3✔
2699
                whose, lntypes.Remote,
3✔
2700
        )
3✔
2701

3✔
2702
        return pendingOnLocal == 0 && pendingOnRemote == 0
3✔
2703
}
3✔
2704

2705
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2706
// for packets delivered from server, and cleaning up any circuits closed by
2707
// signing a previous commitment txn. This method ensures that the circuits are
2708
// removed from the circuit map before removing them from the link's mailbox,
2709
// otherwise it could be possible for some circuit to be missed if this link
2710
// flaps.
2711
func (l *channelLink) ackDownStreamPackets() error {
3✔
2712
        // First, remove the downstream Add packets that were included in the
3✔
2713
        // previous commitment signature. This will prevent the Adds from being
3✔
2714
        // replayed if this link disconnects.
3✔
2715
        for _, inKey := range l.openedCircuits {
6✔
2716
                // In order to test the sphinx replay logic of the remote
3✔
2717
                // party, unsafe replay does not acknowledge the packets from
3✔
2718
                // the mailbox. We can then force a replay of any Add packets
3✔
2719
                // held in memory by disconnecting and reconnecting the link.
3✔
2720
                if l.cfg.UnsafeReplay {
6✔
2721
                        continue
3✔
2722
                }
2723

2724
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
3✔
2725
                l.mailBox.AckPacket(inKey)
3✔
2726
        }
2727

2728
        // Now, we will delete all circuits closed by the previous commitment
2729
        // signature, which is the result of downstream Settle/Fail packets. We
2730
        // batch them here to ensure circuits are closed atomically and for
2731
        // performance.
2732
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
3✔
2733
        switch err {
3✔
2734
        case nil:
3✔
2735
                // Successful deletion.
2736

2737
        default:
×
2738
                l.log.Errorf("unable to delete %d circuits: %v",
×
2739
                        len(l.closedCircuits), err)
×
2740
                return err
×
2741
        }
2742

2743
        // With the circuits removed from memory and disk, we now ack any
2744
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2745
        // after startup. If forgive is enabled and we've reached this point,
2746
        // the circuits must have been removed at some point, so it is now safe
2747
        // to un-queue the corresponding Settle/Fails.
2748
        for _, inKey := range l.closedCircuits {
6✔
2749
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
3✔
2750
                        inKey)
3✔
2751
                l.mailBox.AckPacket(inKey)
3✔
2752
        }
3✔
2753

2754
        // Lastly, reset our buffers to be empty while keeping any acquired
2755
        // growth in the backing array.
2756
        l.openedCircuits = l.openedCircuits[:0]
3✔
2757
        l.closedCircuits = l.closedCircuits[:0]
3✔
2758

3✔
2759
        return nil
3✔
2760
}
2761

2762
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2763
// the link.
2764
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
3✔
2765
        err := l.updateCommitTx(ctx)
3✔
2766
        switch err {
3✔
2767
        // No error encountered, success.
2768
        case nil:
3✔
2769

2770
        // A duplicate keystone error should be resolved and is not fatal, so
2771
        // we won't send an Error message to the peer.
2772
        case ErrDuplicateKeystone:
×
2773
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2774
                        "temporary circuit error: %v", err)
×
2775
                return false
×
2776

2777
        // Any other error is treated results in an Error message being sent to
2778
        // the peer.
UNCOV
2779
        default:
×
UNCOV
2780
                l.failf(LinkFailureError{code: ErrInternalError},
×
UNCOV
2781
                        "unable to update commitment: %v", err)
×
UNCOV
2782
                return false
×
2783
        }
2784

2785
        return true
3✔
2786
}
2787

2788
// updateCommitTx signs, then sends an update to the remote peer adding a new
2789
// commitment to their commitment chain which includes all the latest updates
2790
// we've received+processed up to this point.
2791
func (l *channelLink) updateCommitTx(ctx context.Context) error {
3✔
2792
        // Preemptively write all pending keystones to disk, just in case the
3✔
2793
        // HTLCs we have in memory are included in the subsequent attempt to
3✔
2794
        // sign a commitment state.
3✔
2795
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
3✔
2796
        if err != nil {
3✔
2797
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2798
                // it.
×
2799
                return err
×
2800
        }
×
2801

2802
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2803
        l.keystoneBatch = l.keystoneBatch[:0]
3✔
2804

3✔
2805
        // If hodl.Commit mode is active, we will refrain from attempting to
3✔
2806
        // commit any in-memory modifications to the channel state. Exiting here
3✔
2807
        // permits testing of either the switch or link's ability to trim
3✔
2808
        // circuits that have been opened, but unsuccessfully committed.
3✔
2809
        if l.cfg.HodlMask.Active(hodl.Commit) {
6✔
2810
                l.log.Warnf(hodl.Commit.Warning())
3✔
2811
                return nil
3✔
2812
        }
3✔
2813

2814
        ctx, done := l.cg.Create(ctx)
3✔
2815
        defer done()
3✔
2816

3✔
2817
        newCommit, err := l.channel.SignNextCommitment(ctx)
3✔
2818
        if err == lnwallet.ErrNoWindow {
6✔
2819
                l.cfg.PendingCommitTicker.Resume()
3✔
2820
                l.log.Trace("PendingCommitTicker resumed")
3✔
2821

3✔
2822
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
3✔
2823
                l.log.Tracef("revocation window exhausted, unable to send: "+
3✔
2824
                        "%v, pend_updates=%v, dangling_closes%v", n,
3✔
2825
                        lnutils.SpewLogClosure(l.openedCircuits),
3✔
2826
                        lnutils.SpewLogClosure(l.closedCircuits))
3✔
2827

3✔
2828
                return nil
3✔
2829
        } else if err != nil {
6✔
2830
                return err
×
2831
        }
×
2832

2833
        if err := l.ackDownStreamPackets(); err != nil {
3✔
2834
                return err
×
2835
        }
×
2836

2837
        l.cfg.PendingCommitTicker.Pause()
3✔
2838
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
3✔
2839

3✔
2840
        // The remote party now has a new pending commitment, so we'll update
3✔
2841
        // the contract court to be aware of this new set (the prior old remote
3✔
2842
        // pending).
3✔
2843
        newUpdate := &contractcourt.ContractUpdate{
3✔
2844
                HtlcKey: contractcourt.RemotePendingHtlcSet,
3✔
2845
                Htlcs:   newCommit.PendingHTLCs,
3✔
2846
        }
3✔
2847
        err = l.cfg.NotifyContractUpdate(newUpdate)
3✔
2848
        if err != nil {
3✔
2849
                l.log.Errorf("unable to notify contract update: %v", err)
×
2850
                return err
×
2851
        }
×
2852

2853
        select {
3✔
UNCOV
2854
        case <-l.cg.Done():
×
UNCOV
2855
                return ErrLinkShuttingDown
×
2856
        default:
3✔
2857
        }
2858

2859
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
3✔
2860
        if err != nil {
3✔
2861
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2862
        }
×
2863

2864
        commitSig := &lnwire.CommitSig{
3✔
2865
                ChanID:        l.ChanID(),
3✔
2866
                CommitSig:     newCommit.CommitSig,
3✔
2867
                HtlcSigs:      newCommit.HtlcSigs,
3✔
2868
                PartialSig:    newCommit.PartialSig,
3✔
2869
                CustomRecords: auxBlobRecords,
3✔
2870
        }
3✔
2871
        l.cfg.Peer.SendMessage(false, commitSig)
3✔
2872

3✔
2873
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
3✔
2874
        // of commit hooks.
3✔
2875
        l.RWMutex.Lock()
3✔
2876
        l.outgoingCommitHooks.invoke()
3✔
2877
        l.RWMutex.Unlock()
3✔
2878

3✔
2879
        return nil
3✔
2880
}
2881

2882
// Peer returns the representation of remote peer with which we have the
2883
// channel link opened.
2884
//
2885
// NOTE: Part of the ChannelLink interface.
2886
func (l *channelLink) PeerPubKey() [33]byte {
3✔
2887
        return l.cfg.Peer.PubKey()
3✔
2888
}
3✔
2889

2890
// ChannelPoint returns the channel outpoint for the channel link.
2891
// NOTE: Part of the ChannelLink interface.
2892
func (l *channelLink) ChannelPoint() wire.OutPoint {
3✔
2893
        return l.channel.ChannelPoint()
3✔
2894
}
3✔
2895

2896
// ShortChanID returns the short channel ID for the channel link. The short
2897
// channel ID encodes the exact location in the main chain that the original
2898
// funding output can be found.
2899
//
2900
// NOTE: Part of the ChannelLink interface.
2901
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
3✔
2902
        l.RLock()
3✔
2903
        defer l.RUnlock()
3✔
2904

3✔
2905
        return l.channel.ShortChanID()
3✔
2906
}
3✔
2907

2908
// UpdateShortChanID updates the short channel ID for a link. This may be
2909
// required in the event that a link is created before the short chan ID for it
2910
// is known, or a re-org occurs, and the funding transaction changes location
2911
// within the chain.
2912
//
2913
// NOTE: Part of the ChannelLink interface.
2914
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
3✔
2915
        chanID := l.ChanID()
3✔
2916

3✔
2917
        // Refresh the channel state's short channel ID by loading it from disk.
3✔
2918
        // This ensures that the channel state accurately reflects the updated
3✔
2919
        // short channel ID.
3✔
2920
        err := l.channel.State().Refresh()
3✔
2921
        if err != nil {
3✔
2922
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2923
                        "%v", chanID, err)
×
2924
                return hop.Source, err
×
2925
        }
×
2926

2927
        return hop.Source, nil
3✔
2928
}
2929

2930
// ChanID returns the channel ID for the channel link. The channel ID is a more
2931
// compact representation of a channel's full outpoint.
2932
//
2933
// NOTE: Part of the ChannelLink interface.
2934
func (l *channelLink) ChanID() lnwire.ChannelID {
3✔
2935
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3✔
2936
}
3✔
2937

2938
// Bandwidth returns the total amount that can flow through the channel link at
2939
// this given instance. The value returned is expressed in millisatoshi and can
2940
// be used by callers when making forwarding decisions to determine if a link
2941
// can accept an HTLC.
2942
//
2943
// NOTE: Part of the ChannelLink interface.
2944
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
3✔
2945
        // Get the balance available on the channel for new HTLCs. This takes
3✔
2946
        // the channel reserve into account so HTLCs up to this value won't
3✔
2947
        // violate it.
3✔
2948
        return l.channel.AvailableBalance()
3✔
2949
}
3✔
2950

2951
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2952
// amount provided to the link. This check does not reserve a space, since
2953
// forwards or other payments may use the available slot, so it should be
2954
// considered best-effort.
2955
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
3✔
2956
        return l.channel.MayAddOutgoingHtlc(amt)
3✔
2957
}
3✔
2958

2959
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2960
// method.
2961
//
2962
// NOTE: Part of the dustHandler interface.
2963
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2964
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
3✔
2965

3✔
2966
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
3✔
2967
}
3✔
2968

2969
// getFeeRate is a wrapper method that retrieves the underlying channel's
2970
// feerate.
2971
//
2972
// NOTE: Part of the dustHandler interface.
2973
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
3✔
2974
        return l.channel.CommitFeeRate()
3✔
2975
}
3✔
2976

2977
// getDustClosure returns a closure that can be used by the switch or mailbox
2978
// to evaluate whether a given HTLC is dust.
2979
//
2980
// NOTE: Part of the dustHandler interface.
2981
func (l *channelLink) getDustClosure() dustClosure {
3✔
2982
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
3✔
2983
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
3✔
2984
        chanType := l.channel.State().ChanType
3✔
2985

3✔
2986
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
3✔
2987
}
3✔
2988

2989
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2990
// is used so that the Switch can have access to the commitment fee without
2991
// needing to have a *LightningChannel. This doesn't include dust.
2992
//
2993
// NOTE: Part of the dustHandler interface.
2994
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
3✔
2995
        if remote {
6✔
2996
                return l.channel.State().RemoteCommitment.CommitFee
3✔
2997
        }
3✔
2998

2999
        return l.channel.State().LocalCommitment.CommitFee
3✔
3000
}
3001

3002
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
3003
// increases the total dust and fees within the channel past the configured
3004
// fee threshold. It first calculates the dust sum over every update in the
3005
// update log with the proposed fee-rate and taking into account both the local
3006
// and remote dust limits. It uses every update in the update log instead of
3007
// what is actually on the local and remote commitments because it is assumed
3008
// that in a worst-case scenario, every update in the update log could
3009
// theoretically be on either commitment transaction and this needs to be
3010
// accounted for with this fee-rate. It then calculates the local and remote
3011
// commitment fees given the proposed fee-rate. Finally, it tallies the results
3012
// and determines if the fee threshold has been exceeded.
3013
func (l *channelLink) exceedsFeeExposureLimit(
UNCOV
3014
        feePerKw chainfee.SatPerKWeight) (bool, error) {
×
UNCOV
3015

×
UNCOV
3016
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
×
UNCOV
3017

×
UNCOV
3018
        // Get the sum of dust for both the local and remote commitments using
×
UNCOV
3019
        // this "dry-run" fee.
×
UNCOV
3020
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
×
UNCOV
3021
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
×
UNCOV
3022

×
UNCOV
3023
        // Calculate the local and remote commitment fees using this dry-run
×
UNCOV
3024
        // fee.
×
UNCOV
3025
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
×
UNCOV
3026
        if err != nil {
×
3027
                return false, err
×
3028
        }
×
3029

3030
        // Finally, check whether the max fee exposure was exceeded on either
3031
        // future commitment transaction with the fee-rate.
UNCOV
3032
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
×
UNCOV
3033
        if totalLocalDust > l.cfg.MaxFeeExposure {
×
3034
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3035
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
3036
                        totalLocalDust, localFee)
×
3037

×
3038
                return true, nil
×
3039
        }
×
3040

UNCOV
3041
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
×
UNCOV
3042
                remoteFee,
×
UNCOV
3043
        )
×
UNCOV
3044

×
UNCOV
3045
        if totalRemoteDust > l.cfg.MaxFeeExposure {
×
3046
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3047
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
3048
                        totalRemoteDust, remoteFee)
×
3049

×
3050
                return true, nil
×
3051
        }
×
3052

UNCOV
3053
        return false, nil
×
3054
}
3055

3056
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
3057
// channel exceed the fee threshold. It first fetches the largest fee-rate that
3058
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3059
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3060
// the overall dust sum. If it is not dust, it contributes to weight, which
3061
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3062
// either commitment exceeds the configured fee threshold, this function
3063
// returns true.
3064
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3065
        incoming bool) bool {
3✔
3066

3✔
3067
        dustClosure := l.getDustClosure()
3✔
3068

3✔
3069
        feeRate := l.channel.WorstCaseFeeRate()
3✔
3070

3✔
3071
        amount := htlc.Amount.ToSatoshis()
3✔
3072

3✔
3073
        // See if this HTLC is dust on both the local and remote commitments.
3✔
3074
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
3✔
3075
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
3✔
3076

3✔
3077
        // Calculate the dust sum for the local and remote commitments.
3✔
3078
        localDustSum := l.getDustSum(
3✔
3079
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
3✔
3080
        )
3✔
3081
        remoteDustSum := l.getDustSum(
3✔
3082
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
3✔
3083
        )
3✔
3084

3✔
3085
        // Grab the larger of the local and remote commitment fees w/o dust.
3✔
3086
        commitFee := l.getCommitFee(false)
3✔
3087

3✔
3088
        if l.getCommitFee(true) > commitFee {
3✔
UNCOV
3089
                commitFee = l.getCommitFee(true)
×
UNCOV
3090
        }
×
3091

3092
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
3✔
3093

3✔
3094
        localDustSum += commitFeeMSat
3✔
3095
        remoteDustSum += commitFeeMSat
3✔
3096

3✔
3097
        // Calculate the additional fee increase if this is a non-dust HTLC.
3✔
3098
        weight := lntypes.WeightUnit(input.HTLCWeight)
3✔
3099
        additional := lnwire.NewMSatFromSatoshis(
3✔
3100
                feeRate.FeeForWeight(weight),
3✔
3101
        )
3✔
3102

3✔
3103
        if isLocalDust {
6✔
3104
                // If this is dust, it doesn't contribute to weight but does
3✔
3105
                // contribute to the overall dust sum.
3✔
3106
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
3✔
3107
        } else {
6✔
3108
                // Account for the fee increase that comes with an increase in
3✔
3109
                // weight.
3✔
3110
                localDustSum += additional
3✔
3111
        }
3✔
3112

3113
        if localDustSum > l.cfg.MaxFeeExposure {
3✔
UNCOV
3114
                // The max fee exposure was exceeded.
×
UNCOV
3115
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
UNCOV
3116
                        "overexposed, total local dust: %v (current commit "+
×
UNCOV
3117
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
×
UNCOV
3118

×
UNCOV
3119
                return true
×
UNCOV
3120
        }
×
3121

3122
        if isRemoteDust {
6✔
3123
                // If this is dust, it doesn't contribute to weight but does
3✔
3124
                // contribute to the overall dust sum.
3✔
3125
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
3✔
3126
        } else {
6✔
3127
                // Account for the fee increase that comes with an increase in
3✔
3128
                // weight.
3✔
3129
                remoteDustSum += additional
3✔
3130
        }
3✔
3131

3132
        if remoteDustSum > l.cfg.MaxFeeExposure {
3✔
3133
                // The max fee exposure was exceeded.
×
3134
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
3135
                        "overexposed, total remote dust: %v (current commit "+
×
3136
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
3137

×
3138
                return true
×
3139
        }
×
3140

3141
        return false
3✔
3142
}
3143

3144
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3145
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3146
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3147
// whether to evaluate on the local or remote commit, and finally an HTLC
3148
// amount to test.
3149
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3150
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3151

3152
// dustHelper is used to construct the dustClosure.
3153
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3154
        remoteDustLimit btcutil.Amount) dustClosure {
3✔
3155

3✔
3156
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
3✔
3157
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
6✔
3158

3✔
3159
                var dustLimit btcutil.Amount
3✔
3160
                if whoseCommit.IsLocal() {
6✔
3161
                        dustLimit = localDustLimit
3✔
3162
                } else {
6✔
3163
                        dustLimit = remoteDustLimit
3✔
3164
                }
3✔
3165

3166
                return lnwallet.HtlcIsDust(
3✔
3167
                        chantype, incoming, whoseCommit, feerate, amt,
3✔
3168
                        dustLimit,
3✔
3169
                )
3✔
3170
        }
3171

3172
        return isDust
3✔
3173
}
3174

3175
// zeroConfConfirmed returns whether or not the zero-conf channel has
3176
// confirmed on-chain.
3177
//
3178
// Part of the scidAliasHandler interface.
3179
func (l *channelLink) zeroConfConfirmed() bool {
3✔
3180
        return l.channel.State().ZeroConfConfirmed()
3✔
3181
}
3✔
3182

3183
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3184
// should not be called for non-zero-conf channels.
3185
//
3186
// Part of the scidAliasHandler interface.
3187
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
3✔
3188
        return l.channel.State().ZeroConfRealScid()
3✔
3189
}
3✔
3190

3191
// isZeroConf returns whether or not the underlying channel is a zero-conf
3192
// channel.
3193
//
3194
// Part of the scidAliasHandler interface.
3195
func (l *channelLink) isZeroConf() bool {
3✔
3196
        return l.channel.State().IsZeroConf()
3✔
3197
}
3✔
3198

3199
// negotiatedAliasFeature returns whether or not the underlying channel has
3200
// negotiated the option-scid-alias feature bit. This will be true for both
3201
// option-scid-alias and zero-conf channel-types. It will also be true for
3202
// channels with the feature bit but without the above channel-types.
3203
//
3204
// Part of the scidAliasFeature interface.
3205
func (l *channelLink) negotiatedAliasFeature() bool {
3✔
3206
        return l.channel.State().NegotiatedAliasFeature()
3✔
3207
}
3✔
3208

3209
// getAliases returns the set of aliases for the underlying channel.
3210
//
3211
// Part of the scidAliasHandler interface.
3212
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
3✔
3213
        return l.cfg.GetAliases(l.ShortChanID())
3✔
3214
}
3✔
3215

3216
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3217
//
3218
// Part of the scidAliasHandler interface.
3219
func (l *channelLink) attachFailAliasUpdate(closure func(
3220
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
3✔
3221

3✔
3222
        l.Lock()
3✔
3223
        l.cfg.FailAliasUpdate = closure
3✔
3224
        l.Unlock()
3✔
3225
}
3✔
3226

3227
// AttachMailBox updates the current mailbox used by this link, and hooks up
3228
// the mailbox's message and packet outboxes to the link's upstream and
3229
// downstream chans, respectively.
3230
func (l *channelLink) AttachMailBox(mailbox MailBox) {
3✔
3231
        l.Lock()
3✔
3232
        l.mailBox = mailbox
3✔
3233
        l.upstream = mailbox.MessageOutBox()
3✔
3234
        l.downstream = mailbox.PacketOutBox()
3✔
3235
        l.Unlock()
3✔
3236

3✔
3237
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
3✔
3238
        // never committed.
3✔
3239
        l.mailBox.SetFeeRate(l.getFeeRate())
3✔
3240

3✔
3241
        // Also set the mailbox's dust closure so that it can query whether HTLC's
3✔
3242
        // are dust given the current feerate.
3✔
3243
        l.mailBox.SetDustClosure(l.getDustClosure())
3✔
3244
}
3✔
3245

3246
// UpdateForwardingPolicy updates the forwarding policy for the target
3247
// ChannelLink. Once updated, the link will use the new forwarding policy to
3248
// govern if it an incoming HTLC should be forwarded or not. We assume that
3249
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3250
// update all of the link's FwrdingPolicy's values.
3251
//
3252
// NOTE: Part of the ChannelLink interface.
3253
func (l *channelLink) UpdateForwardingPolicy(
3254
        newPolicy models.ForwardingPolicy) {
3✔
3255

3✔
3256
        l.Lock()
3✔
3257
        defer l.Unlock()
3✔
3258

3✔
3259
        l.cfg.FwrdingPolicy = newPolicy
3✔
3260
}
3✔
3261

3262
// CheckHtlcForward should return a nil error if the passed HTLC details
3263
// satisfy the current forwarding policy fo the target link. Otherwise,
3264
// a LinkError with a valid protocol failure message should be returned
3265
// in order to signal to the source of the HTLC, the policy consistency
3266
// issue.
3267
//
3268
// NOTE: Part of the ChannelLink interface.
3269
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
3270
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
3271
        outgoingTimeout uint32, inboundFee models.InboundFee,
3272
        heightNow uint32, originalScid lnwire.ShortChannelID,
3273
        customRecords lnwire.CustomRecords) *LinkError {
3✔
3274

3✔
3275
        l.RLock()
3✔
3276
        policy := l.cfg.FwrdingPolicy
3✔
3277
        l.RUnlock()
3✔
3278

3✔
3279
        // Using the outgoing HTLC amount, we'll calculate the outgoing
3✔
3280
        // fee this incoming HTLC must carry in order to satisfy the constraints
3✔
3281
        // of the outgoing link.
3✔
3282
        outFee := ExpectedFee(policy, amtToForward)
3✔
3283

3✔
3284
        // Then calculate the inbound fee that we charge based on the sum of
3✔
3285
        // outgoing HTLC amount and outgoing fee.
3✔
3286
        inFee := inboundFee.CalcFee(amtToForward + outFee)
3✔
3287

3✔
3288
        // Add up both fee components. It is important to calculate both fees
3✔
3289
        // separately. An alternative way of calculating is to first determine
3✔
3290
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
3✔
3291
        // rounding may cause the result to be slightly higher than in the case
3✔
3292
        // of separately rounded fee components. This potentially causes failed
3✔
3293
        // forwards for senders and is something to be avoided.
3✔
3294
        expectedFee := inFee + int64(outFee)
3✔
3295

3✔
3296
        // If the actual fee is less than our expected fee, then we'll reject
3✔
3297
        // this HTLC as it didn't provide a sufficient amount of fees, or the
3✔
3298
        // values have been tampered with, or the send used incorrect/dated
3✔
3299
        // information to construct the forwarding information for this hop. In
3✔
3300
        // any case, we'll cancel this HTLC.
3✔
3301
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
3✔
3302
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
6✔
3303
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
3✔
3304
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
3✔
3305
                        "inboundFee=%v",
3✔
3306
                        payHash[:], expectedFee, actualFee,
3✔
3307
                        incomingHtlcAmt, amtToForward, inboundFee,
3✔
3308
                )
3✔
3309

3✔
3310
                // As part of the returned error, we'll send our latest routing
3✔
3311
                // policy so the sending node obtains the most up to date data.
3✔
3312
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
3313
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
3✔
3314
                }
3✔
3315
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
3316
                return NewLinkError(failure)
3✔
3317
        }
3318

3319
        // Check whether the outgoing htlc satisfies the channel policy.
3320
        err := l.canSendHtlc(
3✔
3321
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
3✔
3322
                originalScid, customRecords,
3✔
3323
        )
3✔
3324
        if err != nil {
6✔
3325
                return err
3✔
3326
        }
3✔
3327

3328
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3329
        // the following constraint: the incoming time-lock minus our time-lock
3330
        // delta should equal the outgoing time lock. Otherwise, whether the
3331
        // sender messed up, or an intermediate node tampered with the HTLC.
3332
        timeDelta := policy.TimeLockDelta
3✔
3333
        if incomingTimeout < outgoingTimeout+timeDelta {
3✔
UNCOV
3334
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
×
UNCOV
3335
                        "expected at least %v block delta, got %v block delta",
×
UNCOV
3336
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
×
UNCOV
3337

×
UNCOV
3338
                // Grab the latest routing policy so the sending node is up to
×
UNCOV
3339
                // date with our current policy.
×
UNCOV
3340
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
×
UNCOV
3341
                        return lnwire.NewIncorrectCltvExpiry(
×
UNCOV
3342
                                incomingTimeout, *upd,
×
UNCOV
3343
                        )
×
UNCOV
3344
                }
×
UNCOV
3345
                failure := l.createFailureWithUpdate(false, originalScid, cb)
×
UNCOV
3346
                return NewLinkError(failure)
×
3347
        }
3348

3349
        return nil
3✔
3350
}
3351

3352
// CheckHtlcTransit should return a nil error if the passed HTLC details
3353
// satisfy the current channel policy.  Otherwise, a LinkError with a
3354
// valid protocol failure message should be returned in order to signal
3355
// the violation. This call is intended to be used for locally initiated
3356
// payments for which there is no corresponding incoming htlc.
3357
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3358
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
3359
        customRecords lnwire.CustomRecords) *LinkError {
3✔
3360

3✔
3361
        l.RLock()
3✔
3362
        policy := l.cfg.FwrdingPolicy
3✔
3363
        l.RUnlock()
3✔
3364

3✔
3365
        // We pass in hop.Source here as this is only used in the Switch when
3✔
3366
        // trying to send over a local link. This causes the fallback mechanism
3✔
3367
        // to occur.
3✔
3368
        return l.canSendHtlc(
3✔
3369
                policy, payHash, amt, timeout, heightNow, hop.Source,
3✔
3370
                customRecords,
3✔
3371
        )
3✔
3372
}
3✔
3373

3374
// canSendHtlc checks whether the given htlc parameters satisfy
3375
// the channel's amount and time lock constraints.
3376
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3377
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3378
        heightNow uint32, originalScid lnwire.ShortChannelID,
3379
        customRecords lnwire.CustomRecords) *LinkError {
3✔
3380

3✔
3381
        // As our first sanity check, we'll ensure that the passed HTLC isn't
3✔
3382
        // too small for the next hop. If so, then we'll cancel the HTLC
3✔
3383
        // directly.
3✔
3384
        if amt < policy.MinHTLCOut {
6✔
3385
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
3✔
3386
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
3✔
3387
                        amt)
3✔
3388

3✔
3389
                // As part of the returned error, we'll send our latest routing
3✔
3390
                // policy so the sending node obtains the most up to date data.
3✔
3391
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
3392
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
3✔
3393
                }
3✔
3394
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
3395
                return NewLinkError(failure)
3✔
3396
        }
3397

3398
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3399
        // cancel the HTLC directly.
3400
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
6✔
3401
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
3✔
3402
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
3✔
3403

3✔
3404
                // As part of the returned error, we'll send our latest routing
3✔
3405
                // policy so the sending node obtains the most up-to-date data.
3✔
3406
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
3407
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
3408
                }
3✔
3409
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
3410
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
3✔
3411
        }
3412

3413
        // We want to avoid offering an HTLC which will expire in the near
3414
        // future, so we'll reject an HTLC if the outgoing expiration time is
3415
        // too close to the current height.
3416
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
3✔
UNCOV
3417
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
×
UNCOV
3418
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
×
UNCOV
3419
                        timeout, heightNow)
×
UNCOV
3420

×
UNCOV
3421
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
×
UNCOV
3422
                        return lnwire.NewExpiryTooSoon(*upd)
×
UNCOV
3423
                }
×
UNCOV
3424
                failure := l.createFailureWithUpdate(false, originalScid, cb)
×
UNCOV
3425
                return NewLinkError(failure)
×
3426
        }
3427

3428
        // Check absolute max delta.
3429
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
3✔
UNCOV
3430
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
×
UNCOV
3431
                        "the future: got %v, but maximum is %v", payHash[:],
×
UNCOV
3432
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
×
UNCOV
3433

×
UNCOV
3434
                return NewLinkError(&lnwire.FailExpiryTooFar{})
×
UNCOV
3435
        }
×
3436

3437
        // We now check the available bandwidth to see if this HTLC can be
3438
        // forwarded.
3439
        availableBandwidth := l.Bandwidth()
3✔
3440
        auxBandwidth, err := fn.MapOptionZ(
3✔
3441
                l.cfg.AuxTrafficShaper,
3✔
3442
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
3✔
3443
                        var htlcBlob fn.Option[tlv.Blob]
×
3444
                        blob, err := customRecords.Serialize()
×
3445
                        if err != nil {
×
3446
                                return fn.Err[OptionalBandwidth](
×
3447
                                        fmt.Errorf("unable to serialize "+
×
3448
                                                "custom records: %w", err))
×
3449
                        }
×
3450

3451
                        if len(blob) > 0 {
×
3452
                                htlcBlob = fn.Some(blob)
×
3453
                        }
×
3454

3455
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
3456
                },
3457
        ).Unpack()
3458
        if err != nil {
3✔
3459
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
3460
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
3461
        }
×
3462

3463
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
3✔
3464
                auxBandwidth.Bandwidth.WhenSome(
×
3465
                        func(bandwidth lnwire.MilliSatoshi) {
×
3466
                                availableBandwidth = bandwidth
×
3467
                        },
×
3468
                )
3469
        }
3470

3471
        // Check to see if there is enough balance in this channel.
3472
        if amt > availableBandwidth {
6✔
3473
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
3✔
3474
                        "larger than %v", amt, availableBandwidth)
3✔
3475
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
3476
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
3477
                }
3✔
3478
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
3479
                return NewDetailedLinkError(
3✔
3480
                        failure, OutgoingFailureInsufficientBalance,
3✔
3481
                )
3✔
3482
        }
3483

3484
        return nil
3✔
3485
}
3486

3487
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
3488
// in milli-satoshi. This might be different from the regular BTC bandwidth for
3489
// custom channels. This will always return fn.None() for a regular (non-custom)
3490
// channel.
3491
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
3492
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
3493
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
3494

×
3495
        fundingBlob := l.FundingCustomBlob()
×
3496
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
3497
        if err != nil {
×
3498
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
3499
                        "failed to decide whether to handle traffic: %w", err))
×
3500
        }
×
3501

3502
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
3503
                "traffic: %v", cid, shouldHandle)
×
3504

×
3505
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
3506
        // early.
×
3507
        if !shouldHandle {
×
3508
                return fn.Ok(OptionalBandwidth{
×
3509
                        IsHandled: false,
×
3510
                })
×
3511
        }
×
3512

3513
        peerBytes := l.cfg.Peer.PubKey()
×
3514

×
3515
        peer, err := route.NewVertexFromBytes(peerBytes[:])
×
3516
        if err != nil {
×
3517
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+
×
3518
                        "peer pub key: %v", err))
×
3519
        }
×
3520

3521
        // Ask for a specific bandwidth to be used for the channel.
3522
        commitmentBlob := l.CommitmentCustomBlob()
×
3523
        auxBandwidth, err := ts.PaymentBandwidth(
×
3524
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
3525
                l.channel.FetchLatestAuxHTLCView(), peer,
×
3526
        )
×
3527
        if err != nil {
×
3528
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
3529
                        "bandwidth from external traffic shaper: %w", err))
×
3530
        }
×
3531

3532
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
3533
                "bandwidth: %v", cid, auxBandwidth)
×
3534

×
3535
        return fn.Ok(OptionalBandwidth{
×
3536
                IsHandled: true,
×
3537
                Bandwidth: fn.Some(auxBandwidth),
×
3538
        })
×
3539
}
3540

3541
// Stats returns the statistics of channel link.
3542
//
3543
// NOTE: Part of the ChannelLink interface.
3544
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
3✔
3545
        snapshot := l.channel.StateSnapshot()
3✔
3546

3✔
3547
        return snapshot.ChannelCommitment.CommitHeight,
3✔
3548
                snapshot.TotalMSatSent,
3✔
3549
                snapshot.TotalMSatReceived
3✔
3550
}
3✔
3551

3552
// String returns the string representation of channel link.
3553
//
3554
// NOTE: Part of the ChannelLink interface.
3555
func (l *channelLink) String() string {
×
3556
        return l.channel.ChannelPoint().String()
×
3557
}
×
3558

3559
// handleSwitchPacket handles the switch packets. This packets which might be
3560
// forwarded to us from another channel link in case the htlc update came from
3561
// another peer or if the update was created by user
3562
//
3563
// NOTE: Part of the packetHandler interface.
3564
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
3✔
3565
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
3✔
3566
                pkt.inKey(), pkt.outKey())
3✔
3567

3✔
3568
        return l.mailBox.AddPacket(pkt)
3✔
3569
}
3✔
3570

3571
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3572
// to us from remote peer we have a channel with.
3573
//
3574
// NOTE: Part of the ChannelLink interface.
3575
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3✔
3576
        select {
3✔
3577
        case <-l.cg.Done():
×
3578
                // Return early if the link is already in the process of
×
3579
                // quitting. It doesn't make sense to hand the message to the
×
3580
                // mailbox here.
×
3581
                return
×
3582
        default:
3✔
3583
        }
3584

3585
        err := l.mailBox.AddMessage(message)
3✔
3586
        if err != nil {
3✔
3587
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3588
        }
×
3589
}
3590

3591
// updateChannelFee updates the commitment fee-per-kw on this channel by
3592
// committing to an update_fee message.
3593
func (l *channelLink) updateChannelFee(ctx context.Context,
UNCOV
3594
        feePerKw chainfee.SatPerKWeight) error {
×
UNCOV
3595

×
UNCOV
3596
        l.log.Infof("updating commit fee to %v", feePerKw)
×
UNCOV
3597

×
UNCOV
3598
        // We skip sending the UpdateFee message if the channel is not
×
UNCOV
3599
        // currently eligible to forward messages.
×
UNCOV
3600
        if !l.eligibleToUpdate() {
×
3601
                l.log.Debugf("skipping fee update for inactive channel")
×
3602
                return nil
×
3603
        }
×
3604

3605
        // Check and see if our proposed fee-rate would make us exceed the fee
3606
        // threshold.
UNCOV
3607
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
×
UNCOV
3608
        if err != nil {
×
3609
                // This shouldn't typically happen. If it does, it indicates
×
3610
                // something is wrong with our channel state.
×
3611
                return err
×
3612
        }
×
3613

UNCOV
3614
        if thresholdExceeded {
×
3615
                return fmt.Errorf("link fee threshold exceeded")
×
3616
        }
×
3617

3618
        // First, we'll update the local fee on our commitment.
UNCOV
3619
        if err := l.channel.UpdateFee(feePerKw); err != nil {
×
3620
                return err
×
3621
        }
×
3622

3623
        // The fee passed the channel's validation checks, so we update the
3624
        // mailbox feerate.
UNCOV
3625
        l.mailBox.SetFeeRate(feePerKw)
×
UNCOV
3626

×
UNCOV
3627
        // We'll then attempt to send a new UpdateFee message, and also lock it
×
UNCOV
3628
        // in immediately by triggering a commitment update.
×
UNCOV
3629
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
×
UNCOV
3630
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
×
3631
                return err
×
3632
        }
×
3633

UNCOV
3634
        return l.updateCommitTx(ctx)
×
3635
}
3636

3637
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3638
// after receiving a revocation from the remote party, and reprocesses them in
3639
// the context of the provided forwarding package. Any settles or fails that
3640
// have already been acknowledged in the forwarding package will not be sent to
3641
// the switch.
3642
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
3✔
3643
        if len(fwdPkg.SettleFails) == 0 {
6✔
3644
                l.log.Trace("fwd package has no settle/fails to process " +
3✔
3645
                        "exiting early")
3✔
3646

3✔
3647
                return
3✔
3648
        }
3✔
3649

3650
        // Exit early if the fwdPkg is already processed.
3651
        if fwdPkg.State == channeldb.FwdStateCompleted {
3✔
3652
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
3653

×
3654
                return
×
3655
        }
×
3656

3657
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
3✔
3658

3✔
3659
        var switchPackets []*htlcPacket
3✔
3660
        for i, update := range fwdPkg.SettleFails {
6✔
3661
                destRef := fwdPkg.DestRef(uint16(i))
3✔
3662

3✔
3663
                // Skip any settles or fails that have already been
3✔
3664
                // acknowledged by the incoming link that originated the
3✔
3665
                // forwarded Add.
3✔
3666
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
3✔
3667
                        continue
×
3668
                }
3669

3670
                // TODO(roasbeef): rework log entries to a shared
3671
                // interface.
3672

3673
                switch msg := update.UpdateMsg.(type) {
3✔
3674
                // A settle for an HTLC we previously forwarded HTLC has been
3675
                // received. So we'll forward the HTLC to the switch which will
3676
                // handle propagating the settle to the prior hop.
3677
                case *lnwire.UpdateFulfillHTLC:
3✔
3678
                        // If hodl.SettleIncoming is requested, we will not
3✔
3679
                        // forward the SETTLE to the switch and will not signal
3✔
3680
                        // a free slot on the commitment transaction.
3✔
3681
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
3✔
3682
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3683
                                continue
×
3684
                        }
3685

3686
                        settlePacket := &htlcPacket{
3✔
3687
                                outgoingChanID: l.ShortChanID(),
3✔
3688
                                outgoingHTLCID: msg.ID,
3✔
3689
                                destRef:        &destRef,
3✔
3690
                                htlc:           msg,
3✔
3691
                        }
3✔
3692

3✔
3693
                        // Add the packet to the batch to be forwarded, and
3✔
3694
                        // notify the overflow queue that a spare spot has been
3✔
3695
                        // freed up within the commitment state.
3✔
3696
                        switchPackets = append(switchPackets, settlePacket)
3✔
3697

3698
                // A failureCode message for a previously forwarded HTLC has
3699
                // been received. As a result a new slot will be freed up in
3700
                // our commitment state, so we'll forward this to the switch so
3701
                // the backwards undo can continue.
3702
                case *lnwire.UpdateFailHTLC:
3✔
3703
                        // If hodl.SettleIncoming is requested, we will not
3✔
3704
                        // forward the FAIL to the switch and will not signal a
3✔
3705
                        // free slot on the commitment transaction.
3✔
3706
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
3✔
3707
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3708
                                continue
×
3709
                        }
3710

3711
                        // Fetch the reason the HTLC was canceled so we can
3712
                        // continue to propagate it. This failure originated
3713
                        // from another node, so the linkFailure field is not
3714
                        // set on the packet.
3715
                        failPacket := &htlcPacket{
3✔
3716
                                outgoingChanID: l.ShortChanID(),
3✔
3717
                                outgoingHTLCID: msg.ID,
3✔
3718
                                destRef:        &destRef,
3✔
3719
                                htlc:           msg,
3✔
3720
                        }
3✔
3721

3✔
3722
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
3✔
3723

3✔
3724
                        // If the failure message lacks an HMAC (but includes
3✔
3725
                        // the 4 bytes for encoding the message and padding
3✔
3726
                        // lengths, then this means that we received it as an
3✔
3727
                        // UpdateFailMalformedHTLC. As a result, we'll signal
3✔
3728
                        // that we need to convert this error within the switch
3✔
3729
                        // to an actual error, by encrypting it as if we were
3✔
3730
                        // the originating hop.
3✔
3731
                        convertedErrorSize := lnwire.FailureMessageLength + 4
3✔
3732
                        if len(msg.Reason) == convertedErrorSize {
6✔
3733
                                failPacket.convertedError = true
3✔
3734
                        }
3✔
3735

3736
                        // Add the packet to the batch to be forwarded, and
3737
                        // notify the overflow queue that a spare spot has been
3738
                        // freed up within the commitment state.
3739
                        switchPackets = append(switchPackets, failPacket)
3✔
3740
                }
3741
        }
3742

3743
        // Only spawn the task forward packets we have a non-zero number.
3744
        if len(switchPackets) > 0 {
6✔
3745
                go l.forwardBatch(false, switchPackets...)
3✔
3746
        }
3✔
3747
}
3748

3749
// processRemoteAdds serially processes each of the Add payment descriptors
3750
// which have been "locked-in" by receiving a revocation from the remote party.
3751
// The forwarding package provided instructs how to process this batch,
3752
// indicating whether this is the first time these Adds are being processed, or
3753
// whether we are reprocessing as a result of a failure or restart. Adds that
3754
// have already been acknowledged in the forwarding package will be ignored.
3755
//
3756
//nolint:funlen
3757
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
3✔
3758
        // Exit early if there are no adds to process.
3✔
3759
        if len(fwdPkg.Adds) == 0 {
6✔
3760
                l.log.Trace("fwd package has no adds to process exiting early")
3✔
3761

3✔
3762
                return
3✔
3763
        }
3✔
3764

3765
        // Exit early if the fwdPkg is already processed.
3766
        if fwdPkg.State == channeldb.FwdStateCompleted {
3✔
3767
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
3768

×
3769
                return
×
3770
        }
×
3771

3772
        l.log.Tracef("processing %d remote adds for height %d",
3✔
3773
                len(fwdPkg.Adds), fwdPkg.Height)
3✔
3774

3✔
3775
        // decodeReqs is a list of requests sent to the onion decoder. We expect
3✔
3776
        // the same length of responses to be returned.
3✔
3777
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
3✔
3778

3✔
3779
        // unackedAdds is a list of ADDs that's waiting for the remote's
3✔
3780
        // settle/fail update.
3✔
3781
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
3✔
3782

3✔
3783
        for i, update := range fwdPkg.Adds {
6✔
3784
                // If this index is already found in the ack filter, the
3✔
3785
                // response to this forwarding decision has already been
3✔
3786
                // committed by one of our commitment txns. ADDs in this state
3✔
3787
                // are waiting for the rest of the fwding package to get acked
3✔
3788
                // before being garbage collected.
3✔
3789
                if fwdPkg.State == channeldb.FwdStateProcessed &&
3✔
3790
                        fwdPkg.AckFilter.Contains(uint16(i)) {
3✔
3791

×
3792
                        continue
×
3793
                }
3794

3795
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
6✔
3796
                        // Before adding the new htlc to the state machine,
3✔
3797
                        // parse the onion object in order to obtain the
3✔
3798
                        // routing information with DecodeHopIterator function
3✔
3799
                        // which process the Sphinx packet.
3✔
3800
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
3✔
3801

3✔
3802
                        req := hop.DecodeHopIteratorRequest{
3✔
3803
                                OnionReader:    onionReader,
3✔
3804
                                RHash:          msg.PaymentHash[:],
3✔
3805
                                IncomingCltv:   msg.Expiry,
3✔
3806
                                IncomingAmount: msg.Amount,
3✔
3807
                                BlindingPoint:  msg.BlindingPoint,
3✔
3808
                        }
3✔
3809

3✔
3810
                        decodeReqs = append(decodeReqs, req)
3✔
3811
                        unackedAdds = append(unackedAdds, msg)
3✔
3812
                }
3✔
3813
        }
3814

3815
        // If the fwdPkg has already been processed, it means we are
3816
        // reforwarding the packets again, which happens only on a restart.
3817
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
3✔
3818

3✔
3819
        // Atomically decode the incoming htlcs, simultaneously checking for
3✔
3820
        // replay attempts. A particular index in the returned, spare list of
3✔
3821
        // channel iterators should only be used if the failure code at the
3✔
3822
        // same index is lnwire.FailCodeNone.
3✔
3823
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
3✔
3824
                fwdPkg.ID(), decodeReqs, reforward,
3✔
3825
        )
3✔
3826
        if sphinxErr != nil {
3✔
3827
                l.failf(LinkFailureError{code: ErrInternalError},
×
3828
                        "unable to decode hop iterators: %v", sphinxErr)
×
3829
                return
×
3830
        }
×
3831

3832
        var switchPackets []*htlcPacket
3✔
3833

3✔
3834
        for i, update := range unackedAdds {
6✔
3835
                idx := uint16(i)
3✔
3836
                sourceRef := fwdPkg.SourceRef(idx)
3✔
3837
                add := *update
3✔
3838

3✔
3839
                // An incoming HTLC add has been full-locked in. As a result we
3✔
3840
                // can now examine the forwarding details of the HTLC, and the
3✔
3841
                // HTLC itself to decide if: we should forward it, cancel it,
3✔
3842
                // or are able to settle it (and it adheres to our fee related
3✔
3843
                // constraints).
3✔
3844

3✔
3845
                // Before adding the new htlc to the state machine, parse the
3✔
3846
                // onion object in order to obtain the routing information with
3✔
3847
                // DecodeHopIterator function which process the Sphinx packet.
3✔
3848
                chanIterator, failureCode := decodeResps[i].Result()
3✔
3849
                if failureCode != lnwire.CodeNone {
6✔
3850
                        // If we're unable to process the onion blob then we
3✔
3851
                        // should send the malformed htlc error to payment
3✔
3852
                        // sender.
3✔
3853
                        l.sendMalformedHTLCError(
3✔
3854
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
3✔
3855
                        )
3✔
3856

3✔
3857
                        l.log.Errorf("unable to decode onion hop iterator "+
3✔
3858
                                "for htlc(id=%v, hash=%x): %v", add.ID,
3✔
3859
                                add.PaymentHash, failureCode)
3✔
3860

3✔
3861
                        continue
3✔
3862
                }
3863

3864
                heightNow := l.cfg.BestHeight()
3✔
3865

3✔
3866
                pld, routeRole, pldErr := chanIterator.HopPayload()
3✔
3867
                if pldErr != nil {
6✔
3868
                        // If we're unable to process the onion payload, or we
3✔
3869
                        // received invalid onion payload failure, then we
3✔
3870
                        // should send an error back to the caller so the HTLC
3✔
3871
                        // can be canceled.
3✔
3872
                        var failedType uint64
3✔
3873

3✔
3874
                        // We need to get the underlying error value, so we
3✔
3875
                        // can't use errors.As as suggested by the linter.
3✔
3876
                        //nolint:errorlint
3✔
3877
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3✔
3878
                                failedType = uint64(e.Type)
×
3879
                        }
×
3880

3881
                        // If we couldn't parse the payload, make our best
3882
                        // effort at creating an error encrypter that knows
3883
                        // what blinding type we were, but if we couldn't
3884
                        // parse the payload we have no way of knowing whether
3885
                        // we were the introduction node or not.
3886
                        //
3887
                        //nolint:ll
3888
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
3✔
3889
                                l.cfg.ExtractErrorEncrypter,
3✔
3890
                                // We need our route role here because we
3✔
3891
                                // couldn't parse or validate the payload.
3✔
3892
                                routeRole == hop.RouteRoleIntroduction,
3✔
3893
                        )
3✔
3894
                        if failCode != lnwire.CodeNone {
3✔
3895
                                l.log.Errorf("could not extract error "+
×
3896
                                        "encrypter: %v", pldErr)
×
3897

×
3898
                                // We can't process this htlc, send back
×
3899
                                // malformed.
×
3900
                                l.sendMalformedHTLCError(
×
3901
                                        add.ID, failureCode, add.OnionBlob,
×
3902
                                        &sourceRef,
×
3903
                                )
×
3904

×
3905
                                continue
×
3906
                        }
3907

3908
                        // TODO: currently none of the test unit infrastructure
3909
                        // is setup to handle TLV payloads, so testing this
3910
                        // would require implementing a separate mock iterator
3911
                        // for TLV payloads that also supports injecting invalid
3912
                        // payloads. Deferring this non-trival effort till a
3913
                        // later date
3914
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3✔
3915

3✔
3916
                        l.sendHTLCError(
3✔
3917
                                add, sourceRef, NewLinkError(failure),
3✔
3918
                                obfuscator, false,
3✔
3919
                        )
3✔
3920

3✔
3921
                        l.log.Errorf("unable to decode forwarding "+
3✔
3922
                                "instructions: %v", pldErr)
3✔
3923

3✔
3924
                        continue
3✔
3925
                }
3926

3927
                // Retrieve onion obfuscator from onion blob in order to
3928
                // produce initial obfuscation of the onion failureCode.
3929
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
3✔
3930
                        l.cfg.ExtractErrorEncrypter,
3✔
3931
                        routeRole == hop.RouteRoleIntroduction,
3✔
3932
                )
3✔
3933
                if failureCode != lnwire.CodeNone {
3✔
UNCOV
3934
                        // If we're unable to process the onion blob than we
×
UNCOV
3935
                        // should send the malformed htlc error to payment
×
UNCOV
3936
                        // sender.
×
UNCOV
3937
                        l.sendMalformedHTLCError(
×
UNCOV
3938
                                add.ID, failureCode, add.OnionBlob,
×
UNCOV
3939
                                &sourceRef,
×
UNCOV
3940
                        )
×
UNCOV
3941

×
UNCOV
3942
                        l.log.Errorf("unable to decode onion "+
×
UNCOV
3943
                                "obfuscator: %v", failureCode)
×
UNCOV
3944

×
UNCOV
3945
                        continue
×
3946
                }
3947

3948
                fwdInfo := pld.ForwardingInfo()
3✔
3949

3✔
3950
                // Check whether the payload we've just processed uses our
3✔
3951
                // node as the introduction point (gave us a blinding key in
3✔
3952
                // the payload itself) and fail it back if we don't support
3✔
3953
                // route blinding.
3✔
3954
                if fwdInfo.NextBlinding.IsSome() &&
3✔
3955
                        l.cfg.DisallowRouteBlinding {
6✔
3956

3✔
3957
                        failure := lnwire.NewInvalidBlinding(
3✔
3958
                                fn.Some(add.OnionBlob),
3✔
3959
                        )
3✔
3960

3✔
3961
                        l.sendHTLCError(
3✔
3962
                                add, sourceRef, NewLinkError(failure),
3✔
3963
                                obfuscator, false,
3✔
3964
                        )
3✔
3965

3✔
3966
                        l.log.Error("rejected htlc that uses use as an " +
3✔
3967
                                "introduction point when we do not support " +
3✔
3968
                                "route blinding")
3✔
3969

3✔
3970
                        continue
3✔
3971
                }
3972

3973
                switch fwdInfo.NextHop {
3✔
3974
                case hop.Exit:
3✔
3975
                        err := l.processExitHop(
3✔
3976
                                add, sourceRef, obfuscator, fwdInfo,
3✔
3977
                                heightNow, pld,
3✔
3978
                        )
3✔
3979
                        if err != nil {
3✔
3980
                                l.failf(LinkFailureError{
×
3981
                                        code: ErrInternalError,
×
3982
                                }, err.Error()) //nolint
×
3983

×
3984
                                return
×
3985
                        }
×
3986

3987
                // There are additional channels left within this route. So
3988
                // we'll simply do some forwarding package book-keeping.
3989
                default:
3✔
3990
                        // If hodl.AddIncoming is requested, we will not
3✔
3991
                        // validate the forwarded ADD, nor will we send the
3✔
3992
                        // packet to the htlc switch.
3✔
3993
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
3✔
3994
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3995
                                continue
×
3996
                        }
3997

3998
                        endorseValue := l.experimentalEndorsement(
3✔
3999
                                record.CustomSet(add.CustomRecords),
3✔
4000
                        )
3✔
4001
                        endorseType := uint64(
3✔
4002
                                lnwire.ExperimentalEndorsementType,
3✔
4003
                        )
3✔
4004

3✔
4005
                        switch fwdPkg.State {
3✔
4006
                        case channeldb.FwdStateProcessed:
3✔
4007
                                // This add was not forwarded on the previous
3✔
4008
                                // processing phase, run it through our
3✔
4009
                                // validation pipeline to reproduce an error.
3✔
4010
                                // This may trigger a different error due to
3✔
4011
                                // expiring timelocks, but we expect that an
3✔
4012
                                // error will be reproduced.
3✔
4013
                                if !fwdPkg.FwdFilter.Contains(idx) {
3✔
4014
                                        break
×
4015
                                }
4016

4017
                                // Otherwise, it was already processed, we can
4018
                                // can collect it and continue.
4019
                                outgoingAdd := &lnwire.UpdateAddHTLC{
3✔
4020
                                        Expiry:        fwdInfo.OutgoingCTLV,
3✔
4021
                                        Amount:        fwdInfo.AmountToForward,
3✔
4022
                                        PaymentHash:   add.PaymentHash,
3✔
4023
                                        BlindingPoint: fwdInfo.NextBlinding,
3✔
4024
                                }
3✔
4025

3✔
4026
                                endorseValue.WhenSome(func(e byte) {
6✔
4027
                                        custRecords := map[uint64][]byte{
3✔
4028
                                                endorseType: {e},
3✔
4029
                                        }
3✔
4030

3✔
4031
                                        outgoingAdd.CustomRecords = custRecords
3✔
4032
                                })
3✔
4033

4034
                                // Finally, we'll encode the onion packet for
4035
                                // the _next_ hop using the hop iterator
4036
                                // decoded for the current hop.
4037
                                buf := bytes.NewBuffer(
3✔
4038
                                        outgoingAdd.OnionBlob[0:0],
3✔
4039
                                )
3✔
4040

3✔
4041
                                // We know this cannot fail, as this ADD
3✔
4042
                                // was marked forwarded in a previous
3✔
4043
                                // round of processing.
3✔
4044
                                chanIterator.EncodeNextHop(buf)
3✔
4045

3✔
4046
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
4047

3✔
4048
                                //nolint:ll
3✔
4049
                                updatePacket := &htlcPacket{
3✔
4050
                                        incomingChanID:       l.ShortChanID(),
3✔
4051
                                        incomingHTLCID:       add.ID,
3✔
4052
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
4053
                                        sourceRef:            &sourceRef,
3✔
4054
                                        incomingAmount:       add.Amount,
3✔
4055
                                        amount:               outgoingAdd.Amount,
3✔
4056
                                        htlc:                 outgoingAdd,
3✔
4057
                                        obfuscator:           obfuscator,
3✔
4058
                                        incomingTimeout:      add.Expiry,
3✔
4059
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
4060
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
4061
                                        inboundFee:           inboundFee,
3✔
4062
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
4063
                                }
3✔
4064
                                switchPackets = append(
3✔
4065
                                        switchPackets, updatePacket,
3✔
4066
                                )
3✔
4067

3✔
4068
                                continue
3✔
4069
                        }
4070

4071
                        // TODO(roasbeef): ensure don't accept outrageous
4072
                        // timeout for htlc
4073

4074
                        // With all our forwarding constraints met, we'll
4075
                        // create the outgoing HTLC using the parameters as
4076
                        // specified in the forwarding info.
4077
                        addMsg := &lnwire.UpdateAddHTLC{
3✔
4078
                                Expiry:        fwdInfo.OutgoingCTLV,
3✔
4079
                                Amount:        fwdInfo.AmountToForward,
3✔
4080
                                PaymentHash:   add.PaymentHash,
3✔
4081
                                BlindingPoint: fwdInfo.NextBlinding,
3✔
4082
                        }
3✔
4083

3✔
4084
                        endorseValue.WhenSome(func(e byte) {
6✔
4085
                                addMsg.CustomRecords = map[uint64][]byte{
3✔
4086
                                        endorseType: {e},
3✔
4087
                                }
3✔
4088
                        })
3✔
4089

4090
                        // Finally, we'll encode the onion packet for the
4091
                        // _next_ hop using the hop iterator decoded for the
4092
                        // current hop.
4093
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
3✔
4094
                        err := chanIterator.EncodeNextHop(buf)
3✔
4095
                        if err != nil {
3✔
4096
                                l.log.Errorf("unable to encode the "+
×
4097
                                        "remaining route %v", err)
×
4098

×
4099
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
4100
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
4101
                                }
×
4102

4103
                                failure := l.createFailureWithUpdate(
×
4104
                                        true, hop.Source, cb,
×
4105
                                )
×
4106

×
4107
                                l.sendHTLCError(
×
4108
                                        add, sourceRef, NewLinkError(failure),
×
4109
                                        obfuscator, false,
×
4110
                                )
×
4111
                                continue
×
4112
                        }
4113

4114
                        // Now that this add has been reprocessed, only append
4115
                        // it to our list of packets to forward to the switch
4116
                        // this is the first time processing the add. If the
4117
                        // fwd pkg has already been processed, then we entered
4118
                        // the above section to recreate a previous error.  If
4119
                        // the packet had previously been forwarded, it would
4120
                        // have been added to switchPackets at the top of this
4121
                        // section.
4122
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
6✔
4123
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
4124

3✔
4125
                                //nolint:ll
3✔
4126
                                updatePacket := &htlcPacket{
3✔
4127
                                        incomingChanID:       l.ShortChanID(),
3✔
4128
                                        incomingHTLCID:       add.ID,
3✔
4129
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
4130
                                        sourceRef:            &sourceRef,
3✔
4131
                                        incomingAmount:       add.Amount,
3✔
4132
                                        amount:               addMsg.Amount,
3✔
4133
                                        htlc:                 addMsg,
3✔
4134
                                        obfuscator:           obfuscator,
3✔
4135
                                        incomingTimeout:      add.Expiry,
3✔
4136
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
4137
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
4138
                                        inboundFee:           inboundFee,
3✔
4139
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
4140
                                }
3✔
4141

3✔
4142
                                fwdPkg.FwdFilter.Set(idx)
3✔
4143
                                switchPackets = append(switchPackets,
3✔
4144
                                        updatePacket)
3✔
4145
                        }
3✔
4146
                }
4147
        }
4148

4149
        // Commit the htlcs we are intending to forward if this package has not
4150
        // been fully processed.
4151
        if fwdPkg.State == channeldb.FwdStateLockedIn {
6✔
4152
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
3✔
4153
                if err != nil {
3✔
4154
                        l.failf(LinkFailureError{code: ErrInternalError},
×
4155
                                "unable to set fwd filter: %v", err)
×
4156
                        return
×
4157
                }
×
4158
        }
4159

4160
        if len(switchPackets) == 0 {
6✔
4161
                return
3✔
4162
        }
3✔
4163

4164
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
3✔
4165
                len(switchPackets), reforward)
3✔
4166

3✔
4167
        // NOTE: This call is made synchronous so that we ensure all circuits
3✔
4168
        // are committed in the exact order that they are processed in the link.
3✔
4169
        // Failing to do this could cause reorderings/gaps in the range of
3✔
4170
        // opened circuits, which violates assumptions made by the circuit
3✔
4171
        // trimming.
3✔
4172
        l.forwardBatch(reforward, switchPackets...)
3✔
4173
}
4174

4175
// experimentalEndorsement returns the value to set for our outgoing
4176
// experimental endorsement field, and a boolean indicating whether it should
4177
// be populated on the outgoing htlc.
4178
func (l *channelLink) experimentalEndorsement(
4179
        customUpdateAdd record.CustomSet) fn.Option[byte] {
3✔
4180

3✔
4181
        // Only relay experimental signal if we are within the experiment
3✔
4182
        // period.
3✔
4183
        if !l.cfg.ShouldFwdExpEndorsement() {
6✔
4184
                return fn.None[byte]()
3✔
4185
        }
3✔
4186

4187
        // If we don't have any custom records or the experimental field is
4188
        // not set, just forward a zero value.
4189
        if len(customUpdateAdd) == 0 {
6✔
4190
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
4191
        }
3✔
4192

4193
        t := uint64(lnwire.ExperimentalEndorsementType)
3✔
4194
        value, set := customUpdateAdd[t]
3✔
4195
        if !set {
3✔
4196
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4197
        }
×
4198

4199
        // We expect at least one byte for this field, consider it invalid if
4200
        // it has no data and just forward a zero value.
4201
        if len(value) == 0 {
3✔
4202
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4203
        }
×
4204

4205
        // Only forward endorsed if the incoming link is endorsed.
4206
        if value[0] == lnwire.ExperimentalEndorsed {
6✔
4207
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
3✔
4208
        }
3✔
4209

4210
        // Forward as unendorsed otherwise, including cases where we've
4211
        // received an invalid value that uses more than 3 bits of information.
4212
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
4213
}
4214

4215
// processExitHop handles an htlc for which this link is the exit hop. It
4216
// returns a boolean indicating whether the commitment tx needs an update.
4217
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
4218
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
4219
        fwdInfo hop.ForwardingInfo, heightNow uint32,
4220
        payload invoices.Payload) error {
3✔
4221

3✔
4222
        // If hodl.ExitSettle is requested, we will not validate the final hop's
3✔
4223
        // ADD, nor will we settle the corresponding invoice or respond with the
3✔
4224
        // preimage.
3✔
4225
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
6✔
4226
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
3✔
4227
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
3✔
4228

3✔
4229
                return nil
3✔
4230
        }
3✔
4231

4232
        // In case the traffic shaper is active, we'll check if the HTLC has
4233
        // custom records and skip the amount check in the onion payload below.
4234
        isCustomHTLC := fn.MapOptionZ(
3✔
4235
                l.cfg.AuxTrafficShaper,
3✔
4236
                func(ts AuxTrafficShaper) bool {
3✔
4237
                        return ts.IsCustomHTLC(add.CustomRecords)
×
4238
                },
×
4239
        )
4240

4241
        // As we're the exit hop, we'll double check the hop-payload included in
4242
        // the HTLC to ensure that it was crafted correctly by the sender and
4243
        // is compatible with the HTLC we were extended. If an external
4244
        // validator is active we might bypass the amount check.
4245
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
3✔
UNCOV
4246
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
×
UNCOV
4247
                        "incompatible value: expected <=%v, got %v",
×
UNCOV
4248
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
×
UNCOV
4249

×
UNCOV
4250
                failure := NewLinkError(
×
UNCOV
4251
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
×
UNCOV
4252
                )
×
UNCOV
4253
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
×
UNCOV
4254

×
UNCOV
4255
                return nil
×
UNCOV
4256
        }
×
4257

4258
        // We'll also ensure that our time-lock value has been computed
4259
        // correctly.
4260
        if add.Expiry < fwdInfo.OutgoingCTLV {
3✔
UNCOV
4261
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
×
UNCOV
4262
                        "incompatible time-lock: expected <=%v, got %v",
×
UNCOV
4263
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
×
UNCOV
4264

×
UNCOV
4265
                failure := NewLinkError(
×
UNCOV
4266
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
×
UNCOV
4267
                )
×
UNCOV
4268

×
UNCOV
4269
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
×
UNCOV
4270

×
UNCOV
4271
                return nil
×
UNCOV
4272
        }
×
4273

4274
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4275
        // after this, this code will be re-executed after restart. We will
4276
        // receive back a resolution event.
4277
        invoiceHash := lntypes.Hash(add.PaymentHash)
3✔
4278

3✔
4279
        circuitKey := models.CircuitKey{
3✔
4280
                ChanID: l.ShortChanID(),
3✔
4281
                HtlcID: add.ID,
3✔
4282
        }
3✔
4283

3✔
4284
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
3✔
4285
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
3✔
4286
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
3✔
4287
        )
3✔
4288
        if err != nil {
3✔
4289
                return err
×
4290
        }
×
4291

4292
        // Create a hodlHtlc struct and decide either resolved now or later.
4293
        htlc := hodlHtlc{
3✔
4294
                add:        add,
3✔
4295
                sourceRef:  sourceRef,
3✔
4296
                obfuscator: obfuscator,
3✔
4297
        }
3✔
4298

3✔
4299
        // If the event is nil, the invoice is being held, so we save payment
3✔
4300
        // descriptor for future reference.
3✔
4301
        if event == nil {
6✔
4302
                l.hodlMap[circuitKey] = htlc
3✔
4303
                return nil
3✔
4304
        }
3✔
4305

4306
        // Process the received resolution.
4307
        return l.processHtlcResolution(event, htlc)
3✔
4308
}
4309

4310
// settleHTLC settles the HTLC on the channel.
4311
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4312
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
3✔
4313

3✔
4314
        hash := preimage.Hash()
3✔
4315

3✔
4316
        l.log.Infof("settling htlc %v as exit hop", hash)
3✔
4317

3✔
4318
        err := l.channel.SettleHTLC(
3✔
4319
                preimage, htlcIndex, &sourceRef, nil, nil,
3✔
4320
        )
3✔
4321
        if err != nil {
3✔
4322
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4323
        }
×
4324

4325
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4326
        // fake one before sending it to the peer.
4327
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
6✔
4328
                l.log.Warnf(hodl.BogusSettle.Warning())
3✔
4329
                preimage = [32]byte{}
3✔
4330
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3✔
4331
        }
3✔
4332

4333
        // HTLC was successfully settled locally send notification about it
4334
        // remote peer.
4335
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
3✔
4336
                ChanID:          l.ChanID(),
3✔
4337
                ID:              htlcIndex,
3✔
4338
                PaymentPreimage: preimage,
3✔
4339
        })
3✔
4340

3✔
4341
        // Once we have successfully settled the htlc, notify a settle event.
3✔
4342
        l.cfg.HtlcNotifier.NotifySettleEvent(
3✔
4343
                HtlcKey{
3✔
4344
                        IncomingCircuit: models.CircuitKey{
3✔
4345
                                ChanID: l.ShortChanID(),
3✔
4346
                                HtlcID: htlcIndex,
3✔
4347
                        },
3✔
4348
                },
3✔
4349
                preimage,
3✔
4350
                HtlcEventTypeReceive,
3✔
4351
        )
3✔
4352

3✔
4353
        return nil
3✔
4354
}
4355

4356
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4357
// err chan for the individual responses. This method is intended to be spawned
4358
// as a goroutine so the responses can be handled in the background.
4359
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
3✔
4360
        // Don't forward packets for which we already have a response in our
3✔
4361
        // mailbox. This could happen if a packet fails and is buffered in the
3✔
4362
        // mailbox, and the incoming link flaps.
3✔
4363
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
3✔
4364
        for _, pkt := range packets {
6✔
4365
                if l.mailBox.HasPacket(pkt.inKey()) {
6✔
4366
                        continue
3✔
4367
                }
4368

4369
                filteredPkts = append(filteredPkts, pkt)
3✔
4370
        }
4371

4372
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
3✔
4373
        if err != nil {
3✔
UNCOV
4374
                log.Errorf("Unhandled error while reforwarding htlc "+
×
UNCOV
4375
                        "settle/fail over htlcswitch: %v", err)
×
UNCOV
4376
        }
×
4377
}
4378

4379
// sendHTLCError functions cancels HTLC and send cancel message back to the
4380
// peer from which HTLC was received.
4381
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4382
        sourceRef channeldb.AddRef, failure *LinkError,
4383
        e hop.ErrorEncrypter, isReceive bool) {
3✔
4384

3✔
4385
        reason, err := e.EncryptFirstHop(failure.WireMessage())
3✔
4386
        if err != nil {
3✔
4387
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4388
                return
×
4389
        }
×
4390

4391
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
3✔
4392
        if err != nil {
3✔
4393
                l.log.Errorf("unable cancel htlc: %v", err)
×
4394
                return
×
4395
        }
×
4396

4397
        // Send the appropriate failure message depending on whether we're
4398
        // in a blinded route or not.
4399
        if err := l.sendIncomingHTLCFailureMsg(
3✔
4400
                add.ID, e, reason,
3✔
4401
        ); err != nil {
3✔
4402
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4403
                return
×
4404
        }
×
4405

4406
        // Notify a link failure on our incoming link. Outgoing htlc information
4407
        // is not available at this point, because we have not decrypted the
4408
        // onion, so it is excluded.
4409
        var eventType HtlcEventType
3✔
4410
        if isReceive {
6✔
4411
                eventType = HtlcEventTypeReceive
3✔
4412
        } else {
6✔
4413
                eventType = HtlcEventTypeForward
3✔
4414
        }
3✔
4415

4416
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
3✔
4417
                HtlcKey{
3✔
4418
                        IncomingCircuit: models.CircuitKey{
3✔
4419
                                ChanID: l.ShortChanID(),
3✔
4420
                                HtlcID: add.ID,
3✔
4421
                        },
3✔
4422
                },
3✔
4423
                HtlcInfo{
3✔
4424
                        IncomingTimeLock: add.Expiry,
3✔
4425
                        IncomingAmt:      add.Amount,
3✔
4426
                },
3✔
4427
                eventType,
3✔
4428
                failure,
3✔
4429
                true,
3✔
4430
        )
3✔
4431
}
4432

4433
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4434
// peer from which the HTLC was received. This function is primarily used to
4435
// handle the special requirements of route blinding, specifically:
4436
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4437
// - Introduction nodes should return regular HTLC failure messages.
4438
//
4439
// It accepts the original opaque failure, which will be used in the case
4440
// that we're not part of a blinded route and an error encrypter that'll be
4441
// used if we are the introduction node and need to present an error as if
4442
// we're the failing party.
4443
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4444
        e hop.ErrorEncrypter,
4445
        originalFailure lnwire.OpaqueReason) error {
3✔
4446

3✔
4447
        var msg lnwire.Message
3✔
4448
        switch {
3✔
4449
        // Our circuit's error encrypter will be nil if this was a locally
4450
        // initiated payment. We can only hit a blinded error for a locally
4451
        // initiated payment if we allow ourselves to be picked as the
4452
        // introduction node for our own payments and in that case we
4453
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4454
        // we fail it back and log an error.
4455
        // code.
4456
        case e == nil:
×
4457
                msg = &lnwire.UpdateFailHTLC{
×
4458
                        ChanID: l.ChanID(),
×
4459
                        ID:     htlcIndex,
×
4460
                        Reason: originalFailure,
×
4461
                }
×
4462

×
4463
                l.log.Errorf("Unexpected blinded failure when "+
×
4464
                        "we are the sending node, incoming htlc: %v(%v)",
×
4465
                        l.ShortChanID(), htlcIndex)
×
4466

4467
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4468
        // transformation on the error message and can just send the original.
4469
        case !e.Type().IsBlinded():
3✔
4470
                msg = &lnwire.UpdateFailHTLC{
3✔
4471
                        ChanID: l.ChanID(),
3✔
4472
                        ID:     htlcIndex,
3✔
4473
                        Reason: originalFailure,
3✔
4474
                }
3✔
4475

4476
        // When we're the introduction node, we need to convert the error to
4477
        // a UpdateFailHTLC.
4478
        case e.Type() == hop.EncrypterTypeIntroduction:
3✔
4479
                l.log.Debugf("Introduction blinded node switching out failure "+
3✔
4480
                        "error: %v", htlcIndex)
3✔
4481

3✔
4482
                // The specification does not require that we set the onion
3✔
4483
                // blob.
3✔
4484
                failureMsg := lnwire.NewInvalidBlinding(
3✔
4485
                        fn.None[[lnwire.OnionPacketSize]byte](),
3✔
4486
                )
3✔
4487
                reason, err := e.EncryptFirstHop(failureMsg)
3✔
4488
                if err != nil {
3✔
4489
                        return err
×
4490
                }
×
4491

4492
                msg = &lnwire.UpdateFailHTLC{
3✔
4493
                        ChanID: l.ChanID(),
3✔
4494
                        ID:     htlcIndex,
3✔
4495
                        Reason: reason,
3✔
4496
                }
3✔
4497

4498
        // If we are a relaying node, we need to switch out any error that
4499
        // we've received to a malformed HTLC error.
4500
        case e.Type() == hop.EncrypterTypeRelaying:
3✔
4501
                l.log.Debugf("Relaying blinded node switching out malformed "+
3✔
4502
                        "error: %v", htlcIndex)
3✔
4503

3✔
4504
                msg = &lnwire.UpdateFailMalformedHTLC{
3✔
4505
                        ChanID:      l.ChanID(),
3✔
4506
                        ID:          htlcIndex,
3✔
4507
                        FailureCode: lnwire.CodeInvalidBlinding,
3✔
4508
                }
3✔
4509

4510
        default:
×
4511
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4512
        }
4513

4514
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
4515
                l.log.Warnf("Send update fail failed: %v", err)
×
4516
        }
×
4517

4518
        return nil
3✔
4519
}
4520

4521
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4522
// to the payment sender.
4523
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4524
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4525
        sourceRef *channeldb.AddRef) {
3✔
4526

3✔
4527
        shaOnionBlob := sha256.Sum256(onionBlob[:])
3✔
4528
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
3✔
4529
        if err != nil {
3✔
4530
                l.log.Errorf("unable cancel htlc: %v", err)
×
4531
                return
×
4532
        }
×
4533

4534
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
3✔
4535
                ChanID:       l.ChanID(),
3✔
4536
                ID:           htlcIndex,
3✔
4537
                ShaOnionBlob: shaOnionBlob,
3✔
4538
                FailureCode:  code,
3✔
4539
        })
3✔
4540
}
4541

4542
// failf is a function which is used to encapsulate the action necessary for
4543
// properly failing the link. It takes a LinkFailureError, which will be passed
4544
// to the OnChannelFailure closure, in order for it to determine if we should
4545
// force close the channel, and if we should send an error message to the
4546
// remote peer.
4547
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4548
        a ...interface{}) {
3✔
4549

3✔
4550
        reason := fmt.Errorf(format, a...)
3✔
4551

3✔
4552
        // Return if we have already notified about a failure.
3✔
4553
        if l.failed {
6✔
4554
                l.log.Warnf("ignoring link failure (%v), as link already "+
3✔
4555
                        "failed", reason)
3✔
4556
                return
3✔
4557
        }
3✔
4558

4559
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
3✔
4560

3✔
4561
        // Set failed, such that we won't process any more updates, and notify
3✔
4562
        // the peer about the failure.
3✔
4563
        l.failed = true
3✔
4564
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
3✔
4565
}
4566

4567
// FundingCustomBlob returns the custom funding blob of the channel that this
4568
// link is associated with. The funding blob represents static information about
4569
// the channel that was created at channel funding time.
4570
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4571
        if l.channel == nil {
×
4572
                return fn.None[tlv.Blob]()
×
4573
        }
×
4574

4575
        if l.channel.State() == nil {
×
4576
                return fn.None[tlv.Blob]()
×
4577
        }
×
4578

4579
        return l.channel.State().CustomBlob
×
4580
}
4581

4582
// CommitmentCustomBlob returns the custom blob of the current local commitment
4583
// of the channel that this link is associated with.
4584
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4585
        if l.channel == nil {
×
4586
                return fn.None[tlv.Blob]()
×
4587
        }
×
4588

4589
        return l.channel.LocalCommitmentBlob()
×
4590
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc