• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13558005087

27 Feb 2025 03:04AM UTC coverage: 58.834% (-0.001%) from 58.835%
13558005087

Pull #8453

github

Roasbeef
lnwallet/chancloser: increase test coverage of state machine
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

1079 of 1370 new or added lines in 23 files covered. (78.76%)

578 existing lines in 40 files now uncovered.

137063 of 232965 relevant lines covered (58.83%)

19205.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "github.com/lightningnetwork/lnd/tlv"
36
)
37

38
func init() {
12✔
39
        prand.Seed(time.Now().UnixNano())
12✔
40
}
12✔
41

42
const (
43
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
44
        // the node accepts for forwarded payments. The value is relative to the
45
        // current block height. The reason to have a maximum is to prevent
46
        // funds getting locked up unreasonably long. Otherwise, an attacker
47
        // willing to lock its own funds too, could force the funds of this node
48
        // to be locked up for an indefinite (max int32) number of blocks.
49
        //
50
        // The value 2016 corresponds to on average two weeks worth of blocks
51
        // and is based on the maximum number of hops (20), the default CLTV
52
        // delta (40), and some extra margin to account for the other lightning
53
        // implementations and past lnd versions which used to have a default
54
        // CLTV delta of 144.
55
        DefaultMaxOutgoingCltvExpiry = 2016
56

57
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
58
        // which a link should propose to update its commitment fee rate.
59
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
60

61
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
62
        // which a link should propose to update its commitment fee rate.
63
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
64

65
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
66
        // a channel's commitment fee to be of its balance. This only applies to
67
        // the initiator of the channel.
68
        DefaultMaxLinkFeeAllocation float64 = 0.5
69
)
70

71
// ExpectedFee computes the expected fee for a given htlc amount. The value
72
// returned from this function is to be used as a sanity check when forwarding
73
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
74
// forwarding policy.
75
//
76
// TODO(roasbeef): also add in current available channel bandwidth, inverse
77
// func
78
func ExpectedFee(f models.ForwardingPolicy,
79
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
79✔
80

79✔
81
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
79✔
82
}
79✔
83

84
// ChannelLinkConfig defines the configuration for the channel link. ALL
85
// elements within the configuration MUST be non-nil for channel link to carry
86
// out its duties.
87
type ChannelLinkConfig struct {
88
        // FwrdingPolicy is the initial forwarding policy to be used when
89
        // deciding whether to forwarding incoming HTLC's or not. This value
90
        // can be updated with subsequent calls to UpdateForwardingPolicy
91
        // targeted at a given ChannelLink concrete interface implementation.
92
        FwrdingPolicy models.ForwardingPolicy
93

94
        // Circuits provides restricted access to the switch's circuit map,
95
        // allowing the link to open and close circuits.
96
        Circuits CircuitModifier
97

98
        // BestHeight returns the best known height.
99
        BestHeight func() uint32
100

101
        // ForwardPackets attempts to forward the batch of htlcs through the
102
        // switch. The function returns and error in case it fails to send one or
103
        // more packets. The link's quit signal should be provided to allow
104
        // cancellation of forwarding during link shutdown.
105
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
106

107
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
108
        // blobs, which are then used to inform how to forward an HTLC.
109
        //
110
        // NOTE: This function assumes the same set of readers and preimages
111
        // are always presented for the same identifier.
112
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
113
                []hop.DecodeHopIteratorResponse, error)
114

115
        // ExtractErrorEncrypter function is responsible for decoding HTLC
116
        // Sphinx onion blob, and creating onion failure obfuscator.
117
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
118

119
        // FetchLastChannelUpdate retrieves the latest routing policy for a
120
        // target channel. This channel will typically be the outgoing channel
121
        // specified when we receive an incoming HTLC.  This will be used to
122
        // provide payment senders our latest policy when sending encrypted
123
        // error messages.
124
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
125
                *lnwire.ChannelUpdate1, error)
126

127
        // Peer is a lightning network node with which we have the channel link
128
        // opened.
129
        Peer lnpeer.Peer
130

131
        // Registry is a sub-system which responsible for managing the invoices
132
        // in thread-safe manner.
133
        Registry InvoiceDatabase
134

135
        // PreimageCache is a global witness beacon that houses any new
136
        // preimages discovered by other links. We'll use this to add new
137
        // witnesses that we discover which will notify any sub-systems
138
        // subscribed to new events.
139
        PreimageCache contractcourt.WitnessBeacon
140

141
        // OnChannelFailure is a function closure that we'll call if the
142
        // channel failed for some reason. Depending on the severity of the
143
        // error, the closure potentially must force close this channel and
144
        // disconnect the peer.
145
        //
146
        // NOTE: The method must return in order for the ChannelLink to be able
147
        // to shut down properly.
148
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
149
                LinkFailureError)
150

151
        // UpdateContractSignals is a function closure that we'll use to update
152
        // outside sub-systems with this channel's latest ShortChannelID.
153
        UpdateContractSignals func(*contractcourt.ContractSignals) error
154

155
        // NotifyContractUpdate is a function closure that we'll use to update
156
        // the contractcourt and more specifically the ChannelArbitrator of the
157
        // latest channel state.
158
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
159

160
        // ChainEvents is an active subscription to the chain watcher for this
161
        // channel to be notified of any on-chain activity related to this
162
        // channel.
163
        ChainEvents *contractcourt.ChainEventSubscription
164

165
        // FeeEstimator is an instance of a live fee estimator which will be
166
        // used to dynamically regulate the current fee of the commitment
167
        // transaction to ensure timely confirmation.
168
        FeeEstimator chainfee.Estimator
169

170
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
171
        // for HTLC forwarding internal to the switch.
172
        //
173
        // NOTE: This should only be used for testing.
174
        HodlMask hodl.Mask
175

176
        // SyncStates is used to indicate that we need send the channel
177
        // reestablishment message to the remote peer. It should be done if our
178
        // clients have been restarted, or remote peer have been reconnected.
179
        SyncStates bool
180

181
        // BatchTicker is the ticker that determines the interval that we'll
182
        // use to check the batch to see if there're any updates we should
183
        // flush out. By batching updates into a single commit, we attempt to
184
        // increase throughput by maximizing the number of updates coalesced
185
        // into a single commit.
186
        BatchTicker ticker.Ticker
187

188
        // FwdPkgGCTicker is the ticker determining the frequency at which
189
        // garbage collection of forwarding packages occurs. We use a
190
        // time-based approach, as opposed to block epochs, as to not hinder
191
        // syncing.
192
        FwdPkgGCTicker ticker.Ticker
193

194
        // PendingCommitTicker is a ticker that allows the link to determine if
195
        // a locally initiated commitment dance gets stuck waiting for the
196
        // remote party to revoke.
197
        PendingCommitTicker ticker.Ticker
198

199
        // BatchSize is the max size of a batch of updates done to the link
200
        // before we do a state update.
201
        BatchSize uint32
202

203
        // UnsafeReplay will cause a link to replay the adds in its latest
204
        // commitment txn after the link is restarted. This should only be used
205
        // in testing, it is here to ensure the sphinx replay detection on the
206
        // receiving node is persistent.
207
        UnsafeReplay bool
208

209
        // MinUpdateTimeout represents the minimum interval in which a link
210
        // will propose to update its commitment fee rate. A random timeout will
211
        // be selected between this and MaxUpdateTimeout.
212
        MinUpdateTimeout time.Duration
213

214
        // MaxUpdateTimeout represents the maximum interval in which a link
215
        // will propose to update its commitment fee rate. A random timeout will
216
        // be selected between this and MinUpdateTimeout.
217
        MaxUpdateTimeout time.Duration
218

219
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
220
        // an htlc where we don't offer an htlc anymore. This should be at least
221
        // the outgoing broadcast delta, because in any case we don't want to
222
        // risk offering an htlc that triggers channel closure.
223
        OutgoingCltvRejectDelta uint32
224

225
        // TowerClient is an optional engine that manages the signing,
226
        // encrypting, and uploading of justice transactions to the daemon's
227
        // configured set of watchtowers for legacy channels.
228
        TowerClient TowerClient
229

230
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
231
        // should accept for a forwarded HTLC. The value is relative to the
232
        // current block height.
233
        MaxOutgoingCltvExpiry uint32
234

235
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
236
        // commitment fee to be of its balance. This only applies to the
237
        // initiator of the channel.
238
        MaxFeeAllocation float64
239

240
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
241
        // the initiator for channels of the anchor type.
242
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
243

244
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
245
        // link is first started.
246
        NotifyActiveLink func(wire.OutPoint)
247

248
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
249
        // channels becomes active.
250
        NotifyActiveChannel func(wire.OutPoint)
251

252
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
253
        // when channels become inactive.
254
        NotifyInactiveChannel func(wire.OutPoint)
255

256
        // NotifyInactiveLinkEvent allows the switch to tell the
257
        // ChannelNotifier when a channel link become inactive.
258
        NotifyInactiveLinkEvent func(wire.OutPoint)
259

260
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
261
        // events through.
262
        HtlcNotifier htlcNotifier
263

264
        // FailAliasUpdate is a function used to fail an HTLC for an
265
        // option_scid_alias channel.
266
        FailAliasUpdate func(sid lnwire.ShortChannelID,
267
                incoming bool) *lnwire.ChannelUpdate1
268

269
        // GetAliases is used by the link and switch to fetch the set of
270
        // aliases for a given link.
271
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
272

273
        // PreviouslySentShutdown is an optional value that is set if, at the
274
        // time of the link being started, persisted shutdown info was found for
275
        // the channel. This value being set means that we previously sent a
276
        // Shutdown message to our peer, and so we should do so again on
277
        // re-establish and should not allow anymore HTLC adds on the outgoing
278
        // direction of the link.
279
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
280

281
        // Adds the option to disable forwarding payments in blinded routes
282
        // by failing back any blinding-related payloads as if they were
283
        // invalid.
284
        DisallowRouteBlinding bool
285

286
        // DisallowQuiescence is a flag that can be used to disable the
287
        // quiescence protocol.
288
        DisallowQuiescence bool
289

290
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
291
        // restrict the flow of HTLCs and fee updates.
292
        MaxFeeExposure lnwire.MilliSatoshi
293

294
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
295
        // should forward experimental endorsement signals.
296
        ShouldFwdExpEndorsement func() bool
297

298
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
299
        // used to manage the bandwidth of the link.
300
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
301
}
302

303
// channelLink is the service which drives a channel's commitment update
304
// state-machine. In the event that an HTLC needs to be propagated to another
305
// link, the forward handler from config is used which sends HTLC to the
306
// switch. Additionally, the link encapsulate logic of commitment protocol
307
// message ordering and updates.
308
type channelLink struct {
309
        // The following fields are only meant to be used *atomically*
310
        started       int32
311
        reestablished int32
312
        shutdown      int32
313

314
        // failed should be set to true in case a link error happens, making
315
        // sure we don't process any more updates.
316
        failed bool
317

318
        // keystoneBatch represents a volatile list of keystones that must be
319
        // written before attempting to sign the next commitment txn. These
320
        // represent all the HTLC's forwarded to the link from the switch. Once
321
        // we lock them into our outgoing commitment, then the circuit has a
322
        // keystone, and is fully opened.
323
        keystoneBatch []Keystone
324

325
        // openedCircuits is the set of all payment circuits that will be open
326
        // once we make our next commitment. After making the commitment we'll
327
        // ACK all these from our mailbox to ensure that they don't get
328
        // re-delivered if we reconnect.
329
        openedCircuits []CircuitKey
330

331
        // closedCircuits is the set of all payment circuits that will be
332
        // closed once we make our next commitment. After taking the commitment
333
        // we'll ACK all these to ensure that they don't get re-delivered if we
334
        // reconnect.
335
        closedCircuits []CircuitKey
336

337
        // channel is a lightning network channel to which we apply htlc
338
        // updates.
339
        channel *lnwallet.LightningChannel
340

341
        // cfg is a structure which carries all dependable fields/handlers
342
        // which may affect behaviour of the service.
343
        cfg ChannelLinkConfig
344

345
        // mailBox is the main interface between the outside world and the
346
        // link. All incoming messages will be sent over this mailBox. Messages
347
        // include new updates from our connected peer, and new packets to be
348
        // forwarded sent by the switch.
349
        mailBox MailBox
350

351
        // upstream is a channel that new messages sent from the remote peer to
352
        // the local peer will be sent across.
353
        upstream chan lnwire.Message
354

355
        // downstream is a channel in which new multi-hop HTLC's to be
356
        // forwarded will be sent across. Messages from this channel are sent
357
        // by the HTLC switch.
358
        downstream chan *htlcPacket
359

360
        // updateFeeTimer is the timer responsible for updating the link's
361
        // commitment fee every time it fires.
362
        updateFeeTimer *time.Timer
363

364
        // uncommittedPreimages stores a list of all preimages that have been
365
        // learned since receiving the last CommitSig from the remote peer. The
366
        // batch will be flushed just before accepting the subsequent CommitSig
367
        // or on shutdown to avoid doing a write for each preimage received.
368
        uncommittedPreimages []lntypes.Preimage
369

370
        sync.RWMutex
371

372
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
373
        // registry.
374
        hodlQueue *queue.ConcurrentQueue
375

376
        // hodlMap stores related htlc data for a circuit key. It allows
377
        // resolving those htlcs when we receive a message on hodlQueue.
378
        hodlMap map[models.CircuitKey]hodlHtlc
379

380
        // log is a link-specific logging instance.
381
        log btclog.Logger
382

383
        // isOutgoingAddBlocked tracks whether the channelLink can send an
384
        // UpdateAddHTLC.
385
        isOutgoingAddBlocked atomic.Bool
386

387
        // isIncomingAddBlocked tracks whether the channelLink can receive an
388
        // UpdateAddHTLC.
389
        isIncomingAddBlocked atomic.Bool
390

391
        // flushHooks is a hookMap that is triggered when we reach a channel
392
        // state with no live HTLCs.
393
        flushHooks hookMap
394

395
        // outgoingCommitHooks is a hookMap that is triggered after we send our
396
        // next CommitSig.
397
        outgoingCommitHooks hookMap
398

399
        // incomingCommitHooks is a hookMap that is triggered after we receive
400
        // our next CommitSig.
401
        incomingCommitHooks hookMap
402

403
        // quiescer is the state machine that tracks where this channel is with
404
        // respect to the quiescence protocol.
405
        quiescer Quiescer
406

407
        // quiescenceReqs is a queue of requests to quiesce this link. The
408
        // members of the queue are send-only channels we should call back with
409
        // the result.
410
        quiescenceReqs chan StfuReq
411

412
        // cg is a helper that encapsulates a wait group and quit channel and
413
        // allows contexts that either block or cancel on those depending on
414
        // the use case.
415
        cg *fn.ContextGuard
416
}
417

418
// hookMap is a data structure that is used to track the hooks that need to be
419
// called in various parts of the channelLink's lifecycle.
420
//
421
// WARNING: NOT thread-safe.
422
type hookMap struct {
423
        // allocIdx keeps track of the next id we haven't yet allocated.
424
        allocIdx atomic.Uint64
425

426
        // transient is a map of hooks that are only called the next time invoke
427
        // is called. These hooks are deleted during invoke.
428
        transient map[uint64]func()
429

430
        // newTransients is a channel that we use to accept new hooks into the
431
        // hookMap.
432
        newTransients chan func()
433
}
434

435
// newHookMap initializes a new empty hookMap.
436
func newHookMap() hookMap {
646✔
437
        return hookMap{
646✔
438
                allocIdx:      atomic.Uint64{},
646✔
439
                transient:     make(map[uint64]func()),
646✔
440
                newTransients: make(chan func()),
646✔
441
        }
646✔
442
}
646✔
443

444
// alloc allocates space in the hook map for the supplied hook, the second
445
// argument determines whether it goes into the transient or persistent part
446
// of the hookMap.
447
func (m *hookMap) alloc(hook func()) uint64 {
3✔
448
        // We assume we never overflow a uint64. Seems OK.
3✔
449
        hookID := m.allocIdx.Add(1)
3✔
450
        if hookID == 0 {
3✔
451
                panic("hookMap allocIdx overflow")
×
452
        }
453
        m.transient[hookID] = hook
3✔
454

3✔
455
        return hookID
3✔
456
}
457

458
// invoke is used on a hook map to call all the registered hooks and then clear
459
// out the transient hooks so they are not called again.
460
func (m *hookMap) invoke() {
2,732✔
461
        for _, hook := range m.transient {
2,735✔
462
                hook()
3✔
463
        }
3✔
464

465
        m.transient = make(map[uint64]func())
2,732✔
466
}
467

468
// hodlHtlc contains htlc data that is required for resolution.
469
type hodlHtlc struct {
470
        add        lnwire.UpdateAddHTLC
471
        sourceRef  channeldb.AddRef
472
        obfuscator hop.ErrorEncrypter
473
}
474

475
// NewChannelLink creates a new instance of a ChannelLink given a configuration
476
// and active channel that will be used to verify/apply updates to.
477
func NewChannelLink(cfg ChannelLinkConfig,
478
        channel *lnwallet.LightningChannel) ChannelLink {
216✔
479

216✔
480
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
216✔
481

216✔
482
        // If the max fee exposure isn't set, use the default.
216✔
483
        if cfg.MaxFeeExposure == 0 {
431✔
484
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
485
        }
215✔
486

487
        var qsm Quiescer
216✔
488
        if !cfg.DisallowQuiescence {
432✔
489
                qsm = NewQuiescer(QuiescerCfg{
216✔
490
                        chanID: lnwire.NewChanIDFromOutPoint(
216✔
491
                                channel.ChannelPoint(),
216✔
492
                        ),
216✔
493
                        channelInitiator: channel.Initiator(),
216✔
494
                        sendMsg: func(s lnwire.Stfu) error {
219✔
495
                                return cfg.Peer.SendMessage(false, &s)
3✔
496
                        },
3✔
497
                        timeoutDuration: defaultQuiescenceTimeout,
498
                        onTimeout: func() {
×
499
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
×
500
                        },
×
501
                })
502
        } else {
×
503
                qsm = &quiescerNoop{}
×
504
        }
×
505

506
        quiescenceReqs := make(
216✔
507
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
216✔
508
        )
216✔
509

216✔
510
        return &channelLink{
216✔
511
                cfg:                 cfg,
216✔
512
                channel:             channel,
216✔
513
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
216✔
514
                hodlQueue:           queue.NewConcurrentQueue(10),
216✔
515
                log:                 log.WithPrefix(logPrefix),
216✔
516
                flushHooks:          newHookMap(),
216✔
517
                outgoingCommitHooks: newHookMap(),
216✔
518
                incomingCommitHooks: newHookMap(),
216✔
519
                quiescer:            qsm,
216✔
520
                quiescenceReqs:      quiescenceReqs,
216✔
521
                cg:                  fn.NewContextGuard(),
216✔
522
        }
216✔
523
}
524

525
// A compile time check to ensure channelLink implements the ChannelLink
526
// interface.
527
var _ ChannelLink = (*channelLink)(nil)
528

529
// Start starts all helper goroutines required for the operation of the channel
530
// link.
531
//
532
// NOTE: Part of the ChannelLink interface.
533
func (l *channelLink) Start() error {
214✔
534
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
214✔
535
                err := fmt.Errorf("channel link(%v): already started", l)
×
536
                l.log.Warn("already started")
×
537
                return err
×
538
        }
×
539

540
        l.log.Info("starting")
214✔
541

214✔
542
        // If the config supplied watchtower client, ensure the channel is
214✔
543
        // registered before trying to use it during operation.
214✔
544
        if l.cfg.TowerClient != nil {
215✔
545
                err := l.cfg.TowerClient.RegisterChannel(
1✔
546
                        l.ChanID(), l.channel.State().ChanType,
1✔
547
                )
1✔
548
                if err != nil {
1✔
549
                        return err
×
550
                }
×
551
        }
552

553
        l.mailBox.ResetMessages()
214✔
554
        l.hodlQueue.Start()
214✔
555

214✔
556
        // Before launching the htlcManager messages, revert any circuits that
214✔
557
        // were marked open in the switch's circuit map, but did not make it
214✔
558
        // into a commitment txn. We use the next local htlc index as the cut
214✔
559
        // off point, since all indexes below that are committed. This action
214✔
560
        // is only performed if the link's final short channel ID has been
214✔
561
        // assigned, otherwise we would try to trim the htlcs belonging to the
214✔
562
        // all-zero, hop.Source ID.
214✔
563
        if l.ShortChanID() != hop.Source {
428✔
564
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
214✔
565
                if err != nil {
214✔
566
                        return fmt.Errorf("unable to retrieve next local "+
×
567
                                "htlc index: %v", err)
×
568
                }
×
569

570
                // NOTE: This is automatically done by the switch when it
571
                // starts up, but is necessary to prevent inconsistencies in
572
                // the case that the link flaps. This is a result of a link's
573
                // life-cycle being shorter than that of the switch.
574
                chanID := l.ShortChanID()
214✔
575
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
214✔
576
                if err != nil {
214✔
577
                        return fmt.Errorf("unable to trim circuits above "+
×
578
                                "local htlc index %d: %v", localHtlcIndex, err)
×
579
                }
×
580

581
                // Since the link is live, before we start the link we'll update
582
                // the ChainArbitrator with the set of new channel signals for
583
                // this channel.
584
                //
585
                // TODO(roasbeef): split goroutines within channel arb to avoid
586
                go func() {
428✔
587
                        signals := &contractcourt.ContractSignals{
214✔
588
                                ShortChanID: l.channel.ShortChanID(),
214✔
589
                        }
214✔
590

214✔
591
                        err := l.cfg.UpdateContractSignals(signals)
214✔
592
                        if err != nil {
214✔
593
                                l.log.Errorf("unable to update signals")
×
594
                        }
×
595
                }()
596
        }
597

598
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
214✔
599

214✔
600
        l.cg.WgAdd(1)
214✔
601
        go l.htlcManager(context.TODO())
214✔
602

214✔
603
        return nil
214✔
604
}
605

606
// Stop gracefully stops all active helper goroutines, then waits until they've
607
// exited.
608
//
609
// NOTE: Part of the ChannelLink interface.
610
func (l *channelLink) Stop() {
215✔
611
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
227✔
612
                l.log.Warn("already stopped")
12✔
613
                return
12✔
614
        }
12✔
615

616
        l.log.Info("stopping")
203✔
617

203✔
618
        // As the link is stopping, we are no longer interested in htlc
203✔
619
        // resolutions coming from the invoice registry.
203✔
620
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
203✔
621

203✔
622
        if l.cfg.ChainEvents.Cancel != nil {
204✔
623
                l.cfg.ChainEvents.Cancel()
1✔
624
        }
1✔
625

626
        // Ensure the channel for the timer is drained.
627
        if l.updateFeeTimer != nil {
406✔
628
                if !l.updateFeeTimer.Stop() {
203✔
629
                        select {
×
630
                        case <-l.updateFeeTimer.C:
×
631
                        default:
×
632
                        }
633
                }
634
        }
635

636
        if l.hodlQueue != nil {
406✔
637
                l.hodlQueue.Stop()
203✔
638
        }
203✔
639

640
        l.cg.Quit()
203✔
641
        l.cg.WgWait()
203✔
642

203✔
643
        // Now that the htlcManager has completely exited, reset the packet
203✔
644
        // courier. This allows the mailbox to revaluate any lingering Adds that
203✔
645
        // were delivered but didn't make it on a commitment to be failed back
203✔
646
        // if the link is offline for an extended period of time. The error is
203✔
647
        // ignored since it can only fail when the daemon is exiting.
203✔
648
        _ = l.mailBox.ResetPackets()
203✔
649

203✔
650
        // As a final precaution, we will attempt to flush any uncommitted
203✔
651
        // preimages to the preimage cache. The preimages should be re-delivered
203✔
652
        // after channel reestablishment, however this adds an extra layer of
203✔
653
        // protection in case the peer never returns. Without this, we will be
203✔
654
        // unable to settle any contracts depending on the preimages even though
203✔
655
        // we had learned them at some point.
203✔
656
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
203✔
657
        if err != nil {
203✔
658
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
659
                        l.uncommittedPreimages, err)
×
660
        }
×
661
}
662

663
// WaitForShutdown blocks until the link finishes shutting down, which includes
664
// termination of all dependent goroutines.
665
func (l *channelLink) WaitForShutdown() {
×
666
        l.cg.WgWait()
×
667
}
×
668

669
// EligibleToForward returns a bool indicating if the channel is able to
670
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
671
// we are eligible to update AND the channel isn't currently flushing the
672
// outgoing half of the channel.
673
//
674
// NOTE: MUST NOT be called from the main event loop.
675
func (l *channelLink) EligibleToForward() bool {
614✔
676
        l.RLock()
614✔
677
        defer l.RUnlock()
614✔
678

614✔
679
        return l.eligibleToForward()
614✔
680
}
614✔
681

682
// eligibleToForward returns a bool indicating if the channel is able to
683
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
684
// we are eligible to update AND the channel isn't currently flushing the
685
// outgoing half of the channel.
686
//
687
// NOTE: MUST be called from the main event loop.
688
func (l *channelLink) eligibleToForward() bool {
614✔
689
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
614✔
690
}
614✔
691

692
// eligibleToUpdate returns a bool indicating if the channel is able to update
693
// channel state. We're able to update channel state if we know the remote
694
// party's next revocation point. Otherwise, we can't initiate new channel
695
// state. We also require that the short channel ID not be the all-zero source
696
// ID, meaning that the channel has had its ID finalized.
697
//
698
// NOTE: MUST be called from the main event loop.
699
func (l *channelLink) eligibleToUpdate() bool {
617✔
700
        return l.channel.RemoteNextRevocation() != nil &&
617✔
701
                l.channel.ShortChanID() != hop.Source &&
617✔
702
                l.isReestablished() &&
617✔
703
                l.quiescer.CanSendUpdates()
617✔
704
}
617✔
705

706
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
707
// the specified direction. It returns true if the state was changed and false
708
// if the desired state was already set before the method was called.
709
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
14✔
710
        if linkDirection == Outgoing {
23✔
711
                return l.isOutgoingAddBlocked.Swap(false)
9✔
712
        }
9✔
713

714
        return l.isIncomingAddBlocked.Swap(false)
5✔
715
}
716

717
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
718
// the specified direction. It returns true if the state was changed and false
719
// if the desired state was already set before the method was called.
720
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
16✔
721
        if linkDirection == Outgoing {
23✔
722
                return !l.isOutgoingAddBlocked.Swap(true)
7✔
723
        }
7✔
724

725
        return !l.isIncomingAddBlocked.Swap(true)
10✔
726
}
727

728
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
729
// the argument.
730
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,592✔
731
        if linkDirection == Outgoing {
2,710✔
732
                return l.isOutgoingAddBlocked.Load()
1,118✔
733
        }
1,118✔
734

735
        return l.isIncomingAddBlocked.Load()
475✔
736
}
737

738
// OnFlushedOnce adds a hook that will be called the next time the channel
739
// state reaches zero htlcs. This hook will only ever be called once. If the
740
// channel state already has zero htlcs, then this will be called immediately.
741
func (l *channelLink) OnFlushedOnce(hook func()) {
2✔
742
        select {
2✔
743
        case l.flushHooks.newTransients <- hook:
2✔
744
        case <-l.cg.Done():
×
745
        }
746
}
747

748
// OnCommitOnce adds a hook that will be called the next time a CommitSig
749
// message is sent in the argument's LinkDirection. This hook will only ever be
750
// called once. If no CommitSig is owed in the argument's LinkDirection, then
751
// we will call this hook be run immediately.
752
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
2✔
753
        var queue chan func()
2✔
754

2✔
755
        if direction == Outgoing {
4✔
756
                queue = l.outgoingCommitHooks.newTransients
2✔
757
        } else {
2✔
758
                queue = l.incomingCommitHooks.newTransients
×
759
        }
×
760

761
        select {
2✔
762
        case queue <- hook:
2✔
763
        case <-l.cg.Done():
×
764
        }
765
}
766

767
// InitStfu allows us to initiate quiescence on this link. It returns a receive
768
// only channel that will block until quiescence has been achieved, or
769
// definitively fails.
770
//
771
// This operation has been added to allow channels to be quiesced via RPC. It
772
// may be removed or reworked in the future as RPC initiated quiescence is a
773
// holdover until we have downstream protocols that use it.
774
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
2✔
775
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
2✔
776
                fn.Unit{},
2✔
777
        )
2✔
778

2✔
779
        select {
2✔
780
        case l.quiescenceReqs <- req:
2✔
781
        case <-l.cg.Done():
×
782
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
783
        }
784

785
        return out
2✔
786
}
787

788
// isReestablished returns true if the link has successfully completed the
789
// channel reestablishment dance.
790
func (l *channelLink) isReestablished() bool {
617✔
791
        return atomic.LoadInt32(&l.reestablished) == 1
617✔
792
}
617✔
793

794
// markReestablished signals that the remote peer has successfully exchanged
795
// channel reestablish messages and that the channel is ready to process
796
// subsequent messages.
797
func (l *channelLink) markReestablished() {
214✔
798
        atomic.StoreInt32(&l.reestablished, 1)
214✔
799
}
214✔
800

801
// IsUnadvertised returns true if the underlying channel is unadvertised.
802
func (l *channelLink) IsUnadvertised() bool {
3✔
803
        state := l.channel.State()
3✔
804
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
3✔
805
}
3✔
806

807
// sampleNetworkFee samples the current fee rate on the network to get into the
808
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
809
// this is the native rate used when computing the fee for commitment
810
// transactions, and the second-level HTLC transactions.
811
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
812
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
813
        // blocks.
4✔
814
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
815
        if err != nil {
4✔
816
                return 0, err
×
817
        }
×
818

819
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
820
                int64(feePerKw))
4✔
821

4✔
822
        return feePerKw, nil
4✔
823
}
824

825
// shouldAdjustCommitFee returns true if we should update our commitment fee to
826
// match that of the network fee. We'll only update our commitment fee if the
827
// network fee is +/- 10% to our commitment fee or if our current commitment
828
// fee is below the minimum relay fee.
829
func shouldAdjustCommitFee(netFee, chanFee,
830
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
831

14✔
832
        switch {
14✔
833
        // If the network fee is greater than our current commitment fee and
834
        // our current commitment fee is below the minimum relay fee then
835
        // we should switch to it no matter if it is less than a 10% increase.
836
        case netFee > chanFee && chanFee < minRelayFee:
1✔
837
                return true
1✔
838

839
        // If the network fee is greater than the commitment fee, then we'll
840
        // switch to it if it's at least 10% greater than the commit fee.
841
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
842
                return true
4✔
843

844
        // If the network fee is less than our commitment fee, then we'll
845
        // switch to it if it's at least 10% less than the commitment fee.
846
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
847
                return true
2✔
848

849
        // Otherwise, we won't modify our fee.
850
        default:
7✔
851
                return false
7✔
852
        }
853
}
854

855
// failCb is used to cut down on the argument verbosity.
856
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
857

858
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
859
// outgoing HTLC. It may return a FailureMessage that references a channel's
860
// alias. If the channel does not have an alias, then the regular channel
861
// update from disk will be returned.
862
func (l *channelLink) createFailureWithUpdate(incoming bool,
863
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
23✔
864

23✔
865
        // Determine which SCID to use in case we need to use aliases in the
23✔
866
        // ChannelUpdate.
23✔
867
        scid := outgoingScid
23✔
868
        if incoming {
23✔
869
                scid = l.ShortChanID()
×
870
        }
×
871

872
        // Try using the FailAliasUpdate function. If it returns nil, fallback
873
        // to the non-alias behavior.
874
        update := l.cfg.FailAliasUpdate(scid, incoming)
23✔
875
        if update == nil {
40✔
876
                // Fallback to the non-alias behavior.
17✔
877
                var err error
17✔
878
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
17✔
879
                if err != nil {
17✔
880
                        return &lnwire.FailTemporaryNodeFailure{}
×
881
                }
×
882
        }
883

884
        return cb(update)
23✔
885
}
886

887
// syncChanState attempts to synchronize channel states with the remote party.
888
// This method is to be called upon reconnection after the initial funding
889
// flow. We'll compare out commitment chains with the remote party, and re-send
890
// either a danging commit signature, a revocation, or both.
891
func (l *channelLink) syncChanStates(ctx context.Context) error {
171✔
892
        chanState := l.channel.State()
171✔
893

171✔
894
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
171✔
895

171✔
896
        // First, we'll generate our ChanSync message to send to the other
171✔
897
        // side. Based on this message, the remote party will decide if they
171✔
898
        // need to retransmit any data or not.
171✔
899
        localChanSyncMsg, err := chanState.ChanSyncMsg()
171✔
900
        if err != nil {
171✔
901
                return fmt.Errorf("unable to generate chan sync message for "+
×
902
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
903
        }
×
904
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
171✔
905
                return fmt.Errorf("unable to send chan sync message for "+
×
906
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
907
        }
×
908

909
        var msgsToReSend []lnwire.Message
171✔
910

171✔
911
        // Next, we'll wait indefinitely to receive the ChanSync message. The
171✔
912
        // first message sent MUST be the ChanSync message.
171✔
913
        select {
171✔
914
        case msg := <-l.upstream:
171✔
915
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
171✔
916
                        l.cfg.Peer.PubKey())
171✔
917

171✔
918
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
171✔
919
                if !ok {
171✔
920
                        return fmt.Errorf("first message sent to sync "+
×
921
                                "should be ChannelReestablish, instead "+
×
922
                                "received: %T", msg)
×
923
                }
×
924

925
                // If the remote party indicates that they think we haven't
926
                // done any state updates yet, then we'll retransmit the
927
                // channel_ready message first. We do this, as at this point
928
                // we can't be sure if they've really received the
929
                // ChannelReady message.
930
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
171✔
931
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
171✔
932
                        !l.channel.IsPending() {
336✔
933

165✔
934
                        l.log.Infof("resending ChannelReady message to peer")
165✔
935

165✔
936
                        nextRevocation, err := l.channel.NextRevocationKey()
165✔
937
                        if err != nil {
165✔
938
                                return fmt.Errorf("unable to create next "+
×
939
                                        "revocation: %v", err)
×
940
                        }
×
941

942
                        channelReadyMsg := lnwire.NewChannelReady(
165✔
943
                                l.ChanID(), nextRevocation,
165✔
944
                        )
165✔
945

165✔
946
                        // If this is a taproot channel, then we'll send the
165✔
947
                        // very same nonce that we sent above, as they should
165✔
948
                        // take the latest verification nonce we send.
165✔
949
                        if chanState.ChanType.IsTaproot() {
166✔
950
                                //nolint:ll
1✔
951
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
1✔
952
                        }
1✔
953

954
                        // For channels that negotiated the option-scid-alias
955
                        // feature bit, ensure that we send over the alias in
956
                        // the channel_ready message. We'll send the first
957
                        // alias we find for the channel since it does not
958
                        // matter which alias we send. We'll error out if no
959
                        // aliases are found.
960
                        if l.negotiatedAliasFeature() {
166✔
961
                                aliases := l.getAliases()
1✔
962
                                if len(aliases) == 0 {
1✔
963
                                        // This shouldn't happen since we
×
964
                                        // always add at least one alias before
×
965
                                        // the channel reaches the link.
×
966
                                        return fmt.Errorf("no aliases found")
×
967
                                }
×
968

969
                                // getAliases returns a copy of the alias slice
970
                                // so it is ok to use a pointer to the first
971
                                // entry.
972
                                channelReadyMsg.AliasScid = &aliases[0]
1✔
973
                        }
974

975
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
165✔
976
                        if err != nil {
165✔
977
                                return fmt.Errorf("unable to re-send "+
×
978
                                        "ChannelReady: %v", err)
×
979
                        }
×
980
                }
981

982
                // In any case, we'll then process their ChanSync message.
983
                l.log.Info("received re-establishment message from remote side")
171✔
984

171✔
985
                var (
171✔
986
                        openedCircuits []CircuitKey
171✔
987
                        closedCircuits []CircuitKey
171✔
988
                )
171✔
989

171✔
990
                // We've just received a ChanSync message from the remote
171✔
991
                // party, so we'll process the message  in order to determine
171✔
992
                // if we need to re-transmit any messages to the remote party.
171✔
993
                ctx, cancel := l.cg.Create(ctx)
171✔
994
                defer cancel()
171✔
995
                msgsToReSend, openedCircuits, closedCircuits, err =
171✔
996
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
171✔
997
                if err != nil {
172✔
998
                        return err
1✔
999
                }
1✔
1000

1001
                // Repopulate any identifiers for circuits that may have been
1002
                // opened or unclosed. This may happen if we needed to
1003
                // retransmit a commitment signature message.
1004
                l.openedCircuits = openedCircuits
171✔
1005
                l.closedCircuits = closedCircuits
171✔
1006

171✔
1007
                // Ensure that all packets have been have been removed from the
171✔
1008
                // link's mailbox.
171✔
1009
                if err := l.ackDownStreamPackets(); err != nil {
171✔
1010
                        return err
×
1011
                }
×
1012

1013
                if len(msgsToReSend) > 0 {
176✔
1014
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1015
                                "state", len(msgsToReSend))
5✔
1016
                }
5✔
1017

1018
                // If we have any messages to retransmit, we'll do so
1019
                // immediately so we return to a synchronized state as soon as
1020
                // possible.
1021
                for _, msg := range msgsToReSend {
182✔
1022
                        l.cfg.Peer.SendMessage(false, msg)
11✔
1023
                }
11✔
1024

1025
        case <-l.cg.Done():
1✔
1026
                return ErrLinkShuttingDown
1✔
1027
        }
1028

1029
        return nil
171✔
1030
}
1031

1032
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1033
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1034
// we previously received are reinstated in memory, and forwarded to the switch
1035
// if necessary. After a restart, this will also delete any previously
1036
// completed packages.
1037
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
214✔
1038
        fwdPkgs, err := l.channel.LoadFwdPkgs()
214✔
1039
        if err != nil {
214✔
UNCOV
1040
                return err
×
UNCOV
1041
        }
×
1042

1043
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
214✔
1044

214✔
1045
        for _, fwdPkg := range fwdPkgs {
221✔
1046
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
7✔
UNCOV
1047
                        return err
×
UNCOV
1048
                }
×
1049
        }
1050

1051
        // If any of our reprocessing steps require an update to the commitment
1052
        // txn, we initiate a state transition to capture all relevant changes.
1053
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
215✔
1054
                return l.updateCommitTx(ctx)
1✔
1055
        }
1✔
1056

1057
        return nil
214✔
1058
}
1059

1060
// resolveFwdPkg interprets the FwdState of the provided package, either
1061
// reprocesses any outstanding htlcs in the package, or performs garbage
1062
// collection on the package.
1063
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
7✔
1064
        // Remove any completed packages to clear up space.
7✔
1065
        if fwdPkg.State == channeldb.FwdStateCompleted {
9✔
1066
                l.log.Debugf("removing completed fwd pkg for height=%d",
2✔
1067
                        fwdPkg.Height)
2✔
1068

2✔
1069
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
2✔
1070
                if err != nil {
2✔
UNCOV
1071
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
UNCOV
1072
                                "%v", fwdPkg.Height, err)
×
UNCOV
1073
                        return err
×
UNCOV
1074
                }
×
1075
        }
1076

1077
        // Otherwise this is either a new package or one has gone through
1078
        // processing, but contains htlcs that need to be restored in memory.
1079
        // We replay this forwarding package to make sure our local mem state
1080
        // is resurrected, we mimic any original responses back to the remote
1081
        // party, and re-forward the relevant HTLCs to the switch.
1082

1083
        // If the package is fully acked but not completed, it must still have
1084
        // settles and fails to propagate.
1085
        if !fwdPkg.SettleFailFilter.IsFull() {
8✔
1086
                l.processRemoteSettleFails(fwdPkg)
1✔
1087
        }
1✔
1088

1089
        // Finally, replay *ALL ADDS* in this forwarding package. The
1090
        // downstream logic is able to filter out any duplicates, but we must
1091
        // shove the entire, original set of adds down the pipeline so that the
1092
        // batch of adds presented to the sphinx router does not ever change.
1093
        if !fwdPkg.AckFilter.IsFull() {
11✔
1094
                l.processRemoteAdds(fwdPkg)
4✔
1095

4✔
1096
                // If the link failed during processing the adds, we must
4✔
1097
                // return to ensure we won't attempted to update the state
4✔
1098
                // further.
4✔
1099
                if l.failed {
4✔
1100
                        return fmt.Errorf("link failed while " +
×
1101
                                "processing remote adds")
×
1102
                }
×
1103
        }
1104

1105
        return nil
7✔
1106
}
1107

1108
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1109
// removes those that can be discarded. It is safe to do this entirely in the
1110
// background, since all state is coordinated on disk. This also ensures the
1111
// link can continue to process messages and interleave database accesses.
1112
//
1113
// NOTE: This MUST be run as a goroutine.
1114
func (l *channelLink) fwdPkgGarbager() {
214✔
1115
        defer l.cg.WgDone()
214✔
1116

214✔
1117
        l.cfg.FwdPkgGCTicker.Resume()
214✔
1118
        defer l.cfg.FwdPkgGCTicker.Stop()
214✔
1119

214✔
1120
        if err := l.loadAndRemove(); err != nil {
215✔
1121
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
1✔
1122
        }
1✔
1123

1124
        for {
444✔
1125
                select {
230✔
1126
                case <-l.cfg.FwdPkgGCTicker.Ticks():
16✔
1127
                        if err := l.loadAndRemove(); err != nil {
32✔
1128
                                l.log.Warnf("unable to remove fwd pkgs: %v",
16✔
1129
                                        err)
16✔
1130
                                continue
16✔
1131
                        }
1132
                case <-l.cg.Done():
203✔
1133
                        return
203✔
1134
                }
1135
        }
1136
}
1137

1138
// loadAndRemove loads all the channels forwarding packages and determines if
1139
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1140
// a longer tick interval can be used.
1141
func (l *channelLink) loadAndRemove() error {
230✔
1142
        fwdPkgs, err := l.channel.LoadFwdPkgs()
230✔
1143
        if err != nil {
247✔
1144
                return err
17✔
1145
        }
17✔
1146

1147
        var removeHeights []uint64
213✔
1148
        for _, fwdPkg := range fwdPkgs {
219✔
1149
                if fwdPkg.State != channeldb.FwdStateCompleted {
12✔
1150
                        continue
6✔
1151
                }
1152

1153
                removeHeights = append(removeHeights, fwdPkg.Height)
1✔
1154
        }
1155

1156
        // If removeHeights is empty, return early so we don't use a db
1157
        // transaction.
1158
        if len(removeHeights) == 0 {
426✔
1159
                return nil
213✔
1160
        }
213✔
1161

1162
        return l.channel.RemoveFwdPkgs(removeHeights...)
1✔
1163
}
1164

1165
// handleChanSyncErr performs the error handling logic in the case where we
1166
// could not successfully syncChanStates with our channel peer.
1167
func (l *channelLink) handleChanSyncErr(err error) {
1✔
1168
        l.log.Warnf("error when syncing channel states: %v", err)
1✔
1169

1✔
1170
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
1✔
1171

1✔
1172
        switch {
1✔
1173
        case errors.Is(err, ErrLinkShuttingDown):
1✔
1174
                l.log.Debugf("unable to sync channel states, link is " +
1✔
1175
                        "shutting down")
1✔
1176
                return
1✔
1177

1178
        // We failed syncing the commit chains, probably because the remote has
1179
        // lost state. We should force close the channel.
1180
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
1✔
1181
                fallthrough
1✔
1182

1183
        // The remote sent us an invalid last commit secret, we should force
1184
        // close the channel.
1185
        // TODO(halseth): and permanently ban the peer?
1186
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
1✔
1187
                fallthrough
1✔
1188

1189
        // The remote sent us a commit point different from what they sent us
1190
        // before.
1191
        // TODO(halseth): ban peer?
1192
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
1✔
1193
                // We'll fail the link and tell the peer to force close the
1✔
1194
                // channel. Note that the database state is not updated here,
1✔
1195
                // but will be updated when the close transaction is ready to
1✔
1196
                // avoid that we go down before storing the transaction in the
1✔
1197
                // db.
1✔
1198
                l.failf(
1✔
1199
                        LinkFailureError{
1✔
1200
                                code:          ErrSyncError,
1✔
1201
                                FailureAction: LinkFailureForceClose,
1✔
1202
                        },
1✔
1203
                        "unable to synchronize channel states: %v", err,
1✔
1204
                )
1✔
1205

1206
        // We have lost state and cannot safely force close the channel. Fail
1207
        // the channel and wait for the remote to hopefully force close it. The
1208
        // remote has sent us its latest unrevoked commitment point, and we'll
1209
        // store it in the database, such that we can attempt to recover the
1210
        // funds if the remote force closes the channel.
1211
        case errors.As(err, &errDataLoss):
1✔
1212
                err := l.channel.MarkDataLoss(
1✔
1213
                        errDataLoss.CommitPoint,
1✔
1214
                )
1✔
1215
                if err != nil {
1✔
1216
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1217
                                err)
×
1218
                }
×
1219

1220
        // We determined the commit chains were not possible to sync. We
1221
        // cautiously fail the channel, but don't force close.
1222
        // TODO(halseth): can we safely force close in any cases where this
1223
        // error is returned?
1224
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1225
                if err := l.channel.MarkBorked(); err != nil {
×
1226
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1227
                }
×
1228

1229
        // Other, unspecified error.
1230
        default:
×
1231
        }
1232

1233
        l.failf(
1✔
1234
                LinkFailureError{
1✔
1235
                        code:          ErrRecoveryError,
1✔
1236
                        FailureAction: LinkFailureForceNone,
1✔
1237
                },
1✔
1238
                "unable to synchronize channel states: %v", err,
1✔
1239
        )
1✔
1240
}
1241

1242
// htlcManager is the primary goroutine which drives a channel's commitment
1243
// update state-machine in response to messages received via several channels.
1244
// This goroutine reads messages from the upstream (remote) peer, and also from
1245
// downstream channel managed by the channel link. In the event that an htlc
1246
// needs to be forwarded, then send-only forward handler is used which sends
1247
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1248
// all timeouts for any active HTLCs, manages the channel's revocation window,
1249
// and also the htlc trickle queue+timer for this active channels.
1250
//
1251
// NOTE: This MUST be run as a goroutine.
1252
//
1253
//nolint:funlen
1254
func (l *channelLink) htlcManager(ctx context.Context) {
214✔
1255
        defer func() {
418✔
1256
                l.cfg.BatchTicker.Stop()
204✔
1257
                l.cg.WgDone()
204✔
1258
                l.log.Infof("exited")
204✔
1259
        }()
204✔
1260

1261
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
214✔
1262

214✔
1263
        // Notify any clients that the link is now in the switch via an
214✔
1264
        // ActiveLinkEvent. We'll also defer an inactive link notification for
214✔
1265
        // when the link exits to ensure that every active notification is
214✔
1266
        // matched by an inactive one.
214✔
1267
        l.cfg.NotifyActiveLink(l.ChannelPoint())
214✔
1268
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
214✔
1269

214✔
1270
        // TODO(roasbeef): need to call wipe chan whenever D/C?
214✔
1271

214✔
1272
        // If this isn't the first time that this channel link has been
214✔
1273
        // created, then we'll need to check to see if we need to
214✔
1274
        // re-synchronize state with the remote peer. settledHtlcs is a map of
214✔
1275
        // HTLC's that we re-settled as part of the channel state sync.
214✔
1276
        if l.cfg.SyncStates {
385✔
1277
                err := l.syncChanStates(ctx)
171✔
1278
                if err != nil {
172✔
1279
                        l.handleChanSyncErr(err)
1✔
1280
                        return
1✔
1281
                }
1✔
1282
        }
1283

1284
        // If a shutdown message has previously been sent on this link, then we
1285
        // need to make sure that we have disabled any HTLC adds on the outgoing
1286
        // direction of the link and that we re-resend the same shutdown message
1287
        // that we previously sent.
1288
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
215✔
1289
                // Immediately disallow any new outgoing HTLCs.
1✔
1290
                if !l.DisableAdds(Outgoing) {
1✔
1291
                        l.log.Warnf("Outgoing link adds already disabled")
×
1292
                }
×
1293

1294
                // Re-send the shutdown message the peer. Since syncChanStates
1295
                // would have sent any outstanding CommitSig, it is fine for us
1296
                // to immediately queue the shutdown message now.
1297
                err := l.cfg.Peer.SendMessage(false, &shutdown)
1✔
1298
                if err != nil {
1✔
1299
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1300
                }
×
1301
        })
1302

1303
        // We've successfully reestablished the channel, mark it as such to
1304
        // allow the switch to forward HTLCs in the outbound direction.
1305
        l.markReestablished()
214✔
1306

214✔
1307
        // Now that we've received both channel_ready and channel reestablish,
214✔
1308
        // we can go ahead and send the active channel notification. We'll also
214✔
1309
        // defer the inactive notification for when the link exits to ensure
214✔
1310
        // that every active notification is matched by an inactive one.
214✔
1311
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
214✔
1312
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
214✔
1313

214✔
1314
        // With the channel states synced, we now reset the mailbox to ensure
214✔
1315
        // we start processing all unacked packets in order. This is done here
214✔
1316
        // to ensure that all acknowledgments that occur during channel
214✔
1317
        // resynchronization have taken affect, causing us only to pull unacked
214✔
1318
        // packets after starting to read from the downstream mailbox.
214✔
1319
        l.mailBox.ResetPackets()
214✔
1320

214✔
1321
        // After cleaning up any memory pertaining to incoming packets, we now
214✔
1322
        // replay our forwarding packages to handle any htlcs that can be
214✔
1323
        // processed locally, or need to be forwarded out to the switch. We will
214✔
1324
        // only attempt to resolve packages if our short chan id indicates that
214✔
1325
        // the channel is not pending, otherwise we should have no htlcs to
214✔
1326
        // reforward.
214✔
1327
        if l.ShortChanID() != hop.Source {
428✔
1328
                err := l.resolveFwdPkgs(ctx)
214✔
1329
                switch err {
214✔
1330
                // No error was encountered, success.
1331
                case nil:
214✔
1332

1333
                // If the duplicate keystone error was encountered, we'll fail
1334
                // without sending an Error message to the peer.
1335
                case ErrDuplicateKeystone:
×
1336
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1337
                                "temporary circuit error: %v", err)
×
1338
                        return
×
1339

1340
                // A non-nil error was encountered, send an Error message to
1341
                // the peer.
UNCOV
1342
                default:
×
UNCOV
1343
                        l.failf(LinkFailureError{code: ErrInternalError},
×
UNCOV
1344
                                "unable to resolve fwd pkgs: %v", err)
×
UNCOV
1345
                        return
×
1346
                }
1347

1348
                // With our link's in-memory state fully reconstructed, spawn a
1349
                // goroutine to manage the reclamation of disk space occupied by
1350
                // completed forwarding packages.
1351
                l.cg.WgAdd(1)
214✔
1352
                go l.fwdPkgGarbager()
214✔
1353
        }
1354

1355
        for {
4,397✔
1356
                // We must always check if we failed at some point processing
4,183✔
1357
                // the last update before processing the next.
4,183✔
1358
                if l.failed {
4,197✔
1359
                        l.log.Errorf("link failed, exiting htlcManager")
14✔
1360
                        return
14✔
1361
                }
14✔
1362

1363
                // If the previous event resulted in a non-empty batch, resume
1364
                // the batch ticker so that it can be cleared. Otherwise pause
1365
                // the ticker to prevent waking up the htlcManager while the
1366
                // batch is empty.
1367
                numUpdates := l.channel.NumPendingUpdates(
4,170✔
1368
                        lntypes.Local, lntypes.Remote,
4,170✔
1369
                )
4,170✔
1370
                if numUpdates > 0 {
4,675✔
1371
                        l.cfg.BatchTicker.Resume()
505✔
1372
                        l.log.Tracef("BatchTicker resumed, "+
505✔
1373
                                "NumPendingUpdates(Local, Remote)=%d",
505✔
1374
                                numUpdates,
505✔
1375
                        )
505✔
1376
                } else {
4,171✔
1377
                        l.cfg.BatchTicker.Pause()
3,666✔
1378
                        l.log.Trace("BatchTicker paused due to zero " +
3,666✔
1379
                                "NumPendingUpdates(Local, Remote)")
3,666✔
1380
                }
3,666✔
1381

1382
                select {
4,170✔
1383
                // We have a new hook that needs to be run when we reach a clean
1384
                // channel state.
1385
                case hook := <-l.flushHooks.newTransients:
2✔
1386
                        if l.channel.IsChannelClean() {
3✔
1387
                                hook()
1✔
1388
                        } else {
3✔
1389
                                l.flushHooks.alloc(hook)
2✔
1390
                        }
2✔
1391

1392
                // We have a new hook that needs to be run when we have
1393
                // committed all of our updates.
1394
                case hook := <-l.outgoingCommitHooks.newTransients:
2✔
1395
                        if !l.channel.OweCommitment() {
3✔
1396
                                hook()
1✔
1397
                        } else {
2✔
1398
                                l.outgoingCommitHooks.alloc(hook)
1✔
1399
                        }
1✔
1400

1401
                // We have a new hook that needs to be run when our peer has
1402
                // committed all of their updates.
1403
                case hook := <-l.incomingCommitHooks.newTransients:
×
1404
                        if !l.channel.NeedCommitment() {
×
1405
                                hook()
×
1406
                        } else {
×
1407
                                l.incomingCommitHooks.alloc(hook)
×
1408
                        }
×
1409

1410
                // Our update fee timer has fired, so we'll check the network
1411
                // fee to see if we should adjust our commitment fee.
1412
                case <-l.updateFeeTimer.C:
4✔
1413
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1414

4✔
1415
                        // If we're not the initiator of the channel, don't we
4✔
1416
                        // don't control the fees, so we can ignore this.
4✔
1417
                        if !l.channel.IsInitiator() {
4✔
1418
                                continue
×
1419
                        }
1420

1421
                        // If we are the initiator, then we'll sample the
1422
                        // current fee rate to get into the chain within 3
1423
                        // blocks.
1424
                        netFee, err := l.sampleNetworkFee()
4✔
1425
                        if err != nil {
4✔
1426
                                l.log.Errorf("unable to sample network fee: %v",
×
1427
                                        err)
×
1428
                                continue
×
1429
                        }
1430

1431
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1432

4✔
1433
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1434
                                netFee, minRelayFee,
4✔
1435
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1436
                                l.cfg.MaxFeeAllocation,
4✔
1437
                        )
4✔
1438

4✔
1439
                        // We determine if we should adjust the commitment fee
4✔
1440
                        // based on the current commitment fee, the suggested
4✔
1441
                        // new commitment fee and the current minimum relay fee
4✔
1442
                        // rate.
4✔
1443
                        commitFee := l.channel.CommitFeeRate()
4✔
1444
                        if !shouldAdjustCommitFee(
4✔
1445
                                newCommitFee, commitFee, minRelayFee,
4✔
1446
                        ) {
5✔
1447

1✔
1448
                                continue
1✔
1449
                        }
1450

1451
                        // If we do, then we'll send a new UpdateFee message to
1452
                        // the remote party, to be locked in with a new update.
1453
                        err = l.updateChannelFee(ctx, newCommitFee)
3✔
1454
                        if err != nil {
3✔
1455
                                l.log.Errorf("unable to update fee rate: %v",
×
1456
                                        err)
×
1457
                                continue
×
1458
                        }
1459

1460
                // The underlying channel has notified us of a unilateral close
1461
                // carried out by the remote peer. In the case of such an
1462
                // event, we'll wipe the channel state from the peer, and mark
1463
                // the contract as fully settled. Afterwards we can exit.
1464
                //
1465
                // TODO(roasbeef): add force closure? also breach?
1466
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
1✔
1467
                        l.log.Warnf("remote peer has closed on-chain")
1✔
1468

1✔
1469
                        // TODO(roasbeef): remove all together
1✔
1470
                        go func() {
2✔
1471
                                chanPoint := l.channel.ChannelPoint()
1✔
1472
                                l.cfg.Peer.WipeChannel(&chanPoint)
1✔
1473
                        }()
1✔
1474

1475
                        return
1✔
1476

1477
                case <-l.cfg.BatchTicker.Ticks():
196✔
1478
                        // Attempt to extend the remote commitment chain
196✔
1479
                        // including all the currently pending entries. If the
196✔
1480
                        // send was unsuccessful, then abandon the update,
196✔
1481
                        // waiting for the revocation window to open up.
196✔
1482
                        if !l.updateCommitTxOrFail(ctx) {
196✔
1483
                                return
×
1484
                        }
×
1485

1486
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1487
                        l.failf(
1✔
1488
                                LinkFailureError{
1✔
1489
                                        code:          ErrRemoteUnresponsive,
1✔
1490
                                        FailureAction: LinkFailureDisconnect,
1✔
1491
                                },
1✔
1492
                                "unable to complete dance",
1✔
1493
                        )
1✔
1494
                        return
1✔
1495

1496
                // A message from the switch was just received. This indicates
1497
                // that the link is an intermediate hop in a multi-hop HTLC
1498
                // circuit.
1499
                case pkt := <-l.downstream:
522✔
1500
                        l.handleDownstreamPkt(ctx, pkt)
522✔
1501

1502
                // A message from the connected peer was just received. This
1503
                // indicates that we have a new incoming HTLC, either directly
1504
                // for us, or part of a multi-hop HTLC circuit.
1505
                case msg := <-l.upstream:
3,192✔
1506
                        l.handleUpstreamMsg(ctx, msg)
3,192✔
1507

1508
                // A htlc resolution is received. This means that we now have a
1509
                // resolution for a previously accepted htlc.
1510
                case hodlItem := <-l.hodlQueue.ChanOut():
56✔
1511
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
56✔
1512
                        err := l.processHodlQueue(ctx, htlcResolution)
56✔
1513
                        switch err {
56✔
1514
                        // No error, success.
1515
                        case nil:
55✔
1516

1517
                        // If the duplicate keystone error was encountered,
1518
                        // fail back gracefully.
1519
                        case ErrDuplicateKeystone:
×
1520
                                l.failf(LinkFailureError{
×
1521
                                        code: ErrCircuitError,
×
1522
                                }, "process hodl queue: "+
×
1523
                                        "temporary circuit error: %v",
×
1524
                                        err,
×
1525
                                )
×
1526

1527
                        // Send an Error message to the peer.
1528
                        default:
1✔
1529
                                l.failf(LinkFailureError{
1✔
1530
                                        code: ErrInternalError,
1✔
1531
                                }, "process hodl queue: unable to update "+
1✔
1532
                                        "commitment: %v", err,
1✔
1533
                                )
1✔
1534
                        }
1535

1536
                case qReq := <-l.quiescenceReqs:
2✔
1537
                        l.quiescer.InitStfu(qReq)
2✔
1538

2✔
1539
                        if l.noDanglingUpdates(lntypes.Local) {
4✔
1540
                                err := l.quiescer.SendOwedStfu()
2✔
1541
                                if err != nil {
2✔
1542
                                        l.stfuFailf(
×
1543
                                                "SendOwedStfu: %s", err.Error(),
×
1544
                                        )
×
1545
                                        res := fn.Err[lntypes.ChannelParty](err)
×
1546
                                        qReq.Resolve(res)
×
1547
                                }
×
1548
                        }
1549

1550
                case <-l.cg.Done():
190✔
1551
                        return
190✔
1552
                }
1553
        }
1554
}
1555

1556
// processHodlQueue processes a received htlc resolution and continues reading
1557
// from the hodl queue until no more resolutions remain. When this function
1558
// returns without an error, the commit tx should be updated.
1559
func (l *channelLink) processHodlQueue(ctx context.Context,
1560
        firstResolution invoices.HtlcResolution) error {
56✔
1561

56✔
1562
        // Try to read all waiting resolution messages, so that they can all be
56✔
1563
        // processed in a single commitment tx update.
56✔
1564
        htlcResolution := firstResolution
56✔
1565
loop:
56✔
1566
        for {
112✔
1567
                // Lookup all hodl htlcs that can be failed or settled with this event.
56✔
1568
                // The hodl htlc must be present in the map.
56✔
1569
                circuitKey := htlcResolution.CircuitKey()
56✔
1570
                hodlHtlc, ok := l.hodlMap[circuitKey]
56✔
1571
                if !ok {
56✔
1572
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1573
                }
×
1574

1575
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
56✔
1576
                        return err
×
1577
                }
×
1578

1579
                // Clean up hodl map.
1580
                delete(l.hodlMap, circuitKey)
56✔
1581

56✔
1582
                select {
56✔
1583
                case item := <-l.hodlQueue.ChanOut():
1✔
1584
                        htlcResolution = item.(invoices.HtlcResolution)
1✔
1585
                default:
56✔
1586
                        break loop
56✔
1587
                }
1588
        }
1589

1590
        // Update the commitment tx.
1591
        if err := l.updateCommitTx(ctx); err != nil {
57✔
1592
                return err
1✔
1593
        }
1✔
1594

1595
        return nil
55✔
1596
}
1597

1598
// processHtlcResolution applies a received htlc resolution to the provided
1599
// htlc. When this function returns without an error, the commit tx should be
1600
// updated.
1601
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1602
        htlc hodlHtlc) error {
202✔
1603

202✔
1604
        circuitKey := resolution.CircuitKey()
202✔
1605

202✔
1606
        // Determine required action for the resolution based on the type of
202✔
1607
        // resolution we have received.
202✔
1608
        switch res := resolution.(type) {
202✔
1609
        // Settle htlcs that returned a settle resolution using the preimage
1610
        // in the resolution.
1611
        case *invoices.HtlcSettleResolution:
198✔
1612
                l.log.Debugf("received settle resolution for %v "+
198✔
1613
                        "with outcome: %v", circuitKey, res.Outcome)
198✔
1614

198✔
1615
                return l.settleHTLC(
198✔
1616
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
198✔
1617
                )
198✔
1618

1619
        // For htlc failures, we get the relevant failure message based
1620
        // on the failure resolution and then fail the htlc.
1621
        case *invoices.HtlcFailResolution:
5✔
1622
                l.log.Debugf("received cancel resolution for "+
5✔
1623
                        "%v with outcome: %v", circuitKey, res.Outcome)
5✔
1624

5✔
1625
                // Get the lnwire failure message based on the resolution
5✔
1626
                // result.
5✔
1627
                failure := getResolutionFailure(res, htlc.add.Amount)
5✔
1628

5✔
1629
                l.sendHTLCError(
5✔
1630
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
5✔
1631
                        true,
5✔
1632
                )
5✔
1633
                return nil
5✔
1634

1635
        // Fail if we do not get a settle of fail resolution, since we
1636
        // are only expecting to handle settles and fails.
1637
        default:
×
1638
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1639
                        resolution)
×
1640
        }
1641
}
1642

1643
// getResolutionFailure returns the wire message that a htlc resolution should
1644
// be failed with.
1645
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1646
        amount lnwire.MilliSatoshi) *LinkError {
5✔
1647

5✔
1648
        // If the resolution has been resolved as part of a MPP timeout,
5✔
1649
        // we need to fail the htlc with lnwire.FailMppTimeout.
5✔
1650
        if resolution.Outcome == invoices.ResultMppTimeout {
5✔
1651
                return NewDetailedLinkError(
×
1652
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1653
                )
×
1654
        }
×
1655

1656
        // If the htlc is not a MPP timeout, we fail it with
1657
        // FailIncorrectDetails. This error is sent for invoice payment
1658
        // failures such as underpayment/ expiry too soon and hodl invoices
1659
        // (which return FailIncorrectDetails to avoid leaking information).
1660
        incorrectDetails := lnwire.NewFailIncorrectDetails(
5✔
1661
                amount, uint32(resolution.AcceptHeight),
5✔
1662
        )
5✔
1663

5✔
1664
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
5✔
1665
}
1666

1667
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1668
// within the link's configuration that will be used to determine when the link
1669
// should propose an update to its commitment fee rate.
1670
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
218✔
1671
        lower := int64(l.cfg.MinUpdateTimeout)
218✔
1672
        upper := int64(l.cfg.MaxUpdateTimeout)
218✔
1673
        return time.Duration(prand.Int63n(upper-lower) + lower)
218✔
1674
}
218✔
1675

1676
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1677
// downstream HTLC Switch.
1678
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1679
        pkt *htlcPacket) error {
481✔
1680

481✔
1681
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
481✔
1682
        if !ok {
481✔
1683
                return errors.New("not an UpdateAddHTLC packet")
×
1684
        }
×
1685

1686
        // If we are flushing the link in the outgoing direction or we have
1687
        // already sent Stfu, then we can't add new htlcs to the link and we
1688
        // need to bounce it.
1689
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
481✔
1690
                l.mailBox.FailAdd(pkt)
×
1691

×
1692
                return NewDetailedLinkError(
×
1693
                        &lnwire.FailTemporaryChannelFailure{},
×
1694
                        OutgoingFailureLinkNotEligible,
×
1695
                )
×
1696
        }
×
1697

1698
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1699
        // arbitrary delays between the switch adding an ADD to the
1700
        // mailbox, and the HTLC being added to the commitment state.
1701
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
481✔
1702
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1703
                l.mailBox.AckPacket(pkt.inKey())
×
1704
                return nil
×
1705
        }
×
1706

1707
        // Check if we can add the HTLC here without exceededing the max fee
1708
        // exposure threshold.
1709
        if l.isOverexposedWithHtlc(htlc, false) {
485✔
1710
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1711
                        "exposure exceeded")
4✔
1712

4✔
1713
                l.mailBox.FailAdd(pkt)
4✔
1714

4✔
1715
                return NewDetailedLinkError(
4✔
1716
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1717
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1718
                )
4✔
1719
        }
4✔
1720

1721
        // A new payment has been initiated via the downstream channel,
1722
        // so we add the new HTLC to our local log, then update the
1723
        // commitment chains.
1724
        htlc.ChanID = l.ChanID()
477✔
1725
        openCircuitRef := pkt.inKey()
477✔
1726

477✔
1727
        // We enforce the fee buffer for the commitment transaction because
477✔
1728
        // we are in control of adding this htlc. Nothing has locked-in yet so
477✔
1729
        // we can securely enforce the fee buffer which is only relevant if we
477✔
1730
        // are the initiator of the channel.
477✔
1731
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
477✔
1732
        if err != nil {
479✔
1733
                // The HTLC was unable to be added to the state machine,
2✔
1734
                // as a result, we'll signal the switch to cancel the
2✔
1735
                // pending payment.
2✔
1736
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
2✔
1737
                        err)
2✔
1738

2✔
1739
                // Remove this packet from the link's mailbox, this
2✔
1740
                // prevents it from being reprocessed if the link
2✔
1741
                // restarts and resets it mailbox. If this response
2✔
1742
                // doesn't make it back to the originating link, it will
2✔
1743
                // be rejected upon attempting to reforward the Add to
2✔
1744
                // the switch, since the circuit was never fully opened,
2✔
1745
                // and the forwarding package shows it as
2✔
1746
                // unacknowledged.
2✔
1747
                l.mailBox.FailAdd(pkt)
2✔
1748

2✔
1749
                return NewDetailedLinkError(
2✔
1750
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1751
                        OutgoingFailureDownstreamHtlcAdd,
2✔
1752
                )
2✔
1753
        }
2✔
1754

1755
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
476✔
1756
                "local_log_index=%v, pend_updates=%v",
476✔
1757
                htlc.PaymentHash[:], index,
476✔
1758
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
476✔
1759

476✔
1760
        pkt.outgoingChanID = l.ShortChanID()
476✔
1761
        pkt.outgoingHTLCID = index
476✔
1762
        htlc.ID = index
476✔
1763

476✔
1764
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
476✔
1765
                pkt.inKey(), pkt.outKey())
476✔
1766

476✔
1767
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
476✔
1768
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
476✔
1769

476✔
1770
        _ = l.cfg.Peer.SendMessage(false, htlc)
476✔
1771

476✔
1772
        // Send a forward event notification to htlcNotifier.
476✔
1773
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
476✔
1774
                newHtlcKey(pkt),
476✔
1775
                HtlcInfo{
476✔
1776
                        IncomingTimeLock: pkt.incomingTimeout,
476✔
1777
                        IncomingAmt:      pkt.incomingAmount,
476✔
1778
                        OutgoingTimeLock: htlc.Expiry,
476✔
1779
                        OutgoingAmt:      htlc.Amount,
476✔
1780
                },
476✔
1781
                getEventType(pkt),
476✔
1782
        )
476✔
1783

476✔
1784
        l.tryBatchUpdateCommitTx(ctx)
476✔
1785

476✔
1786
        return nil
476✔
1787
}
1788

1789
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1790
// Switch. Possible messages sent by the switch include requests to forward new
1791
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1792
// cleared HTLCs with the upstream peer.
1793
//
1794
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1795
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1796
        pkt *htlcPacket) {
522✔
1797

522✔
1798
        if pkt.htlc.MsgType().IsChannelUpdate() &&
522✔
1799
                !l.quiescer.CanSendUpdates() {
522✔
1800

×
1801
                l.log.Warnf("unable to process channel update. "+
×
1802
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1803

×
1804
                return
×
1805
        }
×
1806

1807
        switch htlc := pkt.htlc.(type) {
522✔
1808
        case *lnwire.UpdateAddHTLC:
481✔
1809
                // Handle add message. The returned error can be ignored,
481✔
1810
                // because it is also sent through the mailbox.
481✔
1811
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
481✔
1812

1813
        case *lnwire.UpdateFulfillHTLC:
24✔
1814
                // If hodl.SettleOutgoing mode is active, we exit early to
24✔
1815
                // simulate arbitrary delays between the switch adding the
24✔
1816
                // SETTLE to the mailbox, and the HTLC being added to the
24✔
1817
                // commitment state.
24✔
1818
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
24✔
1819
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1820
                        l.mailBox.AckPacket(pkt.inKey())
×
1821
                        return
×
1822
                }
×
1823

1824
                // An HTLC we forward to the switch has just settled somewhere
1825
                // upstream. Therefore we settle the HTLC within the our local
1826
                // state machine.
1827
                inKey := pkt.inKey()
24✔
1828
                err := l.channel.SettleHTLC(
24✔
1829
                        htlc.PaymentPreimage,
24✔
1830
                        pkt.incomingHTLCID,
24✔
1831
                        pkt.sourceRef,
24✔
1832
                        pkt.destRef,
24✔
1833
                        &inKey,
24✔
1834
                )
24✔
1835
                if err != nil {
24✔
1836
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1837
                                "circuit-key=%v: %v", inKey, err)
×
1838

×
1839
                        // If the HTLC index for Settle response was not known
×
1840
                        // to our commitment state, it has already been
×
1841
                        // cleaned up by a prior response. We'll thus try to
×
1842
                        // clean up any lingering state to ensure we don't
×
1843
                        // continue reforwarding.
×
1844
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1845
                                l.cleanupSpuriousResponse(pkt)
×
1846
                        }
×
1847

1848
                        // Remove the packet from the link's mailbox to ensure
1849
                        // it doesn't get replayed after a reconnection.
1850
                        l.mailBox.AckPacket(inKey)
×
1851

×
1852
                        return
×
1853
                }
1854

1855
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
24✔
1856
                        "%s->%s", pkt.inKey(), pkt.outKey())
24✔
1857

24✔
1858
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
24✔
1859

24✔
1860
                // With the HTLC settled, we'll need to populate the wire
24✔
1861
                // message to target the specific channel and HTLC to be
24✔
1862
                // canceled.
24✔
1863
                htlc.ChanID = l.ChanID()
24✔
1864
                htlc.ID = pkt.incomingHTLCID
24✔
1865

24✔
1866
                // Then we send the HTLC settle message to the connected peer
24✔
1867
                // so we can continue the propagation of the settle message.
24✔
1868
                l.cfg.Peer.SendMessage(false, htlc)
24✔
1869

24✔
1870
                // Send a settle event notification to htlcNotifier.
24✔
1871
                l.cfg.HtlcNotifier.NotifySettleEvent(
24✔
1872
                        newHtlcKey(pkt),
24✔
1873
                        htlc.PaymentPreimage,
24✔
1874
                        getEventType(pkt),
24✔
1875
                )
24✔
1876

24✔
1877
                // Immediately update the commitment tx to minimize latency.
24✔
1878
                l.updateCommitTxOrFail(ctx)
24✔
1879

1880
        case *lnwire.UpdateFailHTLC:
19✔
1881
                // If hodl.FailOutgoing mode is active, we exit early to
19✔
1882
                // simulate arbitrary delays between the switch adding a FAIL to
19✔
1883
                // the mailbox, and the HTLC being added to the commitment
19✔
1884
                // state.
19✔
1885
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
19✔
1886
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1887
                        l.mailBox.AckPacket(pkt.inKey())
×
1888
                        return
×
1889
                }
×
1890

1891
                // An HTLC cancellation has been triggered somewhere upstream,
1892
                // we'll remove then HTLC from our local state machine.
1893
                inKey := pkt.inKey()
19✔
1894
                err := l.channel.FailHTLC(
19✔
1895
                        pkt.incomingHTLCID,
19✔
1896
                        htlc.Reason,
19✔
1897
                        pkt.sourceRef,
19✔
1898
                        pkt.destRef,
19✔
1899
                        &inKey,
19✔
1900
                )
19✔
1901
                if err != nil {
22✔
1902
                        l.log.Errorf("unable to cancel incoming HTLC for "+
3✔
1903
                                "circuit-key=%v: %v", inKey, err)
3✔
1904

3✔
1905
                        // If the HTLC index for Fail response was not known to
3✔
1906
                        // our commitment state, it has already been cleaned up
3✔
1907
                        // by a prior response. We'll thus try to clean up any
3✔
1908
                        // lingering state to ensure we don't continue
3✔
1909
                        // reforwarding.
3✔
1910
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
5✔
1911
                                l.cleanupSpuriousResponse(pkt)
2✔
1912
                        }
2✔
1913

1914
                        // Remove the packet from the link's mailbox to ensure
1915
                        // it doesn't get replayed after a reconnection.
1916
                        l.mailBox.AckPacket(inKey)
3✔
1917

3✔
1918
                        return
3✔
1919
                }
1920

1921
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
17✔
1922
                        pkt.inKey(), pkt.outKey())
17✔
1923

17✔
1924
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
17✔
1925

17✔
1926
                // With the HTLC removed, we'll need to populate the wire
17✔
1927
                // message to target the specific channel and HTLC to be
17✔
1928
                // canceled. The "Reason" field will have already been set
17✔
1929
                // within the switch.
17✔
1930
                htlc.ChanID = l.ChanID()
17✔
1931
                htlc.ID = pkt.incomingHTLCID
17✔
1932

17✔
1933
                // We send the HTLC message to the peer which initially created
17✔
1934
                // the HTLC. If the incoming blinding point is non-nil, we
17✔
1935
                // know that we are a relaying node in a blinded path.
17✔
1936
                // Otherwise, we're either an introduction node or not part of
17✔
1937
                // a blinded path at all.
17✔
1938
                if err := l.sendIncomingHTLCFailureMsg(
17✔
1939
                        htlc.ID,
17✔
1940
                        pkt.obfuscator,
17✔
1941
                        htlc.Reason,
17✔
1942
                ); err != nil {
17✔
1943
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1944
                                err)
×
1945

×
1946
                        return
×
1947
                }
×
1948

1949
                // If the packet does not have a link failure set, it failed
1950
                // further down the route so we notify a forwarding failure.
1951
                // Otherwise, we notify a link failure because it failed at our
1952
                // node.
1953
                if pkt.linkFailure != nil {
28✔
1954
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
11✔
1955
                                newHtlcKey(pkt),
11✔
1956
                                newHtlcInfo(pkt),
11✔
1957
                                getEventType(pkt),
11✔
1958
                                pkt.linkFailure,
11✔
1959
                                false,
11✔
1960
                        )
11✔
1961
                } else {
18✔
1962
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
7✔
1963
                                newHtlcKey(pkt), getEventType(pkt),
7✔
1964
                        )
7✔
1965
                }
7✔
1966

1967
                // Immediately update the commitment tx to minimize latency.
1968
                l.updateCommitTxOrFail(ctx)
17✔
1969
        }
1970
}
1971

1972
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1973
// full.
1974
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
476✔
1975
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
476✔
1976
        if pending < uint64(l.cfg.BatchSize) {
928✔
1977
                return
452✔
1978
        }
452✔
1979

1980
        l.updateCommitTxOrFail(ctx)
25✔
1981
}
1982

1983
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1984
// associated with this packet. If successful in doing so, it will also purge
1985
// the open circuit from the circuit map and remove the packet from the link's
1986
// mailbox.
1987
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1988
        inKey := pkt.inKey()
2✔
1989

2✔
1990
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1991
                "circuit-key=%v", inKey)
2✔
1992

2✔
1993
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1994
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1995
        if pkt.sourceRef == nil {
3✔
1996
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1997
                        "circuit-key=%v, does not contain source reference",
1✔
1998
                        inKey)
1✔
1999
                return
1✔
2000
        }
1✔
2001

2002
        // If the source reference is present,  we will try to prevent this link
2003
        // from resending the packet to the switch. To do so, we ack the AddRef
2004
        // of the incoming HTLC belonging to this link.
2005
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
2006
        if err != nil {
1✔
2007
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2008
                        "circuit-key=%v: %v", inKey, err)
×
2009

×
2010
                // If this operation failed, it is unsafe to attempt removal of
×
2011
                // the destination reference or circuit, so we exit early. The
×
2012
                // cleanup may proceed with a different packet in the future
×
2013
                // that succeeds on this step.
×
2014
                return
×
2015
        }
×
2016

2017
        // Now that we know this link will stop retransmitting Adds to the
2018
        // switch, we can begin to teardown the response reference and circuit
2019
        // map.
2020
        //
2021
        // If the packet includes a destination reference, then a response for
2022
        // this HTLC was locked into the outgoing channel. Attempt to remove
2023
        // this reference, so we stop retransmitting the response internally.
2024
        // Even if this fails, we will proceed in trying to delete the circuit.
2025
        // When retransmitting responses, the destination references will be
2026
        // cleaned up if an open circuit is not found in the circuit map.
2027
        if pkt.destRef != nil {
1✔
2028
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2029
                if err != nil {
×
2030
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2031
                                "for incoming circuit-key=%v: %v",
×
2032
                                inKey, err)
×
2033
                }
×
2034
        }
2035

2036
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
2037

1✔
2038
        // With all known references acked, we can now safely delete the circuit
1✔
2039
        // from the switch's circuit map, as the state is no longer needed.
1✔
2040
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
2041
        if err != nil {
1✔
2042
                l.log.Errorf("unable to delete circuit for "+
×
2043
                        "circuit-key=%v: %v", inKey, err)
×
2044
        }
×
2045
}
2046

2047
// handleUpstreamMsg processes wire messages related to commitment state
2048
// updates from the upstream peer. The upstream peer is the peer whom we have a
2049
// direct channel with, updating our respective commitment chains.
2050
//
2051
//nolint:funlen
2052
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
2053
        msg lnwire.Message) {
3,192✔
2054

3,192✔
2055
        // First check if the message is an update and we are capable of
3,192✔
2056
        // receiving updates right now.
3,192✔
2057
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,192✔
2058
                l.stfuFailf("update received after stfu: %T", msg)
×
2059
                return
×
2060
        }
×
2061

2062
        switch msg := msg.(type) {
3,192✔
2063
        case *lnwire.UpdateAddHTLC:
451✔
2064
                if l.IsFlushing(Incoming) {
451✔
2065
                        // This is forbidden by the protocol specification.
×
2066
                        // The best chance we have to deal with this is to drop
×
2067
                        // the connection. This should roll back the channel
×
2068
                        // state to the last CommitSig. If the remote has
×
2069
                        // already sent a CommitSig we haven't received yet,
×
2070
                        // channel state will be re-synchronized with a
×
2071
                        // ChannelReestablish message upon reconnection and the
×
2072
                        // protocol state that caused us to flush the link will
×
2073
                        // be rolled back. In the event that there was some
×
2074
                        // non-deterministic behavior in the remote that caused
×
2075
                        // them to violate the protocol, we have a decent shot
×
2076
                        // at correcting it this way, since reconnecting will
×
2077
                        // put us in the cleanest possible state to try again.
×
2078
                        //
×
2079
                        // In addition to the above, it is possible for us to
×
2080
                        // hit this case in situations where we improperly
×
2081
                        // handle message ordering due to concurrency choices.
×
2082
                        // An issue has been filed to address this here:
×
2083
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2084
                        l.failf(
×
2085
                                LinkFailureError{
×
2086
                                        code:             ErrInvalidUpdate,
×
2087
                                        FailureAction:    LinkFailureDisconnect,
×
2088
                                        PermanentFailure: false,
×
2089
                                        Warning:          true,
×
2090
                                },
×
2091
                                "received add while link is flushing",
×
2092
                        )
×
2093

×
2094
                        return
×
2095
                }
×
2096

2097
                // Disallow htlcs with blinding points set if we haven't
2098
                // enabled the feature. This saves us from having to process
2099
                // the onion at all, but will only catch blinded payments
2100
                // where we are a relaying node (as the blinding point will
2101
                // be in the payload when we're the introduction node).
2102
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
451✔
2103
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2104
                                "blinding point included when route blinding "+
×
2105
                                        "is disabled")
×
2106

×
2107
                        return
×
2108
                }
×
2109

2110
                // We have to check the limit here rather than later in the
2111
                // switch because the counterparty can keep sending HTLC's
2112
                // without sending a revoke. This would mean that the switch
2113
                // check would only occur later.
2114
                if l.isOverexposedWithHtlc(msg, true) {
451✔
2115
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2116
                                "peer sent us an HTLC that exceeded our max "+
×
2117
                                        "fee exposure")
×
2118

×
2119
                        return
×
2120
                }
×
2121

2122
                // We just received an add request from an upstream peer, so we
2123
                // add it to our state machine, then add the HTLC to our
2124
                // "settle" list in the event that we know the preimage.
2125
                index, err := l.channel.ReceiveHTLC(msg)
451✔
2126
                if err != nil {
451✔
2127
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2128
                                "unable to handle upstream add HTLC: %v", err)
×
2129
                        return
×
2130
                }
×
2131

2132
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
451✔
2133
                        "assigning index: %v", msg.PaymentHash[:], index)
451✔
2134

2135
        case *lnwire.UpdateFulfillHTLC:
228✔
2136
                pre := msg.PaymentPreimage
228✔
2137
                idx := msg.ID
228✔
2138

228✔
2139
                // Before we pipeline the settle, we'll check the set of active
228✔
2140
                // htlc's to see if the related UpdateAddHTLC has been fully
228✔
2141
                // locked-in.
228✔
2142
                var lockedin bool
228✔
2143
                htlcs := l.channel.ActiveHtlcs()
228✔
2144
                for _, add := range htlcs {
896✔
2145
                        // The HTLC will be outgoing and match idx.
668✔
2146
                        if !add.Incoming && add.HtlcIndex == idx {
894✔
2147
                                lockedin = true
226✔
2148
                                break
226✔
2149
                        }
2150
                }
2151

2152
                if !lockedin {
230✔
2153
                        l.failf(
2✔
2154
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2155
                                "unable to handle upstream settle",
2✔
2156
                        )
2✔
2157
                        return
2✔
2158
                }
2✔
2159

2160
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
227✔
2161
                        l.failf(
1✔
2162
                                LinkFailureError{
1✔
2163
                                        code:          ErrInvalidUpdate,
1✔
2164
                                        FailureAction: LinkFailureForceClose,
1✔
2165
                                },
1✔
2166
                                "unable to handle upstream settle HTLC: %v", err,
1✔
2167
                        )
1✔
2168
                        return
1✔
2169
                }
1✔
2170

2171
                settlePacket := &htlcPacket{
226✔
2172
                        outgoingChanID: l.ShortChanID(),
226✔
2173
                        outgoingHTLCID: idx,
226✔
2174
                        htlc: &lnwire.UpdateFulfillHTLC{
226✔
2175
                                PaymentPreimage: pre,
226✔
2176
                        },
226✔
2177
                }
226✔
2178

226✔
2179
                // Add the newly discovered preimage to our growing list of
226✔
2180
                // uncommitted preimage. These will be written to the witness
226✔
2181
                // cache just before accepting the next commitment signature
226✔
2182
                // from the remote peer.
226✔
2183
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
226✔
2184

226✔
2185
                // Pipeline this settle, send it to the switch.
226✔
2186
                go l.forwardBatch(false, settlePacket)
226✔
2187

2188
        case *lnwire.UpdateFailMalformedHTLC:
4✔
2189
                // Convert the failure type encoded within the HTLC fail
4✔
2190
                // message to the proper generic lnwire error code.
4✔
2191
                var failure lnwire.FailureMessage
4✔
2192
                switch msg.FailureCode {
4✔
2193
                case lnwire.CodeInvalidOnionVersion:
2✔
2194
                        failure = &lnwire.FailInvalidOnionVersion{
2✔
2195
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2196
                        }
2✔
2197
                case lnwire.CodeInvalidOnionHmac:
×
2198
                        failure = &lnwire.FailInvalidOnionHmac{
×
2199
                                OnionSHA256: msg.ShaOnionBlob,
×
2200
                        }
×
2201

2202
                case lnwire.CodeInvalidOnionKey:
×
2203
                        failure = &lnwire.FailInvalidOnionKey{
×
2204
                                OnionSHA256: msg.ShaOnionBlob,
×
2205
                        }
×
2206

2207
                // Handle malformed errors that are part of a blinded route.
2208
                // This case is slightly different, because we expect every
2209
                // relaying node in the blinded portion of the route to send
2210
                // malformed errors. If we're also a relaying node, we're
2211
                // likely going to switch this error out anyway for our own
2212
                // malformed error, but we handle the case here for
2213
                // completeness.
2214
                case lnwire.CodeInvalidBlinding:
1✔
2215
                        failure = &lnwire.FailInvalidBlinding{
1✔
2216
                                OnionSHA256: msg.ShaOnionBlob,
1✔
2217
                        }
1✔
2218

2219
                default:
2✔
2220
                        l.log.Warnf("unexpected failure code received in "+
2✔
2221
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2222

2✔
2223
                        // We don't just pass back the error we received from
2✔
2224
                        // our successor. Otherwise we might report a failure
2✔
2225
                        // that penalizes us more than needed. If the onion that
2✔
2226
                        // we forwarded was correct, the node should have been
2✔
2227
                        // able to send back its own failure. The node did not
2✔
2228
                        // send back its own failure, so we assume there was a
2✔
2229
                        // problem with the onion and report that back. We reuse
2✔
2230
                        // the invalid onion key failure because there is no
2✔
2231
                        // specific error for this case.
2✔
2232
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2233
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2234
                        }
2✔
2235
                }
2236

2237
                // With the error parsed, we'll convert the into it's opaque
2238
                // form.
2239
                var b bytes.Buffer
4✔
2240
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
4✔
2241
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2242
                        return
×
2243
                }
×
2244

2245
                // If remote side have been unable to parse the onion blob we
2246
                // have sent to it, than we should transform the malformed HTLC
2247
                // message to the usual HTLC fail message.
2248
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
4✔
2249
                if err != nil {
4✔
2250
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2251
                                "unable to handle upstream fail HTLC: %v", err)
×
2252
                        return
×
2253
                }
×
2254

2255
        case *lnwire.UpdateFailHTLC:
121✔
2256
                // Verify that the failure reason is at least 256 bytes plus
121✔
2257
                // overhead.
121✔
2258
                const minimumFailReasonLength = lnwire.FailureMessageLength +
121✔
2259
                        2 + 2 + 32
121✔
2260

121✔
2261
                if len(msg.Reason) < minimumFailReasonLength {
122✔
2262
                        // We've received a reason with a non-compliant length.
1✔
2263
                        // Older nodes happily relay back these failures that
1✔
2264
                        // may originate from a node further downstream.
1✔
2265
                        // Therefore we can't just fail the channel.
1✔
2266
                        //
1✔
2267
                        // We want to be compliant ourselves, so we also can't
1✔
2268
                        // pass back the reason unmodified. And we must make
1✔
2269
                        // sure that we don't hit the magic length check of 260
1✔
2270
                        // bytes in processRemoteSettleFails either.
1✔
2271
                        //
1✔
2272
                        // Because the reason is unreadable for the payer
1✔
2273
                        // anyway, we just replace it by a compliant-length
1✔
2274
                        // series of random bytes.
1✔
2275
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2276
                        _, err := crand.Read(msg.Reason[:])
1✔
2277
                        if err != nil {
1✔
2278
                                l.log.Errorf("Random generation error: %v", err)
×
2279

×
2280
                                return
×
2281
                        }
×
2282
                }
2283

2284
                // Add fail to the update log.
2285
                idx := msg.ID
121✔
2286
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
121✔
2287
                if err != nil {
121✔
2288
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2289
                                "unable to handle upstream fail HTLC: %v", err)
×
2290
                        return
×
2291
                }
×
2292

2293
        case *lnwire.CommitSig:
1,199✔
2294
                // Since we may have learned new preimages for the first time,
1,199✔
2295
                // we'll add them to our preimage cache. By doing this, we
1,199✔
2296
                // ensure any contested contracts watched by any on-chain
1,199✔
2297
                // arbitrators can now sweep this HTLC on-chain. We delay
1,199✔
2298
                // committing the preimages until just before accepting the new
1,199✔
2299
                // remote commitment, as afterwards the peer won't resend the
1,199✔
2300
                // Settle messages on the next channel reestablishment. Doing so
1,199✔
2301
                // allows us to more effectively batch this operation, instead
1,199✔
2302
                // of doing a single write per preimage.
1,199✔
2303
                err := l.cfg.PreimageCache.AddPreimages(
1,199✔
2304
                        l.uncommittedPreimages...,
1,199✔
2305
                )
1,199✔
2306
                if err != nil {
1,199✔
2307
                        l.failf(
×
2308
                                LinkFailureError{code: ErrInternalError},
×
2309
                                "unable to add preimages=%v to cache: %v",
×
2310
                                l.uncommittedPreimages, err,
×
2311
                        )
×
2312
                        return
×
2313
                }
×
2314

2315
                // Instead of truncating the slice to conserve memory
2316
                // allocations, we simply set the uncommitted preimage slice to
2317
                // nil so that a new one will be initialized if any more
2318
                // witnesses are discovered. We do this because the maximum size
2319
                // that the slice can occupy is 15KB, and we want to ensure we
2320
                // release that memory back to the runtime.
2321
                l.uncommittedPreimages = nil
1,199✔
2322

1,199✔
2323
                // We just received a new updates to our local commitment
1,199✔
2324
                // chain, validate this new commitment, closing the link if
1,199✔
2325
                // invalid.
1,199✔
2326
                auxSigBlob, err := msg.CustomRecords.Serialize()
1,199✔
2327
                if err != nil {
1,199✔
2328
                        l.failf(
×
2329
                                LinkFailureError{code: ErrInvalidCommitment},
×
2330
                                "unable to serialize custom records: %v", err,
×
2331
                        )
×
2332

×
2333
                        return
×
2334
                }
×
2335
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,199✔
2336
                        CommitSig:  msg.CommitSig,
1,199✔
2337
                        HtlcSigs:   msg.HtlcSigs,
1,199✔
2338
                        PartialSig: msg.PartialSig,
1,199✔
2339
                        AuxSigBlob: auxSigBlob,
1,199✔
2340
                })
1,199✔
2341
                if err != nil {
1,199✔
2342
                        // If we were unable to reconstruct their proposed
×
2343
                        // commitment, then we'll examine the type of error. If
×
2344
                        // it's an InvalidCommitSigError, then we'll send a
×
2345
                        // direct error.
×
2346
                        var sendData []byte
×
2347
                        switch err.(type) {
×
2348
                        case *lnwallet.InvalidCommitSigError:
×
2349
                                sendData = []byte(err.Error())
×
2350
                        case *lnwallet.InvalidHtlcSigError:
×
2351
                                sendData = []byte(err.Error())
×
2352
                        }
2353
                        l.failf(
×
2354
                                LinkFailureError{
×
2355
                                        code:          ErrInvalidCommitment,
×
2356
                                        FailureAction: LinkFailureForceClose,
×
2357
                                        SendData:      sendData,
×
2358
                                },
×
2359
                                "ChannelPoint(%v): unable to accept new "+
×
2360
                                        "commitment: %v",
×
2361
                                l.channel.ChannelPoint(), err,
×
2362
                        )
×
2363
                        return
×
2364
                }
2365

2366
                // As we've just accepted a new state, we'll now
2367
                // immediately send the remote peer a revocation for our prior
2368
                // state.
2369
                nextRevocation, currentHtlcs, finalHTLCs, err :=
1,199✔
2370
                        l.channel.RevokeCurrentCommitment()
1,199✔
2371
                if err != nil {
1,199✔
2372
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2373

×
2374
                        // We need to fail the channel in case revoking our
×
2375
                        // local commitment does not succeed. We might have
×
2376
                        // already advanced our channel state which would lead
×
2377
                        // us to proceed with an unclean state.
×
2378
                        //
×
2379
                        // NOTE: We do not trigger a force close because this
×
2380
                        // could resolve itself in case our db was just busy
×
2381
                        // not accepting new transactions.
×
2382
                        l.failf(
×
2383
                                LinkFailureError{
×
2384
                                        code:          ErrInternalError,
×
2385
                                        Warning:       true,
×
2386
                                        FailureAction: LinkFailureDisconnect,
×
2387
                                },
×
2388
                                "ChannelPoint(%v): unable to accept new "+
×
2389
                                        "commitment: %v",
×
2390
                                l.channel.ChannelPoint(), err,
×
2391
                        )
×
2392
                        return
×
2393
                }
×
2394

2395
                // As soon as we are ready to send our next revocation, we can
2396
                // invoke the incoming commit hooks.
2397
                l.RWMutex.Lock()
1,199✔
2398
                l.incomingCommitHooks.invoke()
1,199✔
2399
                l.RWMutex.Unlock()
1,199✔
2400

1,199✔
2401
                l.cfg.Peer.SendMessage(false, nextRevocation)
1,199✔
2402

1,199✔
2403
                // Notify the incoming htlcs of which the resolutions were
1,199✔
2404
                // locked in.
1,199✔
2405
                for id, settled := range finalHTLCs {
1,531✔
2406
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
332✔
2407
                                models.CircuitKey{
332✔
2408
                                        ChanID: l.ShortChanID(),
332✔
2409
                                        HtlcID: id,
332✔
2410
                                },
332✔
2411
                                channeldb.FinalHtlcInfo{
332✔
2412
                                        Settled:  settled,
332✔
2413
                                        Offchain: true,
332✔
2414
                                },
332✔
2415
                        )
332✔
2416
                }
332✔
2417

2418
                // Since we just revoked our commitment, we may have a new set
2419
                // of HTLC's on our commitment, so we'll send them using our
2420
                // function closure NotifyContractUpdate.
2421
                newUpdate := &contractcourt.ContractUpdate{
1,199✔
2422
                        HtlcKey: contractcourt.LocalHtlcSet,
1,199✔
2423
                        Htlcs:   currentHtlcs,
1,199✔
2424
                }
1,199✔
2425
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,199✔
2426
                if err != nil {
1,199✔
2427
                        l.log.Errorf("unable to notify contract update: %v",
×
2428
                                err)
×
2429
                        return
×
2430
                }
×
2431

2432
                select {
1,199✔
2433
                case <-l.cg.Done():
×
2434
                        return
×
2435
                default:
1,199✔
2436
                }
2437

2438
                // If the remote party initiated the state transition,
2439
                // we'll reply with a signature to provide them with their
2440
                // version of the latest commitment. Otherwise, both commitment
2441
                // chains are fully synced from our PoV, then we don't need to
2442
                // reply with a signature as both sides already have a
2443
                // commitment with the latest accepted.
2444
                if l.channel.OweCommitment() {
1,870✔
2445
                        if !l.updateCommitTxOrFail(ctx) {
671✔
2446
                                return
×
2447
                        }
×
2448
                }
2449

2450
                // If we need to send out an Stfu, this would be the time to do
2451
                // so.
2452
                if l.noDanglingUpdates(lntypes.Local) {
2,263✔
2453
                        err = l.quiescer.SendOwedStfu()
1,064✔
2454
                        if err != nil {
1,064✔
2455
                                l.stfuFailf("sendOwedStfu: %v", err.Error())
×
2456
                        }
×
2457
                }
2458

2459
                // Now that we have finished processing the incoming CommitSig
2460
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2461
                // the channel state is clean.
2462
                l.RWMutex.Lock()
1,199✔
2463
                if l.channel.IsChannelClean() {
1,377✔
2464
                        l.flushHooks.invoke()
178✔
2465
                }
178✔
2466
                l.RWMutex.Unlock()
1,199✔
2467

2468
        case *lnwire.RevokeAndAck:
1,188✔
2469
                // We've received a revocation from the remote chain, if valid,
1,188✔
2470
                // this moves the remote chain forward, and expands our
1,188✔
2471
                // revocation window.
1,188✔
2472

1,188✔
2473
                // We now process the message and advance our remote commit
1,188✔
2474
                // chain.
1,188✔
2475
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,188✔
2476
                if err != nil {
1,188✔
2477
                        // TODO(halseth): force close?
×
2478
                        l.failf(
×
2479
                                LinkFailureError{
×
2480
                                        code:          ErrInvalidRevocation,
×
2481
                                        FailureAction: LinkFailureDisconnect,
×
2482
                                },
×
2483
                                "unable to accept revocation: %v", err,
×
2484
                        )
×
2485
                        return
×
2486
                }
×
2487

2488
                // The remote party now has a new primary commitment, so we'll
2489
                // update the contract court to be aware of this new set (the
2490
                // prior old remote pending).
2491
                newUpdate := &contractcourt.ContractUpdate{
1,188✔
2492
                        HtlcKey: contractcourt.RemoteHtlcSet,
1,188✔
2493
                        Htlcs:   remoteHTLCs,
1,188✔
2494
                }
1,188✔
2495
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,188✔
2496
                if err != nil {
1,188✔
2497
                        l.log.Errorf("unable to notify contract update: %v",
×
2498
                                err)
×
2499
                        return
×
2500
                }
×
2501

2502
                select {
1,188✔
2503
                case <-l.cg.Done():
2✔
2504
                        return
2✔
2505
                default:
1,186✔
2506
                }
2507

2508
                // If we have a tower client for this channel type, we'll
2509
                // create a backup for the current state.
2510
                if l.cfg.TowerClient != nil {
1,187✔
2511
                        state := l.channel.State()
1✔
2512
                        chanID := l.ChanID()
1✔
2513

1✔
2514
                        err = l.cfg.TowerClient.BackupState(
1✔
2515
                                &chanID, state.RemoteCommitment.CommitHeight-1,
1✔
2516
                        )
1✔
2517
                        if err != nil {
1✔
2518
                                l.failf(LinkFailureError{
×
2519
                                        code: ErrInternalError,
×
2520
                                }, "unable to queue breach backup: %v", err)
×
2521
                                return
×
2522
                        }
×
2523
                }
2524

2525
                // If we can send updates then we can process adds in case we
2526
                // are the exit hop and need to send back resolutions, or in
2527
                // case there are validity issues with the packets. Otherwise
2528
                // we defer the action until resume.
2529
                //
2530
                // We are free to process the settles and fails without this
2531
                // check since processing those can't result in further updates
2532
                // to this channel link.
2533
                if l.quiescer.CanSendUpdates() {
2,371✔
2534
                        l.processRemoteAdds(fwdPkg)
1,185✔
2535
                } else {
1,186✔
2536
                        l.quiescer.OnResume(func() {
1✔
2537
                                l.processRemoteAdds(fwdPkg)
×
2538
                        })
×
2539
                }
2540
                l.processRemoteSettleFails(fwdPkg)
1,186✔
2541

1,186✔
2542
                // If the link failed during processing the adds, we must
1,186✔
2543
                // return to ensure we won't attempted to update the state
1,186✔
2544
                // further.
1,186✔
2545
                if l.failed {
1,186✔
2546
                        return
×
2547
                }
×
2548

2549
                // The revocation window opened up. If there are pending local
2550
                // updates, try to update the commit tx. Pending updates could
2551
                // already have been present because of a previously failed
2552
                // update to the commit tx or freshly added in by
2553
                // processRemoteAdds. Also in case there are no local updates,
2554
                // but there are still remote updates that are not in the remote
2555
                // commit tx yet, send out an update.
2556
                if l.channel.OweCommitment() {
1,504✔
2557
                        if !l.updateCommitTxOrFail(ctx) {
325✔
2558
                                return
7✔
2559
                        }
7✔
2560
                }
2561

2562
                // Now that we have finished processing the RevokeAndAck, we
2563
                // can invoke the flushHooks if the channel state is clean.
2564
                l.RWMutex.Lock()
1,179✔
2565
                if l.channel.IsChannelClean() {
1,340✔
2566
                        l.flushHooks.invoke()
161✔
2567
                }
161✔
2568
                l.RWMutex.Unlock()
1,179✔
2569

2570
        case *lnwire.UpdateFee:
3✔
2571
                // Check and see if their proposed fee-rate would make us
3✔
2572
                // exceed the fee threshold.
3✔
2573
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2574

3✔
2575
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2576
                if err != nil {
3✔
2577
                        // This shouldn't typically happen. If it does, it
×
2578
                        // indicates something is wrong with our channel state.
×
2579
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2580
                                "exceeded")
×
2581
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2582
                                "error calculating fee exposure: %v", err)
×
2583

×
2584
                        return
×
2585
                }
×
2586

2587
                if isDust {
3✔
2588
                        // The proposed fee-rate makes us exceed the fee
×
2589
                        // threshold.
×
2590
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2591
                                "fee threshold exceeded: %v", err)
×
2592
                        return
×
2593
                }
×
2594

2595
                // We received fee update from peer. If we are the initiator we
2596
                // will fail the channel, if not we will apply the update.
2597
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2598
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2599
                                "error receiving fee update: %v", err)
×
2600
                        return
×
2601
                }
×
2602

2603
                // Update the mailbox's feerate as well.
2604
                l.mailBox.SetFeeRate(fee)
3✔
2605

2606
        case *lnwire.Stfu:
3✔
2607
                err := l.handleStfu(msg)
3✔
2608
                if err != nil {
3✔
2609
                        l.stfuFailf("handleStfu: %v", err.Error())
×
2610
                }
×
2611

2612
        // In the case where we receive a warning message from our peer, just
2613
        // log it and move on. We choose not to disconnect from our peer,
2614
        // although we "MAY" do so according to the specification.
2615
        case *lnwire.Warning:
1✔
2616
                l.log.Warnf("received warning message from peer: %v",
1✔
2617
                        msg.Warning())
1✔
2618

2619
        case *lnwire.Error:
1✔
2620
                // Error received from remote, MUST fail channel, but should
1✔
2621
                // only print the contents of the error message if all
1✔
2622
                // characters are printable ASCII.
1✔
2623
                l.failf(
1✔
2624
                        LinkFailureError{
1✔
2625
                                code: ErrRemoteError,
1✔
2626

1✔
2627
                                // TODO(halseth): we currently don't fail the
1✔
2628
                                // channel permanently, as there are some sync
1✔
2629
                                // issues with other implementations that will
1✔
2630
                                // lead to them sending an error message, but
1✔
2631
                                // we can recover from on next connection. See
1✔
2632
                                // https://github.com/ElementsProject/lightning/issues/4212
1✔
2633
                                PermanentFailure: false,
1✔
2634
                        },
1✔
2635
                        "ChannelPoint(%v): received error from peer: %v",
1✔
2636
                        l.channel.ChannelPoint(), msg.Error(),
1✔
2637
                )
1✔
2638
        default:
×
2639
                l.log.Warnf("received unknown message of type %T", msg)
×
2640
        }
2641

2642
}
2643

2644
// handleStfu implements the top-level logic for handling the Stfu message from
2645
// our peer.
2646
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
3✔
2647
        if !l.noDanglingUpdates(lntypes.Remote) {
3✔
2648
                return ErrPendingRemoteUpdates
×
2649
        }
×
2650
        err := l.quiescer.RecvStfu(*stfu)
3✔
2651
        if err != nil {
3✔
2652
                return err
×
2653
        }
×
2654

2655
        // If we can immediately send an Stfu response back, we will.
2656
        if l.noDanglingUpdates(lntypes.Local) {
5✔
2657
                return l.quiescer.SendOwedStfu()
2✔
2658
        }
2✔
2659

2660
        return nil
1✔
2661
}
2662

2663
// stfuFailf fails the link in the case where the requirements of the quiescence
2664
// protocol are violated. In all cases we opt to drop the connection as only
2665
// link state (as opposed to channel state) is affected.
2666
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
2667
        l.failf(LinkFailureError{
×
2668
                code:             ErrStfuViolation,
×
2669
                FailureAction:    LinkFailureDisconnect,
×
2670
                PermanentFailure: false,
×
2671
                Warning:          true,
×
2672
        }, format, args...)
×
2673
}
×
2674

2675
// noDanglingUpdates returns true when there are 0 updates that were originally
2676
// issued by whose on either the Local or Remote commitment transaction.
2677
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,204✔
2678
        pendingOnLocal := l.channel.NumPendingUpdates(
1,204✔
2679
                whose, lntypes.Local,
1,204✔
2680
        )
1,204✔
2681
        pendingOnRemote := l.channel.NumPendingUpdates(
1,204✔
2682
                whose, lntypes.Remote,
1,204✔
2683
        )
1,204✔
2684

1,204✔
2685
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,204✔
2686
}
1,204✔
2687

2688
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2689
// for packets delivered from server, and cleaning up any circuits closed by
2690
// signing a previous commitment txn. This method ensures that the circuits are
2691
// removed from the circuit map before removing them from the link's mailbox,
2692
// otherwise it could be possible for some circuit to be missed if this link
2693
// flaps.
2694
func (l *channelLink) ackDownStreamPackets() error {
1,378✔
2695
        // First, remove the downstream Add packets that were included in the
1,378✔
2696
        // previous commitment signature. This will prevent the Adds from being
1,378✔
2697
        // replayed if this link disconnects.
1,378✔
2698
        for _, inKey := range l.openedCircuits {
1,843✔
2699
                // In order to test the sphinx replay logic of the remote
465✔
2700
                // party, unsafe replay does not acknowledge the packets from
465✔
2701
                // the mailbox. We can then force a replay of any Add packets
465✔
2702
                // held in memory by disconnecting and reconnecting the link.
465✔
2703
                if l.cfg.UnsafeReplay {
466✔
2704
                        continue
1✔
2705
                }
2706

2707
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
465✔
2708
                l.mailBox.AckPacket(inKey)
465✔
2709
        }
2710

2711
        // Now, we will delete all circuits closed by the previous commitment
2712
        // signature, which is the result of downstream Settle/Fail packets. We
2713
        // batch them here to ensure circuits are closed atomically and for
2714
        // performance.
2715
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,378✔
2716
        switch err {
1,378✔
2717
        case nil:
1,378✔
2718
                // Successful deletion.
2719

2720
        default:
×
2721
                l.log.Errorf("unable to delete %d circuits: %v",
×
2722
                        len(l.closedCircuits), err)
×
2723
                return err
×
2724
        }
2725

2726
        // With the circuits removed from memory and disk, we now ack any
2727
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2728
        // after startup. If forgive is enabled and we've reached this point,
2729
        // the circuits must have been removed at some point, so it is now safe
2730
        // to un-queue the corresponding Settle/Fails.
2731
        for _, inKey := range l.closedCircuits {
1,418✔
2732
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
40✔
2733
                        inKey)
40✔
2734
                l.mailBox.AckPacket(inKey)
40✔
2735
        }
40✔
2736

2737
        // Lastly, reset our buffers to be empty while keeping any acquired
2738
        // growth in the backing array.
2739
        l.openedCircuits = l.openedCircuits[:0]
1,378✔
2740
        l.closedCircuits = l.closedCircuits[:0]
1,378✔
2741

1,378✔
2742
        return nil
1,378✔
2743
}
2744

2745
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2746
// the link.
2747
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1,246✔
2748
        err := l.updateCommitTx(ctx)
1,246✔
2749
        switch err {
1,246✔
2750
        // No error encountered, success.
2751
        case nil:
1,236✔
2752

2753
        // A duplicate keystone error should be resolved and is not fatal, so
2754
        // we won't send an Error message to the peer.
2755
        case ErrDuplicateKeystone:
×
2756
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2757
                        "temporary circuit error: %v", err)
×
2758
                return false
×
2759

2760
        // Any other error is treated results in an Error message being sent to
2761
        // the peer.
2762
        default:
10✔
2763
                l.failf(LinkFailureError{code: ErrInternalError},
10✔
2764
                        "unable to update commitment: %v", err)
10✔
2765
                return false
10✔
2766
        }
2767

2768
        return true
1,236✔
2769
}
2770

2771
// updateCommitTx signs, then sends an update to the remote peer adding a new
2772
// commitment to their commitment chain which includes all the latest updates
2773
// we've received+processed up to this point.
2774
func (l *channelLink) updateCommitTx(ctx context.Context) error {
1,304✔
2775
        // Preemptively write all pending keystones to disk, just in case the
1,304✔
2776
        // HTLCs we have in memory are included in the subsequent attempt to
1,304✔
2777
        // sign a commitment state.
1,304✔
2778
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,304✔
2779
        if err != nil {
1,304✔
2780
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2781
                // it.
×
2782
                return err
×
2783
        }
×
2784

2785
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2786
        l.keystoneBatch = l.keystoneBatch[:0]
1,304✔
2787

1,304✔
2788
        // If hodl.Commit mode is active, we will refrain from attempting to
1,304✔
2789
        // commit any in-memory modifications to the channel state. Exiting here
1,304✔
2790
        // permits testing of either the switch or link's ability to trim
1,304✔
2791
        // circuits that have been opened, but unsuccessfully committed.
1,304✔
2792
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,309✔
2793
                l.log.Warnf(hodl.Commit.Warning())
5✔
2794
                return nil
5✔
2795
        }
5✔
2796

2797
        ctx, done := l.cg.Create(ctx)
1,300✔
2798
        defer done()
1,300✔
2799

1,300✔
2800
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,300✔
2801
        if err == lnwallet.ErrNoWindow {
1,393✔
2802
                l.cfg.PendingCommitTicker.Resume()
93✔
2803
                l.log.Trace("PendingCommitTicker resumed")
93✔
2804

93✔
2805
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
93✔
2806
                l.log.Tracef("revocation window exhausted, unable to send: "+
93✔
2807
                        "%v, pend_updates=%v, dangling_closes%v", n,
93✔
2808
                        lnutils.SpewLogClosure(l.openedCircuits),
93✔
2809
                        lnutils.SpewLogClosure(l.closedCircuits))
93✔
2810

93✔
2811
                return nil
93✔
2812
        } else if err != nil {
1,301✔
2813
                return err
×
2814
        }
×
2815

2816
        if err := l.ackDownStreamPackets(); err != nil {
1,208✔
2817
                return err
×
2818
        }
×
2819

2820
        l.cfg.PendingCommitTicker.Pause()
1,208✔
2821
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,208✔
2822

1,208✔
2823
        // The remote party now has a new pending commitment, so we'll update
1,208✔
2824
        // the contract court to be aware of this new set (the prior old remote
1,208✔
2825
        // pending).
1,208✔
2826
        newUpdate := &contractcourt.ContractUpdate{
1,208✔
2827
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,208✔
2828
                Htlcs:   newCommit.PendingHTLCs,
1,208✔
2829
        }
1,208✔
2830
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,208✔
2831
        if err != nil {
1,208✔
2832
                l.log.Errorf("unable to notify contract update: %v", err)
×
2833
                return err
×
2834
        }
×
2835

2836
        select {
1,208✔
2837
        case <-l.cg.Done():
11✔
2838
                return ErrLinkShuttingDown
11✔
2839
        default:
1,197✔
2840
        }
2841

2842
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,197✔
2843
        if err != nil {
1,197✔
2844
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2845
        }
×
2846

2847
        commitSig := &lnwire.CommitSig{
1,197✔
2848
                ChanID:        l.ChanID(),
1,197✔
2849
                CommitSig:     newCommit.CommitSig,
1,197✔
2850
                HtlcSigs:      newCommit.HtlcSigs,
1,197✔
2851
                PartialSig:    newCommit.PartialSig,
1,197✔
2852
                CustomRecords: auxBlobRecords,
1,197✔
2853
        }
1,197✔
2854
        l.cfg.Peer.SendMessage(false, commitSig)
1,197✔
2855

1,197✔
2856
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
1,197✔
2857
        // of commit hooks.
1,197✔
2858
        l.RWMutex.Lock()
1,197✔
2859
        l.outgoingCommitHooks.invoke()
1,197✔
2860
        l.RWMutex.Unlock()
1,197✔
2861

1,197✔
2862
        return nil
1,197✔
2863
}
2864

2865
// Peer returns the representation of remote peer with which we have the
2866
// channel link opened.
2867
//
2868
// NOTE: Part of the ChannelLink interface.
2869
func (l *channelLink) PeerPubKey() [33]byte {
442✔
2870
        return l.cfg.Peer.PubKey()
442✔
2871
}
442✔
2872

2873
// ChannelPoint returns the channel outpoint for the channel link.
2874
// NOTE: Part of the ChannelLink interface.
2875
func (l *channelLink) ChannelPoint() wire.OutPoint {
853✔
2876
        return l.channel.ChannelPoint()
853✔
2877
}
853✔
2878

2879
// ShortChanID returns the short channel ID for the channel link. The short
2880
// channel ID encodes the exact location in the main chain that the original
2881
// funding output can be found.
2882
//
2883
// NOTE: Part of the ChannelLink interface.
2884
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,248✔
2885
        l.RLock()
4,248✔
2886
        defer l.RUnlock()
4,248✔
2887

4,248✔
2888
        return l.channel.ShortChanID()
4,248✔
2889
}
4,248✔
2890

2891
// UpdateShortChanID updates the short channel ID for a link. This may be
2892
// required in the event that a link is created before the short chan ID for it
2893
// is known, or a re-org occurs, and the funding transaction changes location
2894
// within the chain.
2895
//
2896
// NOTE: Part of the ChannelLink interface.
2897
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
1✔
2898
        chanID := l.ChanID()
1✔
2899

1✔
2900
        // Refresh the channel state's short channel ID by loading it from disk.
1✔
2901
        // This ensures that the channel state accurately reflects the updated
1✔
2902
        // short channel ID.
1✔
2903
        err := l.channel.State().Refresh()
1✔
2904
        if err != nil {
1✔
2905
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2906
                        "%v", chanID, err)
×
2907
                return hop.Source, err
×
2908
        }
×
2909

2910
        return hop.Source, nil
1✔
2911
}
2912

2913
// ChanID returns the channel ID for the channel link. The channel ID is a more
2914
// compact representation of a channel's full outpoint.
2915
//
2916
// NOTE: Part of the ChannelLink interface.
2917
func (l *channelLink) ChanID() lnwire.ChannelID {
3,936✔
2918
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,936✔
2919
}
3,936✔
2920

2921
// Bandwidth returns the total amount that can flow through the channel link at
2922
// this given instance. The value returned is expressed in millisatoshi and can
2923
// be used by callers when making forwarding decisions to determine if a link
2924
// can accept an HTLC.
2925
//
2926
// NOTE: Part of the ChannelLink interface.
2927
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
812✔
2928
        // Get the balance available on the channel for new HTLCs. This takes
812✔
2929
        // the channel reserve into account so HTLCs up to this value won't
812✔
2930
        // violate it.
812✔
2931
        return l.channel.AvailableBalance()
812✔
2932
}
812✔
2933

2934
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2935
// amount provided to the link. This check does not reserve a space, since
2936
// forwards or other payments may use the available slot, so it should be
2937
// considered best-effort.
2938
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
1✔
2939
        return l.channel.MayAddOutgoingHtlc(amt)
1✔
2940
}
1✔
2941

2942
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2943
// method.
2944
//
2945
// NOTE: Part of the dustHandler interface.
2946
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2947
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,524✔
2948

2,524✔
2949
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,524✔
2950
}
2,524✔
2951

2952
// getFeeRate is a wrapper method that retrieves the underlying channel's
2953
// feerate.
2954
//
2955
// NOTE: Part of the dustHandler interface.
2956
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
670✔
2957
        return l.channel.CommitFeeRate()
670✔
2958
}
670✔
2959

2960
// getDustClosure returns a closure that can be used by the switch or mailbox
2961
// to evaluate whether a given HTLC is dust.
2962
//
2963
// NOTE: Part of the dustHandler interface.
2964
func (l *channelLink) getDustClosure() dustClosure {
1,600✔
2965
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,600✔
2966
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,600✔
2967
        chanType := l.channel.State().ChanType
1,600✔
2968

1,600✔
2969
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,600✔
2970
}
1,600✔
2971

2972
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2973
// is used so that the Switch can have access to the commitment fee without
2974
// needing to have a *LightningChannel. This doesn't include dust.
2975
//
2976
// NOTE: Part of the dustHandler interface.
2977
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,868✔
2978
        if remote {
2,806✔
2979
                return l.channel.State().RemoteCommitment.CommitFee
938✔
2980
        }
938✔
2981

2982
        return l.channel.State().LocalCommitment.CommitFee
931✔
2983
}
2984

2985
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2986
// increases the total dust and fees within the channel past the configured
2987
// fee threshold. It first calculates the dust sum over every update in the
2988
// update log with the proposed fee-rate and taking into account both the local
2989
// and remote dust limits. It uses every update in the update log instead of
2990
// what is actually on the local and remote commitments because it is assumed
2991
// that in a worst-case scenario, every update in the update log could
2992
// theoretically be on either commitment transaction and this needs to be
2993
// accounted for with this fee-rate. It then calculates the local and remote
2994
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2995
// and determines if the fee threshold has been exceeded.
2996
func (l *channelLink) exceedsFeeExposureLimit(
2997
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2998

6✔
2999
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
3000

6✔
3001
        // Get the sum of dust for both the local and remote commitments using
6✔
3002
        // this "dry-run" fee.
6✔
3003
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
3004
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
3005

6✔
3006
        // Calculate the local and remote commitment fees using this dry-run
6✔
3007
        // fee.
6✔
3008
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
3009
        if err != nil {
6✔
3010
                return false, err
×
3011
        }
×
3012

3013
        // Finally, check whether the max fee exposure was exceeded on either
3014
        // future commitment transaction with the fee-rate.
3015
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
3016
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
3017
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3018
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
3019
                        totalLocalDust, localFee)
×
3020

×
3021
                return true, nil
×
3022
        }
×
3023

3024
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
3025
                remoteFee,
6✔
3026
        )
6✔
3027

6✔
3028
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
3029
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3030
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
3031
                        totalRemoteDust, remoteFee)
×
3032

×
3033
                return true, nil
×
3034
        }
×
3035

3036
        return false, nil
6✔
3037
}
3038

3039
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
3040
// channel exceed the fee threshold. It first fetches the largest fee-rate that
3041
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3042
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3043
// the overall dust sum. If it is not dust, it contributes to weight, which
3044
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3045
// either commitment exceeds the configured fee threshold, this function
3046
// returns true.
3047
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3048
        incoming bool) bool {
931✔
3049

931✔
3050
        dustClosure := l.getDustClosure()
931✔
3051

931✔
3052
        feeRate := l.channel.WorstCaseFeeRate()
931✔
3053

931✔
3054
        amount := htlc.Amount.ToSatoshis()
931✔
3055

931✔
3056
        // See if this HTLC is dust on both the local and remote commitments.
931✔
3057
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
931✔
3058
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
931✔
3059

931✔
3060
        // Calculate the dust sum for the local and remote commitments.
931✔
3061
        localDustSum := l.getDustSum(
931✔
3062
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
931✔
3063
        )
931✔
3064
        remoteDustSum := l.getDustSum(
931✔
3065
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
931✔
3066
        )
931✔
3067

931✔
3068
        // Grab the larger of the local and remote commitment fees w/o dust.
931✔
3069
        commitFee := l.getCommitFee(false)
931✔
3070

931✔
3071
        if l.getCommitFee(true) > commitFee {
939✔
3072
                commitFee = l.getCommitFee(true)
8✔
3073
        }
8✔
3074

3075
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
931✔
3076

931✔
3077
        localDustSum += commitFeeMSat
931✔
3078
        remoteDustSum += commitFeeMSat
931✔
3079

931✔
3080
        // Calculate the additional fee increase if this is a non-dust HTLC.
931✔
3081
        weight := lntypes.WeightUnit(input.HTLCWeight)
931✔
3082
        additional := lnwire.NewMSatFromSatoshis(
931✔
3083
                feeRate.FeeForWeight(weight),
931✔
3084
        )
931✔
3085

931✔
3086
        if isLocalDust {
1,565✔
3087
                // If this is dust, it doesn't contribute to weight but does
634✔
3088
                // contribute to the overall dust sum.
634✔
3089
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
634✔
3090
        } else {
932✔
3091
                // Account for the fee increase that comes with an increase in
298✔
3092
                // weight.
298✔
3093
                localDustSum += additional
298✔
3094
        }
298✔
3095

3096
        if localDustSum > l.cfg.MaxFeeExposure {
935✔
3097
                // The max fee exposure was exceeded.
4✔
3098
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
3099
                        "overexposed, total local dust: %v (current commit "+
4✔
3100
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
3101

4✔
3102
                return true
4✔
3103
        }
4✔
3104

3105
        if isRemoteDust {
1,558✔
3106
                // If this is dust, it doesn't contribute to weight but does
631✔
3107
                // contribute to the overall dust sum.
631✔
3108
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
631✔
3109
        } else {
928✔
3110
                // Account for the fee increase that comes with an increase in
297✔
3111
                // weight.
297✔
3112
                remoteDustSum += additional
297✔
3113
        }
297✔
3114

3115
        if remoteDustSum > l.cfg.MaxFeeExposure {
927✔
3116
                // The max fee exposure was exceeded.
×
3117
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
3118
                        "overexposed, total remote dust: %v (current commit "+
×
3119
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
3120

×
3121
                return true
×
3122
        }
×
3123

3124
        return false
927✔
3125
}
3126

3127
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3128
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3129
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3130
// whether to evaluate on the local or remote commit, and finally an HTLC
3131
// amount to test.
3132
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3133
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3134

3135
// dustHelper is used to construct the dustClosure.
3136
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3137
        remoteDustLimit btcutil.Amount) dustClosure {
1,800✔
3138

1,800✔
3139
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,800✔
3140
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,391✔
3141

9,591✔
3142
                var dustLimit btcutil.Amount
9,591✔
3143
                if whoseCommit.IsLocal() {
14,387✔
3144
                        dustLimit = localDustLimit
4,796✔
3145
                } else {
9,592✔
3146
                        dustLimit = remoteDustLimit
4,796✔
3147
                }
4,796✔
3148

3149
                return lnwallet.HtlcIsDust(
9,591✔
3150
                        chantype, incoming, whoseCommit, feerate, amt,
9,591✔
3151
                        dustLimit,
9,591✔
3152
                )
9,591✔
3153
        }
3154

3155
        return isDust
1,800✔
3156
}
3157

3158
// zeroConfConfirmed returns whether or not the zero-conf channel has
3159
// confirmed on-chain.
3160
//
3161
// Part of the scidAliasHandler interface.
3162
func (l *channelLink) zeroConfConfirmed() bool {
4✔
3163
        return l.channel.State().ZeroConfConfirmed()
4✔
3164
}
4✔
3165

3166
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3167
// should not be called for non-zero-conf channels.
3168
//
3169
// Part of the scidAliasHandler interface.
3170
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
4✔
3171
        return l.channel.State().ZeroConfRealScid()
4✔
3172
}
4✔
3173

3174
// isZeroConf returns whether or not the underlying channel is a zero-conf
3175
// channel.
3176
//
3177
// Part of the scidAliasHandler interface.
3178
func (l *channelLink) isZeroConf() bool {
214✔
3179
        return l.channel.State().IsZeroConf()
214✔
3180
}
214✔
3181

3182
// negotiatedAliasFeature returns whether or not the underlying channel has
3183
// negotiated the option-scid-alias feature bit. This will be true for both
3184
// option-scid-alias and zero-conf channel-types. It will also be true for
3185
// channels with the feature bit but without the above channel-types.
3186
//
3187
// Part of the scidAliasFeature interface.
3188
func (l *channelLink) negotiatedAliasFeature() bool {
375✔
3189
        return l.channel.State().NegotiatedAliasFeature()
375✔
3190
}
375✔
3191

3192
// getAliases returns the set of aliases for the underlying channel.
3193
//
3194
// Part of the scidAliasHandler interface.
3195
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
220✔
3196
        return l.cfg.GetAliases(l.ShortChanID())
220✔
3197
}
220✔
3198

3199
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3200
//
3201
// Part of the scidAliasHandler interface.
3202
func (l *channelLink) attachFailAliasUpdate(closure func(
3203
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
215✔
3204

215✔
3205
        l.Lock()
215✔
3206
        l.cfg.FailAliasUpdate = closure
215✔
3207
        l.Unlock()
215✔
3208
}
215✔
3209

3210
// AttachMailBox updates the current mailbox used by this link, and hooks up
3211
// the mailbox's message and packet outboxes to the link's upstream and
3212
// downstream chans, respectively.
3213
func (l *channelLink) AttachMailBox(mailbox MailBox) {
214✔
3214
        l.Lock()
214✔
3215
        l.mailBox = mailbox
214✔
3216
        l.upstream = mailbox.MessageOutBox()
214✔
3217
        l.downstream = mailbox.PacketOutBox()
214✔
3218
        l.Unlock()
214✔
3219

214✔
3220
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
214✔
3221
        // never committed.
214✔
3222
        l.mailBox.SetFeeRate(l.getFeeRate())
214✔
3223

214✔
3224
        // Also set the mailbox's dust closure so that it can query whether HTLC's
214✔
3225
        // are dust given the current feerate.
214✔
3226
        l.mailBox.SetDustClosure(l.getDustClosure())
214✔
3227
}
214✔
3228

3229
// UpdateForwardingPolicy updates the forwarding policy for the target
3230
// ChannelLink. Once updated, the link will use the new forwarding policy to
3231
// govern if it an incoming HTLC should be forwarded or not. We assume that
3232
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3233
// update all of the link's FwrdingPolicy's values.
3234
//
3235
// NOTE: Part of the ChannelLink interface.
3236
func (l *channelLink) UpdateForwardingPolicy(
3237
        newPolicy models.ForwardingPolicy) {
13✔
3238

13✔
3239
        l.Lock()
13✔
3240
        defer l.Unlock()
13✔
3241

13✔
3242
        l.cfg.FwrdingPolicy = newPolicy
13✔
3243
}
13✔
3244

3245
// CheckHtlcForward should return a nil error if the passed HTLC details
3246
// satisfy the current forwarding policy fo the target link. Otherwise,
3247
// a LinkError with a valid protocol failure message should be returned
3248
// in order to signal to the source of the HTLC, the policy consistency
3249
// issue.
3250
//
3251
// NOTE: Part of the ChannelLink interface.
3252
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
3253
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
3254
        outgoingTimeout uint32, inboundFee models.InboundFee,
3255
        heightNow uint32, originalScid lnwire.ShortChannelID,
3256
        customRecords lnwire.CustomRecords) *LinkError {
50✔
3257

50✔
3258
        l.RLock()
50✔
3259
        policy := l.cfg.FwrdingPolicy
50✔
3260
        l.RUnlock()
50✔
3261

50✔
3262
        // Using the outgoing HTLC amount, we'll calculate the outgoing
50✔
3263
        // fee this incoming HTLC must carry in order to satisfy the constraints
50✔
3264
        // of the outgoing link.
50✔
3265
        outFee := ExpectedFee(policy, amtToForward)
50✔
3266

50✔
3267
        // Then calculate the inbound fee that we charge based on the sum of
50✔
3268
        // outgoing HTLC amount and outgoing fee.
50✔
3269
        inFee := inboundFee.CalcFee(amtToForward + outFee)
50✔
3270

50✔
3271
        // Add up both fee components. It is important to calculate both fees
50✔
3272
        // separately. An alternative way of calculating is to first determine
50✔
3273
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
50✔
3274
        // rounding may cause the result to be slightly higher than in the case
50✔
3275
        // of separately rounded fee components. This potentially causes failed
50✔
3276
        // forwards for senders and is something to be avoided.
50✔
3277
        expectedFee := inFee + int64(outFee)
50✔
3278

50✔
3279
        // If the actual fee is less than our expected fee, then we'll reject
50✔
3280
        // this HTLC as it didn't provide a sufficient amount of fees, or the
50✔
3281
        // values have been tampered with, or the send used incorrect/dated
50✔
3282
        // information to construct the forwarding information for this hop. In
50✔
3283
        // any case, we'll cancel this HTLC.
50✔
3284
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
50✔
3285
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
57✔
3286
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
7✔
3287
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
7✔
3288
                        "inboundFee=%v",
7✔
3289
                        payHash[:], expectedFee, actualFee,
7✔
3290
                        incomingHtlcAmt, amtToForward, inboundFee,
7✔
3291
                )
7✔
3292

7✔
3293
                // As part of the returned error, we'll send our latest routing
7✔
3294
                // policy so the sending node obtains the most up to date data.
7✔
3295
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
14✔
3296
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
7✔
3297
                }
7✔
3298
                failure := l.createFailureWithUpdate(false, originalScid, cb)
7✔
3299
                return NewLinkError(failure)
7✔
3300
        }
3301

3302
        // Check whether the outgoing htlc satisfies the channel policy.
3303
        err := l.canSendHtlc(
44✔
3304
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
44✔
3305
                originalScid, customRecords,
44✔
3306
        )
44✔
3307
        if err != nil {
58✔
3308
                return err
14✔
3309
        }
14✔
3310

3311
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3312
        // the following constraint: the incoming time-lock minus our time-lock
3313
        // delta should equal the outgoing time lock. Otherwise, whether the
3314
        // sender messed up, or an intermediate node tampered with the HTLC.
3315
        timeDelta := policy.TimeLockDelta
31✔
3316
        if incomingTimeout < outgoingTimeout+timeDelta {
33✔
3317
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3318
                        "expected at least %v block delta, got %v block delta",
2✔
3319
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3320

2✔
3321
                // Grab the latest routing policy so the sending node is up to
2✔
3322
                // date with our current policy.
2✔
3323
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3324
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3325
                                incomingTimeout, *upd,
2✔
3326
                        )
2✔
3327
                }
2✔
3328
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3329
                return NewLinkError(failure)
2✔
3330
        }
3331

3332
        return nil
29✔
3333
}
3334

3335
// CheckHtlcTransit should return a nil error if the passed HTLC details
3336
// satisfy the current channel policy.  Otherwise, a LinkError with a
3337
// valid protocol failure message should be returned in order to signal
3338
// the violation. This call is intended to be used for locally initiated
3339
// payments for which there is no corresponding incoming htlc.
3340
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3341
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
3342
        customRecords lnwire.CustomRecords) *LinkError {
407✔
3343

407✔
3344
        l.RLock()
407✔
3345
        policy := l.cfg.FwrdingPolicy
407✔
3346
        l.RUnlock()
407✔
3347

407✔
3348
        // We pass in hop.Source here as this is only used in the Switch when
407✔
3349
        // trying to send over a local link. This causes the fallback mechanism
407✔
3350
        // to occur.
407✔
3351
        return l.canSendHtlc(
407✔
3352
                policy, payHash, amt, timeout, heightNow, hop.Source,
407✔
3353
                customRecords,
407✔
3354
        )
407✔
3355
}
407✔
3356

3357
// canSendHtlc checks whether the given htlc parameters satisfy
3358
// the channel's amount and time lock constraints.
3359
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3360
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3361
        heightNow uint32, originalScid lnwire.ShortChannelID,
3362
        customRecords lnwire.CustomRecords) *LinkError {
450✔
3363

450✔
3364
        // As our first sanity check, we'll ensure that the passed HTLC isn't
450✔
3365
        // too small for the next hop. If so, then we'll cancel the HTLC
450✔
3366
        // directly.
450✔
3367
        if amt < policy.MinHTLCOut {
459✔
3368
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
9✔
3369
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
9✔
3370
                        amt)
9✔
3371

9✔
3372
                // As part of the returned error, we'll send our latest routing
9✔
3373
                // policy so the sending node obtains the most up to date data.
9✔
3374
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
18✔
3375
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
9✔
3376
                }
9✔
3377
                failure := l.createFailureWithUpdate(false, originalScid, cb)
9✔
3378
                return NewLinkError(failure)
9✔
3379
        }
3380

3381
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3382
        // cancel the HTLC directly.
3383
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
446✔
3384
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
4✔
3385
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
4✔
3386

4✔
3387
                // As part of the returned error, we'll send our latest routing
4✔
3388
                // policy so the sending node obtains the most up-to-date data.
4✔
3389
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
8✔
3390
                        return lnwire.NewTemporaryChannelFailure(upd)
4✔
3391
                }
4✔
3392
                failure := l.createFailureWithUpdate(false, originalScid, cb)
4✔
3393
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
4✔
3394
        }
3395

3396
        // We want to avoid offering an HTLC which will expire in the near
3397
        // future, so we'll reject an HTLC if the outgoing expiration time is
3398
        // too close to the current height.
3399
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
441✔
3400
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3401
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3402
                        timeout, heightNow)
2✔
3403

2✔
3404
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3405
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3406
                }
2✔
3407
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3408
                return NewLinkError(failure)
2✔
3409
        }
3410

3411
        // Check absolute max delta.
3412
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
438✔
3413
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3414
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3415
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3416

1✔
3417
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3418
        }
1✔
3419

3420
        // We now check the available bandwidth to see if this HTLC can be
3421
        // forwarded.
3422
        availableBandwidth := l.Bandwidth()
436✔
3423
        auxBandwidth, err := fn.MapOptionZ(
436✔
3424
                l.cfg.AuxTrafficShaper,
436✔
3425
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
436✔
3426
                        var htlcBlob fn.Option[tlv.Blob]
×
3427
                        blob, err := customRecords.Serialize()
×
3428
                        if err != nil {
×
3429
                                return fn.Err[OptionalBandwidth](
×
3430
                                        fmt.Errorf("unable to serialize "+
×
3431
                                                "custom records: %w", err))
×
3432
                        }
×
3433

3434
                        if len(blob) > 0 {
×
3435
                                htlcBlob = fn.Some(blob)
×
3436
                        }
×
3437

3438
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
3439
                },
3440
        ).Unpack()
3441
        if err != nil {
436✔
3442
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
3443
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
3444
        }
×
3445

3446
        auxBandwidth.WhenSome(func(bandwidth lnwire.MilliSatoshi) {
436✔
3447
                availableBandwidth = bandwidth
×
3448
        })
×
3449

3450
        // Check to see if there is enough balance in this channel.
3451
        if amt > availableBandwidth {
438✔
3452
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
2✔
3453
                        "larger than %v", amt, availableBandwidth)
2✔
3454
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3455
                        return lnwire.NewTemporaryChannelFailure(upd)
2✔
3456
                }
2✔
3457
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3458
                return NewDetailedLinkError(
2✔
3459
                        failure, OutgoingFailureInsufficientBalance,
2✔
3460
                )
2✔
3461
        }
3462

3463
        return nil
435✔
3464
}
3465

3466
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
3467
// in milli-satoshi. This might be different from the regular BTC bandwidth for
3468
// custom channels. This will always return fn.None() for a regular (non-custom)
3469
// channel.
3470
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
3471
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
3472
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
3473

×
3474
        unknownBandwidth := fn.None[lnwire.MilliSatoshi]()
×
3475

×
3476
        fundingBlob := l.FundingCustomBlob()
×
3477
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob)
×
3478
        if err != nil {
×
3479
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
3480
                        "failed to decide whether to handle traffic: %w", err))
×
3481
        }
×
3482

3483
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
3484
                "traffic: %v", cid, shouldHandle)
×
3485

×
3486
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
3487
        // early.
×
3488
        if !shouldHandle {
×
3489
                return fn.Ok(unknownBandwidth)
×
3490
        }
×
3491

3492
        // Ask for a specific bandwidth to be used for the channel.
3493
        commitmentBlob := l.CommitmentCustomBlob()
×
3494
        auxBandwidth, err := ts.PaymentBandwidth(
×
3495
                htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
3496
        )
×
3497
        if err != nil {
×
3498
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
3499
                        "bandwidth from external traffic shaper: %w", err))
×
3500
        }
×
3501

3502
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
3503
                "bandwidth: %v", cid, auxBandwidth)
×
3504

×
3505
        return fn.Ok(fn.Some(auxBandwidth))
×
3506
}
3507

3508
// Stats returns the statistics of channel link.
3509
//
3510
// NOTE: Part of the ChannelLink interface.
3511
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
5✔
3512
        snapshot := l.channel.StateSnapshot()
5✔
3513

5✔
3514
        return snapshot.ChannelCommitment.CommitHeight,
5✔
3515
                snapshot.TotalMSatSent,
5✔
3516
                snapshot.TotalMSatReceived
5✔
3517
}
5✔
3518

3519
// String returns the string representation of channel link.
3520
//
3521
// NOTE: Part of the ChannelLink interface.
3522
func (l *channelLink) String() string {
×
3523
        return l.channel.ChannelPoint().String()
×
3524
}
×
3525

3526
// handleSwitchPacket handles the switch packets. This packets which might be
3527
// forwarded to us from another channel link in case the htlc update came from
3528
// another peer or if the update was created by user
3529
//
3530
// NOTE: Part of the packetHandler interface.
3531
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
480✔
3532
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
480✔
3533
                pkt.inKey(), pkt.outKey())
480✔
3534

480✔
3535
        return l.mailBox.AddPacket(pkt)
480✔
3536
}
480✔
3537

3538
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3539
// to us from remote peer we have a channel with.
3540
//
3541
// NOTE: Part of the ChannelLink interface.
3542
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,362✔
3543
        select {
3,362✔
3544
        case <-l.cg.Done():
×
3545
                // Return early if the link is already in the process of
×
3546
                // quitting. It doesn't make sense to hand the message to the
×
3547
                // mailbox here.
×
3548
                return
×
3549
        default:
3,362✔
3550
        }
3551

3552
        err := l.mailBox.AddMessage(message)
3,362✔
3553
        if err != nil {
3,362✔
3554
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3555
        }
×
3556
}
3557

3558
// updateChannelFee updates the commitment fee-per-kw on this channel by
3559
// committing to an update_fee message.
3560
func (l *channelLink) updateChannelFee(ctx context.Context,
3561
        feePerKw chainfee.SatPerKWeight) error {
3✔
3562

3✔
3563
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3564

3✔
3565
        // We skip sending the UpdateFee message if the channel is not
3✔
3566
        // currently eligible to forward messages.
3✔
3567
        if !l.eligibleToUpdate() {
3✔
3568
                l.log.Debugf("skipping fee update for inactive channel")
×
3569
                return nil
×
3570
        }
×
3571

3572
        // Check and see if our proposed fee-rate would make us exceed the fee
3573
        // threshold.
3574
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3575
        if err != nil {
3✔
3576
                // This shouldn't typically happen. If it does, it indicates
×
3577
                // something is wrong with our channel state.
×
3578
                return err
×
3579
        }
×
3580

3581
        if thresholdExceeded {
3✔
3582
                return fmt.Errorf("link fee threshold exceeded")
×
3583
        }
×
3584

3585
        // First, we'll update the local fee on our commitment.
3586
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3587
                return err
×
3588
        }
×
3589

3590
        // The fee passed the channel's validation checks, so we update the
3591
        // mailbox feerate.
3592
        l.mailBox.SetFeeRate(feePerKw)
3✔
3593

3✔
3594
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3595
        // in immediately by triggering a commitment update.
3✔
3596
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3597
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3598
                return err
×
3599
        }
×
3600

3601
        return l.updateCommitTx(ctx)
3✔
3602
}
3603

3604
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3605
// after receiving a revocation from the remote party, and reprocesses them in
3606
// the context of the provided forwarding package. Any settles or fails that
3607
// have already been acknowledged in the forwarding package will not be sent to
3608
// the switch.
3609
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,186✔
3610
        if len(fwdPkg.SettleFails) == 0 {
2,058✔
3611
                return
872✔
3612
        }
872✔
3613

3614
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
315✔
3615

315✔
3616
        var switchPackets []*htlcPacket
315✔
3617
        for i, update := range fwdPkg.SettleFails {
630✔
3618
                destRef := fwdPkg.DestRef(uint16(i))
315✔
3619

315✔
3620
                // Skip any settles or fails that have already been
315✔
3621
                // acknowledged by the incoming link that originated the
315✔
3622
                // forwarded Add.
315✔
3623
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
315✔
3624
                        continue
×
3625
                }
3626

3627
                // TODO(roasbeef): rework log entries to a shared
3628
                // interface.
3629

3630
                switch msg := update.UpdateMsg.(type) {
315✔
3631
                // A settle for an HTLC we previously forwarded HTLC has been
3632
                // received. So we'll forward the HTLC to the switch which will
3633
                // handle propagating the settle to the prior hop.
3634
                case *lnwire.UpdateFulfillHTLC:
192✔
3635
                        // If hodl.SettleIncoming is requested, we will not
192✔
3636
                        // forward the SETTLE to the switch and will not signal
192✔
3637
                        // a free slot on the commitment transaction.
192✔
3638
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
192✔
3639
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3640
                                continue
×
3641
                        }
3642

3643
                        settlePacket := &htlcPacket{
192✔
3644
                                outgoingChanID: l.ShortChanID(),
192✔
3645
                                outgoingHTLCID: msg.ID,
192✔
3646
                                destRef:        &destRef,
192✔
3647
                                htlc:           msg,
192✔
3648
                        }
192✔
3649

192✔
3650
                        // Add the packet to the batch to be forwarded, and
192✔
3651
                        // notify the overflow queue that a spare spot has been
192✔
3652
                        // freed up within the commitment state.
192✔
3653
                        switchPackets = append(switchPackets, settlePacket)
192✔
3654

3655
                // A failureCode message for a previously forwarded HTLC has
3656
                // been received. As a result a new slot will be freed up in
3657
                // our commitment state, so we'll forward this to the switch so
3658
                // the backwards undo can continue.
3659
                case *lnwire.UpdateFailHTLC:
124✔
3660
                        // If hodl.SettleIncoming is requested, we will not
124✔
3661
                        // forward the FAIL to the switch and will not signal a
124✔
3662
                        // free slot on the commitment transaction.
124✔
3663
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
124✔
3664
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3665
                                continue
×
3666
                        }
3667

3668
                        // Fetch the reason the HTLC was canceled so we can
3669
                        // continue to propagate it. This failure originated
3670
                        // from another node, so the linkFailure field is not
3671
                        // set on the packet.
3672
                        failPacket := &htlcPacket{
124✔
3673
                                outgoingChanID: l.ShortChanID(),
124✔
3674
                                outgoingHTLCID: msg.ID,
124✔
3675
                                destRef:        &destRef,
124✔
3676
                                htlc:           msg,
124✔
3677
                        }
124✔
3678

124✔
3679
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
124✔
3680

124✔
3681
                        // If the failure message lacks an HMAC (but includes
124✔
3682
                        // the 4 bytes for encoding the message and padding
124✔
3683
                        // lengths, then this means that we received it as an
124✔
3684
                        // UpdateFailMalformedHTLC. As a result, we'll signal
124✔
3685
                        // that we need to convert this error within the switch
124✔
3686
                        // to an actual error, by encrypting it as if we were
124✔
3687
                        // the originating hop.
124✔
3688
                        convertedErrorSize := lnwire.FailureMessageLength + 4
124✔
3689
                        if len(msg.Reason) == convertedErrorSize {
128✔
3690
                                failPacket.convertedError = true
4✔
3691
                        }
4✔
3692

3693
                        // Add the packet to the batch to be forwarded, and
3694
                        // notify the overflow queue that a spare spot has been
3695
                        // freed up within the commitment state.
3696
                        switchPackets = append(switchPackets, failPacket)
124✔
3697
                }
3698
        }
3699

3700
        // Only spawn the task forward packets we have a non-zero number.
3701
        if len(switchPackets) > 0 {
630✔
3702
                go l.forwardBatch(false, switchPackets...)
315✔
3703
        }
315✔
3704
}
3705

3706
// processRemoteAdds serially processes each of the Add payment descriptors
3707
// which have been "locked-in" by receiving a revocation from the remote party.
3708
// The forwarding package provided instructs how to process this batch,
3709
// indicating whether this is the first time these Adds are being processed, or
3710
// whether we are reprocessing as a result of a failure or restart. Adds that
3711
// have already been acknowledged in the forwarding package will be ignored.
3712
//
3713
//nolint:funlen
3714
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,188✔
3715
        l.log.Tracef("processing %d remote adds for height %d",
1,188✔
3716
                len(fwdPkg.Adds), fwdPkg.Height)
1,188✔
3717

1,188✔
3718
        decodeReqs := make(
1,188✔
3719
                []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds),
1,188✔
3720
        )
1,188✔
3721
        for _, update := range fwdPkg.Adds {
1,638✔
3722
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
900✔
3723
                        // Before adding the new htlc to the state machine,
450✔
3724
                        // parse the onion object in order to obtain the
450✔
3725
                        // routing information with DecodeHopIterator function
450✔
3726
                        // which process the Sphinx packet.
450✔
3727
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
450✔
3728

450✔
3729
                        req := hop.DecodeHopIteratorRequest{
450✔
3730
                                OnionReader:    onionReader,
450✔
3731
                                RHash:          msg.PaymentHash[:],
450✔
3732
                                IncomingCltv:   msg.Expiry,
450✔
3733
                                IncomingAmount: msg.Amount,
450✔
3734
                                BlindingPoint:  msg.BlindingPoint,
450✔
3735
                        }
450✔
3736

450✔
3737
                        decodeReqs = append(decodeReqs, req)
450✔
3738
                }
450✔
3739
        }
3740

3741
        // Atomically decode the incoming htlcs, simultaneously checking for
3742
        // replay attempts. A particular index in the returned, spare list of
3743
        // channel iterators should only be used if the failure code at the
3744
        // same index is lnwire.FailCodeNone.
3745
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,188✔
3746
                fwdPkg.ID(), decodeReqs,
1,188✔
3747
        )
1,188✔
3748
        if sphinxErr != nil {
1,188✔
3749
                l.failf(LinkFailureError{code: ErrInternalError},
×
3750
                        "unable to decode hop iterators: %v", sphinxErr)
×
3751
                return
×
3752
        }
×
3753

3754
        var switchPackets []*htlcPacket
1,188✔
3755

1,188✔
3756
        for i, update := range fwdPkg.Adds {
1,638✔
3757
                idx := uint16(i)
450✔
3758

450✔
3759
                //nolint:forcetypeassert
450✔
3760
                add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC)
450✔
3761
                sourceRef := fwdPkg.SourceRef(idx)
450✔
3762

450✔
3763
                if fwdPkg.State == channeldb.FwdStateProcessed &&
450✔
3764
                        fwdPkg.AckFilter.Contains(idx) {
450✔
3765

×
3766
                        // If this index is already found in the ack filter,
×
3767
                        // the response to this forwarding decision has already
×
3768
                        // been committed by one of our commitment txns. ADDs
×
3769
                        // in this state are waiting for the rest of the fwding
×
3770
                        // package to get acked before being garbage collected.
×
3771
                        continue
×
3772
                }
3773

3774
                // An incoming HTLC add has been full-locked in. As a result we
3775
                // can now examine the forwarding details of the HTLC, and the
3776
                // HTLC itself to decide if: we should forward it, cancel it,
3777
                // or are able to settle it (and it adheres to our fee related
3778
                // constraints).
3779

3780
                // Before adding the new htlc to the state machine, parse the
3781
                // onion object in order to obtain the routing information with
3782
                // DecodeHopIterator function which process the Sphinx packet.
3783
                chanIterator, failureCode := decodeResps[i].Result()
450✔
3784
                if failureCode != lnwire.CodeNone {
453✔
3785
                        // If we're unable to process the onion blob then we
3✔
3786
                        // should send the malformed htlc error to payment
3✔
3787
                        // sender.
3✔
3788
                        l.sendMalformedHTLCError(
3✔
3789
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
3✔
3790
                        )
3✔
3791

3✔
3792
                        l.log.Errorf("unable to decode onion hop "+
3✔
3793
                                "iterator: %v", failureCode)
3✔
3794
                        continue
3✔
3795
                }
3796

3797
                heightNow := l.cfg.BestHeight()
448✔
3798

448✔
3799
                pld, routeRole, pldErr := chanIterator.HopPayload()
448✔
3800
                if pldErr != nil {
449✔
3801
                        // If we're unable to process the onion payload, or we
1✔
3802
                        // received invalid onion payload failure, then we
1✔
3803
                        // should send an error back to the caller so the HTLC
1✔
3804
                        // can be canceled.
1✔
3805
                        var failedType uint64
1✔
3806

1✔
3807
                        // We need to get the underlying error value, so we
1✔
3808
                        // can't use errors.As as suggested by the linter.
1✔
3809
                        //nolint:errorlint
1✔
3810
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
1✔
3811
                                failedType = uint64(e.Type)
×
3812
                        }
×
3813

3814
                        // If we couldn't parse the payload, make our best
3815
                        // effort at creating an error encrypter that knows
3816
                        // what blinding type we were, but if we couldn't
3817
                        // parse the payload we have no way of knowing whether
3818
                        // we were the introduction node or not.
3819
                        //
3820
                        //nolint:ll
3821
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
1✔
3822
                                l.cfg.ExtractErrorEncrypter,
1✔
3823
                                // We need our route role here because we
1✔
3824
                                // couldn't parse or validate the payload.
1✔
3825
                                routeRole == hop.RouteRoleIntroduction,
1✔
3826
                        )
1✔
3827
                        if failCode != lnwire.CodeNone {
1✔
3828
                                l.log.Errorf("could not extract error "+
×
3829
                                        "encrypter: %v", pldErr)
×
3830

×
3831
                                // We can't process this htlc, send back
×
3832
                                // malformed.
×
3833
                                l.sendMalformedHTLCError(
×
3834
                                        add.ID, failureCode, add.OnionBlob,
×
3835
                                        &sourceRef,
×
3836
                                )
×
3837

×
3838
                                continue
×
3839
                        }
3840

3841
                        // TODO: currently none of the test unit infrastructure
3842
                        // is setup to handle TLV payloads, so testing this
3843
                        // would require implementing a separate mock iterator
3844
                        // for TLV payloads that also supports injecting invalid
3845
                        // payloads. Deferring this non-trival effort till a
3846
                        // later date
3847
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
1✔
3848

1✔
3849
                        l.sendHTLCError(
1✔
3850
                                add, sourceRef, NewLinkError(failure),
1✔
3851
                                obfuscator, false,
1✔
3852
                        )
1✔
3853

1✔
3854
                        l.log.Errorf("unable to decode forwarding "+
1✔
3855
                                "instructions: %v", pldErr)
1✔
3856

1✔
3857
                        continue
1✔
3858
                }
3859

3860
                // Retrieve onion obfuscator from onion blob in order to
3861
                // produce initial obfuscation of the onion failureCode.
3862
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
448✔
3863
                        l.cfg.ExtractErrorEncrypter,
448✔
3864
                        routeRole == hop.RouteRoleIntroduction,
448✔
3865
                )
448✔
3866
                if failureCode != lnwire.CodeNone {
449✔
3867
                        // If we're unable to process the onion blob than we
1✔
3868
                        // should send the malformed htlc error to payment
1✔
3869
                        // sender.
1✔
3870
                        l.sendMalformedHTLCError(
1✔
3871
                                add.ID, failureCode, add.OnionBlob,
1✔
3872
                                &sourceRef,
1✔
3873
                        )
1✔
3874

1✔
3875
                        l.log.Errorf("unable to decode onion "+
1✔
3876
                                "obfuscator: %v", failureCode)
1✔
3877

1✔
3878
                        continue
1✔
3879
                }
3880

3881
                fwdInfo := pld.ForwardingInfo()
447✔
3882

447✔
3883
                // Check whether the payload we've just processed uses our
447✔
3884
                // node as the introduction point (gave us a blinding key in
447✔
3885
                // the payload itself) and fail it back if we don't support
447✔
3886
                // route blinding.
447✔
3887
                if fwdInfo.NextBlinding.IsSome() &&
447✔
3888
                        l.cfg.DisallowRouteBlinding {
448✔
3889

1✔
3890
                        failure := lnwire.NewInvalidBlinding(
1✔
3891
                                fn.Some(add.OnionBlob),
1✔
3892
                        )
1✔
3893

1✔
3894
                        l.sendHTLCError(
1✔
3895
                                add, sourceRef, NewLinkError(failure),
1✔
3896
                                obfuscator, false,
1✔
3897
                        )
1✔
3898

1✔
3899
                        l.log.Error("rejected htlc that uses use as an " +
1✔
3900
                                "introduction point when we do not support " +
1✔
3901
                                "route blinding")
1✔
3902

1✔
3903
                        continue
1✔
3904
                }
3905

3906
                switch fwdInfo.NextHop {
447✔
3907
                case hop.Exit:
411✔
3908
                        err := l.processExitHop(
411✔
3909
                                add, sourceRef, obfuscator, fwdInfo,
411✔
3910
                                heightNow, pld,
411✔
3911
                        )
411✔
3912
                        if err != nil {
411✔
3913
                                l.failf(LinkFailureError{
×
3914
                                        code: ErrInternalError,
×
3915
                                }, err.Error()) //nolint
×
3916

×
3917
                                return
×
3918
                        }
×
3919

3920
                // There are additional channels left within this route. So
3921
                // we'll simply do some forwarding package book-keeping.
3922
                default:
37✔
3923
                        // If hodl.AddIncoming is requested, we will not
37✔
3924
                        // validate the forwarded ADD, nor will we send the
37✔
3925
                        // packet to the htlc switch.
37✔
3926
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
37✔
3927
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3928
                                continue
×
3929
                        }
3930

3931
                        endorseValue := l.experimentalEndorsement(
37✔
3932
                                record.CustomSet(add.CustomRecords),
37✔
3933
                        )
37✔
3934
                        endorseType := uint64(
37✔
3935
                                lnwire.ExperimentalEndorsementType,
37✔
3936
                        )
37✔
3937

37✔
3938
                        switch fwdPkg.State {
37✔
3939
                        case channeldb.FwdStateProcessed:
1✔
3940
                                // This add was not forwarded on the previous
1✔
3941
                                // processing phase, run it through our
1✔
3942
                                // validation pipeline to reproduce an error.
1✔
3943
                                // This may trigger a different error due to
1✔
3944
                                // expiring timelocks, but we expect that an
1✔
3945
                                // error will be reproduced.
1✔
3946
                                if !fwdPkg.FwdFilter.Contains(idx) {
1✔
3947
                                        break
×
3948
                                }
3949

3950
                                // Otherwise, it was already processed, we can
3951
                                // can collect it and continue.
3952
                                outgoingAdd := &lnwire.UpdateAddHTLC{
1✔
3953
                                        Expiry:        fwdInfo.OutgoingCTLV,
1✔
3954
                                        Amount:        fwdInfo.AmountToForward,
1✔
3955
                                        PaymentHash:   add.PaymentHash,
1✔
3956
                                        BlindingPoint: fwdInfo.NextBlinding,
1✔
3957
                                }
1✔
3958

1✔
3959
                                endorseValue.WhenSome(func(e byte) {
2✔
3960
                                        custRecords := map[uint64][]byte{
1✔
3961
                                                endorseType: {e},
1✔
3962
                                        }
1✔
3963

1✔
3964
                                        outgoingAdd.CustomRecords = custRecords
1✔
3965
                                })
1✔
3966

3967
                                // Finally, we'll encode the onion packet for
3968
                                // the _next_ hop using the hop iterator
3969
                                // decoded for the current hop.
3970
                                buf := bytes.NewBuffer(
1✔
3971
                                        outgoingAdd.OnionBlob[0:0],
1✔
3972
                                )
1✔
3973

1✔
3974
                                // We know this cannot fail, as this ADD
1✔
3975
                                // was marked forwarded in a previous
1✔
3976
                                // round of processing.
1✔
3977
                                chanIterator.EncodeNextHop(buf)
1✔
3978

1✔
3979
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
1✔
3980

1✔
3981
                                //nolint:ll
1✔
3982
                                updatePacket := &htlcPacket{
1✔
3983
                                        incomingChanID:       l.ShortChanID(),
1✔
3984
                                        incomingHTLCID:       add.ID,
1✔
3985
                                        outgoingChanID:       fwdInfo.NextHop,
1✔
3986
                                        sourceRef:            &sourceRef,
1✔
3987
                                        incomingAmount:       add.Amount,
1✔
3988
                                        amount:               outgoingAdd.Amount,
1✔
3989
                                        htlc:                 outgoingAdd,
1✔
3990
                                        obfuscator:           obfuscator,
1✔
3991
                                        incomingTimeout:      add.Expiry,
1✔
3992
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
1✔
3993
                                        inOnionCustomRecords: pld.CustomRecords(),
1✔
3994
                                        inboundFee:           inboundFee,
1✔
3995
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
1✔
3996
                                }
1✔
3997
                                switchPackets = append(
1✔
3998
                                        switchPackets, updatePacket,
1✔
3999
                                )
1✔
4000

1✔
4001
                                continue
1✔
4002
                        }
4003

4004
                        // TODO(roasbeef): ensure don't accept outrageous
4005
                        // timeout for htlc
4006

4007
                        // With all our forwarding constraints met, we'll
4008
                        // create the outgoing HTLC using the parameters as
4009
                        // specified in the forwarding info.
4010
                        addMsg := &lnwire.UpdateAddHTLC{
37✔
4011
                                Expiry:        fwdInfo.OutgoingCTLV,
37✔
4012
                                Amount:        fwdInfo.AmountToForward,
37✔
4013
                                PaymentHash:   add.PaymentHash,
37✔
4014
                                BlindingPoint: fwdInfo.NextBlinding,
37✔
4015
                        }
37✔
4016

37✔
4017
                        endorseValue.WhenSome(func(e byte) {
74✔
4018
                                addMsg.CustomRecords = map[uint64][]byte{
37✔
4019
                                        endorseType: {e},
37✔
4020
                                }
37✔
4021
                        })
37✔
4022

4023
                        // Finally, we'll encode the onion packet for the
4024
                        // _next_ hop using the hop iterator decoded for the
4025
                        // current hop.
4026
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
37✔
4027
                        err := chanIterator.EncodeNextHop(buf)
37✔
4028
                        if err != nil {
37✔
4029
                                l.log.Errorf("unable to encode the "+
×
4030
                                        "remaining route %v", err)
×
4031

×
4032
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
4033
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
4034
                                }
×
4035

4036
                                failure := l.createFailureWithUpdate(
×
4037
                                        true, hop.Source, cb,
×
4038
                                )
×
4039

×
4040
                                l.sendHTLCError(
×
4041
                                        add, sourceRef, NewLinkError(failure),
×
4042
                                        obfuscator, false,
×
4043
                                )
×
4044
                                continue
×
4045
                        }
4046

4047
                        // Now that this add has been reprocessed, only append
4048
                        // it to our list of packets to forward to the switch
4049
                        // this is the first time processing the add. If the
4050
                        // fwd pkg has already been processed, then we entered
4051
                        // the above section to recreate a previous error.  If
4052
                        // the packet had previously been forwarded, it would
4053
                        // have been added to switchPackets at the top of this
4054
                        // section.
4055
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
74✔
4056
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
37✔
4057

37✔
4058
                                //nolint:ll
37✔
4059
                                updatePacket := &htlcPacket{
37✔
4060
                                        incomingChanID:       l.ShortChanID(),
37✔
4061
                                        incomingHTLCID:       add.ID,
37✔
4062
                                        outgoingChanID:       fwdInfo.NextHop,
37✔
4063
                                        sourceRef:            &sourceRef,
37✔
4064
                                        incomingAmount:       add.Amount,
37✔
4065
                                        amount:               addMsg.Amount,
37✔
4066
                                        htlc:                 addMsg,
37✔
4067
                                        obfuscator:           obfuscator,
37✔
4068
                                        incomingTimeout:      add.Expiry,
37✔
4069
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
37✔
4070
                                        inOnionCustomRecords: pld.CustomRecords(),
37✔
4071
                                        inboundFee:           inboundFee,
37✔
4072
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
37✔
4073
                                }
37✔
4074

37✔
4075
                                fwdPkg.FwdFilter.Set(idx)
37✔
4076
                                switchPackets = append(switchPackets,
37✔
4077
                                        updatePacket)
37✔
4078
                        }
37✔
4079
                }
4080
        }
4081

4082
        // Commit the htlcs we are intending to forward if this package has not
4083
        // been fully processed.
4084
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,373✔
4085
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,185✔
4086
                if err != nil {
1,185✔
4087
                        l.failf(LinkFailureError{code: ErrInternalError},
×
4088
                                "unable to set fwd filter: %v", err)
×
4089
                        return
×
4090
                }
×
4091
        }
4092

4093
        if len(switchPackets) == 0 {
2,340✔
4094
                return
1,152✔
4095
        }
1,152✔
4096

4097
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
37✔
4098

37✔
4099
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
37✔
4100
                len(switchPackets), replay)
37✔
4101

37✔
4102
        // NOTE: This call is made synchronous so that we ensure all circuits
37✔
4103
        // are committed in the exact order that they are processed in the link.
37✔
4104
        // Failing to do this could cause reorderings/gaps in the range of
37✔
4105
        // opened circuits, which violates assumptions made by the circuit
37✔
4106
        // trimming.
37✔
4107
        l.forwardBatch(replay, switchPackets...)
37✔
4108
}
4109

4110
// experimentalEndorsement returns the value to set for our outgoing
4111
// experimental endorsement field, and a boolean indicating whether it should
4112
// be populated on the outgoing htlc.
4113
func (l *channelLink) experimentalEndorsement(
4114
        customUpdateAdd record.CustomSet) fn.Option[byte] {
37✔
4115

37✔
4116
        // Only relay experimental signal if we are within the experiment
37✔
4117
        // period.
37✔
4118
        if !l.cfg.ShouldFwdExpEndorsement() {
38✔
4119
                return fn.None[byte]()
1✔
4120
        }
1✔
4121

4122
        // If we don't have any custom records or the experimental field is
4123
        // not set, just forward a zero value.
4124
        if len(customUpdateAdd) == 0 {
74✔
4125
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
37✔
4126
        }
37✔
4127

4128
        t := uint64(lnwire.ExperimentalEndorsementType)
1✔
4129
        value, set := customUpdateAdd[t]
1✔
4130
        if !set {
1✔
4131
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4132
        }
×
4133

4134
        // We expect at least one byte for this field, consider it invalid if
4135
        // it has no data and just forward a zero value.
4136
        if len(value) == 0 {
1✔
4137
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4138
        }
×
4139

4140
        // Only forward endorsed if the incoming link is endorsed.
4141
        if value[0] == lnwire.ExperimentalEndorsed {
2✔
4142
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
1✔
4143
        }
1✔
4144

4145
        // Forward as unendorsed otherwise, including cases where we've
4146
        // received an invalid value that uses more than 3 bits of information.
4147
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
1✔
4148
}
4149

4150
// processExitHop handles an htlc for which this link is the exit hop. It
4151
// returns a boolean indicating whether the commitment tx needs an update.
4152
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
4153
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
4154
        fwdInfo hop.ForwardingInfo, heightNow uint32,
4155
        payload invoices.Payload) error {
411✔
4156

411✔
4157
        // If hodl.ExitSettle is requested, we will not validate the final hop's
411✔
4158
        // ADD, nor will we settle the corresponding invoice or respond with the
411✔
4159
        // preimage.
411✔
4160
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
519✔
4161
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
108✔
4162
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
108✔
4163

108✔
4164
                return nil
108✔
4165
        }
108✔
4166

4167
        // In case the traffic shaper is active, we'll check if the HTLC has
4168
        // custom records and skip the amount check in the onion payload below.
4169
        isCustomHTLC := fn.MapOptionZ(
304✔
4170
                l.cfg.AuxTrafficShaper,
304✔
4171
                func(ts AuxTrafficShaper) bool {
304✔
4172
                        return ts.IsCustomHTLC(add.CustomRecords)
×
4173
                },
×
4174
        )
4175

4176
        // As we're the exit hop, we'll double check the hop-payload included in
4177
        // the HTLC to ensure that it was crafted correctly by the sender and
4178
        // is compatible with the HTLC we were extended. If an external
4179
        // validator is active we might bypass the amount check.
4180
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
404✔
4181
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
4182
                        "incompatible value: expected <=%v, got %v",
100✔
4183
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
4184

100✔
4185
                failure := NewLinkError(
100✔
4186
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
4187
                )
100✔
4188
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
4189

100✔
4190
                return nil
100✔
4191
        }
100✔
4192

4193
        // We'll also ensure that our time-lock value has been computed
4194
        // correctly.
4195
        if add.Expiry < fwdInfo.OutgoingCTLV {
205✔
4196
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
4197
                        "incompatible time-lock: expected <=%v, got %v",
1✔
4198
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
4199

1✔
4200
                failure := NewLinkError(
1✔
4201
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
4202
                )
1✔
4203

1✔
4204
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
4205

1✔
4206
                return nil
1✔
4207
        }
1✔
4208

4209
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4210
        // after this, this code will be re-executed after restart. We will
4211
        // receive back a resolution event.
4212
        invoiceHash := lntypes.Hash(add.PaymentHash)
203✔
4213

203✔
4214
        circuitKey := models.CircuitKey{
203✔
4215
                ChanID: l.ShortChanID(),
203✔
4216
                HtlcID: add.ID,
203✔
4217
        }
203✔
4218

203✔
4219
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
203✔
4220
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
203✔
4221
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
203✔
4222
        )
203✔
4223
        if err != nil {
203✔
4224
                return err
×
4225
        }
×
4226

4227
        // Create a hodlHtlc struct and decide either resolved now or later.
4228
        htlc := hodlHtlc{
203✔
4229
                add:        add,
203✔
4230
                sourceRef:  sourceRef,
203✔
4231
                obfuscator: obfuscator,
203✔
4232
        }
203✔
4233

203✔
4234
        // If the event is nil, the invoice is being held, so we save payment
203✔
4235
        // descriptor for future reference.
203✔
4236
        if event == nil {
260✔
4237
                l.hodlMap[circuitKey] = htlc
57✔
4238
                return nil
57✔
4239
        }
57✔
4240

4241
        // Process the received resolution.
4242
        return l.processHtlcResolution(event, htlc)
147✔
4243
}
4244

4245
// settleHTLC settles the HTLC on the channel.
4246
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4247
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
198✔
4248

198✔
4249
        hash := preimage.Hash()
198✔
4250

198✔
4251
        l.log.Infof("settling htlc %v as exit hop", hash)
198✔
4252

198✔
4253
        err := l.channel.SettleHTLC(
198✔
4254
                preimage, htlcIndex, &sourceRef, nil, nil,
198✔
4255
        )
198✔
4256
        if err != nil {
198✔
4257
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4258
        }
×
4259

4260
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4261
        // fake one before sending it to the peer.
4262
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
199✔
4263
                l.log.Warnf(hodl.BogusSettle.Warning())
1✔
4264
                preimage = [32]byte{}
1✔
4265
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
1✔
4266
        }
1✔
4267

4268
        // HTLC was successfully settled locally send notification about it
4269
        // remote peer.
4270
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
198✔
4271
                ChanID:          l.ChanID(),
198✔
4272
                ID:              htlcIndex,
198✔
4273
                PaymentPreimage: preimage,
198✔
4274
        })
198✔
4275

198✔
4276
        // Once we have successfully settled the htlc, notify a settle event.
198✔
4277
        l.cfg.HtlcNotifier.NotifySettleEvent(
198✔
4278
                HtlcKey{
198✔
4279
                        IncomingCircuit: models.CircuitKey{
198✔
4280
                                ChanID: l.ShortChanID(),
198✔
4281
                                HtlcID: htlcIndex,
198✔
4282
                        },
198✔
4283
                },
198✔
4284
                preimage,
198✔
4285
                HtlcEventTypeReceive,
198✔
4286
        )
198✔
4287

198✔
4288
        return nil
198✔
4289
}
4290

4291
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4292
// err chan for the individual responses. This method is intended to be spawned
4293
// as a goroutine so the responses can be handled in the background.
4294
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
576✔
4295
        // Don't forward packets for which we already have a response in our
576✔
4296
        // mailbox. This could happen if a packet fails and is buffered in the
576✔
4297
        // mailbox, and the incoming link flaps.
576✔
4298
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
576✔
4299
        for _, pkt := range packets {
1,152✔
4300
                if l.mailBox.HasPacket(pkt.inKey()) {
577✔
4301
                        continue
1✔
4302
                }
4303

4304
                filteredPkts = append(filteredPkts, pkt)
576✔
4305
        }
4306

4307
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
576✔
4308
        if err != nil {
587✔
4309
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
4310
                        "settle/fail over htlcswitch: %v", err)
11✔
4311
        }
11✔
4312
}
4313

4314
// sendHTLCError functions cancels HTLC and send cancel message back to the
4315
// peer from which HTLC was received.
4316
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4317
        sourceRef channeldb.AddRef, failure *LinkError,
4318
        e hop.ErrorEncrypter, isReceive bool) {
106✔
4319

106✔
4320
        reason, err := e.EncryptFirstHop(failure.WireMessage())
106✔
4321
        if err != nil {
106✔
4322
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4323
                return
×
4324
        }
×
4325

4326
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
106✔
4327
        if err != nil {
106✔
4328
                l.log.Errorf("unable cancel htlc: %v", err)
×
4329
                return
×
4330
        }
×
4331

4332
        // Send the appropriate failure message depending on whether we're
4333
        // in a blinded route or not.
4334
        if err := l.sendIncomingHTLCFailureMsg(
106✔
4335
                add.ID, e, reason,
106✔
4336
        ); err != nil {
106✔
4337
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4338
                return
×
4339
        }
×
4340

4341
        // Notify a link failure on our incoming link. Outgoing htlc information
4342
        // is not available at this point, because we have not decrypted the
4343
        // onion, so it is excluded.
4344
        var eventType HtlcEventType
106✔
4345
        if isReceive {
212✔
4346
                eventType = HtlcEventTypeReceive
106✔
4347
        } else {
107✔
4348
                eventType = HtlcEventTypeForward
1✔
4349
        }
1✔
4350

4351
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
106✔
4352
                HtlcKey{
106✔
4353
                        IncomingCircuit: models.CircuitKey{
106✔
4354
                                ChanID: l.ShortChanID(),
106✔
4355
                                HtlcID: add.ID,
106✔
4356
                        },
106✔
4357
                },
106✔
4358
                HtlcInfo{
106✔
4359
                        IncomingTimeLock: add.Expiry,
106✔
4360
                        IncomingAmt:      add.Amount,
106✔
4361
                },
106✔
4362
                eventType,
106✔
4363
                failure,
106✔
4364
                true,
106✔
4365
        )
106✔
4366
}
4367

4368
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4369
// peer from which the HTLC was received. This function is primarily used to
4370
// handle the special requirements of route blinding, specifically:
4371
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4372
// - Introduction nodes should return regular HTLC failure messages.
4373
//
4374
// It accepts the original opaque failure, which will be used in the case
4375
// that we're not part of a blinded route and an error encrypter that'll be
4376
// used if we are the introduction node and need to present an error as if
4377
// we're the failing party.
4378
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4379
        e hop.ErrorEncrypter,
4380
        originalFailure lnwire.OpaqueReason) error {
122✔
4381

122✔
4382
        var msg lnwire.Message
122✔
4383
        switch {
122✔
4384
        // Our circuit's error encrypter will be nil if this was a locally
4385
        // initiated payment. We can only hit a blinded error for a locally
4386
        // initiated payment if we allow ourselves to be picked as the
4387
        // introduction node for our own payments and in that case we
4388
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4389
        // we fail it back and log an error.
4390
        // code.
4391
        case e == nil:
×
4392
                msg = &lnwire.UpdateFailHTLC{
×
4393
                        ChanID: l.ChanID(),
×
4394
                        ID:     htlcIndex,
×
4395
                        Reason: originalFailure,
×
4396
                }
×
4397

×
4398
                l.log.Errorf("Unexpected blinded failure when "+
×
4399
                        "we are the sending node, incoming htlc: %v(%v)",
×
4400
                        l.ShortChanID(), htlcIndex)
×
4401

4402
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4403
        // transformation on the error message and can just send the original.
4404
        case !e.Type().IsBlinded():
122✔
4405
                msg = &lnwire.UpdateFailHTLC{
122✔
4406
                        ChanID: l.ChanID(),
122✔
4407
                        ID:     htlcIndex,
122✔
4408
                        Reason: originalFailure,
122✔
4409
                }
122✔
4410

4411
        // When we're the introduction node, we need to convert the error to
4412
        // a UpdateFailHTLC.
4413
        case e.Type() == hop.EncrypterTypeIntroduction:
1✔
4414
                l.log.Debugf("Introduction blinded node switching out failure "+
1✔
4415
                        "error: %v", htlcIndex)
1✔
4416

1✔
4417
                // The specification does not require that we set the onion
1✔
4418
                // blob.
1✔
4419
                failureMsg := lnwire.NewInvalidBlinding(
1✔
4420
                        fn.None[[lnwire.OnionPacketSize]byte](),
1✔
4421
                )
1✔
4422
                reason, err := e.EncryptFirstHop(failureMsg)
1✔
4423
                if err != nil {
1✔
4424
                        return err
×
4425
                }
×
4426

4427
                msg = &lnwire.UpdateFailHTLC{
1✔
4428
                        ChanID: l.ChanID(),
1✔
4429
                        ID:     htlcIndex,
1✔
4430
                        Reason: reason,
1✔
4431
                }
1✔
4432

4433
        // If we are a relaying node, we need to switch out any error that
4434
        // we've received to a malformed HTLC error.
4435
        case e.Type() == hop.EncrypterTypeRelaying:
1✔
4436
                l.log.Debugf("Relaying blinded node switching out malformed "+
1✔
4437
                        "error: %v", htlcIndex)
1✔
4438

1✔
4439
                msg = &lnwire.UpdateFailMalformedHTLC{
1✔
4440
                        ChanID:      l.ChanID(),
1✔
4441
                        ID:          htlcIndex,
1✔
4442
                        FailureCode: lnwire.CodeInvalidBlinding,
1✔
4443
                }
1✔
4444

4445
        default:
×
4446
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4447
        }
4448

4449
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
122✔
4450
                l.log.Warnf("Send update fail failed: %v", err)
×
4451
        }
×
4452

4453
        return nil
122✔
4454
}
4455

4456
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4457
// to the payment sender.
4458
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4459
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4460
        sourceRef *channeldb.AddRef) {
4✔
4461

4✔
4462
        shaOnionBlob := sha256.Sum256(onionBlob[:])
4✔
4463
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
4✔
4464
        if err != nil {
4✔
4465
                l.log.Errorf("unable cancel htlc: %v", err)
×
4466
                return
×
4467
        }
×
4468

4469
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
4✔
4470
                ChanID:       l.ChanID(),
4✔
4471
                ID:           htlcIndex,
4✔
4472
                ShaOnionBlob: shaOnionBlob,
4✔
4473
                FailureCode:  code,
4✔
4474
        })
4✔
4475
}
4476

4477
// failf is a function which is used to encapsulate the action necessary for
4478
// properly failing the link. It takes a LinkFailureError, which will be passed
4479
// to the OnChannelFailure closure, in order for it to determine if we should
4480
// force close the channel, and if we should send an error message to the
4481
// remote peer.
4482
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4483
        a ...interface{}) {
15✔
4484

15✔
4485
        reason := fmt.Errorf(format, a...)
15✔
4486

15✔
4487
        // Return if we have already notified about a failure.
15✔
4488
        if l.failed {
16✔
4489
                l.log.Warnf("ignoring link failure (%v), as link already "+
1✔
4490
                        "failed", reason)
1✔
4491
                return
1✔
4492
        }
1✔
4493

4494
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
15✔
4495

15✔
4496
        // Set failed, such that we won't process any more updates, and notify
15✔
4497
        // the peer about the failure.
15✔
4498
        l.failed = true
15✔
4499
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
15✔
4500
}
4501

4502
// FundingCustomBlob returns the custom funding blob of the channel that this
4503
// link is associated with. The funding blob represents static information about
4504
// the channel that was created at channel funding time.
4505
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4506
        if l.channel == nil {
×
4507
                return fn.None[tlv.Blob]()
×
4508
        }
×
4509

4510
        if l.channel.State() == nil {
×
4511
                return fn.None[tlv.Blob]()
×
4512
        }
×
4513

4514
        return l.channel.State().CustomBlob
×
4515
}
4516

4517
// CommitmentCustomBlob returns the custom blob of the current local commitment
4518
// of the channel that this link is associated with.
4519
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4520
        if l.channel == nil {
×
4521
                return fn.None[tlv.Blob]()
×
4522
        }
×
4523

4524
        return l.channel.LocalCommitmentBlob()
×
4525
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc