• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17190810624

24 Aug 2025 03:58PM UTC coverage: 66.74% (+9.4%) from 57.321%
17190810624

Pull #10167

github

web-flow
Merge 33cec4f6a into 0c2f045f5
Pull Request #10167: multi: bump Go to 1.24.6

6 of 40 new or added lines in 10 files covered. (15.0%)

12 existing lines in 6 files now uncovered.

135947 of 203696 relevant lines covered (66.74%)

21470.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.26
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/routing/route"
35
        "github.com/lightningnetwork/lnd/ticker"
36
        "github.com/lightningnetwork/lnd/tlv"
37
)
38

39
const (
40
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
41
        // the node accepts for forwarded payments. The value is relative to the
42
        // current block height. The reason to have a maximum is to prevent
43
        // funds getting locked up unreasonably long. Otherwise, an attacker
44
        // willing to lock its own funds too, could force the funds of this node
45
        // to be locked up for an indefinite (max int32) number of blocks.
46
        //
47
        // The value 2016 corresponds to on average two weeks worth of blocks
48
        // and is based on the maximum number of hops (20), the default CLTV
49
        // delta (40), and some extra margin to account for the other lightning
50
        // implementations and past lnd versions which used to have a default
51
        // CLTV delta of 144.
52
        DefaultMaxOutgoingCltvExpiry = 2016
53

54
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
55
        // which a link should propose to update its commitment fee rate.
56
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
57

58
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
59
        // which a link should propose to update its commitment fee rate.
60
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
61

62
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
63
        // a channel's commitment fee to be of its balance. This only applies to
64
        // the initiator of the channel.
65
        DefaultMaxLinkFeeAllocation float64 = 0.5
66
)
67

68
// ExpectedFee computes the expected fee for a given htlc amount. The value
69
// returned from this function is to be used as a sanity check when forwarding
70
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
71
// forwarding policy.
72
//
73
// TODO(roasbeef): also add in current available channel bandwidth, inverse
74
// func
75
func ExpectedFee(f models.ForwardingPolicy,
76
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
81✔
77

81✔
78
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
81✔
79
}
81✔
80

81
// ChannelLinkConfig defines the configuration for the channel link. ALL
82
// elements within the configuration MUST be non-nil for channel link to carry
83
// out its duties.
84
type ChannelLinkConfig struct {
85
        // FwrdingPolicy is the initial forwarding policy to be used when
86
        // deciding whether to forwarding incoming HTLC's or not. This value
87
        // can be updated with subsequent calls to UpdateForwardingPolicy
88
        // targeted at a given ChannelLink concrete interface implementation.
89
        FwrdingPolicy models.ForwardingPolicy
90

91
        // Circuits provides restricted access to the switch's circuit map,
92
        // allowing the link to open and close circuits.
93
        Circuits CircuitModifier
94

95
        // BestHeight returns the best known height.
96
        BestHeight func() uint32
97

98
        // ForwardPackets attempts to forward the batch of htlcs through the
99
        // switch. The function returns and error in case it fails to send one or
100
        // more packets. The link's quit signal should be provided to allow
101
        // cancellation of forwarding during link shutdown.
102
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
103

104
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
105
        // blobs, which are then used to inform how to forward an HTLC.
106
        //
107
        // NOTE: This function assumes the same set of readers and preimages
108
        // are always presented for the same identifier. The last boolean is
109
        // used to decide whether this is a reforwarding or not - when it's
110
        // reforwarding, we skip the replay check enforced in our decay log.
111
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
112
                []hop.DecodeHopIteratorResponse, error)
113

114
        // ExtractErrorEncrypter function is responsible for decoding HTLC
115
        // Sphinx onion blob, and creating onion failure obfuscator.
116
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
117

118
        // FetchLastChannelUpdate retrieves the latest routing policy for a
119
        // target channel. This channel will typically be the outgoing channel
120
        // specified when we receive an incoming HTLC.  This will be used to
121
        // provide payment senders our latest policy when sending encrypted
122
        // error messages.
123
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
124
                *lnwire.ChannelUpdate1, error)
125

126
        // Peer is a lightning network node with which we have the channel link
127
        // opened.
128
        Peer lnpeer.Peer
129

130
        // Registry is a sub-system which responsible for managing the invoices
131
        // in thread-safe manner.
132
        Registry InvoiceDatabase
133

134
        // PreimageCache is a global witness beacon that houses any new
135
        // preimages discovered by other links. We'll use this to add new
136
        // witnesses that we discover which will notify any sub-systems
137
        // subscribed to new events.
138
        PreimageCache contractcourt.WitnessBeacon
139

140
        // OnChannelFailure is a function closure that we'll call if the
141
        // channel failed for some reason. Depending on the severity of the
142
        // error, the closure potentially must force close this channel and
143
        // disconnect the peer.
144
        //
145
        // NOTE: The method must return in order for the ChannelLink to be able
146
        // to shut down properly.
147
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
148
                LinkFailureError)
149

150
        // UpdateContractSignals is a function closure that we'll use to update
151
        // outside sub-systems with this channel's latest ShortChannelID.
152
        UpdateContractSignals func(*contractcourt.ContractSignals) error
153

154
        // NotifyContractUpdate is a function closure that we'll use to update
155
        // the contractcourt and more specifically the ChannelArbitrator of the
156
        // latest channel state.
157
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
158

159
        // ChainEvents is an active subscription to the chain watcher for this
160
        // channel to be notified of any on-chain activity related to this
161
        // channel.
162
        ChainEvents *contractcourt.ChainEventSubscription
163

164
        // FeeEstimator is an instance of a live fee estimator which will be
165
        // used to dynamically regulate the current fee of the commitment
166
        // transaction to ensure timely confirmation.
167
        FeeEstimator chainfee.Estimator
168

169
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
170
        // for HTLC forwarding internal to the switch.
171
        //
172
        // NOTE: This should only be used for testing.
173
        HodlMask hodl.Mask
174

175
        // SyncStates is used to indicate that we need send the channel
176
        // reestablishment message to the remote peer. It should be done if our
177
        // clients have been restarted, or remote peer have been reconnected.
178
        SyncStates bool
179

180
        // BatchTicker is the ticker that determines the interval that we'll
181
        // use to check the batch to see if there're any updates we should
182
        // flush out. By batching updates into a single commit, we attempt to
183
        // increase throughput by maximizing the number of updates coalesced
184
        // into a single commit.
185
        BatchTicker ticker.Ticker
186

187
        // FwdPkgGCTicker is the ticker determining the frequency at which
188
        // garbage collection of forwarding packages occurs. We use a
189
        // time-based approach, as opposed to block epochs, as to not hinder
190
        // syncing.
191
        FwdPkgGCTicker ticker.Ticker
192

193
        // PendingCommitTicker is a ticker that allows the link to determine if
194
        // a locally initiated commitment dance gets stuck waiting for the
195
        // remote party to revoke.
196
        PendingCommitTicker ticker.Ticker
197

198
        // BatchSize is the max size of a batch of updates done to the link
199
        // before we do a state update.
200
        BatchSize uint32
201

202
        // UnsafeReplay will cause a link to replay the adds in its latest
203
        // commitment txn after the link is restarted. This should only be used
204
        // in testing, it is here to ensure the sphinx replay detection on the
205
        // receiving node is persistent.
206
        UnsafeReplay bool
207

208
        // MinUpdateTimeout represents the minimum interval in which a link
209
        // will propose to update its commitment fee rate. A random timeout will
210
        // be selected between this and MaxUpdateTimeout.
211
        MinUpdateTimeout time.Duration
212

213
        // MaxUpdateTimeout represents the maximum interval in which a link
214
        // will propose to update its commitment fee rate. A random timeout will
215
        // be selected between this and MinUpdateTimeout.
216
        MaxUpdateTimeout time.Duration
217

218
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
219
        // an htlc where we don't offer an htlc anymore. This should be at least
220
        // the outgoing broadcast delta, because in any case we don't want to
221
        // risk offering an htlc that triggers channel closure.
222
        OutgoingCltvRejectDelta uint32
223

224
        // TowerClient is an optional engine that manages the signing,
225
        // encrypting, and uploading of justice transactions to the daemon's
226
        // configured set of watchtowers for legacy channels.
227
        TowerClient TowerClient
228

229
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
230
        // should accept for a forwarded HTLC. The value is relative to the
231
        // current block height.
232
        MaxOutgoingCltvExpiry uint32
233

234
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
235
        // commitment fee to be of its balance. This only applies to the
236
        // initiator of the channel.
237
        MaxFeeAllocation float64
238

239
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
240
        // the initiator for channels of the anchor type.
241
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
242

243
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
244
        // link is first started.
245
        NotifyActiveLink func(wire.OutPoint)
246

247
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
248
        // channels becomes active.
249
        NotifyActiveChannel func(wire.OutPoint)
250

251
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
252
        // when channels become inactive.
253
        NotifyInactiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveLinkEvent allows the switch to tell the
256
        // ChannelNotifier when a channel link become inactive.
257
        NotifyInactiveLinkEvent func(wire.OutPoint)
258

259
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
260
        // events through.
261
        HtlcNotifier htlcNotifier
262

263
        // FailAliasUpdate is a function used to fail an HTLC for an
264
        // option_scid_alias channel.
265
        FailAliasUpdate func(sid lnwire.ShortChannelID,
266
                incoming bool) *lnwire.ChannelUpdate1
267

268
        // GetAliases is used by the link and switch to fetch the set of
269
        // aliases for a given link.
270
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
271

272
        // PreviouslySentShutdown is an optional value that is set if, at the
273
        // time of the link being started, persisted shutdown info was found for
274
        // the channel. This value being set means that we previously sent a
275
        // Shutdown message to our peer, and so we should do so again on
276
        // re-establish and should not allow anymore HTLC adds on the outgoing
277
        // direction of the link.
278
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
279

280
        // Adds the option to disable forwarding payments in blinded routes
281
        // by failing back any blinding-related payloads as if they were
282
        // invalid.
283
        DisallowRouteBlinding bool
284

285
        // DisallowQuiescence is a flag that can be used to disable the
286
        // quiescence protocol.
287
        DisallowQuiescence bool
288

289
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
290
        // restrict the flow of HTLCs and fee updates.
291
        MaxFeeExposure lnwire.MilliSatoshi
292

293
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
294
        // should forward experimental endorsement signals.
295
        ShouldFwdExpEndorsement func() bool
296

297
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
298
        // used to manage the bandwidth of the link.
299
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
300

301
        // QuiescenceTimeout is the max duration that the channel can be
302
        // quiesced. Any dependent protocols (dynamic commitments, splicing,
303
        // etc.) must finish their operations under this timeout value,
304
        // otherwise the node will disconnect.
305
        QuiescenceTimeout time.Duration
306
}
307

308
// channelLink is the service which drives a channel's commitment update
309
// state-machine. In the event that an HTLC needs to be propagated to another
310
// link, the forward handler from config is used which sends HTLC to the
311
// switch. Additionally, the link encapsulate logic of commitment protocol
312
// message ordering and updates.
313
type channelLink struct {
314
        // The following fields are only meant to be used *atomically*
315
        started       int32
316
        reestablished int32
317
        shutdown      int32
318

319
        // failed should be set to true in case a link error happens, making
320
        // sure we don't process any more updates.
321
        failed bool
322

323
        // keystoneBatch represents a volatile list of keystones that must be
324
        // written before attempting to sign the next commitment txn. These
325
        // represent all the HTLC's forwarded to the link from the switch. Once
326
        // we lock them into our outgoing commitment, then the circuit has a
327
        // keystone, and is fully opened.
328
        keystoneBatch []Keystone
329

330
        // openedCircuits is the set of all payment circuits that will be open
331
        // once we make our next commitment. After making the commitment we'll
332
        // ACK all these from our mailbox to ensure that they don't get
333
        // re-delivered if we reconnect.
334
        openedCircuits []CircuitKey
335

336
        // closedCircuits is the set of all payment circuits that will be
337
        // closed once we make our next commitment. After taking the commitment
338
        // we'll ACK all these to ensure that they don't get re-delivered if we
339
        // reconnect.
340
        closedCircuits []CircuitKey
341

342
        // channel is a lightning network channel to which we apply htlc
343
        // updates.
344
        channel *lnwallet.LightningChannel
345

346
        // cfg is a structure which carries all dependable fields/handlers
347
        // which may affect behaviour of the service.
348
        cfg ChannelLinkConfig
349

350
        // mailBox is the main interface between the outside world and the
351
        // link. All incoming messages will be sent over this mailBox. Messages
352
        // include new updates from our connected peer, and new packets to be
353
        // forwarded sent by the switch.
354
        mailBox MailBox
355

356
        // upstream is a channel that new messages sent from the remote peer to
357
        // the local peer will be sent across.
358
        upstream chan lnwire.Message
359

360
        // downstream is a channel in which new multi-hop HTLC's to be
361
        // forwarded will be sent across. Messages from this channel are sent
362
        // by the HTLC switch.
363
        downstream chan *htlcPacket
364

365
        // updateFeeTimer is the timer responsible for updating the link's
366
        // commitment fee every time it fires.
367
        updateFeeTimer *time.Timer
368

369
        // uncommittedPreimages stores a list of all preimages that have been
370
        // learned since receiving the last CommitSig from the remote peer. The
371
        // batch will be flushed just before accepting the subsequent CommitSig
372
        // or on shutdown to avoid doing a write for each preimage received.
373
        uncommittedPreimages []lntypes.Preimage
374

375
        sync.RWMutex
376

377
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
378
        // registry.
379
        hodlQueue *queue.ConcurrentQueue
380

381
        // hodlMap stores related htlc data for a circuit key. It allows
382
        // resolving those htlcs when we receive a message on hodlQueue.
383
        hodlMap map[models.CircuitKey]hodlHtlc
384

385
        // log is a link-specific logging instance.
386
        log btclog.Logger
387

388
        // isOutgoingAddBlocked tracks whether the channelLink can send an
389
        // UpdateAddHTLC.
390
        isOutgoingAddBlocked atomic.Bool
391

392
        // isIncomingAddBlocked tracks whether the channelLink can receive an
393
        // UpdateAddHTLC.
394
        isIncomingAddBlocked atomic.Bool
395

396
        // flushHooks is a hookMap that is triggered when we reach a channel
397
        // state with no live HTLCs.
398
        flushHooks hookMap
399

400
        // outgoingCommitHooks is a hookMap that is triggered after we send our
401
        // next CommitSig.
402
        outgoingCommitHooks hookMap
403

404
        // incomingCommitHooks is a hookMap that is triggered after we receive
405
        // our next CommitSig.
406
        incomingCommitHooks hookMap
407

408
        // quiescer is the state machine that tracks where this channel is with
409
        // respect to the quiescence protocol.
410
        quiescer Quiescer
411

412
        // quiescenceReqs is a queue of requests to quiesce this link. The
413
        // members of the queue are send-only channels we should call back with
414
        // the result.
415
        quiescenceReqs chan StfuReq
416

417
        // cg is a helper that encapsulates a wait group and quit channel and
418
        // allows contexts that either block or cancel on those depending on
419
        // the use case.
420
        cg *fn.ContextGuard
421
}
422

423
// hookMap is a data structure that is used to track the hooks that need to be
424
// called in various parts of the channelLink's lifecycle.
425
//
426
// WARNING: NOT thread-safe.
427
type hookMap struct {
428
        // allocIdx keeps track of the next id we haven't yet allocated.
429
        allocIdx atomic.Uint64
430

431
        // transient is a map of hooks that are only called the next time invoke
432
        // is called. These hooks are deleted during invoke.
433
        transient map[uint64]func()
434

435
        // newTransients is a channel that we use to accept new hooks into the
436
        // hookMap.
437
        newTransients chan func()
438
}
439

440
// newHookMap initializes a new empty hookMap.
441
func newHookMap() hookMap {
648✔
442
        return hookMap{
648✔
443
                allocIdx:      atomic.Uint64{},
648✔
444
                transient:     make(map[uint64]func()),
648✔
445
                newTransients: make(chan func()),
648✔
446
        }
648✔
447
}
648✔
448

449
// alloc allocates space in the hook map for the supplied hook, the second
450
// argument determines whether it goes into the transient or persistent part
451
// of the hookMap.
452
func (m *hookMap) alloc(hook func()) uint64 {
5✔
453
        // We assume we never overflow a uint64. Seems OK.
5✔
454
        hookID := m.allocIdx.Add(1)
5✔
455
        if hookID == 0 {
5✔
456
                panic("hookMap allocIdx overflow")
×
457
        }
458
        m.transient[hookID] = hook
5✔
459

5✔
460
        return hookID
5✔
461
}
462

463
// invoke is used on a hook map to call all the registered hooks and then clear
464
// out the transient hooks so they are not called again.
465
func (m *hookMap) invoke() {
2,732✔
466
        for _, hook := range m.transient {
2,737✔
467
                hook()
5✔
468
        }
5✔
469

470
        m.transient = make(map[uint64]func())
2,732✔
471
}
472

473
// hodlHtlc contains htlc data that is required for resolution.
474
type hodlHtlc struct {
475
        add        lnwire.UpdateAddHTLC
476
        sourceRef  channeldb.AddRef
477
        obfuscator hop.ErrorEncrypter
478
}
479

480
// NewChannelLink creates a new instance of a ChannelLink given a configuration
481
// and active channel that will be used to verify/apply updates to.
482
func NewChannelLink(cfg ChannelLinkConfig,
483
        channel *lnwallet.LightningChannel) ChannelLink {
218✔
484

218✔
485
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
218✔
486

218✔
487
        // If the max fee exposure isn't set, use the default.
218✔
488
        if cfg.MaxFeeExposure == 0 {
433✔
489
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
490
        }
215✔
491

492
        var qsm Quiescer
218✔
493
        if !cfg.DisallowQuiescence {
436✔
494
                qsm = NewQuiescer(QuiescerCfg{
218✔
495
                        chanID: lnwire.NewChanIDFromOutPoint(
218✔
496
                                channel.ChannelPoint(),
218✔
497
                        ),
218✔
498
                        channelInitiator: channel.Initiator(),
218✔
499
                        sendMsg: func(s lnwire.Stfu) error {
223✔
500
                                return cfg.Peer.SendMessage(false, &s)
5✔
501
                        },
5✔
502
                        timeoutDuration: cfg.QuiescenceTimeout,
503
                        onTimeout: func() {
5✔
504
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
5✔
505
                        },
5✔
506
                })
507
        } else {
×
508
                qsm = &quiescerNoop{}
×
509
        }
×
510

511
        quiescenceReqs := make(
218✔
512
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
218✔
513
        )
218✔
514

218✔
515
        return &channelLink{
218✔
516
                cfg:                 cfg,
218✔
517
                channel:             channel,
218✔
518
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
218✔
519
                hodlQueue:           queue.NewConcurrentQueue(10),
218✔
520
                log:                 log.WithPrefix(logPrefix),
218✔
521
                flushHooks:          newHookMap(),
218✔
522
                outgoingCommitHooks: newHookMap(),
218✔
523
                incomingCommitHooks: newHookMap(),
218✔
524
                quiescer:            qsm,
218✔
525
                quiescenceReqs:      quiescenceReqs,
218✔
526
                cg:                  fn.NewContextGuard(),
218✔
527
        }
218✔
528
}
529

530
// A compile time check to ensure channelLink implements the ChannelLink
531
// interface.
532
var _ ChannelLink = (*channelLink)(nil)
533

534
// Start starts all helper goroutines required for the operation of the channel
535
// link.
536
//
537
// NOTE: Part of the ChannelLink interface.
538
func (l *channelLink) Start() error {
216✔
539
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
216✔
540
                err := fmt.Errorf("channel link(%v): already started", l)
×
541
                l.log.Warn("already started")
×
542
                return err
×
543
        }
×
544

545
        l.log.Info("starting")
216✔
546

216✔
547
        // If the config supplied watchtower client, ensure the channel is
216✔
548
        // registered before trying to use it during operation.
216✔
549
        if l.cfg.TowerClient != nil {
219✔
550
                err := l.cfg.TowerClient.RegisterChannel(
3✔
551
                        l.ChanID(), l.channel.State().ChanType,
3✔
552
                )
3✔
553
                if err != nil {
3✔
554
                        return err
×
555
                }
×
556
        }
557

558
        l.mailBox.ResetMessages()
216✔
559
        l.hodlQueue.Start()
216✔
560

216✔
561
        // Before launching the htlcManager messages, revert any circuits that
216✔
562
        // were marked open in the switch's circuit map, but did not make it
216✔
563
        // into a commitment txn. We use the next local htlc index as the cut
216✔
564
        // off point, since all indexes below that are committed. This action
216✔
565
        // is only performed if the link's final short channel ID has been
216✔
566
        // assigned, otherwise we would try to trim the htlcs belonging to the
216✔
567
        // all-zero, hop.Source ID.
216✔
568
        if l.ShortChanID() != hop.Source {
432✔
569
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
216✔
570
                if err != nil {
216✔
571
                        return fmt.Errorf("unable to retrieve next local "+
×
572
                                "htlc index: %v", err)
×
573
                }
×
574

575
                // NOTE: This is automatically done by the switch when it
576
                // starts up, but is necessary to prevent inconsistencies in
577
                // the case that the link flaps. This is a result of a link's
578
                // life-cycle being shorter than that of the switch.
579
                chanID := l.ShortChanID()
216✔
580
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
216✔
581
                if err != nil {
216✔
582
                        return fmt.Errorf("unable to trim circuits above "+
×
583
                                "local htlc index %d: %v", localHtlcIndex, err)
×
584
                }
×
585

586
                // Since the link is live, before we start the link we'll update
587
                // the ChainArbitrator with the set of new channel signals for
588
                // this channel.
589
                //
590
                // TODO(roasbeef): split goroutines within channel arb to avoid
591
                go func() {
432✔
592
                        signals := &contractcourt.ContractSignals{
216✔
593
                                ShortChanID: l.channel.ShortChanID(),
216✔
594
                        }
216✔
595

216✔
596
                        err := l.cfg.UpdateContractSignals(signals)
216✔
597
                        if err != nil {
216✔
598
                                l.log.Errorf("unable to update signals")
×
599
                        }
×
600
                }()
601
        }
602

603
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
216✔
604

216✔
605
        l.cg.WgAdd(1)
216✔
606
        go l.htlcManager(context.TODO())
216✔
607

216✔
608
        return nil
216✔
609
}
610

611
// Stop gracefully stops all active helper goroutines, then waits until they've
612
// exited.
613
//
614
// NOTE: Part of the ChannelLink interface.
615
func (l *channelLink) Stop() {
217✔
616
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
229✔
617
                l.log.Warn("already stopped")
12✔
618
                return
12✔
619
        }
12✔
620

621
        l.log.Info("stopping")
205✔
622

205✔
623
        // As the link is stopping, we are no longer interested in htlc
205✔
624
        // resolutions coming from the invoice registry.
205✔
625
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
205✔
626

205✔
627
        if l.cfg.ChainEvents.Cancel != nil {
208✔
628
                l.cfg.ChainEvents.Cancel()
3✔
629
        }
3✔
630

631
        // Ensure the channel for the timer is drained.
632
        if l.updateFeeTimer != nil {
410✔
633
                if !l.updateFeeTimer.Stop() {
205✔
634
                        select {
×
635
                        case <-l.updateFeeTimer.C:
×
636
                        default:
×
637
                        }
638
                }
639
        }
640

641
        if l.hodlQueue != nil {
410✔
642
                l.hodlQueue.Stop()
205✔
643
        }
205✔
644

645
        l.cg.Quit()
205✔
646
        l.cg.WgWait()
205✔
647

205✔
648
        // Now that the htlcManager has completely exited, reset the packet
205✔
649
        // courier. This allows the mailbox to revaluate any lingering Adds that
205✔
650
        // were delivered but didn't make it on a commitment to be failed back
205✔
651
        // if the link is offline for an extended period of time. The error is
205✔
652
        // ignored since it can only fail when the daemon is exiting.
205✔
653
        _ = l.mailBox.ResetPackets()
205✔
654

205✔
655
        // As a final precaution, we will attempt to flush any uncommitted
205✔
656
        // preimages to the preimage cache. The preimages should be re-delivered
205✔
657
        // after channel reestablishment, however this adds an extra layer of
205✔
658
        // protection in case the peer never returns. Without this, we will be
205✔
659
        // unable to settle any contracts depending on the preimages even though
205✔
660
        // we had learned them at some point.
205✔
661
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
205✔
662
        if err != nil {
205✔
663
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
664
                        l.uncommittedPreimages, err)
×
665
        }
×
666
}
667

668
// WaitForShutdown blocks until the link finishes shutting down, which includes
669
// termination of all dependent goroutines.
670
func (l *channelLink) WaitForShutdown() {
×
671
        l.cg.WgWait()
×
672
}
×
673

674
// EligibleToForward returns a bool indicating if the channel is able to
675
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
676
// we are eligible to update AND the channel isn't currently flushing the
677
// outgoing half of the channel.
678
//
679
// NOTE: MUST NOT be called from the main event loop.
680
func (l *channelLink) EligibleToForward() bool {
616✔
681
        l.RLock()
616✔
682
        defer l.RUnlock()
616✔
683

616✔
684
        return l.eligibleToForward()
616✔
685
}
616✔
686

687
// eligibleToForward returns a bool indicating if the channel is able to
688
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
689
// we are eligible to update AND the channel isn't currently flushing the
690
// outgoing half of the channel.
691
//
692
// NOTE: MUST be called from the main event loop.
693
func (l *channelLink) eligibleToForward() bool {
616✔
694
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
616✔
695
}
616✔
696

697
// eligibleToUpdate returns a bool indicating if the channel is able to update
698
// channel state. We're able to update channel state if we know the remote
699
// party's next revocation point. Otherwise, we can't initiate new channel
700
// state. We also require that the short channel ID not be the all-zero source
701
// ID, meaning that the channel has had its ID finalized.
702
//
703
// NOTE: MUST be called from the main event loop.
704
func (l *channelLink) eligibleToUpdate() bool {
619✔
705
        return l.channel.RemoteNextRevocation() != nil &&
619✔
706
                l.channel.ShortChanID() != hop.Source &&
619✔
707
                l.isReestablished() &&
619✔
708
                l.quiescer.CanSendUpdates()
619✔
709
}
619✔
710

711
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
712
// the specified direction. It returns true if the state was changed and false
713
// if the desired state was already set before the method was called.
714
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
15✔
715
        if linkDirection == Outgoing {
23✔
716
                return l.isOutgoingAddBlocked.Swap(false)
8✔
717
        }
8✔
718

719
        return l.isIncomingAddBlocked.Swap(false)
7✔
720
}
721

722
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
723
// the specified direction. It returns true if the state was changed and false
724
// if the desired state was already set before the method was called.
725
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
17✔
726
        if linkDirection == Outgoing {
27✔
727
                return !l.isOutgoingAddBlocked.Swap(true)
10✔
728
        }
10✔
729

730
        return !l.isIncomingAddBlocked.Swap(true)
10✔
731
}
732

733
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
734
// the argument.
735
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,594✔
736
        if linkDirection == Outgoing {
2,714✔
737
                return l.isOutgoingAddBlocked.Load()
1,120✔
738
        }
1,120✔
739

740
        return l.isIncomingAddBlocked.Load()
477✔
741
}
742

743
// OnFlushedOnce adds a hook that will be called the next time the channel
744
// state reaches zero htlcs. This hook will only ever be called once. If the
745
// channel state already has zero htlcs, then this will be called immediately.
746
func (l *channelLink) OnFlushedOnce(hook func()) {
4✔
747
        select {
4✔
748
        case l.flushHooks.newTransients <- hook:
4✔
749
        case <-l.cg.Done():
×
750
        }
751
}
752

753
// OnCommitOnce adds a hook that will be called the next time a CommitSig
754
// message is sent in the argument's LinkDirection. This hook will only ever be
755
// called once. If no CommitSig is owed in the argument's LinkDirection, then
756
// we will call this hook be run immediately.
757
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
4✔
758
        var queue chan func()
4✔
759

4✔
760
        if direction == Outgoing {
8✔
761
                queue = l.outgoingCommitHooks.newTransients
4✔
762
        } else {
4✔
763
                queue = l.incomingCommitHooks.newTransients
×
764
        }
×
765

766
        select {
4✔
767
        case queue <- hook:
4✔
768
        case <-l.cg.Done():
×
769
        }
770
}
771

772
// InitStfu allows us to initiate quiescence on this link. It returns a receive
773
// only channel that will block until quiescence has been achieved, or
774
// definitively fails.
775
//
776
// This operation has been added to allow channels to be quiesced via RPC. It
777
// may be removed or reworked in the future as RPC initiated quiescence is a
778
// holdover until we have downstream protocols that use it.
779
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
4✔
780
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
4✔
781
                fn.Unit{},
4✔
782
        )
4✔
783

4✔
784
        select {
4✔
785
        case l.quiescenceReqs <- req:
4✔
786
        case <-l.cg.Done():
×
787
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
788
        }
789

790
        return out
4✔
791
}
792

793
// isReestablished returns true if the link has successfully completed the
794
// channel reestablishment dance.
795
func (l *channelLink) isReestablished() bool {
619✔
796
        return atomic.LoadInt32(&l.reestablished) == 1
619✔
797
}
619✔
798

799
// markReestablished signals that the remote peer has successfully exchanged
800
// channel reestablish messages and that the channel is ready to process
801
// subsequent messages.
802
func (l *channelLink) markReestablished() {
216✔
803
        atomic.StoreInt32(&l.reestablished, 1)
216✔
804
}
216✔
805

806
// IsUnadvertised returns true if the underlying channel is unadvertised.
807
func (l *channelLink) IsUnadvertised() bool {
5✔
808
        state := l.channel.State()
5✔
809
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
5✔
810
}
5✔
811

812
// sampleNetworkFee samples the current fee rate on the network to get into the
813
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
814
// this is the native rate used when computing the fee for commitment
815
// transactions, and the second-level HTLC transactions.
816
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
817
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
818
        // blocks.
4✔
819
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
820
        if err != nil {
4✔
821
                return 0, err
×
822
        }
×
823

824
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
825
                int64(feePerKw))
4✔
826

4✔
827
        return feePerKw, nil
4✔
828
}
829

830
// shouldAdjustCommitFee returns true if we should update our commitment fee to
831
// match that of the network fee. We'll only update our commitment fee if the
832
// network fee is +/- 10% to our commitment fee or if our current commitment
833
// fee is below the minimum relay fee.
834
func shouldAdjustCommitFee(netFee, chanFee,
835
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
836

14✔
837
        switch {
14✔
838
        // If the network fee is greater than our current commitment fee and
839
        // our current commitment fee is below the minimum relay fee then
840
        // we should switch to it no matter if it is less than a 10% increase.
841
        case netFee > chanFee && chanFee < minRelayFee:
1✔
842
                return true
1✔
843

844
        // If the network fee is greater than the commitment fee, then we'll
845
        // switch to it if it's at least 10% greater than the commit fee.
846
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
847
                return true
4✔
848

849
        // If the network fee is less than our commitment fee, then we'll
850
        // switch to it if it's at least 10% less than the commitment fee.
851
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
852
                return true
2✔
853

854
        // Otherwise, we won't modify our fee.
855
        default:
7✔
856
                return false
7✔
857
        }
858
}
859

860
// failCb is used to cut down on the argument verbosity.
861
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
862

863
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
864
// outgoing HTLC. It may return a FailureMessage that references a channel's
865
// alias. If the channel does not have an alias, then the regular channel
866
// update from disk will be returned.
867
func (l *channelLink) createFailureWithUpdate(incoming bool,
868
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
25✔
869

25✔
870
        // Determine which SCID to use in case we need to use aliases in the
25✔
871
        // ChannelUpdate.
25✔
872
        scid := outgoingScid
25✔
873
        if incoming {
25✔
874
                scid = l.ShortChanID()
×
875
        }
×
876

877
        // Try using the FailAliasUpdate function. If it returns nil, fallback
878
        // to the non-alias behavior.
879
        update := l.cfg.FailAliasUpdate(scid, incoming)
25✔
880
        if update == nil {
44✔
881
                // Fallback to the non-alias behavior.
19✔
882
                var err error
19✔
883
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
19✔
884
                if err != nil {
19✔
885
                        return &lnwire.FailTemporaryNodeFailure{}
×
886
                }
×
887
        }
888

889
        return cb(update)
25✔
890
}
891

892
// syncChanState attempts to synchronize channel states with the remote party.
893
// This method is to be called upon reconnection after the initial funding
894
// flow. We'll compare out commitment chains with the remote party, and re-send
895
// either a danging commit signature, a revocation, or both.
896
func (l *channelLink) syncChanStates(ctx context.Context) error {
173✔
897
        chanState := l.channel.State()
173✔
898

173✔
899
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
173✔
900

173✔
901
        // First, we'll generate our ChanSync message to send to the other
173✔
902
        // side. Based on this message, the remote party will decide if they
173✔
903
        // need to retransmit any data or not.
173✔
904
        localChanSyncMsg, err := chanState.ChanSyncMsg()
173✔
905
        if err != nil {
173✔
906
                return fmt.Errorf("unable to generate chan sync message for "+
×
907
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
908
        }
×
909
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
173✔
910
                return fmt.Errorf("unable to send chan sync message for "+
×
911
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
912
        }
×
913

914
        var msgsToReSend []lnwire.Message
173✔
915

173✔
916
        // Next, we'll wait indefinitely to receive the ChanSync message. The
173✔
917
        // first message sent MUST be the ChanSync message.
173✔
918
        select {
173✔
919
        case msg := <-l.upstream:
173✔
920
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
173✔
921
                        l.cfg.Peer.PubKey())
173✔
922

173✔
923
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
173✔
924
                if !ok {
173✔
925
                        return fmt.Errorf("first message sent to sync "+
×
926
                                "should be ChannelReestablish, instead "+
×
927
                                "received: %T", msg)
×
928
                }
×
929

930
                // If the remote party indicates that they think we haven't
931
                // done any state updates yet, then we'll retransmit the
932
                // channel_ready message first. We do this, as at this point
933
                // we can't be sure if they've really received the
934
                // ChannelReady message.
935
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
173✔
936
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
173✔
937
                        !l.channel.IsPending() {
340✔
938

167✔
939
                        l.log.Infof("resending ChannelReady message to peer")
167✔
940

167✔
941
                        nextRevocation, err := l.channel.NextRevocationKey()
167✔
942
                        if err != nil {
167✔
943
                                return fmt.Errorf("unable to create next "+
×
944
                                        "revocation: %v", err)
×
945
                        }
×
946

947
                        channelReadyMsg := lnwire.NewChannelReady(
167✔
948
                                l.ChanID(), nextRevocation,
167✔
949
                        )
167✔
950

167✔
951
                        // If this is a taproot channel, then we'll send the
167✔
952
                        // very same nonce that we sent above, as they should
167✔
953
                        // take the latest verification nonce we send.
167✔
954
                        if chanState.ChanType.IsTaproot() {
170✔
955
                                //nolint:ll
3✔
956
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
3✔
957
                        }
3✔
958

959
                        // For channels that negotiated the option-scid-alias
960
                        // feature bit, ensure that we send over the alias in
961
                        // the channel_ready message. We'll send the first
962
                        // alias we find for the channel since it does not
963
                        // matter which alias we send. We'll error out if no
964
                        // aliases are found.
965
                        if l.negotiatedAliasFeature() {
170✔
966
                                aliases := l.getAliases()
3✔
967
                                if len(aliases) == 0 {
3✔
968
                                        // This shouldn't happen since we
×
969
                                        // always add at least one alias before
×
970
                                        // the channel reaches the link.
×
971
                                        return fmt.Errorf("no aliases found")
×
972
                                }
×
973

974
                                // getAliases returns a copy of the alias slice
975
                                // so it is ok to use a pointer to the first
976
                                // entry.
977
                                channelReadyMsg.AliasScid = &aliases[0]
3✔
978
                        }
979

980
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
167✔
981
                        if err != nil {
167✔
982
                                return fmt.Errorf("unable to re-send "+
×
983
                                        "ChannelReady: %v", err)
×
984
                        }
×
985
                }
986

987
                // In any case, we'll then process their ChanSync message.
988
                l.log.Info("received re-establishment message from remote side")
173✔
989

173✔
990
                var (
173✔
991
                        openedCircuits []CircuitKey
173✔
992
                        closedCircuits []CircuitKey
173✔
993
                )
173✔
994

173✔
995
                // We've just received a ChanSync message from the remote
173✔
996
                // party, so we'll process the message  in order to determine
173✔
997
                // if we need to re-transmit any messages to the remote party.
173✔
998
                ctx, cancel := l.cg.Create(ctx)
173✔
999
                defer cancel()
173✔
1000
                msgsToReSend, openedCircuits, closedCircuits, err =
173✔
1001
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
173✔
1002
                if err != nil {
176✔
1003
                        return err
3✔
1004
                }
3✔
1005

1006
                // Repopulate any identifiers for circuits that may have been
1007
                // opened or unclosed. This may happen if we needed to
1008
                // retransmit a commitment signature message.
1009
                l.openedCircuits = openedCircuits
173✔
1010
                l.closedCircuits = closedCircuits
173✔
1011

173✔
1012
                // Ensure that all packets have been have been removed from the
173✔
1013
                // link's mailbox.
173✔
1014
                if err := l.ackDownStreamPackets(); err != nil {
173✔
1015
                        return err
×
1016
                }
×
1017

1018
                if len(msgsToReSend) > 0 {
178✔
1019
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1020
                                "state", len(msgsToReSend))
5✔
1021
                }
5✔
1022

1023
                // If we have any messages to retransmit, we'll do so
1024
                // immediately so we return to a synchronized state as soon as
1025
                // possible.
1026
                for _, msg := range msgsToReSend {
184✔
1027
                        err := l.cfg.Peer.SendMessage(false, msg)
11✔
1028
                        if err != nil {
11✔
1029
                                l.log.Errorf("failed to send %v: %v",
×
1030
                                        msg.MsgType(), err)
×
1031
                        }
×
1032
                }
1033

1034
        case <-l.cg.Done():
3✔
1035
                return ErrLinkShuttingDown
3✔
1036
        }
1037

1038
        return nil
173✔
1039
}
1040

1041
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1042
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1043
// we previously received are reinstated in memory, and forwarded to the switch
1044
// if necessary. After a restart, this will also delete any previously
1045
// completed packages.
1046
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
216✔
1047
        fwdPkgs, err := l.channel.LoadFwdPkgs()
216✔
1048
        if err != nil {
217✔
1049
                return err
1✔
1050
        }
1✔
1051

1052
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
215✔
1053

215✔
1054
        for _, fwdPkg := range fwdPkgs {
224✔
1055
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
9✔
1056
                        return err
×
1057
                }
×
1058
        }
1059

1060
        // If any of our reprocessing steps require an update to the commitment
1061
        // txn, we initiate a state transition to capture all relevant changes.
1062
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
218✔
1063
                return l.updateCommitTx(ctx)
3✔
1064
        }
3✔
1065

1066
        return nil
215✔
1067
}
1068

1069
// resolveFwdPkg interprets the FwdState of the provided package, either
1070
// reprocesses any outstanding htlcs in the package, or performs garbage
1071
// collection on the package.
1072
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
9✔
1073
        // Remove any completed packages to clear up space.
9✔
1074
        if fwdPkg.State == channeldb.FwdStateCompleted {
13✔
1075
                l.log.Debugf("removing completed fwd pkg for height=%d",
4✔
1076
                        fwdPkg.Height)
4✔
1077

4✔
1078
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
4✔
1079
                if err != nil {
4✔
1080
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1081
                                "%v", fwdPkg.Height, err)
×
1082
                        return err
×
1083
                }
×
1084
        }
1085

1086
        // Otherwise this is either a new package or one has gone through
1087
        // processing, but contains htlcs that need to be restored in memory.
1088
        // We replay this forwarding package to make sure our local mem state
1089
        // is resurrected, we mimic any original responses back to the remote
1090
        // party, and re-forward the relevant HTLCs to the switch.
1091

1092
        // If the package is fully acked but not completed, it must still have
1093
        // settles and fails to propagate.
1094
        if !fwdPkg.SettleFailFilter.IsFull() {
12✔
1095
                l.processRemoteSettleFails(fwdPkg)
3✔
1096
        }
3✔
1097

1098
        // Finally, replay *ALL ADDS* in this forwarding package. The
1099
        // downstream logic is able to filter out any duplicates, but we must
1100
        // shove the entire, original set of adds down the pipeline so that the
1101
        // batch of adds presented to the sphinx router does not ever change.
1102
        if !fwdPkg.AckFilter.IsFull() {
15✔
1103
                l.processRemoteAdds(fwdPkg)
6✔
1104

6✔
1105
                // If the link failed during processing the adds, we must
6✔
1106
                // return to ensure we won't attempted to update the state
6✔
1107
                // further.
6✔
1108
                if l.failed {
6✔
1109
                        return fmt.Errorf("link failed while " +
×
1110
                                "processing remote adds")
×
1111
                }
×
1112
        }
1113

1114
        return nil
9✔
1115
}
1116

1117
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1118
// removes those that can be discarded. It is safe to do this entirely in the
1119
// background, since all state is coordinated on disk. This also ensures the
1120
// link can continue to process messages and interleave database accesses.
1121
//
1122
// NOTE: This MUST be run as a goroutine.
1123
func (l *channelLink) fwdPkgGarbager() {
215✔
1124
        defer l.cg.WgDone()
215✔
1125

215✔
1126
        l.cfg.FwdPkgGCTicker.Resume()
215✔
1127
        defer l.cfg.FwdPkgGCTicker.Stop()
215✔
1128

215✔
1129
        if err := l.loadAndRemove(); err != nil {
215✔
1130
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1131
        }
×
1132

1133
        for {
444✔
1134
                select {
229✔
1135
                case <-l.cfg.FwdPkgGCTicker.Ticks():
14✔
1136
                        if err := l.loadAndRemove(); err != nil {
28✔
1137
                                l.log.Warnf("unable to remove fwd pkgs: %v",
14✔
1138
                                        err)
14✔
1139
                                continue
14✔
1140
                        }
1141
                case <-l.cg.Done():
205✔
1142
                        return
205✔
1143
                }
1144
        }
1145
}
1146

1147
// loadAndRemove loads all the channels forwarding packages and determines if
1148
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1149
// a longer tick interval can be used.
1150
func (l *channelLink) loadAndRemove() error {
229✔
1151
        fwdPkgs, err := l.channel.LoadFwdPkgs()
229✔
1152
        if err != nil {
243✔
1153
                return err
14✔
1154
        }
14✔
1155

1156
        var removeHeights []uint64
215✔
1157
        for _, fwdPkg := range fwdPkgs {
223✔
1158
                if fwdPkg.State != channeldb.FwdStateCompleted {
16✔
1159
                        continue
8✔
1160
                }
1161

1162
                removeHeights = append(removeHeights, fwdPkg.Height)
3✔
1163
        }
1164

1165
        // If removeHeights is empty, return early so we don't use a db
1166
        // transaction.
1167
        if len(removeHeights) == 0 {
430✔
1168
                return nil
215✔
1169
        }
215✔
1170

1171
        return l.channel.RemoveFwdPkgs(removeHeights...)
3✔
1172
}
1173

1174
// handleChanSyncErr performs the error handling logic in the case where we
1175
// could not successfully syncChanStates with our channel peer.
1176
func (l *channelLink) handleChanSyncErr(err error) {
3✔
1177
        l.log.Warnf("error when syncing channel states: %v", err)
3✔
1178

3✔
1179
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
3✔
1180

3✔
1181
        switch {
3✔
1182
        case errors.Is(err, ErrLinkShuttingDown):
3✔
1183
                l.log.Debugf("unable to sync channel states, link is " +
3✔
1184
                        "shutting down")
3✔
1185
                return
3✔
1186

1187
        // We failed syncing the commit chains, probably because the remote has
1188
        // lost state. We should force close the channel.
1189
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
3✔
1190
                fallthrough
3✔
1191

1192
        // The remote sent us an invalid last commit secret, we should force
1193
        // close the channel.
1194
        // TODO(halseth): and permanently ban the peer?
1195
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
3✔
1196
                fallthrough
3✔
1197

1198
        // The remote sent us a commit point different from what they sent us
1199
        // before.
1200
        // TODO(halseth): ban peer?
1201
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
3✔
1202
                // We'll fail the link and tell the peer to force close the
3✔
1203
                // channel. Note that the database state is not updated here,
3✔
1204
                // but will be updated when the close transaction is ready to
3✔
1205
                // avoid that we go down before storing the transaction in the
3✔
1206
                // db.
3✔
1207
                l.failf(
3✔
1208
                        LinkFailureError{
3✔
1209
                                code:          ErrSyncError,
3✔
1210
                                FailureAction: LinkFailureForceClose,
3✔
1211
                        },
3✔
1212
                        "unable to synchronize channel states: %v", err,
3✔
1213
                )
3✔
1214

1215
        // We have lost state and cannot safely force close the channel. Fail
1216
        // the channel and wait for the remote to hopefully force close it. The
1217
        // remote has sent us its latest unrevoked commitment point, and we'll
1218
        // store it in the database, such that we can attempt to recover the
1219
        // funds if the remote force closes the channel.
1220
        case errors.As(err, &errDataLoss):
3✔
1221
                err := l.channel.MarkDataLoss(
3✔
1222
                        errDataLoss.CommitPoint,
3✔
1223
                )
3✔
1224
                if err != nil {
3✔
1225
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1226
                                err)
×
1227
                }
×
1228

1229
        // We determined the commit chains were not possible to sync. We
1230
        // cautiously fail the channel, but don't force close.
1231
        // TODO(halseth): can we safely force close in any cases where this
1232
        // error is returned?
1233
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1234
                if err := l.channel.MarkBorked(); err != nil {
×
1235
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1236
                }
×
1237

1238
        // Other, unspecified error.
1239
        default:
×
1240
        }
1241

1242
        l.failf(
3✔
1243
                LinkFailureError{
3✔
1244
                        code:          ErrRecoveryError,
3✔
1245
                        FailureAction: LinkFailureForceNone,
3✔
1246
                },
3✔
1247
                "unable to synchronize channel states: %v", err,
3✔
1248
        )
3✔
1249
}
1250

1251
// htlcManager is the primary goroutine which drives a channel's commitment
1252
// update state-machine in response to messages received via several channels.
1253
// This goroutine reads messages from the upstream (remote) peer, and also from
1254
// downstream channel managed by the channel link. In the event that an htlc
1255
// needs to be forwarded, then send-only forward handler is used which sends
1256
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1257
// all timeouts for any active HTLCs, manages the channel's revocation window,
1258
// and also the htlc trickle queue+timer for this active channels.
1259
//
1260
// NOTE: This MUST be run as a goroutine.
1261
func (l *channelLink) htlcManager(ctx context.Context) {
216✔
1262
        defer func() {
423✔
1263
                l.cfg.BatchTicker.Stop()
207✔
1264
                l.cg.WgDone()
207✔
1265
                l.log.Infof("exited")
207✔
1266
        }()
207✔
1267

1268
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
216✔
1269

216✔
1270
        // Notify any clients that the link is now in the switch via an
216✔
1271
        // ActiveLinkEvent. We'll also defer an inactive link notification for
216✔
1272
        // when the link exits to ensure that every active notification is
216✔
1273
        // matched by an inactive one.
216✔
1274
        l.cfg.NotifyActiveLink(l.ChannelPoint())
216✔
1275
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
216✔
1276

216✔
1277
        // If the link is not started for the first time, we need to take extra
216✔
1278
        // steps to resume its state.
216✔
1279
        err := l.resumeLink(ctx)
216✔
1280
        if err != nil {
220✔
1281
                l.log.Errorf("resuming link failed: %v", err)
4✔
1282
                return
4✔
1283
        }
4✔
1284

1285
        // Now that we've received both channel_ready and channel reestablish,
1286
        // we can go ahead and send the active channel notification. We'll also
1287
        // defer the inactive notification for when the link exits to ensure
1288
        // that every active notification is matched by an inactive one.
1289
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
215✔
1290
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
215✔
1291

215✔
1292
        for {
4,380✔
1293
                // We must always check if we failed at some point processing
4,165✔
1294
                // the last update before processing the next.
4,165✔
1295
                if l.failed {
4,181✔
1296
                        l.log.Errorf("link failed, exiting htlcManager")
16✔
1297
                        return
16✔
1298
                }
16✔
1299

1300
                // Pause or resume the batch ticker.
1301
                l.toggleBatchTicker()
4,152✔
1302

4,152✔
1303
                select {
4,152✔
1304
                // We have a new hook that needs to be run when we reach a clean
1305
                // channel state.
1306
                case hook := <-l.flushHooks.newTransients:
4✔
1307
                        if l.channel.IsChannelClean() {
7✔
1308
                                hook()
3✔
1309
                        } else {
7✔
1310
                                l.flushHooks.alloc(hook)
4✔
1311
                        }
4✔
1312

1313
                // We have a new hook that needs to be run when we have
1314
                // committed all of our updates.
1315
                case hook := <-l.outgoingCommitHooks.newTransients:
4✔
1316
                        if !l.channel.OweCommitment() {
7✔
1317
                                hook()
3✔
1318
                        } else {
4✔
1319
                                l.outgoingCommitHooks.alloc(hook)
1✔
1320
                        }
1✔
1321

1322
                // We have a new hook that needs to be run when our peer has
1323
                // committed all of their updates.
1324
                case hook := <-l.incomingCommitHooks.newTransients:
×
1325
                        if !l.channel.NeedCommitment() {
×
1326
                                hook()
×
1327
                        } else {
×
1328
                                l.incomingCommitHooks.alloc(hook)
×
1329
                        }
×
1330

1331
                // Our update fee timer has fired, so we'll check the network
1332
                // fee to see if we should adjust our commitment fee.
1333
                case <-l.updateFeeTimer.C:
4✔
1334
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1335
                        err := l.handleUpdateFee(ctx)
4✔
1336
                        if err != nil {
4✔
1337
                                l.log.Errorf("failed to handle update fee: "+
×
1338
                                        "%v", err)
×
1339
                        }
×
1340

1341
                // The underlying channel has notified us of a unilateral close
1342
                // carried out by the remote peer. In the case of such an
1343
                // event, we'll wipe the channel state from the peer, and mark
1344
                // the contract as fully settled. Afterwards we can exit.
1345
                //
1346
                // TODO(roasbeef): add force closure? also breach?
1347
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
3✔
1348
                        l.log.Warnf("remote peer has closed on-chain")
3✔
1349

3✔
1350
                        // TODO(roasbeef): remove all together
3✔
1351
                        go func() {
6✔
1352
                                chanPoint := l.channel.ChannelPoint()
3✔
1353
                                l.cfg.Peer.WipeChannel(&chanPoint)
3✔
1354
                        }()
3✔
1355

1356
                        return
3✔
1357

1358
                case <-l.cfg.BatchTicker.Ticks():
203✔
1359
                        // Attempt to extend the remote commitment chain
203✔
1360
                        // including all the currently pending entries. If the
203✔
1361
                        // send was unsuccessful, then abandon the update,
203✔
1362
                        // waiting for the revocation window to open up.
203✔
1363
                        if !l.updateCommitTxOrFail(ctx) {
203✔
1364
                                return
×
1365
                        }
×
1366

1367
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1368
                        l.failf(
1✔
1369
                                LinkFailureError{
1✔
1370
                                        code:          ErrRemoteUnresponsive,
1✔
1371
                                        FailureAction: LinkFailureDisconnect,
1✔
1372
                                },
1✔
1373
                                "unable to complete dance",
1✔
1374
                        )
1✔
1375
                        return
1✔
1376

1377
                // A message from the switch was just received. This indicates
1378
                // that the link is an intermediate hop in a multi-hop HTLC
1379
                // circuit.
1380
                case pkt := <-l.downstream:
524✔
1381
                        l.handleDownstreamPkt(ctx, pkt)
524✔
1382

1383
                // A message from the connected peer was just received. This
1384
                // indicates that we have a new incoming HTLC, either directly
1385
                // for us, or part of a multi-hop HTLC circuit.
1386
                case msg := <-l.upstream:
3,170✔
1387
                        l.handleUpstreamMsg(ctx, msg)
3,170✔
1388

1389
                // A htlc resolution is received. This means that we now have a
1390
                // resolution for a previously accepted htlc.
1391
                case hodlItem := <-l.hodlQueue.ChanOut():
58✔
1392
                        err := l.handleHtlcResolution(ctx, hodlItem)
58✔
1393
                        if err != nil {
59✔
1394
                                l.log.Errorf("failed to handle htlc "+
1✔
1395
                                        "resolution: %v", err)
1✔
1396
                        }
1✔
1397

1398
                // A user-initiated quiescence request is received. We now
1399
                // forward it to the quiescer.
1400
                case qReq := <-l.quiescenceReqs:
4✔
1401
                        err := l.handleQuiescenceReq(qReq)
4✔
1402
                        if err != nil {
4✔
1403
                                l.log.Errorf("failed handle quiescence "+
×
1404
                                        "req: %v", err)
×
1405
                        }
×
1406

1407
                case <-l.cg.Done():
192✔
1408
                        return
192✔
1409
                }
1410
        }
1411
}
1412

1413
// processHodlQueue processes a received htlc resolution and continues reading
1414
// from the hodl queue until no more resolutions remain. When this function
1415
// returns without an error, the commit tx should be updated.
1416
func (l *channelLink) processHodlQueue(ctx context.Context,
1417
        firstResolution invoices.HtlcResolution) error {
58✔
1418

58✔
1419
        // Try to read all waiting resolution messages, so that they can all be
58✔
1420
        // processed in a single commitment tx update.
58✔
1421
        htlcResolution := firstResolution
58✔
1422
loop:
58✔
1423
        for {
116✔
1424
                // Lookup all hodl htlcs that can be failed or settled with this event.
58✔
1425
                // The hodl htlc must be present in the map.
58✔
1426
                circuitKey := htlcResolution.CircuitKey()
58✔
1427
                hodlHtlc, ok := l.hodlMap[circuitKey]
58✔
1428
                if !ok {
58✔
1429
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1430
                }
×
1431

1432
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
58✔
1433
                        return err
×
1434
                }
×
1435

1436
                // Clean up hodl map.
1437
                delete(l.hodlMap, circuitKey)
58✔
1438

58✔
1439
                select {
58✔
1440
                case item := <-l.hodlQueue.ChanOut():
3✔
1441
                        htlcResolution = item.(invoices.HtlcResolution)
3✔
1442

1443
                // No need to process it if the link is broken.
1444
                case <-l.cg.Done():
×
1445
                        return ErrLinkShuttingDown
×
1446

1447
                default:
58✔
1448
                        break loop
58✔
1449
                }
1450
        }
1451

1452
        // Update the commitment tx.
1453
        if err := l.updateCommitTx(ctx); err != nil {
59✔
1454
                return err
1✔
1455
        }
1✔
1456

1457
        return nil
57✔
1458
}
1459

1460
// processHtlcResolution applies a received htlc resolution to the provided
1461
// htlc. When this function returns without an error, the commit tx should be
1462
// updated.
1463
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1464
        htlc hodlHtlc) error {
204✔
1465

204✔
1466
        circuitKey := resolution.CircuitKey()
204✔
1467

204✔
1468
        // Determine required action for the resolution based on the type of
204✔
1469
        // resolution we have received.
204✔
1470
        switch res := resolution.(type) {
204✔
1471
        // Settle htlcs that returned a settle resolution using the preimage
1472
        // in the resolution.
1473
        case *invoices.HtlcSettleResolution:
200✔
1474
                l.log.Debugf("received settle resolution for %v "+
200✔
1475
                        "with outcome: %v", circuitKey, res.Outcome)
200✔
1476

200✔
1477
                return l.settleHTLC(
200✔
1478
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
200✔
1479
                )
200✔
1480

1481
        // For htlc failures, we get the relevant failure message based
1482
        // on the failure resolution and then fail the htlc.
1483
        case *invoices.HtlcFailResolution:
7✔
1484
                l.log.Debugf("received cancel resolution for "+
7✔
1485
                        "%v with outcome: %v", circuitKey, res.Outcome)
7✔
1486

7✔
1487
                // Get the lnwire failure message based on the resolution
7✔
1488
                // result.
7✔
1489
                failure := getResolutionFailure(res, htlc.add.Amount)
7✔
1490

7✔
1491
                l.sendHTLCError(
7✔
1492
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
7✔
1493
                        true,
7✔
1494
                )
7✔
1495
                return nil
7✔
1496

1497
        // Fail if we do not get a settle of fail resolution, since we
1498
        // are only expecting to handle settles and fails.
1499
        default:
×
1500
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1501
                        resolution)
×
1502
        }
1503
}
1504

1505
// getResolutionFailure returns the wire message that a htlc resolution should
1506
// be failed with.
1507
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1508
        amount lnwire.MilliSatoshi) *LinkError {
7✔
1509

7✔
1510
        // If the resolution has been resolved as part of a MPP timeout,
7✔
1511
        // we need to fail the htlc with lnwire.FailMppTimeout.
7✔
1512
        if resolution.Outcome == invoices.ResultMppTimeout {
7✔
1513
                return NewDetailedLinkError(
×
1514
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1515
                )
×
1516
        }
×
1517

1518
        // If the htlc is not a MPP timeout, we fail it with
1519
        // FailIncorrectDetails. This error is sent for invoice payment
1520
        // failures such as underpayment/ expiry too soon and hodl invoices
1521
        // (which return FailIncorrectDetails to avoid leaking information).
1522
        incorrectDetails := lnwire.NewFailIncorrectDetails(
7✔
1523
                amount, uint32(resolution.AcceptHeight),
7✔
1524
        )
7✔
1525

7✔
1526
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
7✔
1527
}
1528

1529
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1530
// within the link's configuration that will be used to determine when the link
1531
// should propose an update to its commitment fee rate.
1532
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
220✔
1533
        lower := int64(l.cfg.MinUpdateTimeout)
220✔
1534
        upper := int64(l.cfg.MaxUpdateTimeout)
220✔
1535
        return time.Duration(prand.Int63n(upper-lower) + lower)
220✔
1536
}
220✔
1537

1538
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1539
// downstream HTLC Switch.
1540
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1541
        pkt *htlcPacket) error {
483✔
1542

483✔
1543
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
483✔
1544
        if !ok {
483✔
1545
                return errors.New("not an UpdateAddHTLC packet")
×
1546
        }
×
1547

1548
        // If we are flushing the link in the outgoing direction or we have
1549
        // already sent Stfu, then we can't add new htlcs to the link and we
1550
        // need to bounce it.
1551
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
483✔
1552
                l.mailBox.FailAdd(pkt)
×
1553

×
1554
                return NewDetailedLinkError(
×
1555
                        &lnwire.FailTemporaryChannelFailure{},
×
1556
                        OutgoingFailureLinkNotEligible,
×
1557
                )
×
1558
        }
×
1559

1560
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1561
        // arbitrary delays between the switch adding an ADD to the
1562
        // mailbox, and the HTLC being added to the commitment state.
1563
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
483✔
1564
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1565
                l.mailBox.AckPacket(pkt.inKey())
×
1566
                return nil
×
1567
        }
×
1568

1569
        // Check if we can add the HTLC here without exceededing the max fee
1570
        // exposure threshold.
1571
        if l.isOverexposedWithHtlc(htlc, false) {
487✔
1572
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1573
                        "exposure exceeded")
4✔
1574

4✔
1575
                l.mailBox.FailAdd(pkt)
4✔
1576

4✔
1577
                return NewDetailedLinkError(
4✔
1578
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1579
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1580
                )
4✔
1581
        }
4✔
1582

1583
        // A new payment has been initiated via the downstream channel,
1584
        // so we add the new HTLC to our local log, then update the
1585
        // commitment chains.
1586
        htlc.ChanID = l.ChanID()
479✔
1587
        openCircuitRef := pkt.inKey()
479✔
1588

479✔
1589
        // We enforce the fee buffer for the commitment transaction because
479✔
1590
        // we are in control of adding this htlc. Nothing has locked-in yet so
479✔
1591
        // we can securely enforce the fee buffer which is only relevant if we
479✔
1592
        // are the initiator of the channel.
479✔
1593
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
479✔
1594
        if err != nil {
481✔
1595
                // The HTLC was unable to be added to the state machine,
2✔
1596
                // as a result, we'll signal the switch to cancel the
2✔
1597
                // pending payment.
2✔
1598
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
2✔
1599
                        err)
2✔
1600

2✔
1601
                // Remove this packet from the link's mailbox, this
2✔
1602
                // prevents it from being reprocessed if the link
2✔
1603
                // restarts and resets it mailbox. If this response
2✔
1604
                // doesn't make it back to the originating link, it will
2✔
1605
                // be rejected upon attempting to reforward the Add to
2✔
1606
                // the switch, since the circuit was never fully opened,
2✔
1607
                // and the forwarding package shows it as
2✔
1608
                // unacknowledged.
2✔
1609
                l.mailBox.FailAdd(pkt)
2✔
1610

2✔
1611
                return NewDetailedLinkError(
2✔
1612
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1613
                        OutgoingFailureDownstreamHtlcAdd,
2✔
1614
                )
2✔
1615
        }
2✔
1616

1617
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
478✔
1618
                "local_log_index=%v, pend_updates=%v",
478✔
1619
                htlc.PaymentHash[:], index,
478✔
1620
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
478✔
1621

478✔
1622
        pkt.outgoingChanID = l.ShortChanID()
478✔
1623
        pkt.outgoingHTLCID = index
478✔
1624
        htlc.ID = index
478✔
1625

478✔
1626
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
478✔
1627
                pkt.inKey(), pkt.outKey())
478✔
1628

478✔
1629
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
478✔
1630
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
478✔
1631

478✔
1632
        err = l.cfg.Peer.SendMessage(false, htlc)
478✔
1633
        if err != nil {
478✔
1634
                l.log.Errorf("failed to send UpdateAddHTLC: %v", err)
×
1635
        }
×
1636

1637
        // Send a forward event notification to htlcNotifier.
1638
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
478✔
1639
                newHtlcKey(pkt),
478✔
1640
                HtlcInfo{
478✔
1641
                        IncomingTimeLock: pkt.incomingTimeout,
478✔
1642
                        IncomingAmt:      pkt.incomingAmount,
478✔
1643
                        OutgoingTimeLock: htlc.Expiry,
478✔
1644
                        OutgoingAmt:      htlc.Amount,
478✔
1645
                },
478✔
1646
                getEventType(pkt),
478✔
1647
        )
478✔
1648

478✔
1649
        l.tryBatchUpdateCommitTx(ctx)
478✔
1650

478✔
1651
        return nil
478✔
1652
}
1653

1654
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1655
// Switch. Possible messages sent by the switch include requests to forward new
1656
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1657
// cleared HTLCs with the upstream peer.
1658
//
1659
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1660
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1661
        pkt *htlcPacket) {
524✔
1662

524✔
1663
        if pkt.htlc.MsgType().IsChannelUpdate() &&
524✔
1664
                !l.quiescer.CanSendUpdates() {
524✔
1665

×
1666
                l.log.Warnf("unable to process channel update. "+
×
1667
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1668

×
1669
                return
×
1670
        }
×
1671

1672
        switch htlc := pkt.htlc.(type) {
524✔
1673
        case *lnwire.UpdateAddHTLC:
483✔
1674
                // Handle add message. The returned error can be ignored,
483✔
1675
                // because it is also sent through the mailbox.
483✔
1676
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
483✔
1677

1678
        case *lnwire.UpdateFulfillHTLC:
26✔
1679
                l.processLocalUpdateFulfillHTLC(ctx, pkt, htlc)
26✔
1680

1681
        case *lnwire.UpdateFailHTLC:
21✔
1682
                l.processLocalUpdateFailHTLC(ctx, pkt, htlc)
21✔
1683
        }
1684
}
1685

1686
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1687
// full.
1688
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
478✔
1689
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
478✔
1690
        if pending < uint64(l.cfg.BatchSize) {
929✔
1691
                return
451✔
1692
        }
451✔
1693

1694
        l.updateCommitTxOrFail(ctx)
30✔
1695
}
1696

1697
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1698
// associated with this packet. If successful in doing so, it will also purge
1699
// the open circuit from the circuit map and remove the packet from the link's
1700
// mailbox.
1701
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1702
        inKey := pkt.inKey()
2✔
1703

2✔
1704
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1705
                "circuit-key=%v", inKey)
2✔
1706

2✔
1707
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1708
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1709
        if pkt.sourceRef == nil {
3✔
1710
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1711
                        "circuit-key=%v, does not contain source reference",
1✔
1712
                        inKey)
1✔
1713
                return
1✔
1714
        }
1✔
1715

1716
        // If the source reference is present,  we will try to prevent this link
1717
        // from resending the packet to the switch. To do so, we ack the AddRef
1718
        // of the incoming HTLC belonging to this link.
1719
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1720
        if err != nil {
1✔
1721
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1722
                        "circuit-key=%v: %v", inKey, err)
×
1723

×
1724
                // If this operation failed, it is unsafe to attempt removal of
×
1725
                // the destination reference or circuit, so we exit early. The
×
1726
                // cleanup may proceed with a different packet in the future
×
1727
                // that succeeds on this step.
×
1728
                return
×
1729
        }
×
1730

1731
        // Now that we know this link will stop retransmitting Adds to the
1732
        // switch, we can begin to teardown the response reference and circuit
1733
        // map.
1734
        //
1735
        // If the packet includes a destination reference, then a response for
1736
        // this HTLC was locked into the outgoing channel. Attempt to remove
1737
        // this reference, so we stop retransmitting the response internally.
1738
        // Even if this fails, we will proceed in trying to delete the circuit.
1739
        // When retransmitting responses, the destination references will be
1740
        // cleaned up if an open circuit is not found in the circuit map.
1741
        if pkt.destRef != nil {
1✔
1742
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1743
                if err != nil {
×
1744
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1745
                                "for incoming circuit-key=%v: %v",
×
1746
                                inKey, err)
×
1747
                }
×
1748
        }
1749

1750
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
1751

1✔
1752
        // With all known references acked, we can now safely delete the circuit
1✔
1753
        // from the switch's circuit map, as the state is no longer needed.
1✔
1754
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
1755
        if err != nil {
1✔
1756
                l.log.Errorf("unable to delete circuit for "+
×
1757
                        "circuit-key=%v: %v", inKey, err)
×
1758
        }
×
1759
}
1760

1761
// handleUpstreamMsg processes wire messages related to commitment state
1762
// updates from the upstream peer. The upstream peer is the peer whom we have a
1763
// direct channel with, updating our respective commitment chains.
1764
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
1765
        msg lnwire.Message) {
3,170✔
1766

3,170✔
1767
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
3,170✔
1768
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
3,170✔
1769

3,170✔
1770
        // First check if the message is an update and we are capable of
3,170✔
1771
        // receiving updates right now.
3,170✔
1772
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,170✔
1773
                l.stfuFailf("update received after stfu: %T", msg)
×
1774
                return
×
1775
        }
×
1776

1777
        var err error
3,170✔
1778

3,170✔
1779
        switch msg := msg.(type) {
3,170✔
1780
        case *lnwire.UpdateAddHTLC:
453✔
1781
                err = l.processRemoteUpdateAddHTLC(msg)
453✔
1782

1783
        case *lnwire.UpdateFulfillHTLC:
230✔
1784
                err = l.processRemoteUpdateFulfillHTLC(msg)
230✔
1785

1786
        case *lnwire.UpdateFailMalformedHTLC:
6✔
1787
                err = l.processRemoteUpdateFailMalformedHTLC(msg)
6✔
1788

1789
        case *lnwire.UpdateFailHTLC:
123✔
1790
                err = l.processRemoteUpdateFailHTLC(msg)
123✔
1791

1792
        case *lnwire.CommitSig:
1,189✔
1793
                err = l.processRemoteCommitSig(ctx, msg)
1,189✔
1794

1795
        case *lnwire.RevokeAndAck:
1,178✔
1796
                err = l.processRemoteRevokeAndAck(ctx, msg)
1,178✔
1797

1798
        case *lnwire.UpdateFee:
3✔
1799
                err = l.processRemoteUpdateFee(msg)
3✔
1800

1801
        case *lnwire.Stfu:
5✔
1802
                err = l.handleStfu(msg)
5✔
1803
                if err != nil {
5✔
NEW
1804
                        l.stfuFailf("handleStfu: %v", err)
×
1805
                }
×
1806

1807
        // In the case where we receive a warning message from our peer, just
1808
        // log it and move on. We choose not to disconnect from our peer,
1809
        // although we "MAY" do so according to the specification.
1810
        case *lnwire.Warning:
1✔
1811
                l.log.Warnf("received warning message from peer: %v",
1✔
1812
                        msg.Warning())
1✔
1813

1814
        case *lnwire.Error:
2✔
1815
                l.processRemoteError(msg)
2✔
1816

1817
        default:
×
1818
                l.log.Warnf("received unknown message of type %T", msg)
×
1819
        }
1820

1821
        if err != nil {
3,175✔
1822
                l.log.Errorf("failed to process remote %v: %v", msg.MsgType(),
5✔
1823
                        err)
5✔
1824
        }
5✔
1825
}
1826

1827
// handleStfu implements the top-level logic for handling the Stfu message from
1828
// our peer.
1829
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
5✔
1830
        if !l.noDanglingUpdates(lntypes.Remote) {
5✔
1831
                return ErrPendingRemoteUpdates
×
1832
        }
×
1833
        err := l.quiescer.RecvStfu(*stfu)
5✔
1834
        if err != nil {
5✔
1835
                return err
×
1836
        }
×
1837

1838
        // If we can immediately send an Stfu response back, we will.
1839
        if l.noDanglingUpdates(lntypes.Local) {
9✔
1840
                return l.quiescer.SendOwedStfu()
4✔
1841
        }
4✔
1842

1843
        return nil
1✔
1844
}
1845

1846
// stfuFailf fails the link in the case where the requirements of the quiescence
1847
// protocol are violated. In all cases we opt to drop the connection as only
1848
// link state (as opposed to channel state) is affected.
1849
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
1850
        l.failf(LinkFailureError{
×
1851
                code:             ErrStfuViolation,
×
1852
                FailureAction:    LinkFailureDisconnect,
×
1853
                PermanentFailure: false,
×
1854
                Warning:          true,
×
1855
        }, format, args...)
×
1856
}
×
1857

1858
// noDanglingUpdates returns true when there are 0 updates that were originally
1859
// issued by whose on either the Local or Remote commitment transaction.
1860
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,194✔
1861
        pendingOnLocal := l.channel.NumPendingUpdates(
1,194✔
1862
                whose, lntypes.Local,
1,194✔
1863
        )
1,194✔
1864
        pendingOnRemote := l.channel.NumPendingUpdates(
1,194✔
1865
                whose, lntypes.Remote,
1,194✔
1866
        )
1,194✔
1867

1,194✔
1868
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,194✔
1869
}
1,194✔
1870

1871
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
1872
// for packets delivered from server, and cleaning up any circuits closed by
1873
// signing a previous commitment txn. This method ensures that the circuits are
1874
// removed from the circuit map before removing them from the link's mailbox,
1875
// otherwise it could be possible for some circuit to be missed if this link
1876
// flaps.
1877
func (l *channelLink) ackDownStreamPackets() error {
1,368✔
1878
        // First, remove the downstream Add packets that were included in the
1,368✔
1879
        // previous commitment signature. This will prevent the Adds from being
1,368✔
1880
        // replayed if this link disconnects.
1,368✔
1881
        for _, inKey := range l.openedCircuits {
1,835✔
1882
                // In order to test the sphinx replay logic of the remote
467✔
1883
                // party, unsafe replay does not acknowledge the packets from
467✔
1884
                // the mailbox. We can then force a replay of any Add packets
467✔
1885
                // held in memory by disconnecting and reconnecting the link.
467✔
1886
                if l.cfg.UnsafeReplay {
470✔
1887
                        continue
3✔
1888
                }
1889

1890
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
467✔
1891
                l.mailBox.AckPacket(inKey)
467✔
1892
        }
1893

1894
        // Now, we will delete all circuits closed by the previous commitment
1895
        // signature, which is the result of downstream Settle/Fail packets. We
1896
        // batch them here to ensure circuits are closed atomically and for
1897
        // performance.
1898
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,368✔
1899
        switch err {
1,368✔
1900
        case nil:
1,368✔
1901
                // Successful deletion.
1902

1903
        default:
×
1904
                l.log.Errorf("unable to delete %d circuits: %v",
×
1905
                        len(l.closedCircuits), err)
×
1906
                return err
×
1907
        }
1908

1909
        // With the circuits removed from memory and disk, we now ack any
1910
        // Settle/Fails in the mailbox to ensure they do not get redelivered
1911
        // after startup. If forgive is enabled and we've reached this point,
1912
        // the circuits must have been removed at some point, so it is now safe
1913
        // to un-queue the corresponding Settle/Fails.
1914
        for _, inKey := range l.closedCircuits {
1,410✔
1915
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
42✔
1916
                        inKey)
42✔
1917
                l.mailBox.AckPacket(inKey)
42✔
1918
        }
42✔
1919

1920
        // Lastly, reset our buffers to be empty while keeping any acquired
1921
        // growth in the backing array.
1922
        l.openedCircuits = l.openedCircuits[:0]
1,368✔
1923
        l.closedCircuits = l.closedCircuits[:0]
1,368✔
1924

1,368✔
1925
        return nil
1,368✔
1926
}
1927

1928
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
1929
// the link.
1930
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1,219✔
1931
        err := l.updateCommitTx(ctx)
1,219✔
1932
        switch {
1,219✔
1933
        // No error encountered, success.
1934
        case err == nil:
1,209✔
1935

1936
        // A duplicate keystone error should be resolved and is not fatal, so
1937
        // we won't send an Error message to the peer.
1938
        case errors.Is(err, ErrDuplicateKeystone):
×
1939
                l.failf(LinkFailureError{code: ErrCircuitError},
×
1940
                        "temporary circuit error: %v", err)
×
1941
                return false
×
1942

1943
        // Any other error is treated results in an Error message being sent to
1944
        // the peer.
1945
        default:
10✔
1946
                l.failf(LinkFailureError{code: ErrInternalError},
10✔
1947
                        "unable to update commitment: %v", err)
10✔
1948
                return false
10✔
1949
        }
1950

1951
        return true
1,209✔
1952
}
1953

1954
// updateCommitTx signs, then sends an update to the remote peer adding a new
1955
// commitment to their commitment chain which includes all the latest updates
1956
// we've received+processed up to this point.
1957
func (l *channelLink) updateCommitTx(ctx context.Context) error {
1,277✔
1958
        // Preemptively write all pending keystones to disk, just in case the
1,277✔
1959
        // HTLCs we have in memory are included in the subsequent attempt to
1,277✔
1960
        // sign a commitment state.
1,277✔
1961
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,277✔
1962
        if err != nil {
1,277✔
1963
                // If ErrDuplicateKeystone is returned, the caller will catch
×
1964
                // it.
×
1965
                return err
×
1966
        }
×
1967

1968
        // Reset the batch, but keep the backing buffer to avoid reallocating.
1969
        l.keystoneBatch = l.keystoneBatch[:0]
1,277✔
1970

1,277✔
1971
        // If hodl.Commit mode is active, we will refrain from attempting to
1,277✔
1972
        // commit any in-memory modifications to the channel state. Exiting here
1,277✔
1973
        // permits testing of either the switch or link's ability to trim
1,277✔
1974
        // circuits that have been opened, but unsuccessfully committed.
1,277✔
1975
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,284✔
1976
                l.log.Warnf(hodl.Commit.Warning())
7✔
1977
                return nil
7✔
1978
        }
7✔
1979

1980
        ctx, done := l.cg.Create(ctx)
1,273✔
1981
        defer done()
1,273✔
1982

1,273✔
1983
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,273✔
1984
        if err == lnwallet.ErrNoWindow {
1,351✔
1985
                l.cfg.PendingCommitTicker.Resume()
78✔
1986
                l.log.Trace("PendingCommitTicker resumed")
78✔
1987

78✔
1988
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
78✔
1989
                l.log.Tracef("revocation window exhausted, unable to send: "+
78✔
1990
                        "%v, pend_updates=%v, dangling_closes%v", n,
78✔
1991
                        lnutils.SpewLogClosure(l.openedCircuits),
78✔
1992
                        lnutils.SpewLogClosure(l.closedCircuits))
78✔
1993

78✔
1994
                return nil
78✔
1995
        } else if err != nil {
1,276✔
1996
                return err
×
1997
        }
×
1998

1999
        if err := l.ackDownStreamPackets(); err != nil {
1,198✔
2000
                return err
×
2001
        }
×
2002

2003
        l.cfg.PendingCommitTicker.Pause()
1,198✔
2004
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,198✔
2005

1,198✔
2006
        // The remote party now has a new pending commitment, so we'll update
1,198✔
2007
        // the contract court to be aware of this new set (the prior old remote
1,198✔
2008
        // pending).
1,198✔
2009
        newUpdate := &contractcourt.ContractUpdate{
1,198✔
2010
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,198✔
2011
                Htlcs:   newCommit.PendingHTLCs,
1,198✔
2012
        }
1,198✔
2013
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,198✔
2014
        if err != nil {
1,198✔
2015
                l.log.Errorf("unable to notify contract update: %v", err)
×
2016
                return err
×
2017
        }
×
2018

2019
        select {
1,198✔
2020
        case <-l.cg.Done():
11✔
2021
                return ErrLinkShuttingDown
11✔
2022
        default:
1,187✔
2023
        }
2024

2025
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,187✔
2026
        if err != nil {
1,187✔
2027
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2028
        }
×
2029

2030
        commitSig := &lnwire.CommitSig{
1,187✔
2031
                ChanID:        l.ChanID(),
1,187✔
2032
                CommitSig:     newCommit.CommitSig,
1,187✔
2033
                HtlcSigs:      newCommit.HtlcSigs,
1,187✔
2034
                PartialSig:    newCommit.PartialSig,
1,187✔
2035
                CustomRecords: auxBlobRecords,
1,187✔
2036
        }
1,187✔
2037
        err = l.cfg.Peer.SendMessage(false, commitSig)
1,187✔
2038
        if err != nil {
1,187✔
2039
                l.log.Errorf("failed to send CommitSig: %v", err)
×
2040
        }
×
2041

2042
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2043
        // of commit hooks.
2044
        l.RWMutex.Lock()
1,187✔
2045
        l.outgoingCommitHooks.invoke()
1,187✔
2046
        l.RWMutex.Unlock()
1,187✔
2047

1,187✔
2048
        return nil
1,187✔
2049
}
2050

2051
// Peer returns the representation of remote peer with which we have the
2052
// channel link opened.
2053
//
2054
// NOTE: Part of the ChannelLink interface.
2055
func (l *channelLink) PeerPubKey() [33]byte {
444✔
2056
        return l.cfg.Peer.PubKey()
444✔
2057
}
444✔
2058

2059
// ChannelPoint returns the channel outpoint for the channel link.
2060
// NOTE: Part of the ChannelLink interface.
2061
func (l *channelLink) ChannelPoint() wire.OutPoint {
853✔
2062
        return l.channel.ChannelPoint()
853✔
2063
}
853✔
2064

2065
// ShortChanID returns the short channel ID for the channel link. The short
2066
// channel ID encodes the exact location in the main chain that the original
2067
// funding output can be found.
2068
//
2069
// NOTE: Part of the ChannelLink interface.
2070
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,252✔
2071
        l.RLock()
4,252✔
2072
        defer l.RUnlock()
4,252✔
2073

4,252✔
2074
        return l.channel.ShortChanID()
4,252✔
2075
}
4,252✔
2076

2077
// UpdateShortChanID updates the short channel ID for a link. This may be
2078
// required in the event that a link is created before the short chan ID for it
2079
// is known, or a re-org occurs, and the funding transaction changes location
2080
// within the chain.
2081
//
2082
// NOTE: Part of the ChannelLink interface.
2083
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
3✔
2084
        chanID := l.ChanID()
3✔
2085

3✔
2086
        // Refresh the channel state's short channel ID by loading it from disk.
3✔
2087
        // This ensures that the channel state accurately reflects the updated
3✔
2088
        // short channel ID.
3✔
2089
        err := l.channel.State().Refresh()
3✔
2090
        if err != nil {
3✔
2091
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2092
                        "%v", chanID, err)
×
2093
                return hop.Source, err
×
2094
        }
×
2095

2096
        return hop.Source, nil
3✔
2097
}
2098

2099
// ChanID returns the channel ID for the channel link. The channel ID is a more
2100
// compact representation of a channel's full outpoint.
2101
//
2102
// NOTE: Part of the ChannelLink interface.
2103
func (l *channelLink) ChanID() lnwire.ChannelID {
3,927✔
2104
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,927✔
2105
}
3,927✔
2106

2107
// Bandwidth returns the total amount that can flow through the channel link at
2108
// this given instance. The value returned is expressed in millisatoshi and can
2109
// be used by callers when making forwarding decisions to determine if a link
2110
// can accept an HTLC.
2111
//
2112
// NOTE: Part of the ChannelLink interface.
2113
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
814✔
2114
        // Get the balance available on the channel for new HTLCs. This takes
814✔
2115
        // the channel reserve into account so HTLCs up to this value won't
814✔
2116
        // violate it.
814✔
2117
        return l.channel.AvailableBalance()
814✔
2118
}
814✔
2119

2120
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2121
// amount provided to the link. This check does not reserve a space, since
2122
// forwards or other payments may use the available slot, so it should be
2123
// considered best-effort.
2124
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
3✔
2125
        return l.channel.MayAddOutgoingHtlc(amt)
3✔
2126
}
3✔
2127

2128
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2129
// method.
2130
//
2131
// NOTE: Part of the dustHandler interface.
2132
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2133
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,526✔
2134

2,526✔
2135
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,526✔
2136
}
2,526✔
2137

2138
// getFeeRate is a wrapper method that retrieves the underlying channel's
2139
// feerate.
2140
//
2141
// NOTE: Part of the dustHandler interface.
2142
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
672✔
2143
        return l.channel.CommitFeeRate()
672✔
2144
}
672✔
2145

2146
// getDustClosure returns a closure that can be used by the switch or mailbox
2147
// to evaluate whether a given HTLC is dust.
2148
//
2149
// NOTE: Part of the dustHandler interface.
2150
func (l *channelLink) getDustClosure() dustClosure {
1,602✔
2151
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,602✔
2152
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,602✔
2153
        chanType := l.channel.State().ChanType
1,602✔
2154

1,602✔
2155
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,602✔
2156
}
1,602✔
2157

2158
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2159
// is used so that the Switch can have access to the commitment fee without
2160
// needing to have a *LightningChannel. This doesn't include dust.
2161
//
2162
// NOTE: Part of the dustHandler interface.
2163
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,875✔
2164
        if remote {
2,820✔
2165
                return l.channel.State().RemoteCommitment.CommitFee
945✔
2166
        }
945✔
2167

2168
        return l.channel.State().LocalCommitment.CommitFee
933✔
2169
}
2170

2171
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2172
// increases the total dust and fees within the channel past the configured
2173
// fee threshold. It first calculates the dust sum over every update in the
2174
// update log with the proposed fee-rate and taking into account both the local
2175
// and remote dust limits. It uses every update in the update log instead of
2176
// what is actually on the local and remote commitments because it is assumed
2177
// that in a worst-case scenario, every update in the update log could
2178
// theoretically be on either commitment transaction and this needs to be
2179
// accounted for with this fee-rate. It then calculates the local and remote
2180
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2181
// and determines if the fee threshold has been exceeded.
2182
func (l *channelLink) exceedsFeeExposureLimit(
2183
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2184

6✔
2185
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2186

6✔
2187
        // Get the sum of dust for both the local and remote commitments using
6✔
2188
        // this "dry-run" fee.
6✔
2189
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2190
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2191

6✔
2192
        // Calculate the local and remote commitment fees using this dry-run
6✔
2193
        // fee.
6✔
2194
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2195
        if err != nil {
6✔
2196
                return false, err
×
2197
        }
×
2198

2199
        // Finally, check whether the max fee exposure was exceeded on either
2200
        // future commitment transaction with the fee-rate.
2201
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
2202
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
2203
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2204
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
2205
                        totalLocalDust, localFee)
×
2206

×
2207
                return true, nil
×
2208
        }
×
2209

2210
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
2211
                remoteFee,
6✔
2212
        )
6✔
2213

6✔
2214
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
2215
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
2216
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
2217
                        totalRemoteDust, remoteFee)
×
2218

×
2219
                return true, nil
×
2220
        }
×
2221

2222
        return false, nil
6✔
2223
}
2224

2225
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2226
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2227
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
2228
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2229
// the overall dust sum. If it is not dust, it contributes to weight, which
2230
// also adds to the overall dust sum by an increase in fees. If the dust sum on
2231
// either commitment exceeds the configured fee threshold, this function
2232
// returns true.
2233
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2234
        incoming bool) bool {
933✔
2235

933✔
2236
        dustClosure := l.getDustClosure()
933✔
2237

933✔
2238
        feeRate := l.channel.WorstCaseFeeRate()
933✔
2239

933✔
2240
        amount := htlc.Amount.ToSatoshis()
933✔
2241

933✔
2242
        // See if this HTLC is dust on both the local and remote commitments.
933✔
2243
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
933✔
2244
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
933✔
2245

933✔
2246
        // Calculate the dust sum for the local and remote commitments.
933✔
2247
        localDustSum := l.getDustSum(
933✔
2248
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
933✔
2249
        )
933✔
2250
        remoteDustSum := l.getDustSum(
933✔
2251
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
933✔
2252
        )
933✔
2253

933✔
2254
        // Grab the larger of the local and remote commitment fees w/o dust.
933✔
2255
        commitFee := l.getCommitFee(false)
933✔
2256

933✔
2257
        if l.getCommitFee(true) > commitFee {
945✔
2258
                commitFee = l.getCommitFee(true)
12✔
2259
        }
12✔
2260

2261
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
933✔
2262

933✔
2263
        localDustSum += commitFeeMSat
933✔
2264
        remoteDustSum += commitFeeMSat
933✔
2265

933✔
2266
        // Calculate the additional fee increase if this is a non-dust HTLC.
933✔
2267
        weight := lntypes.WeightUnit(input.HTLCWeight)
933✔
2268
        additional := lnwire.NewMSatFromSatoshis(
933✔
2269
                feeRate.FeeForWeight(weight),
933✔
2270
        )
933✔
2271

933✔
2272
        if isLocalDust {
1,569✔
2273
                // If this is dust, it doesn't contribute to weight but does
636✔
2274
                // contribute to the overall dust sum.
636✔
2275
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
636✔
2276
        } else {
936✔
2277
                // Account for the fee increase that comes with an increase in
300✔
2278
                // weight.
300✔
2279
                localDustSum += additional
300✔
2280
        }
300✔
2281

2282
        if localDustSum > l.cfg.MaxFeeExposure {
937✔
2283
                // The max fee exposure was exceeded.
4✔
2284
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
2285
                        "overexposed, total local dust: %v (current commit "+
4✔
2286
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
2287

4✔
2288
                return true
4✔
2289
        }
4✔
2290

2291
        if isRemoteDust {
1,562✔
2292
                // If this is dust, it doesn't contribute to weight but does
633✔
2293
                // contribute to the overall dust sum.
633✔
2294
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
633✔
2295
        } else {
932✔
2296
                // Account for the fee increase that comes with an increase in
299✔
2297
                // weight.
299✔
2298
                remoteDustSum += additional
299✔
2299
        }
299✔
2300

2301
        if remoteDustSum > l.cfg.MaxFeeExposure {
929✔
2302
                // The max fee exposure was exceeded.
×
2303
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
2304
                        "overexposed, total remote dust: %v (current commit "+
×
2305
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
2306

×
2307
                return true
×
2308
        }
×
2309

2310
        return false
929✔
2311
}
2312

2313
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2314
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2315
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2316
// whether to evaluate on the local or remote commit, and finally an HTLC
2317
// amount to test.
2318
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2319
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2320

2321
// dustHelper is used to construct the dustClosure.
2322
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2323
        remoteDustLimit btcutil.Amount) dustClosure {
1,802✔
2324

1,802✔
2325
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,802✔
2326
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
12,225✔
2327

10,423✔
2328
                var dustLimit btcutil.Amount
10,423✔
2329
                if whoseCommit.IsLocal() {
15,636✔
2330
                        dustLimit = localDustLimit
5,213✔
2331
                } else {
10,426✔
2332
                        dustLimit = remoteDustLimit
5,213✔
2333
                }
5,213✔
2334

2335
                return lnwallet.HtlcIsDust(
10,423✔
2336
                        chantype, incoming, whoseCommit, feerate, amt,
10,423✔
2337
                        dustLimit,
10,423✔
2338
                )
10,423✔
2339
        }
2340

2341
        return isDust
1,802✔
2342
}
2343

2344
// zeroConfConfirmed returns whether or not the zero-conf channel has
2345
// confirmed on-chain.
2346
//
2347
// Part of the scidAliasHandler interface.
2348
func (l *channelLink) zeroConfConfirmed() bool {
6✔
2349
        return l.channel.State().ZeroConfConfirmed()
6✔
2350
}
6✔
2351

2352
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2353
// should not be called for non-zero-conf channels.
2354
//
2355
// Part of the scidAliasHandler interface.
2356
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
6✔
2357
        return l.channel.State().ZeroConfRealScid()
6✔
2358
}
6✔
2359

2360
// isZeroConf returns whether or not the underlying channel is a zero-conf
2361
// channel.
2362
//
2363
// Part of the scidAliasHandler interface.
2364
func (l *channelLink) isZeroConf() bool {
216✔
2365
        return l.channel.State().IsZeroConf()
216✔
2366
}
216✔
2367

2368
// negotiatedAliasFeature returns whether or not the underlying channel has
2369
// negotiated the option-scid-alias feature bit. This will be true for both
2370
// option-scid-alias and zero-conf channel-types. It will also be true for
2371
// channels with the feature bit but without the above channel-types.
2372
//
2373
// Part of the scidAliasFeature interface.
2374
func (l *channelLink) negotiatedAliasFeature() bool {
377✔
2375
        return l.channel.State().NegotiatedAliasFeature()
377✔
2376
}
377✔
2377

2378
// getAliases returns the set of aliases for the underlying channel.
2379
//
2380
// Part of the scidAliasHandler interface.
2381
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
222✔
2382
        return l.cfg.GetAliases(l.ShortChanID())
222✔
2383
}
222✔
2384

2385
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2386
//
2387
// Part of the scidAliasHandler interface.
2388
func (l *channelLink) attachFailAliasUpdate(closure func(
2389
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
217✔
2390

217✔
2391
        l.Lock()
217✔
2392
        l.cfg.FailAliasUpdate = closure
217✔
2393
        l.Unlock()
217✔
2394
}
217✔
2395

2396
// AttachMailBox updates the current mailbox used by this link, and hooks up
2397
// the mailbox's message and packet outboxes to the link's upstream and
2398
// downstream chans, respectively.
2399
func (l *channelLink) AttachMailBox(mailbox MailBox) {
216✔
2400
        l.Lock()
216✔
2401
        l.mailBox = mailbox
216✔
2402
        l.upstream = mailbox.MessageOutBox()
216✔
2403
        l.downstream = mailbox.PacketOutBox()
216✔
2404
        l.Unlock()
216✔
2405

216✔
2406
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
216✔
2407
        // never committed.
216✔
2408
        l.mailBox.SetFeeRate(l.getFeeRate())
216✔
2409

216✔
2410
        // Also set the mailbox's dust closure so that it can query whether HTLC's
216✔
2411
        // are dust given the current feerate.
216✔
2412
        l.mailBox.SetDustClosure(l.getDustClosure())
216✔
2413
}
216✔
2414

2415
// UpdateForwardingPolicy updates the forwarding policy for the target
2416
// ChannelLink. Once updated, the link will use the new forwarding policy to
2417
// govern if it an incoming HTLC should be forwarded or not. We assume that
2418
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2419
// update all of the link's FwrdingPolicy's values.
2420
//
2421
// NOTE: Part of the ChannelLink interface.
2422
func (l *channelLink) UpdateForwardingPolicy(
2423
        newPolicy models.ForwardingPolicy) {
15✔
2424

15✔
2425
        l.Lock()
15✔
2426
        defer l.Unlock()
15✔
2427

15✔
2428
        l.cfg.FwrdingPolicy = newPolicy
15✔
2429
}
15✔
2430

2431
// CheckHtlcForward should return a nil error if the passed HTLC details
2432
// satisfy the current forwarding policy fo the target link. Otherwise,
2433
// a LinkError with a valid protocol failure message should be returned
2434
// in order to signal to the source of the HTLC, the policy consistency
2435
// issue.
2436
//
2437
// NOTE: Part of the ChannelLink interface.
2438
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
2439
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
2440
        outgoingTimeout uint32, inboundFee models.InboundFee,
2441
        heightNow uint32, originalScid lnwire.ShortChannelID,
2442
        customRecords lnwire.CustomRecords) *LinkError {
52✔
2443

52✔
2444
        l.RLock()
52✔
2445
        policy := l.cfg.FwrdingPolicy
52✔
2446
        l.RUnlock()
52✔
2447

52✔
2448
        // Using the outgoing HTLC amount, we'll calculate the outgoing
52✔
2449
        // fee this incoming HTLC must carry in order to satisfy the constraints
52✔
2450
        // of the outgoing link.
52✔
2451
        outFee := ExpectedFee(policy, amtToForward)
52✔
2452

52✔
2453
        // Then calculate the inbound fee that we charge based on the sum of
52✔
2454
        // outgoing HTLC amount and outgoing fee.
52✔
2455
        inFee := inboundFee.CalcFee(amtToForward + outFee)
52✔
2456

52✔
2457
        // Add up both fee components. It is important to calculate both fees
52✔
2458
        // separately. An alternative way of calculating is to first determine
52✔
2459
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
52✔
2460
        // rounding may cause the result to be slightly higher than in the case
52✔
2461
        // of separately rounded fee components. This potentially causes failed
52✔
2462
        // forwards for senders and is something to be avoided.
52✔
2463
        expectedFee := inFee + int64(outFee)
52✔
2464

52✔
2465
        // If the actual fee is less than our expected fee, then we'll reject
52✔
2466
        // this HTLC as it didn't provide a sufficient amount of fees, or the
52✔
2467
        // values have been tampered with, or the send used incorrect/dated
52✔
2468
        // information to construct the forwarding information for this hop. In
52✔
2469
        // any case, we'll cancel this HTLC.
52✔
2470
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
52✔
2471
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
61✔
2472
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
9✔
2473
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
9✔
2474
                        "inboundFee=%v",
9✔
2475
                        payHash[:], expectedFee, actualFee,
9✔
2476
                        incomingHtlcAmt, amtToForward, inboundFee,
9✔
2477
                )
9✔
2478

9✔
2479
                // As part of the returned error, we'll send our latest routing
9✔
2480
                // policy so the sending node obtains the most up to date data.
9✔
2481
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
18✔
2482
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
9✔
2483
                }
9✔
2484
                failure := l.createFailureWithUpdate(false, originalScid, cb)
9✔
2485
                return NewLinkError(failure)
9✔
2486
        }
2487

2488
        // Check whether the outgoing htlc satisfies the channel policy.
2489
        err := l.canSendHtlc(
46✔
2490
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
46✔
2491
                originalScid, customRecords,
46✔
2492
        )
46✔
2493
        if err != nil {
62✔
2494
                return err
16✔
2495
        }
16✔
2496

2497
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2498
        // the following constraint: the incoming time-lock minus our time-lock
2499
        // delta should equal the outgoing time lock. Otherwise, whether the
2500
        // sender messed up, or an intermediate node tampered with the HTLC.
2501
        timeDelta := policy.TimeLockDelta
33✔
2502
        if incomingTimeout < outgoingTimeout+timeDelta {
35✔
2503
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
2504
                        "expected at least %v block delta, got %v block delta",
2✔
2505
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
2506

2✔
2507
                // Grab the latest routing policy so the sending node is up to
2✔
2508
                // date with our current policy.
2✔
2509
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2510
                        return lnwire.NewIncorrectCltvExpiry(
2✔
2511
                                incomingTimeout, *upd,
2✔
2512
                        )
2✔
2513
                }
2✔
2514
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2515
                return NewLinkError(failure)
2✔
2516
        }
2517

2518
        return nil
31✔
2519
}
2520

2521
// CheckHtlcTransit should return a nil error if the passed HTLC details
2522
// satisfy the current channel policy.  Otherwise, a LinkError with a
2523
// valid protocol failure message should be returned in order to signal
2524
// the violation. This call is intended to be used for locally initiated
2525
// payments for which there is no corresponding incoming htlc.
2526
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2527
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
2528
        customRecords lnwire.CustomRecords) *LinkError {
409✔
2529

409✔
2530
        l.RLock()
409✔
2531
        policy := l.cfg.FwrdingPolicy
409✔
2532
        l.RUnlock()
409✔
2533

409✔
2534
        // We pass in hop.Source here as this is only used in the Switch when
409✔
2535
        // trying to send over a local link. This causes the fallback mechanism
409✔
2536
        // to occur.
409✔
2537
        return l.canSendHtlc(
409✔
2538
                policy, payHash, amt, timeout, heightNow, hop.Source,
409✔
2539
                customRecords,
409✔
2540
        )
409✔
2541
}
409✔
2542

2543
// canSendHtlc checks whether the given htlc parameters satisfy
2544
// the channel's amount and time lock constraints.
2545
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2546
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2547
        heightNow uint32, originalScid lnwire.ShortChannelID,
2548
        customRecords lnwire.CustomRecords) *LinkError {
452✔
2549

452✔
2550
        // Validate HTLC amount against policy limits.
452✔
2551
        linkErr := l.validateHtlcAmount(
452✔
2552
                policy, payHash, amt, originalScid, customRecords,
452✔
2553
        )
452✔
2554
        if linkErr != nil {
466✔
2555
                return linkErr
14✔
2556
        }
14✔
2557

2558
        // We want to avoid offering an HTLC which will expire in the near
2559
        // future, so we'll reject an HTLC if the outgoing expiration time is
2560
        // too close to the current height.
2561
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
443✔
2562
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
2563
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
2564
                        timeout, heightNow)
2✔
2565

2✔
2566
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
2567
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
2568
                }
2✔
2569
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2570

2✔
2571
                return NewLinkError(failure)
2✔
2572
        }
2573

2574
        // Check absolute max delta.
2575
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
440✔
2576
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
2577
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
2578
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
2579

1✔
2580
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
2581
        }
1✔
2582

2583
        // We now check the available bandwidth to see if this HTLC can be
2584
        // forwarded.
2585
        availableBandwidth := l.Bandwidth()
438✔
2586

438✔
2587
        auxBandwidth, externalErr := fn.MapOptionZ(
438✔
2588
                l.cfg.AuxTrafficShaper,
438✔
2589
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
438✔
2590
                        var htlcBlob fn.Option[tlv.Blob]
×
2591
                        blob, err := customRecords.Serialize()
×
2592
                        if err != nil {
×
2593
                                return fn.Err[OptionalBandwidth](
×
2594
                                        fmt.Errorf("unable to serialize "+
×
2595
                                                "custom records: %w", err))
×
2596
                        }
×
2597

2598
                        if len(blob) > 0 {
×
2599
                                htlcBlob = fn.Some(blob)
×
2600
                        }
×
2601

2602
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
2603
                },
2604
        ).Unpack()
2605
        if externalErr != nil {
438✔
2606
                l.log.Errorf("Unable to determine aux bandwidth: %v",
×
2607
                        externalErr)
×
2608

×
2609
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
2610
        }
×
2611

2612
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
438✔
2613
                auxBandwidth.Bandwidth.WhenSome(
×
2614
                        func(bandwidth lnwire.MilliSatoshi) {
×
2615
                                availableBandwidth = bandwidth
×
2616
                        },
×
2617
                )
2618
        }
2619

2620
        // Check to see if there is enough balance in this channel.
2621
        if amt > availableBandwidth {
442✔
2622
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
4✔
2623
                        "larger than %v", amt, availableBandwidth)
4✔
2624
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
8✔
2625
                        return lnwire.NewTemporaryChannelFailure(upd)
4✔
2626
                }
4✔
2627
                failure := l.createFailureWithUpdate(false, originalScid, cb)
4✔
2628

4✔
2629
                return NewDetailedLinkError(
4✔
2630
                        failure, OutgoingFailureInsufficientBalance,
4✔
2631
                )
4✔
2632
        }
2633

2634
        return nil
437✔
2635
}
2636

2637
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
2638
// in milli-satoshi. This might be different from the regular BTC bandwidth for
2639
// custom channels. This will always return fn.None() for a regular (non-custom)
2640
// channel.
2641
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
2642
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
2643
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
2644

×
2645
        fundingBlob := l.FundingCustomBlob()
×
2646
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
2647
        if err != nil {
×
2648
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
2649
                        "failed to decide whether to handle traffic: %w", err))
×
2650
        }
×
2651

2652
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
2653
                "traffic: %v", cid, shouldHandle)
×
2654

×
2655
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
2656
        // early.
×
2657
        if !shouldHandle {
×
2658
                return fn.Ok(OptionalBandwidth{
×
2659
                        IsHandled: false,
×
2660
                })
×
2661
        }
×
2662

2663
        peerBytes := l.cfg.Peer.PubKey()
×
2664

×
2665
        peer, err := route.NewVertexFromBytes(peerBytes[:])
×
2666
        if err != nil {
×
2667
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to decode "+
×
2668
                        "peer pub key: %v", err))
×
2669
        }
×
2670

2671
        // Ask for a specific bandwidth to be used for the channel.
2672
        commitmentBlob := l.CommitmentCustomBlob()
×
2673
        auxBandwidth, err := ts.PaymentBandwidth(
×
2674
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
2675
                l.channel.FetchLatestAuxHTLCView(), peer,
×
2676
        )
×
2677
        if err != nil {
×
2678
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
2679
                        "bandwidth from external traffic shaper: %w", err))
×
2680
        }
×
2681

2682
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
2683
                "bandwidth: %v", cid, auxBandwidth)
×
2684

×
2685
        return fn.Ok(OptionalBandwidth{
×
2686
                IsHandled: true,
×
2687
                Bandwidth: fn.Some(auxBandwidth),
×
2688
        })
×
2689
}
2690

2691
// Stats returns the statistics of channel link.
2692
//
2693
// NOTE: Part of the ChannelLink interface.
2694
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
7✔
2695
        snapshot := l.channel.StateSnapshot()
7✔
2696

7✔
2697
        return snapshot.ChannelCommitment.CommitHeight,
7✔
2698
                snapshot.TotalMSatSent,
7✔
2699
                snapshot.TotalMSatReceived
7✔
2700
}
7✔
2701

2702
// String returns the string representation of channel link.
2703
//
2704
// NOTE: Part of the ChannelLink interface.
2705
func (l *channelLink) String() string {
×
2706
        return l.channel.ChannelPoint().String()
×
2707
}
×
2708

2709
// handleSwitchPacket handles the switch packets. This packets which might be
2710
// forwarded to us from another channel link in case the htlc update came from
2711
// another peer or if the update was created by user
2712
//
2713
// NOTE: Part of the packetHandler interface.
2714
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
482✔
2715
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
482✔
2716
                pkt.inKey(), pkt.outKey())
482✔
2717

482✔
2718
        return l.mailBox.AddPacket(pkt)
482✔
2719
}
482✔
2720

2721
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
2722
// to us from remote peer we have a channel with.
2723
//
2724
// NOTE: Part of the ChannelLink interface.
2725
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,340✔
2726
        select {
3,340✔
2727
        case <-l.cg.Done():
×
2728
                // Return early if the link is already in the process of
×
2729
                // quitting. It doesn't make sense to hand the message to the
×
2730
                // mailbox here.
×
2731
                return
×
2732
        default:
3,340✔
2733
        }
2734

2735
        err := l.mailBox.AddMessage(message)
3,340✔
2736
        if err != nil {
3,340✔
2737
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
2738
        }
×
2739
}
2740

2741
// updateChannelFee updates the commitment fee-per-kw on this channel by
2742
// committing to an update_fee message.
2743
func (l *channelLink) updateChannelFee(ctx context.Context,
2744
        feePerKw chainfee.SatPerKWeight) error {
3✔
2745

3✔
2746
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
2747

3✔
2748
        // We skip sending the UpdateFee message if the channel is not
3✔
2749
        // currently eligible to forward messages.
3✔
2750
        if !l.eligibleToUpdate() {
3✔
2751
                l.log.Debugf("skipping fee update for inactive channel")
×
2752
                return nil
×
2753
        }
×
2754

2755
        // Check and see if our proposed fee-rate would make us exceed the fee
2756
        // threshold.
2757
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
2758
        if err != nil {
3✔
2759
                // This shouldn't typically happen. If it does, it indicates
×
2760
                // something is wrong with our channel state.
×
2761
                return err
×
2762
        }
×
2763

2764
        if thresholdExceeded {
3✔
2765
                return fmt.Errorf("link fee threshold exceeded")
×
2766
        }
×
2767

2768
        // First, we'll update the local fee on our commitment.
2769
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
2770
                return err
×
2771
        }
×
2772

2773
        // The fee passed the channel's validation checks, so we update the
2774
        // mailbox feerate.
2775
        l.mailBox.SetFeeRate(feePerKw)
3✔
2776

3✔
2777
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
2778
        // in immediately by triggering a commitment update.
3✔
2779
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
2780
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
2781
                return err
×
2782
        }
×
2783

2784
        return l.updateCommitTx(ctx)
3✔
2785
}
2786

2787
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
2788
// after receiving a revocation from the remote party, and reprocesses them in
2789
// the context of the provided forwarding package. Any settles or fails that
2790
// have already been acknowledged in the forwarding package will not be sent to
2791
// the switch.
2792
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,177✔
2793
        if len(fwdPkg.SettleFails) == 0 {
2,039✔
2794
                l.log.Trace("fwd package has no settle/fails to process " +
862✔
2795
                        "exiting early")
862✔
2796

862✔
2797
                return
862✔
2798
        }
862✔
2799

2800
        // Exit early if the fwdPkg is already processed.
2801
        if fwdPkg.State == channeldb.FwdStateCompleted {
318✔
2802
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2803

×
2804
                return
×
2805
        }
×
2806

2807
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
318✔
2808

318✔
2809
        var switchPackets []*htlcPacket
318✔
2810
        for i, update := range fwdPkg.SettleFails {
636✔
2811
                destRef := fwdPkg.DestRef(uint16(i))
318✔
2812

318✔
2813
                // Skip any settles or fails that have already been
318✔
2814
                // acknowledged by the incoming link that originated the
318✔
2815
                // forwarded Add.
318✔
2816
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
318✔
2817
                        continue
×
2818
                }
2819

2820
                // TODO(roasbeef): rework log entries to a shared
2821
                // interface.
2822

2823
                switch msg := update.UpdateMsg.(type) {
318✔
2824
                // A settle for an HTLC we previously forwarded HTLC has been
2825
                // received. So we'll forward the HTLC to the switch which will
2826
                // handle propagating the settle to the prior hop.
2827
                case *lnwire.UpdateFulfillHTLC:
195✔
2828
                        // If hodl.SettleIncoming is requested, we will not
195✔
2829
                        // forward the SETTLE to the switch and will not signal
195✔
2830
                        // a free slot on the commitment transaction.
195✔
2831
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
195✔
2832
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
2833
                                continue
×
2834
                        }
2835

2836
                        settlePacket := &htlcPacket{
195✔
2837
                                outgoingChanID: l.ShortChanID(),
195✔
2838
                                outgoingHTLCID: msg.ID,
195✔
2839
                                destRef:        &destRef,
195✔
2840
                                htlc:           msg,
195✔
2841
                        }
195✔
2842

195✔
2843
                        // Add the packet to the batch to be forwarded, and
195✔
2844
                        // notify the overflow queue that a spare spot has been
195✔
2845
                        // freed up within the commitment state.
195✔
2846
                        switchPackets = append(switchPackets, settlePacket)
195✔
2847

2848
                // A failureCode message for a previously forwarded HTLC has
2849
                // been received. As a result a new slot will be freed up in
2850
                // our commitment state, so we'll forward this to the switch so
2851
                // the backwards undo can continue.
2852
                case *lnwire.UpdateFailHTLC:
126✔
2853
                        // If hodl.SettleIncoming is requested, we will not
126✔
2854
                        // forward the FAIL to the switch and will not signal a
126✔
2855
                        // free slot on the commitment transaction.
126✔
2856
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
126✔
2857
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
2858
                                continue
×
2859
                        }
2860

2861
                        // Fetch the reason the HTLC was canceled so we can
2862
                        // continue to propagate it. This failure originated
2863
                        // from another node, so the linkFailure field is not
2864
                        // set on the packet.
2865
                        failPacket := &htlcPacket{
126✔
2866
                                outgoingChanID: l.ShortChanID(),
126✔
2867
                                outgoingHTLCID: msg.ID,
126✔
2868
                                destRef:        &destRef,
126✔
2869
                                htlc:           msg,
126✔
2870
                        }
126✔
2871

126✔
2872
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
126✔
2873

126✔
2874
                        // If the failure message lacks an HMAC (but includes
126✔
2875
                        // the 4 bytes for encoding the message and padding
126✔
2876
                        // lengths, then this means that we received it as an
126✔
2877
                        // UpdateFailMalformedHTLC. As a result, we'll signal
126✔
2878
                        // that we need to convert this error within the switch
126✔
2879
                        // to an actual error, by encrypting it as if we were
126✔
2880
                        // the originating hop.
126✔
2881
                        convertedErrorSize := lnwire.FailureMessageLength + 4
126✔
2882
                        if len(msg.Reason) == convertedErrorSize {
132✔
2883
                                failPacket.convertedError = true
6✔
2884
                        }
6✔
2885

2886
                        // Add the packet to the batch to be forwarded, and
2887
                        // notify the overflow queue that a spare spot has been
2888
                        // freed up within the commitment state.
2889
                        switchPackets = append(switchPackets, failPacket)
126✔
2890
                }
2891
        }
2892

2893
        // Only spawn the task forward packets we have a non-zero number.
2894
        if len(switchPackets) > 0 {
636✔
2895
                go l.forwardBatch(false, switchPackets...)
318✔
2896
        }
318✔
2897
}
2898

2899
// processRemoteAdds serially processes each of the Add payment descriptors
2900
// which have been "locked-in" by receiving a revocation from the remote party.
2901
// The forwarding package provided instructs how to process this batch,
2902
// indicating whether this is the first time these Adds are being processed, or
2903
// whether we are reprocessing as a result of a failure or restart. Adds that
2904
// have already been acknowledged in the forwarding package will be ignored.
2905
//
2906
// NOTE: This function needs also be called for fwd packages with no ADDs
2907
// because it marks the fwdPkg as processed by writing the FwdFilter into the
2908
// database.
2909
//
2910
//nolint:funlen
2911
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,179✔
2912
        // Exit early if the fwdPkg is already processed.
1,179✔
2913
        if fwdPkg.State == channeldb.FwdStateCompleted {
1,179✔
2914
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
2915

×
2916
                return
×
2917
        }
×
2918

2919
        l.log.Tracef("processing %d remote adds for height %d",
1,179✔
2920
                len(fwdPkg.Adds), fwdPkg.Height)
1,179✔
2921

1,179✔
2922
        // decodeReqs is a list of requests sent to the onion decoder. We expect
1,179✔
2923
        // the same length of responses to be returned.
1,179✔
2924
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
1,179✔
2925

1,179✔
2926
        // unackedAdds is a list of ADDs that's waiting for the remote's
1,179✔
2927
        // settle/fail update.
1,179✔
2928
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
1,179✔
2929

1,179✔
2930
        for i, update := range fwdPkg.Adds {
1,631✔
2931
                // If this index is already found in the ack filter, the
452✔
2932
                // response to this forwarding decision has already been
452✔
2933
                // committed by one of our commitment txns. ADDs in this state
452✔
2934
                // are waiting for the rest of the fwding package to get acked
452✔
2935
                // before being garbage collected.
452✔
2936
                if fwdPkg.State == channeldb.FwdStateProcessed &&
452✔
2937
                        fwdPkg.AckFilter.Contains(uint16(i)) {
452✔
2938

×
2939
                        continue
×
2940
                }
2941

2942
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
904✔
2943
                        // Before adding the new htlc to the state machine,
452✔
2944
                        // parse the onion object in order to obtain the
452✔
2945
                        // routing information with DecodeHopIterator function
452✔
2946
                        // which process the Sphinx packet.
452✔
2947
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
452✔
2948

452✔
2949
                        req := hop.DecodeHopIteratorRequest{
452✔
2950
                                OnionReader:    onionReader,
452✔
2951
                                RHash:          msg.PaymentHash[:],
452✔
2952
                                IncomingCltv:   msg.Expiry,
452✔
2953
                                IncomingAmount: msg.Amount,
452✔
2954
                                BlindingPoint:  msg.BlindingPoint,
452✔
2955
                        }
452✔
2956

452✔
2957
                        decodeReqs = append(decodeReqs, req)
452✔
2958
                        unackedAdds = append(unackedAdds, msg)
452✔
2959
                }
452✔
2960
        }
2961

2962
        // If the fwdPkg has already been processed, it means we are
2963
        // reforwarding the packets again, which happens only on a restart.
2964
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
1,179✔
2965

1,179✔
2966
        // Atomically decode the incoming htlcs, simultaneously checking for
1,179✔
2967
        // replay attempts. A particular index in the returned, spare list of
1,179✔
2968
        // channel iterators should only be used if the failure code at the
1,179✔
2969
        // same index is lnwire.FailCodeNone.
1,179✔
2970
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,179✔
2971
                fwdPkg.ID(), decodeReqs, reforward,
1,179✔
2972
        )
1,179✔
2973
        if sphinxErr != nil {
1,179✔
2974
                l.failf(LinkFailureError{code: ErrInternalError},
×
2975
                        "unable to decode hop iterators: %v", sphinxErr)
×
2976
                return
×
2977
        }
×
2978

2979
        var switchPackets []*htlcPacket
1,179✔
2980

1,179✔
2981
        for i, update := range unackedAdds {
1,631✔
2982
                idx := uint16(i)
452✔
2983
                sourceRef := fwdPkg.SourceRef(idx)
452✔
2984
                add := *update
452✔
2985

452✔
2986
                // An incoming HTLC add has been full-locked in. As a result we
452✔
2987
                // can now examine the forwarding details of the HTLC, and the
452✔
2988
                // HTLC itself to decide if: we should forward it, cancel it,
452✔
2989
                // or are able to settle it (and it adheres to our fee related
452✔
2990
                // constraints).
452✔
2991

452✔
2992
                // Before adding the new htlc to the state machine, parse the
452✔
2993
                // onion object in order to obtain the routing information with
452✔
2994
                // DecodeHopIterator function which process the Sphinx packet.
452✔
2995
                chanIterator, failureCode := decodeResps[i].Result()
452✔
2996
                if failureCode != lnwire.CodeNone {
457✔
2997
                        // If we're unable to process the onion blob then we
5✔
2998
                        // should send the malformed htlc error to payment
5✔
2999
                        // sender.
5✔
3000
                        l.sendMalformedHTLCError(
5✔
3001
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
5✔
3002
                        )
5✔
3003

5✔
3004
                        l.log.Errorf("unable to decode onion hop iterator "+
5✔
3005
                                "for htlc(id=%v, hash=%x): %v", add.ID,
5✔
3006
                                add.PaymentHash, failureCode)
5✔
3007

5✔
3008
                        continue
5✔
3009
                }
3010

3011
                heightNow := l.cfg.BestHeight()
450✔
3012

450✔
3013
                pld, routeRole, pldErr := chanIterator.HopPayload()
450✔
3014
                if pldErr != nil {
453✔
3015
                        // If we're unable to process the onion payload, or we
3✔
3016
                        // received invalid onion payload failure, then we
3✔
3017
                        // should send an error back to the caller so the HTLC
3✔
3018
                        // can be canceled.
3✔
3019
                        var failedType uint64
3✔
3020

3✔
3021
                        // We need to get the underlying error value, so we
3✔
3022
                        // can't use errors.As as suggested by the linter.
3✔
3023
                        //nolint:errorlint
3✔
3024
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3✔
3025
                                failedType = uint64(e.Type)
×
3026
                        }
×
3027

3028
                        // If we couldn't parse the payload, make our best
3029
                        // effort at creating an error encrypter that knows
3030
                        // what blinding type we were, but if we couldn't
3031
                        // parse the payload we have no way of knowing whether
3032
                        // we were the introduction node or not.
3033
                        //
3034
                        //nolint:ll
3035
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
3✔
3036
                                l.cfg.ExtractErrorEncrypter,
3✔
3037
                                // We need our route role here because we
3✔
3038
                                // couldn't parse or validate the payload.
3✔
3039
                                routeRole == hop.RouteRoleIntroduction,
3✔
3040
                        )
3✔
3041
                        if failCode != lnwire.CodeNone {
3✔
3042
                                l.log.Errorf("could not extract error "+
×
3043
                                        "encrypter: %v", pldErr)
×
3044

×
3045
                                // We can't process this htlc, send back
×
3046
                                // malformed.
×
3047
                                l.sendMalformedHTLCError(
×
3048
                                        add.ID, failureCode, add.OnionBlob,
×
3049
                                        &sourceRef,
×
3050
                                )
×
3051

×
3052
                                continue
×
3053
                        }
3054

3055
                        // TODO: currently none of the test unit infrastructure
3056
                        // is setup to handle TLV payloads, so testing this
3057
                        // would require implementing a separate mock iterator
3058
                        // for TLV payloads that also supports injecting invalid
3059
                        // payloads. Deferring this non-trival effort till a
3060
                        // later date
3061
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3✔
3062

3✔
3063
                        l.sendHTLCError(
3✔
3064
                                add, sourceRef, NewLinkError(failure),
3✔
3065
                                obfuscator, false,
3✔
3066
                        )
3✔
3067

3✔
3068
                        l.log.Errorf("unable to decode forwarding "+
3✔
3069
                                "instructions: %v", pldErr)
3✔
3070

3✔
3071
                        continue
3✔
3072
                }
3073

3074
                // Retrieve onion obfuscator from onion blob in order to
3075
                // produce initial obfuscation of the onion failureCode.
3076
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
450✔
3077
                        l.cfg.ExtractErrorEncrypter,
450✔
3078
                        routeRole == hop.RouteRoleIntroduction,
450✔
3079
                )
450✔
3080
                if failureCode != lnwire.CodeNone {
451✔
3081
                        // If we're unable to process the onion blob than we
1✔
3082
                        // should send the malformed htlc error to payment
1✔
3083
                        // sender.
1✔
3084
                        l.sendMalformedHTLCError(
1✔
3085
                                add.ID, failureCode, add.OnionBlob,
1✔
3086
                                &sourceRef,
1✔
3087
                        )
1✔
3088

1✔
3089
                        l.log.Errorf("unable to decode onion "+
1✔
3090
                                "obfuscator: %v", failureCode)
1✔
3091

1✔
3092
                        continue
1✔
3093
                }
3094

3095
                fwdInfo := pld.ForwardingInfo()
449✔
3096

449✔
3097
                // Check whether the payload we've just processed uses our
449✔
3098
                // node as the introduction point (gave us a blinding key in
449✔
3099
                // the payload itself) and fail it back if we don't support
449✔
3100
                // route blinding.
449✔
3101
                if fwdInfo.NextBlinding.IsSome() &&
449✔
3102
                        l.cfg.DisallowRouteBlinding {
452✔
3103

3✔
3104
                        failure := lnwire.NewInvalidBlinding(
3✔
3105
                                fn.Some(add.OnionBlob),
3✔
3106
                        )
3✔
3107

3✔
3108
                        l.sendHTLCError(
3✔
3109
                                add, sourceRef, NewLinkError(failure),
3✔
3110
                                obfuscator, false,
3✔
3111
                        )
3✔
3112

3✔
3113
                        l.log.Error("rejected htlc that uses use as an " +
3✔
3114
                                "introduction point when we do not support " +
3✔
3115
                                "route blinding")
3✔
3116

3✔
3117
                        continue
3✔
3118
                }
3119

3120
                switch fwdInfo.NextHop {
449✔
3121
                case hop.Exit:
413✔
3122
                        err := l.processExitHop(
413✔
3123
                                add, sourceRef, obfuscator, fwdInfo,
413✔
3124
                                heightNow, pld,
413✔
3125
                        )
413✔
3126
                        if err != nil {
413✔
3127
                                l.failf(LinkFailureError{
×
3128
                                        code: ErrInternalError,
×
NEW
3129
                                }, "%v", err)
×
3130

×
3131
                                return
×
3132
                        }
×
3133

3134
                // There are additional channels left within this route. So
3135
                // we'll simply do some forwarding package book-keeping.
3136
                default:
39✔
3137
                        // If hodl.AddIncoming is requested, we will not
39✔
3138
                        // validate the forwarded ADD, nor will we send the
39✔
3139
                        // packet to the htlc switch.
39✔
3140
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
39✔
3141
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3142
                                continue
×
3143
                        }
3144

3145
                        endorseValue := l.experimentalEndorsement(
39✔
3146
                                record.CustomSet(add.CustomRecords),
39✔
3147
                        )
39✔
3148
                        endorseType := uint64(
39✔
3149
                                lnwire.ExperimentalEndorsementType,
39✔
3150
                        )
39✔
3151

39✔
3152
                        switch fwdPkg.State {
39✔
3153
                        case channeldb.FwdStateProcessed:
3✔
3154
                                // This add was not forwarded on the previous
3✔
3155
                                // processing phase, run it through our
3✔
3156
                                // validation pipeline to reproduce an error.
3✔
3157
                                // This may trigger a different error due to
3✔
3158
                                // expiring timelocks, but we expect that an
3✔
3159
                                // error will be reproduced.
3✔
3160
                                if !fwdPkg.FwdFilter.Contains(idx) {
3✔
3161
                                        break
×
3162
                                }
3163

3164
                                // Otherwise, it was already processed, we can
3165
                                // can collect it and continue.
3166
                                outgoingAdd := &lnwire.UpdateAddHTLC{
3✔
3167
                                        Expiry:        fwdInfo.OutgoingCTLV,
3✔
3168
                                        Amount:        fwdInfo.AmountToForward,
3✔
3169
                                        PaymentHash:   add.PaymentHash,
3✔
3170
                                        BlindingPoint: fwdInfo.NextBlinding,
3✔
3171
                                }
3✔
3172

3✔
3173
                                endorseValue.WhenSome(func(e byte) {
6✔
3174
                                        custRecords := map[uint64][]byte{
3✔
3175
                                                endorseType: {e},
3✔
3176
                                        }
3✔
3177

3✔
3178
                                        outgoingAdd.CustomRecords = custRecords
3✔
3179
                                })
3✔
3180

3181
                                // Finally, we'll encode the onion packet for
3182
                                // the _next_ hop using the hop iterator
3183
                                // decoded for the current hop.
3184
                                buf := bytes.NewBuffer(
3✔
3185
                                        outgoingAdd.OnionBlob[0:0],
3✔
3186
                                )
3✔
3187

3✔
3188
                                // We know this cannot fail, as this ADD
3✔
3189
                                // was marked forwarded in a previous
3✔
3190
                                // round of processing.
3✔
3191
                                chanIterator.EncodeNextHop(buf)
3✔
3192

3✔
3193
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
3194

3✔
3195
                                //nolint:ll
3✔
3196
                                updatePacket := &htlcPacket{
3✔
3197
                                        incomingChanID:       l.ShortChanID(),
3✔
3198
                                        incomingHTLCID:       add.ID,
3✔
3199
                                        outgoingChanID:       fwdInfo.NextHop,
3✔
3200
                                        sourceRef:            &sourceRef,
3✔
3201
                                        incomingAmount:       add.Amount,
3✔
3202
                                        amount:               outgoingAdd.Amount,
3✔
3203
                                        htlc:                 outgoingAdd,
3✔
3204
                                        obfuscator:           obfuscator,
3✔
3205
                                        incomingTimeout:      add.Expiry,
3✔
3206
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
3✔
3207
                                        inOnionCustomRecords: pld.CustomRecords(),
3✔
3208
                                        inboundFee:           inboundFee,
3✔
3209
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
3✔
3210
                                }
3✔
3211
                                switchPackets = append(
3✔
3212
                                        switchPackets, updatePacket,
3✔
3213
                                )
3✔
3214

3✔
3215
                                continue
3✔
3216
                        }
3217

3218
                        // TODO(roasbeef): ensure don't accept outrageous
3219
                        // timeout for htlc
3220

3221
                        // With all our forwarding constraints met, we'll
3222
                        // create the outgoing HTLC using the parameters as
3223
                        // specified in the forwarding info.
3224
                        addMsg := &lnwire.UpdateAddHTLC{
39✔
3225
                                Expiry:        fwdInfo.OutgoingCTLV,
39✔
3226
                                Amount:        fwdInfo.AmountToForward,
39✔
3227
                                PaymentHash:   add.PaymentHash,
39✔
3228
                                BlindingPoint: fwdInfo.NextBlinding,
39✔
3229
                        }
39✔
3230

39✔
3231
                        endorseValue.WhenSome(func(e byte) {
78✔
3232
                                addMsg.CustomRecords = map[uint64][]byte{
39✔
3233
                                        endorseType: {e},
39✔
3234
                                }
39✔
3235
                        })
39✔
3236

3237
                        // Finally, we'll encode the onion packet for the
3238
                        // _next_ hop using the hop iterator decoded for the
3239
                        // current hop.
3240
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
39✔
3241
                        err := chanIterator.EncodeNextHop(buf)
39✔
3242
                        if err != nil {
39✔
3243
                                l.log.Errorf("unable to encode the "+
×
3244
                                        "remaining route %v", err)
×
3245

×
3246
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
3247
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3248
                                }
×
3249

3250
                                failure := l.createFailureWithUpdate(
×
3251
                                        true, hop.Source, cb,
×
3252
                                )
×
3253

×
3254
                                l.sendHTLCError(
×
3255
                                        add, sourceRef, NewLinkError(failure),
×
3256
                                        obfuscator, false,
×
3257
                                )
×
3258
                                continue
×
3259
                        }
3260

3261
                        // Now that this add has been reprocessed, only append
3262
                        // it to our list of packets to forward to the switch
3263
                        // this is the first time processing the add. If the
3264
                        // fwd pkg has already been processed, then we entered
3265
                        // the above section to recreate a previous error.  If
3266
                        // the packet had previously been forwarded, it would
3267
                        // have been added to switchPackets at the top of this
3268
                        // section.
3269
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
78✔
3270
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
39✔
3271

39✔
3272
                                //nolint:ll
39✔
3273
                                updatePacket := &htlcPacket{
39✔
3274
                                        incomingChanID:       l.ShortChanID(),
39✔
3275
                                        incomingHTLCID:       add.ID,
39✔
3276
                                        outgoingChanID:       fwdInfo.NextHop,
39✔
3277
                                        sourceRef:            &sourceRef,
39✔
3278
                                        incomingAmount:       add.Amount,
39✔
3279
                                        amount:               addMsg.Amount,
39✔
3280
                                        htlc:                 addMsg,
39✔
3281
                                        obfuscator:           obfuscator,
39✔
3282
                                        incomingTimeout:      add.Expiry,
39✔
3283
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
39✔
3284
                                        inOnionCustomRecords: pld.CustomRecords(),
39✔
3285
                                        inboundFee:           inboundFee,
39✔
3286
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
39✔
3287
                                }
39✔
3288

39✔
3289
                                fwdPkg.FwdFilter.Set(idx)
39✔
3290
                                switchPackets = append(switchPackets,
39✔
3291
                                        updatePacket)
39✔
3292
                        }
39✔
3293
                }
3294
        }
3295

3296
        // Commit the htlcs we are intending to forward if this package has not
3297
        // been fully processed.
3298
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,355✔
3299
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,176✔
3300
                if err != nil {
1,176✔
3301
                        l.failf(LinkFailureError{code: ErrInternalError},
×
3302
                                "unable to set fwd filter: %v", err)
×
3303
                        return
×
3304
                }
×
3305
        }
3306

3307
        if len(switchPackets) == 0 {
2,322✔
3308
                return
1,143✔
3309
        }
1,143✔
3310

3311
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
39✔
3312
                len(switchPackets), reforward)
39✔
3313

39✔
3314
        // NOTE: This call is made synchronous so that we ensure all circuits
39✔
3315
        // are committed in the exact order that they are processed in the link.
39✔
3316
        // Failing to do this could cause reorderings/gaps in the range of
39✔
3317
        // opened circuits, which violates assumptions made by the circuit
39✔
3318
        // trimming.
39✔
3319
        l.forwardBatch(reforward, switchPackets...)
39✔
3320
}
3321

3322
// experimentalEndorsement returns the value to set for our outgoing
3323
// experimental endorsement field, and a boolean indicating whether it should
3324
// be populated on the outgoing htlc.
3325
func (l *channelLink) experimentalEndorsement(
3326
        customUpdateAdd record.CustomSet) fn.Option[byte] {
39✔
3327

39✔
3328
        // Only relay experimental signal if we are within the experiment
39✔
3329
        // period.
39✔
3330
        if !l.cfg.ShouldFwdExpEndorsement() {
42✔
3331
                return fn.None[byte]()
3✔
3332
        }
3✔
3333

3334
        // If we don't have any custom records or the experimental field is
3335
        // not set, just forward a zero value.
3336
        if len(customUpdateAdd) == 0 {
78✔
3337
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
39✔
3338
        }
39✔
3339

3340
        t := uint64(lnwire.ExperimentalEndorsementType)
3✔
3341
        value, set := customUpdateAdd[t]
3✔
3342
        if !set {
3✔
3343
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3344
        }
×
3345

3346
        // We expect at least one byte for this field, consider it invalid if
3347
        // it has no data and just forward a zero value.
3348
        if len(value) == 0 {
3✔
3349
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
3350
        }
×
3351

3352
        // Only forward endorsed if the incoming link is endorsed.
3353
        if value[0] == lnwire.ExperimentalEndorsed {
6✔
3354
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
3✔
3355
        }
3✔
3356

3357
        // Forward as unendorsed otherwise, including cases where we've
3358
        // received an invalid value that uses more than 3 bits of information.
3359
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
3✔
3360
}
3361

3362
// processExitHop handles an htlc for which this link is the exit hop. It
3363
// returns a boolean indicating whether the commitment tx needs an update.
3364
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3365
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3366
        fwdInfo hop.ForwardingInfo, heightNow uint32,
3367
        payload invoices.Payload) error {
413✔
3368

413✔
3369
        // If hodl.ExitSettle is requested, we will not validate the final hop's
413✔
3370
        // ADD, nor will we settle the corresponding invoice or respond with the
413✔
3371
        // preimage.
413✔
3372
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
523✔
3373
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
110✔
3374
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
110✔
3375

110✔
3376
                return nil
110✔
3377
        }
110✔
3378

3379
        // In case the traffic shaper is active, we'll check if the HTLC has
3380
        // custom records and skip the amount check in the onion payload below.
3381
        isCustomHTLC := fn.MapOptionZ(
306✔
3382
                l.cfg.AuxTrafficShaper,
306✔
3383
                func(ts AuxTrafficShaper) bool {
306✔
3384
                        return ts.IsCustomHTLC(add.CustomRecords)
×
3385
                },
×
3386
        )
3387

3388
        // As we're the exit hop, we'll double check the hop-payload included in
3389
        // the HTLC to ensure that it was crafted correctly by the sender and
3390
        // is compatible with the HTLC we were extended. If an external
3391
        // validator is active we might bypass the amount check.
3392
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
406✔
3393
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
3394
                        "incompatible value: expected <=%v, got %v",
100✔
3395
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
3396

100✔
3397
                failure := NewLinkError(
100✔
3398
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
3399
                )
100✔
3400
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
3401

100✔
3402
                return nil
100✔
3403
        }
100✔
3404

3405
        // We'll also ensure that our time-lock value has been computed
3406
        // correctly.
3407
        if add.Expiry < fwdInfo.OutgoingCTLV {
207✔
3408
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
3409
                        "incompatible time-lock: expected <=%v, got %v",
1✔
3410
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
3411

1✔
3412
                failure := NewLinkError(
1✔
3413
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
3414
                )
1✔
3415

1✔
3416
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
3417

1✔
3418
                return nil
1✔
3419
        }
1✔
3420

3421
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3422
        // after this, this code will be re-executed after restart. We will
3423
        // receive back a resolution event.
3424
        invoiceHash := lntypes.Hash(add.PaymentHash)
205✔
3425

205✔
3426
        circuitKey := models.CircuitKey{
205✔
3427
                ChanID: l.ShortChanID(),
205✔
3428
                HtlcID: add.ID,
205✔
3429
        }
205✔
3430

205✔
3431
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
205✔
3432
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
205✔
3433
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
205✔
3434
        )
205✔
3435
        if err != nil {
205✔
3436
                return err
×
3437
        }
×
3438

3439
        // Create a hodlHtlc struct and decide either resolved now or later.
3440
        htlc := hodlHtlc{
205✔
3441
                add:        add,
205✔
3442
                sourceRef:  sourceRef,
205✔
3443
                obfuscator: obfuscator,
205✔
3444
        }
205✔
3445

205✔
3446
        // If the event is nil, the invoice is being held, so we save payment
205✔
3447
        // descriptor for future reference.
205✔
3448
        if event == nil {
264✔
3449
                l.hodlMap[circuitKey] = htlc
59✔
3450
                return nil
59✔
3451
        }
59✔
3452

3453
        // Process the received resolution.
3454
        return l.processHtlcResolution(event, htlc)
149✔
3455
}
3456

3457
// settleHTLC settles the HTLC on the channel.
3458
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3459
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
200✔
3460

200✔
3461
        hash := preimage.Hash()
200✔
3462

200✔
3463
        l.log.Infof("settling htlc %v as exit hop", hash)
200✔
3464

200✔
3465
        err := l.channel.SettleHTLC(
200✔
3466
                preimage, htlcIndex, &sourceRef, nil, nil,
200✔
3467
        )
200✔
3468
        if err != nil {
200✔
3469
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3470
        }
×
3471

3472
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3473
        // fake one before sending it to the peer.
3474
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
203✔
3475
                l.log.Warnf(hodl.BogusSettle.Warning())
3✔
3476
                preimage = [32]byte{}
3✔
3477
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3✔
3478
        }
3✔
3479

3480
        // HTLC was successfully settled locally send notification about it
3481
        // remote peer.
3482
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
200✔
3483
                ChanID:          l.ChanID(),
200✔
3484
                ID:              htlcIndex,
200✔
3485
                PaymentPreimage: preimage,
200✔
3486
        })
200✔
3487
        if err != nil {
200✔
3488
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
3489
        }
×
3490

3491
        // Once we have successfully settled the htlc, notify a settle event.
3492
        l.cfg.HtlcNotifier.NotifySettleEvent(
200✔
3493
                HtlcKey{
200✔
3494
                        IncomingCircuit: models.CircuitKey{
200✔
3495
                                ChanID: l.ShortChanID(),
200✔
3496
                                HtlcID: htlcIndex,
200✔
3497
                        },
200✔
3498
                },
200✔
3499
                preimage,
200✔
3500
                HtlcEventTypeReceive,
200✔
3501
        )
200✔
3502

200✔
3503
        return nil
200✔
3504
}
3505

3506
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3507
// err chan for the individual responses. This method is intended to be spawned
3508
// as a goroutine so the responses can be handled in the background.
3509
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
579✔
3510
        // Don't forward packets for which we already have a response in our
579✔
3511
        // mailbox. This could happen if a packet fails and is buffered in the
579✔
3512
        // mailbox, and the incoming link flaps.
579✔
3513
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
579✔
3514
        for _, pkt := range packets {
1,158✔
3515
                if l.mailBox.HasPacket(pkt.inKey()) {
582✔
3516
                        continue
3✔
3517
                }
3518

3519
                filteredPkts = append(filteredPkts, pkt)
579✔
3520
        }
3521

3522
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
579✔
3523
        if err != nil {
590✔
3524
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
3525
                        "settle/fail over htlcswitch: %v", err)
11✔
3526
        }
11✔
3527
}
3528

3529
// sendHTLCError functions cancels HTLC and send cancel message back to the
3530
// peer from which HTLC was received.
3531
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
3532
        sourceRef channeldb.AddRef, failure *LinkError,
3533
        e hop.ErrorEncrypter, isReceive bool) {
108✔
3534

108✔
3535
        reason, err := e.EncryptFirstHop(failure.WireMessage())
108✔
3536
        if err != nil {
108✔
3537
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3538
                return
×
3539
        }
×
3540

3541
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
108✔
3542
        if err != nil {
108✔
3543
                l.log.Errorf("unable cancel htlc: %v", err)
×
3544
                return
×
3545
        }
×
3546

3547
        // Send the appropriate failure message depending on whether we're
3548
        // in a blinded route or not.
3549
        if err := l.sendIncomingHTLCFailureMsg(
108✔
3550
                add.ID, e, reason,
108✔
3551
        ); err != nil {
108✔
3552
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3553
                return
×
3554
        }
×
3555

3556
        // Notify a link failure on our incoming link. Outgoing htlc information
3557
        // is not available at this point, because we have not decrypted the
3558
        // onion, so it is excluded.
3559
        var eventType HtlcEventType
108✔
3560
        if isReceive {
216✔
3561
                eventType = HtlcEventTypeReceive
108✔
3562
        } else {
111✔
3563
                eventType = HtlcEventTypeForward
3✔
3564
        }
3✔
3565

3566
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
108✔
3567
                HtlcKey{
108✔
3568
                        IncomingCircuit: models.CircuitKey{
108✔
3569
                                ChanID: l.ShortChanID(),
108✔
3570
                                HtlcID: add.ID,
108✔
3571
                        },
108✔
3572
                },
108✔
3573
                HtlcInfo{
108✔
3574
                        IncomingTimeLock: add.Expiry,
108✔
3575
                        IncomingAmt:      add.Amount,
108✔
3576
                },
108✔
3577
                eventType,
108✔
3578
                failure,
108✔
3579
                true,
108✔
3580
        )
108✔
3581
}
3582

3583
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3584
// peer from which the HTLC was received. This function is primarily used to
3585
// handle the special requirements of route blinding, specifically:
3586
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3587
// - Introduction nodes should return regular HTLC failure messages.
3588
//
3589
// It accepts the original opaque failure, which will be used in the case
3590
// that we're not part of a blinded route and an error encrypter that'll be
3591
// used if we are the introduction node and need to present an error as if
3592
// we're the failing party.
3593
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3594
        e hop.ErrorEncrypter,
3595
        originalFailure lnwire.OpaqueReason) error {
124✔
3596

124✔
3597
        var msg lnwire.Message
124✔
3598
        switch {
124✔
3599
        // Our circuit's error encrypter will be nil if this was a locally
3600
        // initiated payment. We can only hit a blinded error for a locally
3601
        // initiated payment if we allow ourselves to be picked as the
3602
        // introduction node for our own payments and in that case we
3603
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3604
        // we fail it back and log an error.
3605
        // code.
3606
        case e == nil:
×
3607
                msg = &lnwire.UpdateFailHTLC{
×
3608
                        ChanID: l.ChanID(),
×
3609
                        ID:     htlcIndex,
×
3610
                        Reason: originalFailure,
×
3611
                }
×
3612

×
3613
                l.log.Errorf("Unexpected blinded failure when "+
×
3614
                        "we are the sending node, incoming htlc: %v(%v)",
×
3615
                        l.ShortChanID(), htlcIndex)
×
3616

3617
        // For cleartext hops (ie, non-blinded/normal) we don't need any
3618
        // transformation on the error message and can just send the original.
3619
        case !e.Type().IsBlinded():
124✔
3620
                msg = &lnwire.UpdateFailHTLC{
124✔
3621
                        ChanID: l.ChanID(),
124✔
3622
                        ID:     htlcIndex,
124✔
3623
                        Reason: originalFailure,
124✔
3624
                }
124✔
3625

3626
        // When we're the introduction node, we need to convert the error to
3627
        // a UpdateFailHTLC.
3628
        case e.Type() == hop.EncrypterTypeIntroduction:
3✔
3629
                l.log.Debugf("Introduction blinded node switching out failure "+
3✔
3630
                        "error: %v", htlcIndex)
3✔
3631

3✔
3632
                // The specification does not require that we set the onion
3✔
3633
                // blob.
3✔
3634
                failureMsg := lnwire.NewInvalidBlinding(
3✔
3635
                        fn.None[[lnwire.OnionPacketSize]byte](),
3✔
3636
                )
3✔
3637
                reason, err := e.EncryptFirstHop(failureMsg)
3✔
3638
                if err != nil {
3✔
3639
                        return err
×
3640
                }
×
3641

3642
                msg = &lnwire.UpdateFailHTLC{
3✔
3643
                        ChanID: l.ChanID(),
3✔
3644
                        ID:     htlcIndex,
3✔
3645
                        Reason: reason,
3✔
3646
                }
3✔
3647

3648
        // If we are a relaying node, we need to switch out any error that
3649
        // we've received to a malformed HTLC error.
3650
        case e.Type() == hop.EncrypterTypeRelaying:
3✔
3651
                l.log.Debugf("Relaying blinded node switching out malformed "+
3✔
3652
                        "error: %v", htlcIndex)
3✔
3653

3✔
3654
                msg = &lnwire.UpdateFailMalformedHTLC{
3✔
3655
                        ChanID:      l.ChanID(),
3✔
3656
                        ID:          htlcIndex,
3✔
3657
                        FailureCode: lnwire.CodeInvalidBlinding,
3✔
3658
                }
3✔
3659

3660
        default:
×
3661
                return fmt.Errorf("unexpected encrypter: %d", e)
×
3662
        }
3663

3664
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
124✔
3665
                l.log.Warnf("Send update fail failed: %v", err)
×
3666
        }
×
3667

3668
        return nil
124✔
3669
}
3670

3671
// sendMalformedHTLCError helper function which sends the malformed HTLC update
3672
// to the payment sender.
3673
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3674
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
3675
        sourceRef *channeldb.AddRef) {
6✔
3676

6✔
3677
        shaOnionBlob := sha256.Sum256(onionBlob[:])
6✔
3678
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
6✔
3679
        if err != nil {
6✔
3680
                l.log.Errorf("unable cancel htlc: %v", err)
×
3681
                return
×
3682
        }
×
3683

3684
        err = l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
6✔
3685
                ChanID:       l.ChanID(),
6✔
3686
                ID:           htlcIndex,
6✔
3687
                ShaOnionBlob: shaOnionBlob,
6✔
3688
                FailureCode:  code,
6✔
3689
        })
6✔
3690
        if err != nil {
6✔
3691
                l.log.Errorf("failed to send UpdateFailMalformedHTLC: %v", err)
×
3692
        }
×
3693
}
3694

3695
// failf is a function which is used to encapsulate the action necessary for
3696
// properly failing the link. It takes a LinkFailureError, which will be passed
3697
// to the OnChannelFailure closure, in order for it to determine if we should
3698
// force close the channel, and if we should send an error message to the
3699
// remote peer.
3700
func (l *channelLink) failf(linkErr LinkFailureError, format string,
3701
        a ...interface{}) {
18✔
3702

18✔
3703
        reason := fmt.Errorf(format, a...)
18✔
3704

18✔
3705
        // Return if we have already notified about a failure.
18✔
3706
        if l.failed {
21✔
3707
                l.log.Warnf("ignoring link failure (%v), as link already "+
3✔
3708
                        "failed", reason)
3✔
3709
                return
3✔
3710
        }
3✔
3711

3712
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
18✔
3713

18✔
3714
        // Set failed, such that we won't process any more updates, and notify
18✔
3715
        // the peer about the failure.
18✔
3716
        l.failed = true
18✔
3717
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
18✔
3718
}
3719

3720
// FundingCustomBlob returns the custom funding blob of the channel that this
3721
// link is associated with. The funding blob represents static information about
3722
// the channel that was created at channel funding time.
3723
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
3724
        if l.channel == nil {
×
3725
                return fn.None[tlv.Blob]()
×
3726
        }
×
3727

3728
        if l.channel.State() == nil {
×
3729
                return fn.None[tlv.Blob]()
×
3730
        }
×
3731

3732
        return l.channel.State().CustomBlob
×
3733
}
3734

3735
// CommitmentCustomBlob returns the custom blob of the current local commitment
3736
// of the channel that this link is associated with.
3737
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
3738
        if l.channel == nil {
×
3739
                return fn.None[tlv.Blob]()
×
3740
        }
×
3741

3742
        return l.channel.LocalCommitmentBlob()
×
3743
}
3744

3745
// handleHtlcResolution takes an HTLC resolution and processes it by draining
3746
// the hodlQueue. Once processed, a commit_sig is sent to the remote to update
3747
// their commitment.
3748
func (l *channelLink) handleHtlcResolution(ctx context.Context,
3749
        hodlItem any) error {
58✔
3750

58✔
3751
        htlcResolution, ok := hodlItem.(invoices.HtlcResolution)
58✔
3752
        if !ok {
58✔
3753
                return fmt.Errorf("expect HtlcResolution, got %T", hodlItem)
×
3754
        }
×
3755

3756
        err := l.processHodlQueue(ctx, htlcResolution)
58✔
3757
        // No error, success.
58✔
3758
        if err == nil {
115✔
3759
                return nil
57✔
3760
        }
57✔
3761

3762
        switch {
1✔
3763
        // If the duplicate keystone error was encountered, fail back
3764
        // gracefully.
3765
        case errors.Is(err, ErrDuplicateKeystone):
×
3766
                l.failf(
×
3767
                        LinkFailureError{
×
3768
                                code: ErrCircuitError,
×
3769
                        },
×
3770
                        "process hodl queue: temporary circuit error: %v", err,
×
3771
                )
×
3772

3773
        // Send an Error message to the peer.
3774
        default:
1✔
3775
                l.failf(
1✔
3776
                        LinkFailureError{
1✔
3777
                                code: ErrInternalError,
1✔
3778
                        },
1✔
3779
                        "process hodl queue: unable to update commitment: %v",
1✔
3780
                        err,
1✔
3781
                )
1✔
3782
        }
3783

3784
        return err
1✔
3785
}
3786

3787
// handleQuiescenceReq takes a locally initialized (RPC) quiescence request and
3788
// forwards it to the quiescer for further processing.
3789
func (l *channelLink) handleQuiescenceReq(req StfuReq) error {
4✔
3790
        l.quiescer.InitStfu(req)
4✔
3791

4✔
3792
        if !l.noDanglingUpdates(lntypes.Local) {
4✔
3793
                return nil
×
3794
        }
×
3795

3796
        err := l.quiescer.SendOwedStfu()
4✔
3797
        if err != nil {
4✔
NEW
3798
                l.stfuFailf("SendOwedStfu: %v", err)
×
3799
                res := fn.Err[lntypes.ChannelParty](err)
×
3800
                req.Resolve(res)
×
3801
        }
×
3802

3803
        return err
4✔
3804
}
3805

3806
// handleUpdateFee is called whenever the `updateFeeTimer` ticks. It is used to
3807
// decide whether we should send an `update_fee` msg to update the commitment's
3808
// feerate.
3809
func (l *channelLink) handleUpdateFee(ctx context.Context) error {
4✔
3810
        // If we're not the initiator of the channel, we don't control the fees,
4✔
3811
        // so we can ignore this.
4✔
3812
        if !l.channel.IsInitiator() {
4✔
3813
                return nil
×
3814
        }
×
3815

3816
        // If we are the initiator, then we'll sample the current fee rate to
3817
        // get into the chain within 3 blocks.
3818
        netFee, err := l.sampleNetworkFee()
4✔
3819
        if err != nil {
4✔
3820
                return fmt.Errorf("unable to sample network fee: %w", err)
×
3821
        }
×
3822

3823
        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
3824

4✔
3825
        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
3826
                netFee, minRelayFee,
4✔
3827
                l.cfg.MaxAnchorsCommitFeeRate,
4✔
3828
                l.cfg.MaxFeeAllocation,
4✔
3829
        )
4✔
3830

4✔
3831
        // We determine if we should adjust the commitment fee based on the
4✔
3832
        // current commitment fee, the suggested new commitment fee and the
4✔
3833
        // current minimum relay fee rate.
4✔
3834
        commitFee := l.channel.CommitFeeRate()
4✔
3835
        if !shouldAdjustCommitFee(newCommitFee, commitFee, minRelayFee) {
5✔
3836
                return nil
1✔
3837
        }
1✔
3838

3839
        // If we do, then we'll send a new UpdateFee message to the remote
3840
        // party, to be locked in with a new update.
3841
        err = l.updateChannelFee(ctx, newCommitFee)
3✔
3842
        if err != nil {
3✔
3843
                return fmt.Errorf("unable to update fee rate: %w", err)
×
3844
        }
×
3845

3846
        return nil
3✔
3847
}
3848

3849
// toggleBatchTicker checks whether we need to resume or pause the batch ticker.
3850
// When we have no pending updates, the ticker is paused, otherwise resumed.
3851
func (l *channelLink) toggleBatchTicker() {
4,152✔
3852
        // If the previous event resulted in a non-empty batch, resume the batch
4,152✔
3853
        // ticker so that it can be cleared. Otherwise pause the ticker to
4,152✔
3854
        // prevent waking up the htlcManager while the batch is empty.
4,152✔
3855
        numUpdates := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
4,152✔
3856
        if numUpdates > 0 {
4,685✔
3857
                l.cfg.BatchTicker.Resume()
533✔
3858
                l.log.Tracef("BatchTicker resumed, NumPendingUpdates(Local, "+
533✔
3859
                        "Remote)=%d", numUpdates)
533✔
3860

533✔
3861
                return
533✔
3862
        }
533✔
3863

3864
        l.cfg.BatchTicker.Pause()
3,622✔
3865
        l.log.Trace("BatchTicker paused due to zero NumPendingUpdates" +
3,622✔
3866
                "(Local, Remote)")
3,622✔
3867
}
3868

3869
// resumeLink is called when starting a previous link. It will go through the
3870
// reestablishment protocol and reforwarding packets that are yet resolved.
3871
func (l *channelLink) resumeLink(ctx context.Context) error {
216✔
3872
        // If this isn't the first time that this channel link has been created,
216✔
3873
        // then we'll need to check to see if we need to re-synchronize state
216✔
3874
        // with the remote peer. settledHtlcs is a map of HTLC's that we
216✔
3875
        // re-settled as part of the channel state sync.
216✔
3876
        if l.cfg.SyncStates {
389✔
3877
                err := l.syncChanStates(ctx)
173✔
3878
                if err != nil {
176✔
3879
                        l.handleChanSyncErr(err)
3✔
3880

3✔
3881
                        return err
3✔
3882
                }
3✔
3883
        }
3884

3885
        // If a shutdown message has previously been sent on this link, then we
3886
        // need to make sure that we have disabled any HTLC adds on the outgoing
3887
        // direction of the link and that we re-resend the same shutdown message
3888
        // that we previously sent.
3889
        //
3890
        // TODO(yy): we should either move this to chanCloser, or move all
3891
        // shutdown handling logic to be managed by the link, but not a mixed of
3892
        // partial management by two subsystems.
3893
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
219✔
3894
                // Immediately disallow any new outgoing HTLCs.
3✔
3895
                if !l.DisableAdds(Outgoing) {
3✔
3896
                        l.log.Warnf("Outgoing link adds already disabled")
×
3897
                }
×
3898

3899
                // Re-send the shutdown message the peer. Since syncChanStates
3900
                // would have sent any outstanding CommitSig, it is fine for us
3901
                // to immediately queue the shutdown message now.
3902
                err := l.cfg.Peer.SendMessage(false, &shutdown)
3✔
3903
                if err != nil {
3✔
3904
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
3905
                }
×
3906
        })
3907

3908
        // We've successfully reestablished the channel, mark it as such to
3909
        // allow the switch to forward HTLCs in the outbound direction.
3910
        l.markReestablished()
216✔
3911

216✔
3912
        // With the channel states synced, we now reset the mailbox to ensure we
216✔
3913
        // start processing all unacked packets in order. This is done here to
216✔
3914
        // ensure that all acknowledgments that occur during channel
216✔
3915
        // resynchronization have taken affect, causing us only to pull unacked
216✔
3916
        // packets after starting to read from the downstream mailbox.
216✔
3917
        err := l.mailBox.ResetPackets()
216✔
3918
        if err != nil {
216✔
3919
                l.log.Errorf("failed to reset packets: %v", err)
×
3920
        }
×
3921

3922
        // If the channel is pending, there's no need to reforwarding packets.
3923
        if l.ShortChanID() == hop.Source {
216✔
3924
                return nil
×
3925
        }
×
3926

3927
        // After cleaning up any memory pertaining to incoming packets, we now
3928
        // replay our forwarding packages to handle any htlcs that can be
3929
        // processed locally, or need to be forwarded out to the switch. We will
3930
        // only attempt to resolve packages if our short chan id indicates that
3931
        // the channel is not pending, otherwise we should have no htlcs to
3932
        // reforward.
3933
        err = l.resolveFwdPkgs(ctx)
216✔
3934
        switch {
216✔
3935
        // No error was encountered, success.
3936
        case err == nil:
215✔
3937
                // With our link's in-memory state fully reconstructed, spawn a
215✔
3938
                // goroutine to manage the reclamation of disk space occupied by
215✔
3939
                // completed forwarding packages.
215✔
3940
                l.cg.WgAdd(1)
215✔
3941
                go l.fwdPkgGarbager()
215✔
3942

215✔
3943
                return nil
215✔
3944

3945
        // If the duplicate keystone error was encountered, we'll fail without
3946
        // sending an Error message to the peer.
3947
        case errors.Is(err, ErrDuplicateKeystone):
×
3948
                l.failf(LinkFailureError{code: ErrCircuitError},
×
3949
                        "temporary circuit error: %v", err)
×
3950

3951
        // A non-nil error was encountered, send an Error message to
3952
        // the peer.
3953
        default:
1✔
3954
                l.failf(LinkFailureError{code: ErrInternalError},
1✔
3955
                        "unable to resolve fwd pkgs: %v", err)
1✔
3956
        }
3957

3958
        return err
1✔
3959
}
3960

3961
// processRemoteUpdateAddHTLC takes an `UpdateAddHTLC` msg sent from the remote
3962
// and processes it.
3963
func (l *channelLink) processRemoteUpdateAddHTLC(
3964
        msg *lnwire.UpdateAddHTLC) error {
453✔
3965

453✔
3966
        if l.IsFlushing(Incoming) {
453✔
3967
                // This is forbidden by the protocol specification. The best
×
3968
                // chance we have to deal with this is to drop the connection.
×
3969
                // This should roll back the channel state to the last
×
3970
                // CommitSig. If the remote has already sent a CommitSig we
×
3971
                // haven't received yet, channel state will be re-synchronized
×
3972
                // with a ChannelReestablish message upon reconnection and the
×
3973
                // protocol state that caused us to flush the link will be
×
3974
                // rolled back. In the event that there was some
×
3975
                // non-deterministic behavior in the remote that caused them to
×
3976
                // violate the protocol, we have a decent shot at correcting it
×
3977
                // this way, since reconnecting will put us in the cleanest
×
3978
                // possible state to try again.
×
3979
                //
×
3980
                // In addition to the above, it is possible for us to hit this
×
3981
                // case in situations where we improperly handle message
×
3982
                // ordering due to concurrency choices. An issue has been filed
×
3983
                // to address this here:
×
3984
                // https://github.com/lightningnetwork/lnd/issues/8393
×
3985
                err := errors.New("received add while link is flushing")
×
3986
                l.failf(
×
3987
                        LinkFailureError{
×
3988
                                code:             ErrInvalidUpdate,
×
3989
                                FailureAction:    LinkFailureDisconnect,
×
3990
                                PermanentFailure: false,
×
3991
                                Warning:          true,
×
NEW
3992
                        }, "%v", err,
×
3993
                )
×
3994

×
3995
                return err
×
3996
        }
×
3997

3998
        // Disallow htlcs with blinding points set if we haven't enabled the
3999
        // feature. This saves us from having to process the onion at all, but
4000
        // will only catch blinded payments where we are a relaying node (as the
4001
        // blinding point will be in the payload when we're the introduction
4002
        // node).
4003
        if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
453✔
4004
                err := errors.New("blinding point included when route " +
×
4005
                        "blinding is disabled")
×
4006

×
NEW
4007
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err)
×
4008

×
4009
                return err
×
4010
        }
×
4011

4012
        // We have to check the limit here rather than later in the switch
4013
        // because the counterparty can keep sending HTLC's without sending a
4014
        // revoke. This would mean that the switch check would only occur later.
4015
        if l.isOverexposedWithHtlc(msg, true) {
453✔
4016
                err := errors.New("peer sent us an HTLC that exceeded our " +
×
4017
                        "max fee exposure")
×
NEW
4018
                l.failf(LinkFailureError{code: ErrInternalError}, "%v", err)
×
4019

×
4020
                return err
×
4021
        }
×
4022

4023
        // We just received an add request from an upstream peer, so we add it
4024
        // to our state machine, then add the HTLC to our "settle" list in the
4025
        // event that we know the preimage.
4026
        index, err := l.channel.ReceiveHTLC(msg)
453✔
4027
        if err != nil {
453✔
4028
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4029
                        "unable to handle upstream add HTLC: %v", err)
×
4030

×
4031
                return err
×
4032
        }
×
4033

4034
        l.log.Tracef("receive upstream htlc with payment hash(%x), "+
453✔
4035
                "assigning index: %v", msg.PaymentHash[:], index)
453✔
4036

453✔
4037
        return nil
453✔
4038
}
4039

4040
// processRemoteUpdateFulfillHTLC takes an `UpdateFulfillHTLC` msg sent from the
4041
// remote and processes it.
4042
func (l *channelLink) processRemoteUpdateFulfillHTLC(
4043
        msg *lnwire.UpdateFulfillHTLC) error {
230✔
4044

230✔
4045
        pre := msg.PaymentPreimage
230✔
4046
        idx := msg.ID
230✔
4047

230✔
4048
        // Before we pipeline the settle, we'll check the set of active htlc's
230✔
4049
        // to see if the related UpdateAddHTLC has been fully locked-in.
230✔
4050
        var lockedin bool
230✔
4051
        htlcs := l.channel.ActiveHtlcs()
230✔
4052
        for _, add := range htlcs {
1,243✔
4053
                // The HTLC will be outgoing and match idx.
1,013✔
4054
                if !add.Incoming && add.HtlcIndex == idx {
1,241✔
4055
                        lockedin = true
228✔
4056
                        break
228✔
4057
                }
4058
        }
4059

4060
        if !lockedin {
232✔
4061
                err := errors.New("unable to handle upstream settle")
2✔
4062
                l.failf(LinkFailureError{code: ErrInvalidUpdate}, "%v", err)
2✔
4063

2✔
4064
                return err
2✔
4065
        }
2✔
4066

4067
        if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
231✔
4068
                l.failf(
3✔
4069
                        LinkFailureError{
3✔
4070
                                code:          ErrInvalidUpdate,
3✔
4071
                                FailureAction: LinkFailureForceClose,
3✔
4072
                        },
3✔
4073
                        "unable to handle upstream settle HTLC: %v", err,
3✔
4074
                )
3✔
4075

3✔
4076
                return err
3✔
4077
        }
3✔
4078

4079
        settlePacket := &htlcPacket{
228✔
4080
                outgoingChanID: l.ShortChanID(),
228✔
4081
                outgoingHTLCID: idx,
228✔
4082
                htlc: &lnwire.UpdateFulfillHTLC{
228✔
4083
                        PaymentPreimage: pre,
228✔
4084
                },
228✔
4085
        }
228✔
4086

228✔
4087
        // Add the newly discovered preimage to our growing list of uncommitted
228✔
4088
        // preimage. These will be written to the witness cache just before
228✔
4089
        // accepting the next commitment signature from the remote peer.
228✔
4090
        l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
228✔
4091

228✔
4092
        // Pipeline this settle, send it to the switch.
228✔
4093
        go l.forwardBatch(false, settlePacket)
228✔
4094

228✔
4095
        return nil
228✔
4096
}
4097

4098
// processRemoteUpdateFailMalformedHTLC takes an `UpdateFailMalformedHTLC` msg
4099
// sent from the remote and processes it.
4100
func (l *channelLink) processRemoteUpdateFailMalformedHTLC(
4101
        msg *lnwire.UpdateFailMalformedHTLC) error {
6✔
4102

6✔
4103
        // Convert the failure type encoded within the HTLC fail message to the
6✔
4104
        // proper generic lnwire error code.
6✔
4105
        var failure lnwire.FailureMessage
6✔
4106
        switch msg.FailureCode {
6✔
4107
        case lnwire.CodeInvalidOnionVersion:
4✔
4108
                failure = &lnwire.FailInvalidOnionVersion{
4✔
4109
                        OnionSHA256: msg.ShaOnionBlob,
4✔
4110
                }
4✔
4111
        case lnwire.CodeInvalidOnionHmac:
×
4112
                failure = &lnwire.FailInvalidOnionHmac{
×
4113
                        OnionSHA256: msg.ShaOnionBlob,
×
4114
                }
×
4115

4116
        case lnwire.CodeInvalidOnionKey:
×
4117
                failure = &lnwire.FailInvalidOnionKey{
×
4118
                        OnionSHA256: msg.ShaOnionBlob,
×
4119
                }
×
4120

4121
        // Handle malformed errors that are part of a blinded route. This case
4122
        // is slightly different, because we expect every relaying node in the
4123
        // blinded portion of the route to send malformed errors. If we're also
4124
        // a relaying node, we're likely going to switch this error out anyway
4125
        // for our own malformed error, but we handle the case here for
4126
        // completeness.
4127
        case lnwire.CodeInvalidBlinding:
3✔
4128
                failure = &lnwire.FailInvalidBlinding{
3✔
4129
                        OnionSHA256: msg.ShaOnionBlob,
3✔
4130
                }
3✔
4131

4132
        default:
2✔
4133
                l.log.Warnf("unexpected failure code received in "+
2✔
4134
                        "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
4135

2✔
4136
                // We don't just pass back the error we received from our
2✔
4137
                // successor. Otherwise we might report a failure that penalizes
2✔
4138
                // us more than needed. If the onion that we forwarded was
2✔
4139
                // correct, the node should have been able to send back its own
2✔
4140
                // failure. The node did not send back its own failure, so we
2✔
4141
                // assume there was a problem with the onion and report that
2✔
4142
                // back. We reuse the invalid onion key failure because there is
2✔
4143
                // no specific error for this case.
2✔
4144
                failure = &lnwire.FailInvalidOnionKey{
2✔
4145
                        OnionSHA256: msg.ShaOnionBlob,
2✔
4146
                }
2✔
4147
        }
4148

4149
        // With the error parsed, we'll convert the into it's opaque form.
4150
        var b bytes.Buffer
6✔
4151
        if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
6✔
4152
                return fmt.Errorf("unable to encode malformed error: %w", err)
×
4153
        }
×
4154

4155
        // If remote side have been unable to parse the onion blob we have sent
4156
        // to it, than we should transform the malformed HTLC message to the
4157
        // usual HTLC fail message.
4158
        err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
6✔
4159
        if err != nil {
6✔
4160
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4161
                        "unable to handle upstream fail HTLC: %v", err)
×
4162

×
4163
                return err
×
4164
        }
×
4165

4166
        return nil
6✔
4167
}
4168

4169
// processRemoteUpdateFailHTLC takes an `UpdateFailHTLC` msg sent from the
4170
// remote and processes it.
4171
func (l *channelLink) processRemoteUpdateFailHTLC(
4172
        msg *lnwire.UpdateFailHTLC) error {
123✔
4173

123✔
4174
        // Verify that the failure reason is at least 256 bytes plus overhead.
123✔
4175
        const minimumFailReasonLength = lnwire.FailureMessageLength + 2 + 2 + 32
123✔
4176

123✔
4177
        if len(msg.Reason) < minimumFailReasonLength {
124✔
4178
                // We've received a reason with a non-compliant length. Older
1✔
4179
                // nodes happily relay back these failures that may originate
1✔
4180
                // from a node further downstream. Therefore we can't just fail
1✔
4181
                // the channel.
1✔
4182
                //
1✔
4183
                // We want to be compliant ourselves, so we also can't pass back
1✔
4184
                // the reason unmodified. And we must make sure that we don't
1✔
4185
                // hit the magic length check of 260 bytes in
1✔
4186
                // processRemoteSettleFails either.
1✔
4187
                //
1✔
4188
                // Because the reason is unreadable for the payer anyway, we
1✔
4189
                // just replace it by a compliant-length series of random bytes.
1✔
4190
                msg.Reason = make([]byte, minimumFailReasonLength)
1✔
4191
                _, err := crand.Read(msg.Reason[:])
1✔
4192
                if err != nil {
1✔
4193
                        return fmt.Errorf("random generation error: %w", err)
×
4194
                }
×
4195
        }
4196

4197
        // Add fail to the update log.
4198
        idx := msg.ID
123✔
4199
        err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
123✔
4200
        if err != nil {
123✔
4201
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4202
                        "unable to handle upstream fail HTLC: %v", err)
×
4203

×
4204
                return err
×
4205
        }
×
4206

4207
        return nil
123✔
4208
}
4209

4210
// processRemoteCommitSig takes a `CommitSig` msg sent from the remote and
4211
// processes it.
4212
func (l *channelLink) processRemoteCommitSig(ctx context.Context,
4213
        msg *lnwire.CommitSig) error {
1,189✔
4214

1,189✔
4215
        // Since we may have learned new preimages for the first time, we'll add
1,189✔
4216
        // them to our preimage cache. By doing this, we ensure any contested
1,189✔
4217
        // contracts watched by any on-chain arbitrators can now sweep this HTLC
1,189✔
4218
        // on-chain. We delay committing the preimages until just before
1,189✔
4219
        // accepting the new remote commitment, as afterwards the peer won't
1,189✔
4220
        // resend the Settle messages on the next channel reestablishment. Doing
1,189✔
4221
        // so allows us to more effectively batch this operation, instead of
1,189✔
4222
        // doing a single write per preimage.
1,189✔
4223
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
1,189✔
4224
        if err != nil {
1,189✔
4225
                l.failf(
×
4226
                        LinkFailureError{code: ErrInternalError},
×
4227
                        "unable to add preimages=%v to cache: %v",
×
4228
                        l.uncommittedPreimages, err,
×
4229
                )
×
4230

×
4231
                return err
×
4232
        }
×
4233

4234
        // Instead of truncating the slice to conserve memory allocations, we
4235
        // simply set the uncommitted preimage slice to nil so that a new one
4236
        // will be initialized if any more witnesses are discovered. We do this
4237
        // because the maximum size that the slice can occupy is 15KB, and we
4238
        // want to ensure we release that memory back to the runtime.
4239
        l.uncommittedPreimages = nil
1,189✔
4240

1,189✔
4241
        // We just received a new updates to our local commitment chain,
1,189✔
4242
        // validate this new commitment, closing the link if invalid.
1,189✔
4243
        auxSigBlob, err := msg.CustomRecords.Serialize()
1,189✔
4244
        if err != nil {
1,189✔
4245
                l.failf(
×
4246
                        LinkFailureError{code: ErrInvalidCommitment},
×
4247
                        "unable to serialize custom records: %v", err,
×
4248
                )
×
4249

×
4250
                return err
×
4251
        }
×
4252
        err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,189✔
4253
                CommitSig:  msg.CommitSig,
1,189✔
4254
                HtlcSigs:   msg.HtlcSigs,
1,189✔
4255
                PartialSig: msg.PartialSig,
1,189✔
4256
                AuxSigBlob: auxSigBlob,
1,189✔
4257
        })
1,189✔
4258
        if err != nil {
1,189✔
4259
                // If we were unable to reconstruct their proposed commitment,
×
4260
                // then we'll examine the type of error. If it's an
×
4261
                // InvalidCommitSigError, then we'll send a direct error.
×
4262
                var sendData []byte
×
4263
                switch {
×
4264
                case lnutils.ErrorAs[*lnwallet.InvalidCommitSigError](err):
×
4265
                        sendData = []byte(err.Error())
×
4266
                case lnutils.ErrorAs[*lnwallet.InvalidHtlcSigError](err):
×
4267
                        sendData = []byte(err.Error())
×
4268
                }
4269
                l.failf(
×
4270
                        LinkFailureError{
×
4271
                                code:          ErrInvalidCommitment,
×
4272
                                FailureAction: LinkFailureForceClose,
×
4273
                                SendData:      sendData,
×
4274
                        },
×
4275
                        "ChannelPoint(%v): unable to accept new "+
×
4276
                                "commitment: %v",
×
4277
                        l.channel.ChannelPoint(), err,
×
4278
                )
×
4279

×
4280
                return err
×
4281
        }
4282

4283
        // As we've just accepted a new state, we'll now immediately send the
4284
        // remote peer a revocation for our prior state.
4285
        nextRevocation, currentHtlcs, finalHTLCs, err :=
1,189✔
4286
                l.channel.RevokeCurrentCommitment()
1,189✔
4287
        if err != nil {
1,189✔
4288
                l.log.Errorf("unable to revoke commitment: %v", err)
×
4289

×
4290
                // We need to fail the channel in case revoking our local
×
4291
                // commitment does not succeed. We might have already advanced
×
4292
                // our channel state which would lead us to proceed with an
×
4293
                // unclean state.
×
4294
                //
×
4295
                // NOTE: We do not trigger a force close because this could
×
4296
                // resolve itself in case our db was just busy not accepting new
×
4297
                // transactions.
×
4298
                l.failf(
×
4299
                        LinkFailureError{
×
4300
                                code:          ErrInternalError,
×
4301
                                Warning:       true,
×
4302
                                FailureAction: LinkFailureDisconnect,
×
4303
                        },
×
4304
                        "ChannelPoint(%v): unable to accept new "+
×
4305
                                "commitment: %v",
×
4306
                        l.channel.ChannelPoint(), err,
×
4307
                )
×
4308

×
4309
                return err
×
4310
        }
×
4311

4312
        // As soon as we are ready to send our next revocation, we can invoke
4313
        // the incoming commit hooks.
4314
        l.Lock()
1,189✔
4315
        l.incomingCommitHooks.invoke()
1,189✔
4316
        l.Unlock()
1,189✔
4317

1,189✔
4318
        err = l.cfg.Peer.SendMessage(false, nextRevocation)
1,189✔
4319
        if err != nil {
1,189✔
4320
                l.log.Errorf("failed to send RevokeAndAck: %v", err)
×
4321
        }
×
4322

4323
        // Notify the incoming htlcs of which the resolutions were locked in.
4324
        for id, settled := range finalHTLCs {
1,523✔
4325
                l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
334✔
4326
                        models.CircuitKey{
334✔
4327
                                ChanID: l.ShortChanID(),
334✔
4328
                                HtlcID: id,
334✔
4329
                        },
334✔
4330
                        channeldb.FinalHtlcInfo{
334✔
4331
                                Settled:  settled,
334✔
4332
                                Offchain: true,
334✔
4333
                        },
334✔
4334
                )
334✔
4335
        }
334✔
4336

4337
        // Since we just revoked our commitment, we may have a new set of HTLC's
4338
        // on our commitment, so we'll send them using our function closure
4339
        // NotifyContractUpdate.
4340
        newUpdate := &contractcourt.ContractUpdate{
1,189✔
4341
                HtlcKey: contractcourt.LocalHtlcSet,
1,189✔
4342
                Htlcs:   currentHtlcs,
1,189✔
4343
        }
1,189✔
4344
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,189✔
4345
        if err != nil {
1,189✔
4346
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4347
        }
×
4348

4349
        select {
1,189✔
4350
        case <-l.cg.Done():
×
4351
                return nil
×
4352
        default:
1,189✔
4353
        }
4354

4355
        // If the remote party initiated the state transition, we'll reply with
4356
        // a signature to provide them with their version of the latest
4357
        // commitment. Otherwise, both commitment chains are fully synced from
4358
        // our PoV, then we don't need to reply with a signature as both sides
4359
        // already have a commitment with the latest accepted.
4360
        if l.channel.OweCommitment() {
1,835✔
4361
                if !l.updateCommitTxOrFail(ctx) {
646✔
4362
                        return nil
×
4363
                }
×
4364
        }
4365

4366
        // If we need to send out an Stfu, this would be the time to do so.
4367
        if l.noDanglingUpdates(lntypes.Local) {
2,280✔
4368
                err = l.quiescer.SendOwedStfu()
1,091✔
4369
                if err != nil {
1,091✔
NEW
4370
                        l.stfuFailf("sendOwedStfu: %v", err)
×
4371
                }
×
4372
        }
4373

4374
        // Now that we have finished processing the incoming CommitSig and sent
4375
        // out our RevokeAndAck, we invoke the flushHooks if the channel state
4376
        // is clean.
4377
        l.Lock()
1,189✔
4378
        if l.channel.IsChannelClean() {
1,391✔
4379
                l.flushHooks.invoke()
202✔
4380
        }
202✔
4381
        l.Unlock()
1,189✔
4382

1,189✔
4383
        return nil
1,189✔
4384
}
4385

4386
// processRemoteRevokeAndAck takes a `RevokeAndAck` msg sent from the remote and
4387
// processes it.
4388
func (l *channelLink) processRemoteRevokeAndAck(ctx context.Context,
4389
        msg *lnwire.RevokeAndAck) error {
1,178✔
4390

1,178✔
4391
        // We've received a revocation from the remote chain, if valid, this
1,178✔
4392
        // moves the remote chain forward, and expands our revocation window.
1,178✔
4393

1,178✔
4394
        // We now process the message and advance our remote commit chain.
1,178✔
4395
        fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,178✔
4396
        if err != nil {
1,178✔
4397
                // TODO(halseth): force close?
×
4398
                l.failf(
×
4399
                        LinkFailureError{
×
4400
                                code:          ErrInvalidRevocation,
×
4401
                                FailureAction: LinkFailureDisconnect,
×
4402
                        },
×
4403
                        "unable to accept revocation: %v", err,
×
4404
                )
×
4405

×
4406
                return err
×
4407
        }
×
4408

4409
        // The remote party now has a new primary commitment, so we'll update
4410
        // the contract court to be aware of this new set (the prior old remote
4411
        // pending).
4412
        newUpdate := &contractcourt.ContractUpdate{
1,178✔
4413
                HtlcKey: contractcourt.RemoteHtlcSet,
1,178✔
4414
                Htlcs:   remoteHTLCs,
1,178✔
4415
        }
1,178✔
4416
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,178✔
4417
        if err != nil {
1,178✔
4418
                return fmt.Errorf("unable to notify contract update: %w", err)
×
4419
        }
×
4420

4421
        select {
1,178✔
4422
        case <-l.cg.Done():
1✔
4423
                return nil
1✔
4424
        default:
1,177✔
4425
        }
4426

4427
        // If we have a tower client for this channel type, we'll create a
4428
        // backup for the current state.
4429
        if l.cfg.TowerClient != nil {
1,180✔
4430
                state := l.channel.State()
3✔
4431
                chanID := l.ChanID()
3✔
4432

3✔
4433
                err = l.cfg.TowerClient.BackupState(
3✔
4434
                        &chanID, state.RemoteCommitment.CommitHeight-1,
3✔
4435
                )
3✔
4436
                if err != nil {
3✔
4437
                        l.failf(LinkFailureError{
×
4438
                                code: ErrInternalError,
×
4439
                        }, "unable to queue breach backup: %v", err)
×
4440

×
4441
                        return err
×
4442
                }
×
4443
        }
4444

4445
        // If we can send updates then we can process adds in case we are the
4446
        // exit hop and need to send back resolutions, or in case there are
4447
        // validity issues with the packets. Otherwise we defer the action until
4448
        // resume.
4449
        //
4450
        // We are free to process the settles and fails without this check since
4451
        // processing those can't result in further updates to this channel
4452
        // link.
4453
        if l.quiescer.CanSendUpdates() {
2,353✔
4454
                l.processRemoteAdds(fwdPkg)
1,176✔
4455
        } else {
1,177✔
4456
                l.quiescer.OnResume(func() {
1✔
4457
                        l.processRemoteAdds(fwdPkg)
×
4458
                })
×
4459
        }
4460
        l.processRemoteSettleFails(fwdPkg)
1,177✔
4461

1,177✔
4462
        // If the link failed during processing the adds, we must return to
1,177✔
4463
        // ensure we won't attempted to update the state further.
1,177✔
4464
        if l.failed {
1,177✔
4465
                return nil
×
4466
        }
×
4467

4468
        // The revocation window opened up. If there are pending local updates,
4469
        // try to update the commit tx. Pending updates could already have been
4470
        // present because of a previously failed update to the commit tx or
4471
        // freshly added in by processRemoteAdds. Also in case there are no
4472
        // local updates, but there are still remote updates that are not in the
4473
        // remote commit tx yet, send out an update.
4474
        if l.channel.OweCommitment() {
1,487✔
4475
                if !l.updateCommitTxOrFail(ctx) {
317✔
4476
                        return nil
7✔
4477
                }
7✔
4478
        }
4479

4480
        // Now that we have finished processing the RevokeAndAck, we can invoke
4481
        // the flushHooks if the channel state is clean.
4482
        l.Lock()
1,170✔
4483
        if l.channel.IsChannelClean() {
1,333✔
4484
                l.flushHooks.invoke()
163✔
4485
        }
163✔
4486
        l.Unlock()
1,170✔
4487

1,170✔
4488
        return nil
1,170✔
4489
}
4490

4491
// processRemoteUpdateFee takes an `UpdateFee` msg sent from the remote and
4492
// processes it.
4493
func (l *channelLink) processRemoteUpdateFee(msg *lnwire.UpdateFee) error {
3✔
4494
        // Check and see if their proposed fee-rate would make us exceed the fee
3✔
4495
        // threshold.
3✔
4496
        fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
4497

3✔
4498
        isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
4499
        if err != nil {
3✔
4500
                // This shouldn't typically happen. If it does, it indicates
×
4501
                // something is wrong with our channel state.
×
4502
                l.log.Errorf("Unable to determine if fee threshold " +
×
4503
                        "exceeded")
×
4504
                l.failf(LinkFailureError{code: ErrInternalError},
×
4505
                        "error calculating fee exposure: %v", err)
×
4506

×
4507
                return err
×
4508
        }
×
4509

4510
        if isDust {
3✔
4511
                // The proposed fee-rate makes us exceed the fee threshold.
×
4512
                l.failf(LinkFailureError{code: ErrInternalError},
×
4513
                        "fee threshold exceeded: %v", err)
×
4514
                return err
×
4515
        }
×
4516

4517
        // We received fee update from peer. If we are the initiator we will
4518
        // fail the channel, if not we will apply the update.
4519
        if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
4520
                l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
4521
                        "error receiving fee update: %v", err)
×
4522
                return err
×
4523
        }
×
4524

4525
        // Update the mailbox's feerate as well.
4526
        l.mailBox.SetFeeRate(fee)
3✔
4527

3✔
4528
        return nil
3✔
4529
}
4530

4531
// processRemoteError takes an `Error` msg sent from the remote and fails the
4532
// channel link.
4533
func (l *channelLink) processRemoteError(msg *lnwire.Error) {
2✔
4534
        // Error received from remote, MUST fail channel, but should only print
2✔
4535
        // the contents of the error message if all characters are printable
2✔
4536
        // ASCII.
2✔
4537
        l.failf(
2✔
4538
                // TODO(halseth): we currently don't fail the channel
2✔
4539
                // permanently, as there are some sync issues with other
2✔
4540
                // implementations that will lead to them sending an
2✔
4541
                // error message, but we can recover from on next
2✔
4542
                // connection. See
2✔
4543
                // https://github.com/ElementsProject/lightning/issues/4212
2✔
4544
                LinkFailureError{
2✔
4545
                        code:             ErrRemoteError,
2✔
4546
                        PermanentFailure: false,
2✔
4547
                },
2✔
4548
                "ChannelPoint(%v): received error from peer: %v",
2✔
4549
                l.channel.ChannelPoint(), msg.Error(),
2✔
4550
        )
2✔
4551
}
2✔
4552

4553
// processLocalUpdateFulfillHTLC takes an `UpdateFulfillHTLC` from the local and
4554
// processes it.
4555
func (l *channelLink) processLocalUpdateFulfillHTLC(ctx context.Context,
4556
        pkt *htlcPacket, htlc *lnwire.UpdateFulfillHTLC) {
26✔
4557

26✔
4558
        // If hodl.SettleOutgoing mode is active, we exit early to simulate
26✔
4559
        // arbitrary delays between the switch adding the SETTLE to the mailbox,
26✔
4560
        // and the HTLC being added to the commitment state.
26✔
4561
        if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
26✔
4562
                l.log.Warnf(hodl.SettleOutgoing.Warning())
×
4563
                l.mailBox.AckPacket(pkt.inKey())
×
4564

×
4565
                return
×
4566
        }
×
4567

4568
        // An HTLC we forward to the switch has just settled somewhere upstream.
4569
        // Therefore we settle the HTLC within the our local state machine.
4570
        inKey := pkt.inKey()
26✔
4571
        err := l.channel.SettleHTLC(
26✔
4572
                htlc.PaymentPreimage, pkt.incomingHTLCID, pkt.sourceRef,
26✔
4573
                pkt.destRef, &inKey,
26✔
4574
        )
26✔
4575
        if err != nil {
26✔
4576
                l.log.Errorf("unable to settle incoming HTLC for "+
×
4577
                        "circuit-key=%v: %v", inKey, err)
×
4578

×
4579
                // If the HTLC index for Settle response was not known to our
×
4580
                // commitment state, it has already been cleaned up by a prior
×
4581
                // response. We'll thus try to clean up any lingering state to
×
4582
                // ensure we don't continue reforwarding.
×
4583
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
×
4584
                        l.cleanupSpuriousResponse(pkt)
×
4585
                }
×
4586

4587
                // Remove the packet from the link's mailbox to ensure it
4588
                // doesn't get replayed after a reconnection.
4589
                l.mailBox.AckPacket(inKey)
×
4590

×
4591
                return
×
4592
        }
4593

4594
        l.log.Debugf("queueing removal of SETTLE closed circuit: %s->%s",
26✔
4595
                pkt.inKey(), pkt.outKey())
26✔
4596

26✔
4597
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
26✔
4598

26✔
4599
        // With the HTLC settled, we'll need to populate the wire message to
26✔
4600
        // target the specific channel and HTLC to be canceled.
26✔
4601
        htlc.ChanID = l.ChanID()
26✔
4602
        htlc.ID = pkt.incomingHTLCID
26✔
4603

26✔
4604
        // Then we send the HTLC settle message to the connected peer so we can
26✔
4605
        // continue the propagation of the settle message.
26✔
4606
        err = l.cfg.Peer.SendMessage(false, htlc)
26✔
4607
        if err != nil {
26✔
4608
                l.log.Errorf("failed to send UpdateFulfillHTLC: %v", err)
×
4609
        }
×
4610

4611
        // Send a settle event notification to htlcNotifier.
4612
        l.cfg.HtlcNotifier.NotifySettleEvent(
26✔
4613
                newHtlcKey(pkt), htlc.PaymentPreimage, getEventType(pkt),
26✔
4614
        )
26✔
4615

26✔
4616
        // Immediately update the commitment tx to minimize latency.
26✔
4617
        l.updateCommitTxOrFail(ctx)
26✔
4618
}
4619

4620
// processLocalUpdateFailHTLC takes an `UpdateFailHTLC` from the local and
4621
// processes it.
4622
func (l *channelLink) processLocalUpdateFailHTLC(ctx context.Context,
4623
        pkt *htlcPacket, htlc *lnwire.UpdateFailHTLC) {
21✔
4624

21✔
4625
        // If hodl.FailOutgoing mode is active, we exit early to simulate
21✔
4626
        // arbitrary delays between the switch adding a FAIL to the mailbox, and
21✔
4627
        // the HTLC being added to the commitment state.
21✔
4628
        if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
21✔
4629
                l.log.Warnf(hodl.FailOutgoing.Warning())
×
4630
                l.mailBox.AckPacket(pkt.inKey())
×
4631

×
4632
                return
×
4633
        }
×
4634

4635
        // An HTLC cancellation has been triggered somewhere upstream, we'll
4636
        // remove then HTLC from our local state machine.
4637
        inKey := pkt.inKey()
21✔
4638
        err := l.channel.FailHTLC(
21✔
4639
                pkt.incomingHTLCID, htlc.Reason, pkt.sourceRef, pkt.destRef,
21✔
4640
                &inKey,
21✔
4641
        )
21✔
4642
        if err != nil {
26✔
4643
                l.log.Errorf("unable to cancel incoming HTLC for "+
5✔
4644
                        "circuit-key=%v: %v", inKey, err)
5✔
4645

5✔
4646
                // If the HTLC index for Fail response was not known to our
5✔
4647
                // commitment state, it has already been cleaned up by a prior
5✔
4648
                // response. We'll thus try to clean up any lingering state to
5✔
4649
                // ensure we don't continue reforwarding.
5✔
4650
                if lnutils.ErrorAs[lnwallet.ErrUnknownHtlcIndex](err) {
7✔
4651
                        l.cleanupSpuriousResponse(pkt)
2✔
4652
                }
2✔
4653

4654
                // Remove the packet from the link's mailbox to ensure it
4655
                // doesn't get replayed after a reconnection.
4656
                l.mailBox.AckPacket(inKey)
5✔
4657

5✔
4658
                return
5✔
4659
        }
4660

4661
        l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
19✔
4662
                pkt.inKey(), pkt.outKey())
19✔
4663

19✔
4664
        l.closedCircuits = append(l.closedCircuits, pkt.inKey())
19✔
4665

19✔
4666
        // With the HTLC removed, we'll need to populate the wire message to
19✔
4667
        // target the specific channel and HTLC to be canceled. The "Reason"
19✔
4668
        // field will have already been set within the switch.
19✔
4669
        htlc.ChanID = l.ChanID()
19✔
4670
        htlc.ID = pkt.incomingHTLCID
19✔
4671

19✔
4672
        // We send the HTLC message to the peer which initially created the
19✔
4673
        // HTLC. If the incoming blinding point is non-nil, we know that we are
19✔
4674
        // a relaying node in a blinded path. Otherwise, we're either an
19✔
4675
        // introduction node or not part of a blinded path at all.
19✔
4676
        err = l.sendIncomingHTLCFailureMsg(htlc.ID, pkt.obfuscator, htlc.Reason)
19✔
4677
        if err != nil {
19✔
4678
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4679

×
4680
                return
×
4681
        }
×
4682

4683
        // If the packet does not have a link failure set, it failed further
4684
        // down the route so we notify a forwarding failure. Otherwise, we
4685
        // notify a link failure because it failed at our node.
4686
        if pkt.linkFailure != nil {
32✔
4687
                l.cfg.HtlcNotifier.NotifyLinkFailEvent(
13✔
4688
                        newHtlcKey(pkt), newHtlcInfo(pkt), getEventType(pkt),
13✔
4689
                        pkt.linkFailure, false,
13✔
4690
                )
13✔
4691
        } else {
22✔
4692
                l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
9✔
4693
                        newHtlcKey(pkt), getEventType(pkt),
9✔
4694
                )
9✔
4695
        }
9✔
4696

4697
        // Immediately update the commitment tx to minimize latency.
4698
        l.updateCommitTxOrFail(ctx)
19✔
4699
}
4700

4701
// validateHtlcAmount checks if the HTLC amount is within the policy's
4702
// minimum and maximum limits. Returns a LinkError if validation fails.
4703
func (l *channelLink) validateHtlcAmount(policy models.ForwardingPolicy,
4704
        payHash [32]byte, amt lnwire.MilliSatoshi,
4705
        originalScid lnwire.ShortChannelID,
4706
        customRecords lnwire.CustomRecords) *LinkError {
452✔
4707

452✔
4708
        // In case we are dealing with a custom HTLC, we don't need to validate
452✔
4709
        // the HTLC constraints.
452✔
4710
        //
452✔
4711
        // NOTE: Custom HTLCs are only locally sourced and will use custom
452✔
4712
        // channels which are not routable channels and should have their policy
452✔
4713
        // not restricted in the first place. However to be sure we skip this
452✔
4714
        // check otherwise we might end up in a loop of sending to the same
452✔
4715
        // route again and again because link errors are not persisted in
452✔
4716
        // mission control.
452✔
4717
        if fn.MapOptionZ(
452✔
4718
                l.cfg.AuxTrafficShaper,
452✔
4719
                func(ts AuxTrafficShaper) bool {
452✔
4720
                        return ts.IsCustomHTLC(customRecords)
×
4721
                },
×
4722
        ) {
×
4723

×
4724
                l.log.Debugf("Skipping htlc amount policy validation for " +
×
4725
                        "custom htlc")
×
4726

×
4727
                return nil
×
4728
        }
×
4729

4730
        // As our first sanity check, we'll ensure that the passed HTLC isn't
4731
        // too small for the next hop. If so, then we'll cancel the HTLC
4732
        // directly.
4733
        if amt < policy.MinHTLCOut {
463✔
4734
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
11✔
4735
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
11✔
4736
                        amt)
11✔
4737

11✔
4738
                // As part of the returned error, we'll send our latest routing
11✔
4739
                // policy so the sending node obtains the most up to date data.
11✔
4740
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
22✔
4741
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
11✔
4742
                }
11✔
4743
                failure := l.createFailureWithUpdate(false, originalScid, cb)
11✔
4744

11✔
4745
                return NewLinkError(failure)
11✔
4746
        }
4747

4748
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
4749
        // cancel the HTLC directly.
4750
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
450✔
4751
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
6✔
4752
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
6✔
4753

6✔
4754
                // As part of the returned error, we'll send our latest routing
6✔
4755
                // policy so the sending node obtains the most up-to-date data.
6✔
4756
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
12✔
4757
                        return lnwire.NewTemporaryChannelFailure(upd)
6✔
4758
                }
6✔
4759
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
4760

6✔
4761
                return NewDetailedLinkError(
6✔
4762
                        failure, OutgoingFailureHTLCExceedsMax,
6✔
4763
                )
6✔
4764
        }
4765

4766
        return nil
441✔
4767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc