• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15858991938

24 Jun 2025 06:51PM UTC coverage: 55.808% (-2.4%) from 58.173%
15858991938

Pull #9148

github

web-flow
Merge 0e921d6a5 into 29ff13d83
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

232 of 267 new or added lines in 5 files covered. (86.89%)

24606 existing lines in 281 files now uncovered.

108380 of 194201 relevant lines covered (55.81%)

22488.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.74
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        crand "crypto/rand"
7
        "crypto/sha256"
8
        "errors"
9
        "fmt"
10
        prand "math/rand"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/btcsuite/btcd/btcutil"
16
        "github.com/btcsuite/btcd/wire"
17
        "github.com/btcsuite/btclog/v2"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/record"
34
        "github.com/lightningnetwork/lnd/ticker"
35
        "github.com/lightningnetwork/lnd/tlv"
36
)
37

38
func init() {
13✔
39
        prand.Seed(time.Now().UnixNano())
13✔
40
}
13✔
41

42
const (
43
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
44
        // the node accepts for forwarded payments. The value is relative to the
45
        // current block height. The reason to have a maximum is to prevent
46
        // funds getting locked up unreasonably long. Otherwise, an attacker
47
        // willing to lock its own funds too, could force the funds of this node
48
        // to be locked up for an indefinite (max int32) number of blocks.
49
        //
50
        // The value 2016 corresponds to on average two weeks worth of blocks
51
        // and is based on the maximum number of hops (20), the default CLTV
52
        // delta (40), and some extra margin to account for the other lightning
53
        // implementations and past lnd versions which used to have a default
54
        // CLTV delta of 144.
55
        DefaultMaxOutgoingCltvExpiry = 2016
56

57
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
58
        // which a link should propose to update its commitment fee rate.
59
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
60

61
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
62
        // which a link should propose to update its commitment fee rate.
63
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
64

65
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
66
        // a channel's commitment fee to be of its balance. This only applies to
67
        // the initiator of the channel.
68
        DefaultMaxLinkFeeAllocation float64 = 0.5
69
)
70

71
// ExpectedFee computes the expected fee for a given htlc amount. The value
72
// returned from this function is to be used as a sanity check when forwarding
73
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
74
// forwarding policy.
75
//
76
// TODO(roasbeef): also add in current available channel bandwidth, inverse
77
// func
78
func ExpectedFee(f models.ForwardingPolicy,
79
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
78✔
80

78✔
81
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
78✔
82
}
78✔
83

84
// ChannelLinkConfig defines the configuration for the channel link. ALL
85
// elements within the configuration MUST be non-nil for channel link to carry
86
// out its duties.
87
type ChannelLinkConfig struct {
88
        // FwrdingPolicy is the initial forwarding policy to be used when
89
        // deciding whether to forwarding incoming HTLC's or not. This value
90
        // can be updated with subsequent calls to UpdateForwardingPolicy
91
        // targeted at a given ChannelLink concrete interface implementation.
92
        FwrdingPolicy models.ForwardingPolicy
93

94
        // Circuits provides restricted access to the switch's circuit map,
95
        // allowing the link to open and close circuits.
96
        Circuits CircuitModifier
97

98
        // BestHeight returns the best known height.
99
        BestHeight func() uint32
100

101
        // ForwardPackets attempts to forward the batch of htlcs through the
102
        // switch. The function returns and error in case it fails to send one or
103
        // more packets. The link's quit signal should be provided to allow
104
        // cancellation of forwarding during link shutdown.
105
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
106

107
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
108
        // blobs, which are then used to inform how to forward an HTLC.
109
        //
110
        // NOTE: This function assumes the same set of readers and preimages
111
        // are always presented for the same identifier. The last boolean is
112
        // used to decide whether this is a reforwarding or not - when it's
113
        // reforwarding, we skip the replay check enforced in our decay log.
114
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest, bool) (
115
                []hop.DecodeHopIteratorResponse, error)
116

117
        // ExtractErrorEncrypter function is responsible for decoding HTLC
118
        // Sphinx onion blob, and creating onion failure obfuscator.
119
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
120

121
        // FetchLastChannelUpdate retrieves the latest routing policy for a
122
        // target channel. This channel will typically be the outgoing channel
123
        // specified when we receive an incoming HTLC.  This will be used to
124
        // provide payment senders our latest policy when sending encrypted
125
        // error messages.
126
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
127
                *lnwire.ChannelUpdate1, error)
128

129
        // Peer is a lightning network node with which we have the channel link
130
        // opened.
131
        Peer lnpeer.Peer
132

133
        // Registry is a sub-system which responsible for managing the invoices
134
        // in thread-safe manner.
135
        Registry InvoiceDatabase
136

137
        // PreimageCache is a global witness beacon that houses any new
138
        // preimages discovered by other links. We'll use this to add new
139
        // witnesses that we discover which will notify any sub-systems
140
        // subscribed to new events.
141
        PreimageCache contractcourt.WitnessBeacon
142

143
        // OnChannelFailure is a function closure that we'll call if the
144
        // channel failed for some reason. Depending on the severity of the
145
        // error, the closure potentially must force close this channel and
146
        // disconnect the peer.
147
        //
148
        // NOTE: The method must return in order for the ChannelLink to be able
149
        // to shut down properly.
150
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
151
                LinkFailureError)
152

153
        // UpdateContractSignals is a function closure that we'll use to update
154
        // outside sub-systems with this channel's latest ShortChannelID.
155
        UpdateContractSignals func(*contractcourt.ContractSignals) error
156

157
        // NotifyContractUpdate is a function closure that we'll use to update
158
        // the contractcourt and more specifically the ChannelArbitrator of the
159
        // latest channel state.
160
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
161

162
        // ChainEvents is an active subscription to the chain watcher for this
163
        // channel to be notified of any on-chain activity related to this
164
        // channel.
165
        ChainEvents *contractcourt.ChainEventSubscription
166

167
        // FeeEstimator is an instance of a live fee estimator which will be
168
        // used to dynamically regulate the current fee of the commitment
169
        // transaction to ensure timely confirmation.
170
        FeeEstimator chainfee.Estimator
171

172
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
173
        // for HTLC forwarding internal to the switch.
174
        //
175
        // NOTE: This should only be used for testing.
176
        HodlMask hodl.Mask
177

178
        // SyncStates is used to indicate that we need send the channel
179
        // reestablishment message to the remote peer. It should be done if our
180
        // clients have been restarted, or remote peer have been reconnected.
181
        SyncStates bool
182

183
        // BatchTicker is the ticker that determines the interval that we'll
184
        // use to check the batch to see if there're any updates we should
185
        // flush out. By batching updates into a single commit, we attempt to
186
        // increase throughput by maximizing the number of updates coalesced
187
        // into a single commit.
188
        BatchTicker ticker.Ticker
189

190
        // FwdPkgGCTicker is the ticker determining the frequency at which
191
        // garbage collection of forwarding packages occurs. We use a
192
        // time-based approach, as opposed to block epochs, as to not hinder
193
        // syncing.
194
        FwdPkgGCTicker ticker.Ticker
195

196
        // PendingCommitTicker is a ticker that allows the link to determine if
197
        // a locally initiated commitment dance gets stuck waiting for the
198
        // remote party to revoke.
199
        PendingCommitTicker ticker.Ticker
200

201
        // BatchSize is the max size of a batch of updates done to the link
202
        // before we do a state update.
203
        BatchSize uint32
204

205
        // UnsafeReplay will cause a link to replay the adds in its latest
206
        // commitment txn after the link is restarted. This should only be used
207
        // in testing, it is here to ensure the sphinx replay detection on the
208
        // receiving node is persistent.
209
        UnsafeReplay bool
210

211
        // MinUpdateTimeout represents the minimum interval in which a link
212
        // will propose to update its commitment fee rate. A random timeout will
213
        // be selected between this and MaxUpdateTimeout.
214
        MinUpdateTimeout time.Duration
215

216
        // MaxUpdateTimeout represents the maximum interval in which a link
217
        // will propose to update its commitment fee rate. A random timeout will
218
        // be selected between this and MinUpdateTimeout.
219
        MaxUpdateTimeout time.Duration
220

221
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
222
        // an htlc where we don't offer an htlc anymore. This should be at least
223
        // the outgoing broadcast delta, because in any case we don't want to
224
        // risk offering an htlc that triggers channel closure.
225
        OutgoingCltvRejectDelta uint32
226

227
        // TowerClient is an optional engine that manages the signing,
228
        // encrypting, and uploading of justice transactions to the daemon's
229
        // configured set of watchtowers for legacy channels.
230
        TowerClient TowerClient
231

232
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
233
        // should accept for a forwarded HTLC. The value is relative to the
234
        // current block height.
235
        MaxOutgoingCltvExpiry uint32
236

237
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
238
        // commitment fee to be of its balance. This only applies to the
239
        // initiator of the channel.
240
        MaxFeeAllocation float64
241

242
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
243
        // the initiator for channels of the anchor type.
244
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
245

246
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
247
        // link is first started.
248
        NotifyActiveLink func(wire.OutPoint)
249

250
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
251
        // channels becomes active.
252
        NotifyActiveChannel func(wire.OutPoint)
253

254
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
255
        // when channels become inactive.
256
        NotifyInactiveChannel func(wire.OutPoint)
257

258
        // NotifyInactiveLinkEvent allows the switch to tell the
259
        // ChannelNotifier when a channel link become inactive.
260
        NotifyInactiveLinkEvent func(wire.OutPoint)
261

262
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
263
        // events through.
264
        HtlcNotifier htlcNotifier
265

266
        // FailAliasUpdate is a function used to fail an HTLC for an
267
        // option_scid_alias channel.
268
        FailAliasUpdate func(sid lnwire.ShortChannelID,
269
                incoming bool) *lnwire.ChannelUpdate1
270

271
        // GetAliases is used by the link and switch to fetch the set of
272
        // aliases for a given link.
273
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
274

275
        // PreviouslySentShutdown is an optional value that is set if, at the
276
        // time of the link being started, persisted shutdown info was found for
277
        // the channel. This value being set means that we previously sent a
278
        // Shutdown message to our peer, and so we should do so again on
279
        // re-establish and should not allow anymore HTLC adds on the outgoing
280
        // direction of the link.
281
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
282

283
        // Adds the option to disable forwarding payments in blinded routes
284
        // by failing back any blinding-related payloads as if they were
285
        // invalid.
286
        DisallowRouteBlinding bool
287

288
        // DisallowQuiescence is a flag that can be used to disable the
289
        // quiescence protocol.
290
        DisallowQuiescence bool
291

292
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
293
        // restrict the flow of HTLCs and fee updates.
294
        MaxFeeExposure lnwire.MilliSatoshi
295

296
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
297
        // should forward experimental endorsement signals.
298
        ShouldFwdExpEndorsement func() bool
299

300
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
301
        // used to manage the bandwidth of the link.
302
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
303
}
304

305
// channelLink is the service which drives a channel's commitment update
306
// state-machine. In the event that an HTLC needs to be propagated to another
307
// link, the forward handler from config is used which sends HTLC to the
308
// switch. Additionally, the link encapsulate logic of commitment protocol
309
// message ordering and updates.
310
type channelLink struct {
311
        // The following fields are only meant to be used *atomically*
312
        started       int32
313
        reestablished int32
314
        shutdown      int32
315

316
        // failed should be set to true in case a link error happens, making
317
        // sure we don't process any more updates.
318
        failed bool
319

320
        // keystoneBatch represents a volatile list of keystones that must be
321
        // written before attempting to sign the next commitment txn. These
322
        // represent all the HTLC's forwarded to the link from the switch. Once
323
        // we lock them into our outgoing commitment, then the circuit has a
324
        // keystone, and is fully opened.
325
        keystoneBatch []Keystone
326

327
        // openedCircuits is the set of all payment circuits that will be open
328
        // once we make our next commitment. After making the commitment we'll
329
        // ACK all these from our mailbox to ensure that they don't get
330
        // re-delivered if we reconnect.
331
        openedCircuits []CircuitKey
332

333
        // closedCircuits is the set of all payment circuits that will be
334
        // closed once we make our next commitment. After taking the commitment
335
        // we'll ACK all these to ensure that they don't get re-delivered if we
336
        // reconnect.
337
        closedCircuits []CircuitKey
338

339
        // channel is a lightning network channel to which we apply htlc
340
        // updates.
341
        channel *lnwallet.LightningChannel
342

343
        // cfg is a structure which carries all dependable fields/handlers
344
        // which may affect behaviour of the service.
345
        cfg ChannelLinkConfig
346

347
        // mailBox is the main interface between the outside world and the
348
        // link. All incoming messages will be sent over this mailBox. Messages
349
        // include new updates from our connected peer, and new packets to be
350
        // forwarded sent by the switch.
351
        mailBox MailBox
352

353
        // upstream is a channel that new messages sent from the remote peer to
354
        // the local peer will be sent across.
355
        upstream chan lnwire.Message
356

357
        // downstream is a channel in which new multi-hop HTLC's to be
358
        // forwarded will be sent across. Messages from this channel are sent
359
        // by the HTLC switch.
360
        downstream chan *htlcPacket
361

362
        // updateFeeTimer is the timer responsible for updating the link's
363
        // commitment fee every time it fires.
364
        updateFeeTimer *time.Timer
365

366
        // uncommittedPreimages stores a list of all preimages that have been
367
        // learned since receiving the last CommitSig from the remote peer. The
368
        // batch will be flushed just before accepting the subsequent CommitSig
369
        // or on shutdown to avoid doing a write for each preimage received.
370
        uncommittedPreimages []lntypes.Preimage
371

372
        sync.RWMutex
373

374
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
375
        // registry.
376
        hodlQueue *queue.ConcurrentQueue
377

378
        // hodlMap stores related htlc data for a circuit key. It allows
379
        // resolving those htlcs when we receive a message on hodlQueue.
380
        hodlMap map[models.CircuitKey]hodlHtlc
381

382
        // log is a link-specific logging instance.
383
        log btclog.Logger
384

385
        // isOutgoingAddBlocked tracks whether the channelLink can send an
386
        // UpdateAddHTLC.
387
        isOutgoingAddBlocked atomic.Bool
388

389
        // isIncomingAddBlocked tracks whether the channelLink can receive an
390
        // UpdateAddHTLC.
391
        isIncomingAddBlocked atomic.Bool
392

393
        // flushHooks is a hookMap that is triggered when we reach a channel
394
        // state with no live HTLCs.
395
        flushHooks hookMap
396

397
        // outgoingCommitHooks is a hookMap that is triggered after we send our
398
        // next CommitSig.
399
        outgoingCommitHooks hookMap
400

401
        // incomingCommitHooks is a hookMap that is triggered after we receive
402
        // our next CommitSig.
403
        incomingCommitHooks hookMap
404

405
        // quiescer is the state machine that tracks where this channel is with
406
        // respect to the quiescence protocol.
407
        quiescer Quiescer
408

409
        // quiescenceReqs is a queue of requests to quiesce this link. The
410
        // members of the queue are send-only channels we should call back with
411
        // the result.
412
        quiescenceReqs chan StfuReq
413

414
        // cg is a helper that encapsulates a wait group and quit channel and
415
        // allows contexts that either block or cancel on those depending on
416
        // the use case.
417
        cg *fn.ContextGuard
418
}
419

420
// hookMap is a data structure that is used to track the hooks that need to be
421
// called in various parts of the channelLink's lifecycle.
422
//
423
// WARNING: NOT thread-safe.
424
type hookMap struct {
425
        // allocIdx keeps track of the next id we haven't yet allocated.
426
        allocIdx atomic.Uint64
427

428
        // transient is a map of hooks that are only called the next time invoke
429
        // is called. These hooks are deleted during invoke.
430
        transient map[uint64]func()
431

432
        // newTransients is a channel that we use to accept new hooks into the
433
        // hookMap.
434
        newTransients chan func()
435
}
436

437
// newHookMap initializes a new empty hookMap.
438
func newHookMap() hookMap {
645✔
439
        return hookMap{
645✔
440
                allocIdx:      atomic.Uint64{},
645✔
441
                transient:     make(map[uint64]func()),
645✔
442
                newTransients: make(chan func()),
645✔
443
        }
645✔
444
}
645✔
445

446
// alloc allocates space in the hook map for the supplied hook, the second
447
// argument determines whether it goes into the transient or persistent part
448
// of the hookMap.
449
func (m *hookMap) alloc(hook func()) uint64 {
2✔
450
        // We assume we never overflow a uint64. Seems OK.
2✔
451
        hookID := m.allocIdx.Add(1)
2✔
452
        if hookID == 0 {
2✔
453
                panic("hookMap allocIdx overflow")
×
454
        }
455
        m.transient[hookID] = hook
2✔
456

2✔
457
        return hookID
2✔
458
}
459

460
// invoke is used on a hook map to call all the registered hooks and then clear
461
// out the transient hooks so they are not called again.
462
func (m *hookMap) invoke() {
2,695✔
463
        for _, hook := range m.transient {
2,697✔
464
                hook()
2✔
465
        }
2✔
466

467
        m.transient = make(map[uint64]func())
2,695✔
468
}
469

470
// hodlHtlc contains htlc data that is required for resolution.
471
type hodlHtlc struct {
472
        add        lnwire.UpdateAddHTLC
473
        sourceRef  channeldb.AddRef
474
        obfuscator hop.ErrorEncrypter
475
}
476

477
// NewChannelLink creates a new instance of a ChannelLink given a configuration
478
// and active channel that will be used to verify/apply updates to.
479
func NewChannelLink(cfg ChannelLinkConfig,
480
        channel *lnwallet.LightningChannel) ChannelLink {
215✔
481

215✔
482
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
215✔
483

215✔
484
        // If the max fee exposure isn't set, use the default.
215✔
485
        if cfg.MaxFeeExposure == 0 {
430✔
486
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
487
        }
215✔
488

489
        var qsm Quiescer
215✔
490
        if !cfg.DisallowQuiescence {
430✔
491
                qsm = NewQuiescer(QuiescerCfg{
215✔
492
                        chanID: lnwire.NewChanIDFromOutPoint(
215✔
493
                                channel.ChannelPoint(),
215✔
494
                        ),
215✔
495
                        channelInitiator: channel.Initiator(),
215✔
496
                        sendMsg: func(s lnwire.Stfu) error {
217✔
497
                                return cfg.Peer.SendMessage(false, &s)
2✔
498
                        },
2✔
499
                        timeoutDuration: defaultQuiescenceTimeout,
500
                        onTimeout: func() {
×
501
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
×
502
                        },
×
503
                })
504
        } else {
×
505
                qsm = &quiescerNoop{}
×
506
        }
×
507

508
        quiescenceReqs := make(
215✔
509
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
215✔
510
        )
215✔
511

215✔
512
        return &channelLink{
215✔
513
                cfg:                 cfg,
215✔
514
                channel:             channel,
215✔
515
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
215✔
516
                hodlQueue:           queue.NewConcurrentQueue(10),
215✔
517
                log:                 log.WithPrefix(logPrefix),
215✔
518
                flushHooks:          newHookMap(),
215✔
519
                outgoingCommitHooks: newHookMap(),
215✔
520
                incomingCommitHooks: newHookMap(),
215✔
521
                quiescer:            qsm,
215✔
522
                quiescenceReqs:      quiescenceReqs,
215✔
523
                cg:                  fn.NewContextGuard(),
215✔
524
        }
215✔
525
}
526

527
// A compile time check to ensure channelLink implements the ChannelLink
528
// interface.
529
var _ ChannelLink = (*channelLink)(nil)
530

531
// Start starts all helper goroutines required for the operation of the channel
532
// link.
533
//
534
// NOTE: Part of the ChannelLink interface.
535
func (l *channelLink) Start() error {
213✔
536
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
213✔
537
                err := fmt.Errorf("channel link(%v): already started", l)
×
538
                l.log.Warn("already started")
×
539
                return err
×
540
        }
×
541

542
        l.log.Info("starting")
213✔
543

213✔
544
        // If the config supplied watchtower client, ensure the channel is
213✔
545
        // registered before trying to use it during operation.
213✔
546
        if l.cfg.TowerClient != nil {
213✔
UNCOV
547
                err := l.cfg.TowerClient.RegisterChannel(
×
UNCOV
548
                        l.ChanID(), l.channel.State().ChanType,
×
UNCOV
549
                )
×
UNCOV
550
                if err != nil {
×
551
                        return err
×
552
                }
×
553
        }
554

555
        l.mailBox.ResetMessages()
213✔
556
        l.hodlQueue.Start()
213✔
557

213✔
558
        // Before launching the htlcManager messages, revert any circuits that
213✔
559
        // were marked open in the switch's circuit map, but did not make it
213✔
560
        // into a commitment txn. We use the next local htlc index as the cut
213✔
561
        // off point, since all indexes below that are committed. This action
213✔
562
        // is only performed if the link's final short channel ID has been
213✔
563
        // assigned, otherwise we would try to trim the htlcs belonging to the
213✔
564
        // all-zero, hop.Source ID.
213✔
565
        if l.ShortChanID() != hop.Source {
426✔
566
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
213✔
567
                if err != nil {
213✔
568
                        return fmt.Errorf("unable to retrieve next local "+
×
569
                                "htlc index: %v", err)
×
570
                }
×
571

572
                // NOTE: This is automatically done by the switch when it
573
                // starts up, but is necessary to prevent inconsistencies in
574
                // the case that the link flaps. This is a result of a link's
575
                // life-cycle being shorter than that of the switch.
576
                chanID := l.ShortChanID()
213✔
577
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
213✔
578
                if err != nil {
213✔
579
                        return fmt.Errorf("unable to trim circuits above "+
×
580
                                "local htlc index %d: %v", localHtlcIndex, err)
×
581
                }
×
582

583
                // Since the link is live, before we start the link we'll update
584
                // the ChainArbitrator with the set of new channel signals for
585
                // this channel.
586
                //
587
                // TODO(roasbeef): split goroutines within channel arb to avoid
588
                go func() {
426✔
589
                        signals := &contractcourt.ContractSignals{
213✔
590
                                ShortChanID: l.channel.ShortChanID(),
213✔
591
                        }
213✔
592

213✔
593
                        err := l.cfg.UpdateContractSignals(signals)
213✔
594
                        if err != nil {
213✔
595
                                l.log.Errorf("unable to update signals")
×
596
                        }
×
597
                }()
598
        }
599

600
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
213✔
601

213✔
602
        l.cg.WgAdd(1)
213✔
603
        go l.htlcManager(context.TODO())
213✔
604

213✔
605
        return nil
213✔
606
}
607

608
// Stop gracefully stops all active helper goroutines, then waits until they've
609
// exited.
610
//
611
// NOTE: Part of the ChannelLink interface.
612
func (l *channelLink) Stop() {
214✔
613
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
226✔
614
                l.log.Warn("already stopped")
12✔
615
                return
12✔
616
        }
12✔
617

618
        l.log.Info("stopping")
202✔
619

202✔
620
        // As the link is stopping, we are no longer interested in htlc
202✔
621
        // resolutions coming from the invoice registry.
202✔
622
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
202✔
623

202✔
624
        if l.cfg.ChainEvents.Cancel != nil {
202✔
UNCOV
625
                l.cfg.ChainEvents.Cancel()
×
UNCOV
626
        }
×
627

628
        // Ensure the channel for the timer is drained.
629
        if l.updateFeeTimer != nil {
404✔
630
                if !l.updateFeeTimer.Stop() {
202✔
631
                        select {
×
632
                        case <-l.updateFeeTimer.C:
×
633
                        default:
×
634
                        }
635
                }
636
        }
637

638
        if l.hodlQueue != nil {
404✔
639
                l.hodlQueue.Stop()
202✔
640
        }
202✔
641

642
        l.cg.Quit()
202✔
643
        l.cg.WgWait()
202✔
644

202✔
645
        // Now that the htlcManager has completely exited, reset the packet
202✔
646
        // courier. This allows the mailbox to revaluate any lingering Adds that
202✔
647
        // were delivered but didn't make it on a commitment to be failed back
202✔
648
        // if the link is offline for an extended period of time. The error is
202✔
649
        // ignored since it can only fail when the daemon is exiting.
202✔
650
        _ = l.mailBox.ResetPackets()
202✔
651

202✔
652
        // As a final precaution, we will attempt to flush any uncommitted
202✔
653
        // preimages to the preimage cache. The preimages should be re-delivered
202✔
654
        // after channel reestablishment, however this adds an extra layer of
202✔
655
        // protection in case the peer never returns. Without this, we will be
202✔
656
        // unable to settle any contracts depending on the preimages even though
202✔
657
        // we had learned them at some point.
202✔
658
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
202✔
659
        if err != nil {
202✔
660
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
661
                        l.uncommittedPreimages, err)
×
662
        }
×
663
}
664

665
// WaitForShutdown blocks until the link finishes shutting down, which includes
666
// termination of all dependent goroutines.
667
func (l *channelLink) WaitForShutdown() {
×
668
        l.cg.WgWait()
×
669
}
×
670

671
// EligibleToForward returns a bool indicating if the channel is able to
672
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
673
// we are eligible to update AND the channel isn't currently flushing the
674
// outgoing half of the channel.
675
//
676
// NOTE: MUST NOT be called from the main event loop.
677
func (l *channelLink) EligibleToForward() bool {
613✔
678
        l.RLock()
613✔
679
        defer l.RUnlock()
613✔
680

613✔
681
        return l.eligibleToForward()
613✔
682
}
613✔
683

684
// eligibleToForward returns a bool indicating if the channel is able to
685
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
686
// we are eligible to update AND the channel isn't currently flushing the
687
// outgoing half of the channel.
688
//
689
// NOTE: MUST be called from the main event loop.
690
func (l *channelLink) eligibleToForward() bool {
613✔
691
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
613✔
692
}
613✔
693

694
// eligibleToUpdate returns a bool indicating if the channel is able to update
695
// channel state. We're able to update channel state if we know the remote
696
// party's next revocation point. Otherwise, we can't initiate new channel
697
// state. We also require that the short channel ID not be the all-zero source
698
// ID, meaning that the channel has had its ID finalized.
699
//
700
// NOTE: MUST be called from the main event loop.
701
func (l *channelLink) eligibleToUpdate() bool {
616✔
702
        return l.channel.RemoteNextRevocation() != nil &&
616✔
703
                l.channel.ShortChanID() != hop.Source &&
616✔
704
                l.isReestablished() &&
616✔
705
                l.quiescer.CanSendUpdates()
616✔
706
}
616✔
707

708
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
709
// the specified direction. It returns true if the state was changed and false
710
// if the desired state was already set before the method was called.
711
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
13✔
712
        if linkDirection == Outgoing {
20✔
713
                return l.isOutgoingAddBlocked.Swap(false)
7✔
714
        }
7✔
715

716
        return l.isIncomingAddBlocked.Swap(false)
6✔
717
}
718

719
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
720
// the specified direction. It returns true if the state was changed and false
721
// if the desired state was already set before the method was called.
722
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
16✔
723
        if linkDirection == Outgoing {
24✔
724
                return !l.isOutgoingAddBlocked.Swap(true)
8✔
725
        }
8✔
726

727
        return !l.isIncomingAddBlocked.Swap(true)
8✔
728
}
729

730
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
731
// the argument.
732
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,591✔
733
        if linkDirection == Outgoing {
2,708✔
734
                return l.isOutgoingAddBlocked.Load()
1,117✔
735
        }
1,117✔
736

737
        return l.isIncomingAddBlocked.Load()
474✔
738
}
739

740
// OnFlushedOnce adds a hook that will be called the next time the channel
741
// state reaches zero htlcs. This hook will only ever be called once. If the
742
// channel state already has zero htlcs, then this will be called immediately.
743
func (l *channelLink) OnFlushedOnce(hook func()) {
1✔
744
        select {
1✔
745
        case l.flushHooks.newTransients <- hook:
1✔
746
        case <-l.cg.Done():
×
747
        }
748
}
749

750
// OnCommitOnce adds a hook that will be called the next time a CommitSig
751
// message is sent in the argument's LinkDirection. This hook will only ever be
752
// called once. If no CommitSig is owed in the argument's LinkDirection, then
753
// we will call this hook be run immediately.
754
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
1✔
755
        var queue chan func()
1✔
756

1✔
757
        if direction == Outgoing {
2✔
758
                queue = l.outgoingCommitHooks.newTransients
1✔
759
        } else {
1✔
760
                queue = l.incomingCommitHooks.newTransients
×
761
        }
×
762

763
        select {
1✔
764
        case queue <- hook:
1✔
765
        case <-l.cg.Done():
×
766
        }
767
}
768

769
// InitStfu allows us to initiate quiescence on this link. It returns a receive
770
// only channel that will block until quiescence has been achieved, or
771
// definitively fails.
772
//
773
// This operation has been added to allow channels to be quiesced via RPC. It
774
// may be removed or reworked in the future as RPC initiated quiescence is a
775
// holdover until we have downstream protocols that use it.
776
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
1✔
777
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
1✔
778
                fn.Unit{},
1✔
779
        )
1✔
780

1✔
781
        select {
1✔
782
        case l.quiescenceReqs <- req:
1✔
783
        case <-l.cg.Done():
×
784
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
785
        }
786

787
        return out
1✔
788
}
789

790
// isReestablished returns true if the link has successfully completed the
791
// channel reestablishment dance.
792
func (l *channelLink) isReestablished() bool {
616✔
793
        return atomic.LoadInt32(&l.reestablished) == 1
616✔
794
}
616✔
795

796
// markReestablished signals that the remote peer has successfully exchanged
797
// channel reestablish messages and that the channel is ready to process
798
// subsequent messages.
799
func (l *channelLink) markReestablished() {
213✔
800
        atomic.StoreInt32(&l.reestablished, 1)
213✔
801
}
213✔
802

803
// IsUnadvertised returns true if the underlying channel is unadvertised.
804
func (l *channelLink) IsUnadvertised() bool {
2✔
805
        state := l.channel.State()
2✔
806
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
2✔
807
}
2✔
808

809
// sampleNetworkFee samples the current fee rate on the network to get into the
810
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
811
// this is the native rate used when computing the fee for commitment
812
// transactions, and the second-level HTLC transactions.
813
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
814
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
815
        // blocks.
4✔
816
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
817
        if err != nil {
4✔
818
                return 0, err
×
819
        }
×
820

821
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
822
                int64(feePerKw))
4✔
823

4✔
824
        return feePerKw, nil
4✔
825
}
826

827
// shouldAdjustCommitFee returns true if we should update our commitment fee to
828
// match that of the network fee. We'll only update our commitment fee if the
829
// network fee is +/- 10% to our commitment fee or if our current commitment
830
// fee is below the minimum relay fee.
831
func shouldAdjustCommitFee(netFee, chanFee,
832
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
833

14✔
834
        switch {
14✔
835
        // If the network fee is greater than our current commitment fee and
836
        // our current commitment fee is below the minimum relay fee then
837
        // we should switch to it no matter if it is less than a 10% increase.
838
        case netFee > chanFee && chanFee < minRelayFee:
1✔
839
                return true
1✔
840

841
        // If the network fee is greater than the commitment fee, then we'll
842
        // switch to it if it's at least 10% greater than the commit fee.
843
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
844
                return true
4✔
845

846
        // If the network fee is less than our commitment fee, then we'll
847
        // switch to it if it's at least 10% less than the commitment fee.
848
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
849
                return true
2✔
850

851
        // Otherwise, we won't modify our fee.
852
        default:
7✔
853
                return false
7✔
854
        }
855
}
856

857
// failCb is used to cut down on the argument verbosity.
858
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
859

860
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
861
// outgoing HTLC. It may return a FailureMessage that references a channel's
862
// alias. If the channel does not have an alias, then the regular channel
863
// update from disk will be returned.
864
func (l *channelLink) createFailureWithUpdate(incoming bool,
865
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
22✔
866

22✔
867
        // Determine which SCID to use in case we need to use aliases in the
22✔
868
        // ChannelUpdate.
22✔
869
        scid := outgoingScid
22✔
870
        if incoming {
22✔
871
                scid = l.ShortChanID()
×
872
        }
×
873

874
        // Try using the FailAliasUpdate function. If it returns nil, fallback
875
        // to the non-alias behavior.
876
        update := l.cfg.FailAliasUpdate(scid, incoming)
22✔
877
        if update == nil {
38✔
878
                // Fallback to the non-alias behavior.
16✔
879
                var err error
16✔
880
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
16✔
881
                if err != nil {
16✔
882
                        return &lnwire.FailTemporaryNodeFailure{}
×
883
                }
×
884
        }
885

886
        return cb(update)
22✔
887
}
888

889
// syncChanState attempts to synchronize channel states with the remote party.
890
// This method is to be called upon reconnection after the initial funding
891
// flow. We'll compare out commitment chains with the remote party, and re-send
892
// either a danging commit signature, a revocation, or both.
893
func (l *channelLink) syncChanStates(ctx context.Context) error {
170✔
894
        chanState := l.channel.State()
170✔
895

170✔
896
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
170✔
897

170✔
898
        // First, we'll generate our ChanSync message to send to the other
170✔
899
        // side. Based on this message, the remote party will decide if they
170✔
900
        // need to retransmit any data or not.
170✔
901
        localChanSyncMsg, err := chanState.ChanSyncMsg()
170✔
902
        if err != nil {
170✔
903
                return fmt.Errorf("unable to generate chan sync message for "+
×
904
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
905
        }
×
906
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
170✔
907
                return fmt.Errorf("unable to send chan sync message for "+
×
908
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
909
        }
×
910

911
        var msgsToReSend []lnwire.Message
170✔
912

170✔
913
        // Next, we'll wait indefinitely to receive the ChanSync message. The
170✔
914
        // first message sent MUST be the ChanSync message.
170✔
915
        select {
170✔
916
        case msg := <-l.upstream:
170✔
917
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
170✔
918
                        l.cfg.Peer.PubKey())
170✔
919

170✔
920
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
170✔
921
                if !ok {
170✔
922
                        return fmt.Errorf("first message sent to sync "+
×
923
                                "should be ChannelReestablish, instead "+
×
924
                                "received: %T", msg)
×
925
                }
×
926

927
                // If the remote party indicates that they think we haven't
928
                // done any state updates yet, then we'll retransmit the
929
                // channel_ready message first. We do this, as at this point
930
                // we can't be sure if they've really received the
931
                // ChannelReady message.
932
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
170✔
933
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
170✔
934
                        !l.channel.IsPending() {
334✔
935

164✔
936
                        l.log.Infof("resending ChannelReady message to peer")
164✔
937

164✔
938
                        nextRevocation, err := l.channel.NextRevocationKey()
164✔
939
                        if err != nil {
164✔
940
                                return fmt.Errorf("unable to create next "+
×
941
                                        "revocation: %v", err)
×
942
                        }
×
943

944
                        channelReadyMsg := lnwire.NewChannelReady(
164✔
945
                                l.ChanID(), nextRevocation,
164✔
946
                        )
164✔
947

164✔
948
                        // If this is a taproot channel, then we'll send the
164✔
949
                        // very same nonce that we sent above, as they should
164✔
950
                        // take the latest verification nonce we send.
164✔
951
                        if chanState.ChanType.IsTaproot() {
164✔
UNCOV
952
                                //nolint:ll
×
UNCOV
953
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
×
UNCOV
954
                        }
×
955

956
                        // For channels that negotiated the option-scid-alias
957
                        // feature bit, ensure that we send over the alias in
958
                        // the channel_ready message. We'll send the first
959
                        // alias we find for the channel since it does not
960
                        // matter which alias we send. We'll error out if no
961
                        // aliases are found.
962
                        if l.negotiatedAliasFeature() {
164✔
UNCOV
963
                                aliases := l.getAliases()
×
UNCOV
964
                                if len(aliases) == 0 {
×
965
                                        // This shouldn't happen since we
×
966
                                        // always add at least one alias before
×
967
                                        // the channel reaches the link.
×
968
                                        return fmt.Errorf("no aliases found")
×
969
                                }
×
970

971
                                // getAliases returns a copy of the alias slice
972
                                // so it is ok to use a pointer to the first
973
                                // entry.
UNCOV
974
                                channelReadyMsg.AliasScid = &aliases[0]
×
975
                        }
976

977
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
164✔
978
                        if err != nil {
164✔
979
                                return fmt.Errorf("unable to re-send "+
×
980
                                        "ChannelReady: %v", err)
×
981
                        }
×
982
                }
983

984
                // In any case, we'll then process their ChanSync message.
985
                l.log.Info("received re-establishment message from remote side")
170✔
986

170✔
987
                var (
170✔
988
                        openedCircuits []CircuitKey
170✔
989
                        closedCircuits []CircuitKey
170✔
990
                )
170✔
991

170✔
992
                // We've just received a ChanSync message from the remote
170✔
993
                // party, so we'll process the message  in order to determine
170✔
994
                // if we need to re-transmit any messages to the remote party.
170✔
995
                ctx, cancel := l.cg.Create(ctx)
170✔
996
                defer cancel()
170✔
997
                msgsToReSend, openedCircuits, closedCircuits, err =
170✔
998
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
170✔
999
                if err != nil {
170✔
UNCOV
1000
                        return err
×
UNCOV
1001
                }
×
1002

1003
                // Repopulate any identifiers for circuits that may have been
1004
                // opened or unclosed. This may happen if we needed to
1005
                // retransmit a commitment signature message.
1006
                l.openedCircuits = openedCircuits
170✔
1007
                l.closedCircuits = closedCircuits
170✔
1008

170✔
1009
                // Ensure that all packets have been have been removed from the
170✔
1010
                // link's mailbox.
170✔
1011
                if err := l.ackDownStreamPackets(); err != nil {
170✔
1012
                        return err
×
1013
                }
×
1014

1015
                if len(msgsToReSend) > 0 {
175✔
1016
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1017
                                "state", len(msgsToReSend))
5✔
1018
                }
5✔
1019

1020
                // If we have any messages to retransmit, we'll do so
1021
                // immediately so we return to a synchronized state as soon as
1022
                // possible.
1023
                for _, msg := range msgsToReSend {
181✔
1024
                        l.cfg.Peer.SendMessage(false, msg)
11✔
1025
                }
11✔
1026

UNCOV
1027
        case <-l.cg.Done():
×
UNCOV
1028
                return ErrLinkShuttingDown
×
1029
        }
1030

1031
        return nil
170✔
1032
}
1033

1034
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1035
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1036
// we previously received are reinstated in memory, and forwarded to the switch
1037
// if necessary. After a restart, this will also delete any previously
1038
// completed packages.
1039
func (l *channelLink) resolveFwdPkgs(ctx context.Context) error {
213✔
1040
        fwdPkgs, err := l.channel.LoadFwdPkgs()
213✔
1041
        if err != nil {
213✔
1042
                return err
×
1043
        }
×
1044

1045
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
213✔
1046

213✔
1047
        for _, fwdPkg := range fwdPkgs {
219✔
1048
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
6✔
1049
                        return err
×
1050
                }
×
1051
        }
1052

1053
        // If any of our reprocessing steps require an update to the commitment
1054
        // txn, we initiate a state transition to capture all relevant changes.
1055
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
213✔
UNCOV
1056
                return l.updateCommitTx(ctx)
×
UNCOV
1057
        }
×
1058

1059
        return nil
213✔
1060
}
1061

1062
// resolveFwdPkg interprets the FwdState of the provided package, either
1063
// reprocesses any outstanding htlcs in the package, or performs garbage
1064
// collection on the package.
1065
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
6✔
1066
        // Remove any completed packages to clear up space.
6✔
1067
        if fwdPkg.State == channeldb.FwdStateCompleted {
6✔
UNCOV
1068
                l.log.Debugf("removing completed fwd pkg for height=%d",
×
UNCOV
1069
                        fwdPkg.Height)
×
UNCOV
1070

×
UNCOV
1071
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
×
UNCOV
1072
                if err != nil {
×
1073
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1074
                                "%v", fwdPkg.Height, err)
×
1075
                        return err
×
1076
                }
×
1077
        }
1078

1079
        // Otherwise this is either a new package or one has gone through
1080
        // processing, but contains htlcs that need to be restored in memory.
1081
        // We replay this forwarding package to make sure our local mem state
1082
        // is resurrected, we mimic any original responses back to the remote
1083
        // party, and re-forward the relevant HTLCs to the switch.
1084

1085
        // If the package is fully acked but not completed, it must still have
1086
        // settles and fails to propagate.
1087
        if !fwdPkg.SettleFailFilter.IsFull() {
6✔
UNCOV
1088
                l.processRemoteSettleFails(fwdPkg)
×
UNCOV
1089
        }
×
1090

1091
        // Finally, replay *ALL ADDS* in this forwarding package. The
1092
        // downstream logic is able to filter out any duplicates, but we must
1093
        // shove the entire, original set of adds down the pipeline so that the
1094
        // batch of adds presented to the sphinx router does not ever change.
1095
        if !fwdPkg.AckFilter.IsFull() {
9✔
1096
                l.processRemoteAdds(fwdPkg)
3✔
1097

3✔
1098
                // If the link failed during processing the adds, we must
3✔
1099
                // return to ensure we won't attempted to update the state
3✔
1100
                // further.
3✔
1101
                if l.failed {
3✔
1102
                        return fmt.Errorf("link failed while " +
×
1103
                                "processing remote adds")
×
1104
                }
×
1105
        }
1106

1107
        return nil
6✔
1108
}
1109

1110
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1111
// removes those that can be discarded. It is safe to do this entirely in the
1112
// background, since all state is coordinated on disk. This also ensures the
1113
// link can continue to process messages and interleave database accesses.
1114
//
1115
// NOTE: This MUST be run as a goroutine.
1116
func (l *channelLink) fwdPkgGarbager() {
213✔
1117
        defer l.cg.WgDone()
213✔
1118

213✔
1119
        l.cfg.FwdPkgGCTicker.Resume()
213✔
1120
        defer l.cfg.FwdPkgGCTicker.Stop()
213✔
1121

213✔
1122
        if err := l.loadAndRemove(); err != nil {
213✔
1123
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1124
        }
×
1125

1126
        for {
441✔
1127
                select {
228✔
1128
                case <-l.cfg.FwdPkgGCTicker.Ticks():
15✔
1129
                        if err := l.loadAndRemove(); err != nil {
30✔
1130
                                l.log.Warnf("unable to remove fwd pkgs: %v",
15✔
1131
                                        err)
15✔
1132
                                continue
15✔
1133
                        }
1134
                case <-l.cg.Done():
202✔
1135
                        return
202✔
1136
                }
1137
        }
1138
}
1139

1140
// loadAndRemove loads all the channels forwarding packages and determines if
1141
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1142
// a longer tick interval can be used.
1143
func (l *channelLink) loadAndRemove() error {
228✔
1144
        fwdPkgs, err := l.channel.LoadFwdPkgs()
228✔
1145
        if err != nil {
243✔
1146
                return err
15✔
1147
        }
15✔
1148

1149
        var removeHeights []uint64
213✔
1150
        for _, fwdPkg := range fwdPkgs {
219✔
1151
                if fwdPkg.State != channeldb.FwdStateCompleted {
12✔
1152
                        continue
6✔
1153
                }
1154

UNCOV
1155
                removeHeights = append(removeHeights, fwdPkg.Height)
×
1156
        }
1157

1158
        // If removeHeights is empty, return early so we don't use a db
1159
        // transaction.
1160
        if len(removeHeights) == 0 {
426✔
1161
                return nil
213✔
1162
        }
213✔
1163

UNCOV
1164
        return l.channel.RemoveFwdPkgs(removeHeights...)
×
1165
}
1166

1167
// handleChanSyncErr performs the error handling logic in the case where we
1168
// could not successfully syncChanStates with our channel peer.
UNCOV
1169
func (l *channelLink) handleChanSyncErr(err error) {
×
UNCOV
1170
        l.log.Warnf("error when syncing channel states: %v", err)
×
UNCOV
1171

×
UNCOV
1172
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
×
UNCOV
1173

×
UNCOV
1174
        switch {
×
UNCOV
1175
        case errors.Is(err, ErrLinkShuttingDown):
×
UNCOV
1176
                l.log.Debugf("unable to sync channel states, link is " +
×
UNCOV
1177
                        "shutting down")
×
UNCOV
1178
                return
×
1179

1180
        // We failed syncing the commit chains, probably because the remote has
1181
        // lost state. We should force close the channel.
UNCOV
1182
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
×
UNCOV
1183
                fallthrough
×
1184

1185
        // The remote sent us an invalid last commit secret, we should force
1186
        // close the channel.
1187
        // TODO(halseth): and permanently ban the peer?
UNCOV
1188
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
×
UNCOV
1189
                fallthrough
×
1190

1191
        // The remote sent us a commit point different from what they sent us
1192
        // before.
1193
        // TODO(halseth): ban peer?
UNCOV
1194
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
×
UNCOV
1195
                // We'll fail the link and tell the peer to force close the
×
UNCOV
1196
                // channel. Note that the database state is not updated here,
×
UNCOV
1197
                // but will be updated when the close transaction is ready to
×
UNCOV
1198
                // avoid that we go down before storing the transaction in the
×
UNCOV
1199
                // db.
×
UNCOV
1200
                l.failf(
×
UNCOV
1201
                        LinkFailureError{
×
UNCOV
1202
                                code:          ErrSyncError,
×
UNCOV
1203
                                FailureAction: LinkFailureForceClose,
×
UNCOV
1204
                        },
×
UNCOV
1205
                        "unable to synchronize channel states: %v", err,
×
UNCOV
1206
                )
×
1207

1208
        // We have lost state and cannot safely force close the channel. Fail
1209
        // the channel and wait for the remote to hopefully force close it. The
1210
        // remote has sent us its latest unrevoked commitment point, and we'll
1211
        // store it in the database, such that we can attempt to recover the
1212
        // funds if the remote force closes the channel.
UNCOV
1213
        case errors.As(err, &errDataLoss):
×
UNCOV
1214
                err := l.channel.MarkDataLoss(
×
UNCOV
1215
                        errDataLoss.CommitPoint,
×
UNCOV
1216
                )
×
UNCOV
1217
                if err != nil {
×
1218
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1219
                                err)
×
1220
                }
×
1221

1222
        // We determined the commit chains were not possible to sync. We
1223
        // cautiously fail the channel, but don't force close.
1224
        // TODO(halseth): can we safely force close in any cases where this
1225
        // error is returned?
1226
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1227
                if err := l.channel.MarkBorked(); err != nil {
×
1228
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1229
                }
×
1230

1231
        // Other, unspecified error.
1232
        default:
×
1233
        }
1234

UNCOV
1235
        l.failf(
×
UNCOV
1236
                LinkFailureError{
×
UNCOV
1237
                        code:          ErrRecoveryError,
×
UNCOV
1238
                        FailureAction: LinkFailureForceNone,
×
UNCOV
1239
                },
×
UNCOV
1240
                "unable to synchronize channel states: %v", err,
×
UNCOV
1241
        )
×
1242
}
1243

1244
// htlcManager is the primary goroutine which drives a channel's commitment
1245
// update state-machine in response to messages received via several channels.
1246
// This goroutine reads messages from the upstream (remote) peer, and also from
1247
// downstream channel managed by the channel link. In the event that an htlc
1248
// needs to be forwarded, then send-only forward handler is used which sends
1249
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1250
// all timeouts for any active HTLCs, manages the channel's revocation window,
1251
// and also the htlc trickle queue+timer for this active channels.
1252
//
1253
// NOTE: This MUST be run as a goroutine.
1254
//
1255
//nolint:funlen
1256
func (l *channelLink) htlcManager(ctx context.Context) {
213✔
1257
        defer func() {
416✔
1258
                l.cfg.BatchTicker.Stop()
203✔
1259
                l.cg.WgDone()
203✔
1260
                l.log.Infof("exited")
203✔
1261
        }()
203✔
1262

1263
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
213✔
1264

213✔
1265
        // Notify any clients that the link is now in the switch via an
213✔
1266
        // ActiveLinkEvent. We'll also defer an inactive link notification for
213✔
1267
        // when the link exits to ensure that every active notification is
213✔
1268
        // matched by an inactive one.
213✔
1269
        l.cfg.NotifyActiveLink(l.ChannelPoint())
213✔
1270
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
213✔
1271

213✔
1272
        // TODO(roasbeef): need to call wipe chan whenever D/C?
213✔
1273

213✔
1274
        // If this isn't the first time that this channel link has been
213✔
1275
        // created, then we'll need to check to see if we need to
213✔
1276
        // re-synchronize state with the remote peer. settledHtlcs is a map of
213✔
1277
        // HTLC's that we re-settled as part of the channel state sync.
213✔
1278
        if l.cfg.SyncStates {
383✔
1279
                err := l.syncChanStates(ctx)
170✔
1280
                if err != nil {
170✔
UNCOV
1281
                        l.handleChanSyncErr(err)
×
UNCOV
1282
                        return
×
UNCOV
1283
                }
×
1284
        }
1285

1286
        // If a shutdown message has previously been sent on this link, then we
1287
        // need to make sure that we have disabled any HTLC adds on the outgoing
1288
        // direction of the link and that we re-resend the same shutdown message
1289
        // that we previously sent.
1290
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
213✔
UNCOV
1291
                // Immediately disallow any new outgoing HTLCs.
×
UNCOV
1292
                if !l.DisableAdds(Outgoing) {
×
1293
                        l.log.Warnf("Outgoing link adds already disabled")
×
1294
                }
×
1295

1296
                // Re-send the shutdown message the peer. Since syncChanStates
1297
                // would have sent any outstanding CommitSig, it is fine for us
1298
                // to immediately queue the shutdown message now.
UNCOV
1299
                err := l.cfg.Peer.SendMessage(false, &shutdown)
×
UNCOV
1300
                if err != nil {
×
1301
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1302
                }
×
1303
        })
1304

1305
        // We've successfully reestablished the channel, mark it as such to
1306
        // allow the switch to forward HTLCs in the outbound direction.
1307
        l.markReestablished()
213✔
1308

213✔
1309
        // Now that we've received both channel_ready and channel reestablish,
213✔
1310
        // we can go ahead and send the active channel notification. We'll also
213✔
1311
        // defer the inactive notification for when the link exits to ensure
213✔
1312
        // that every active notification is matched by an inactive one.
213✔
1313
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
213✔
1314
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
213✔
1315

213✔
1316
        // With the channel states synced, we now reset the mailbox to ensure
213✔
1317
        // we start processing all unacked packets in order. This is done here
213✔
1318
        // to ensure that all acknowledgments that occur during channel
213✔
1319
        // resynchronization have taken affect, causing us only to pull unacked
213✔
1320
        // packets after starting to read from the downstream mailbox.
213✔
1321
        l.mailBox.ResetPackets()
213✔
1322

213✔
1323
        // After cleaning up any memory pertaining to incoming packets, we now
213✔
1324
        // replay our forwarding packages to handle any htlcs that can be
213✔
1325
        // processed locally, or need to be forwarded out to the switch. We will
213✔
1326
        // only attempt to resolve packages if our short chan id indicates that
213✔
1327
        // the channel is not pending, otherwise we should have no htlcs to
213✔
1328
        // reforward.
213✔
1329
        if l.ShortChanID() != hop.Source {
426✔
1330
                err := l.resolveFwdPkgs(ctx)
213✔
1331
                switch err {
213✔
1332
                // No error was encountered, success.
1333
                case nil:
213✔
1334

1335
                // If the duplicate keystone error was encountered, we'll fail
1336
                // without sending an Error message to the peer.
1337
                case ErrDuplicateKeystone:
×
1338
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1339
                                "temporary circuit error: %v", err)
×
1340
                        return
×
1341

1342
                // A non-nil error was encountered, send an Error message to
1343
                // the peer.
1344
                default:
×
1345
                        l.failf(LinkFailureError{code: ErrInternalError},
×
1346
                                "unable to resolve fwd pkgs: %v", err)
×
1347
                        return
×
1348
                }
1349

1350
                // With our link's in-memory state fully reconstructed, spawn a
1351
                // goroutine to manage the reclamation of disk space occupied by
1352
                // completed forwarding packages.
1353
                l.cg.WgAdd(1)
213✔
1354
                go l.fwdPkgGarbager()
213✔
1355
        }
1356

1357
        for {
4,351✔
1358
                // We must always check if we failed at some point processing
4,138✔
1359
                // the last update before processing the next.
4,138✔
1360
                if l.failed {
4,151✔
1361
                        l.log.Errorf("link failed, exiting htlcManager")
13✔
1362
                        return
13✔
1363
                }
13✔
1364

1365
                // If the previous event resulted in a non-empty batch, resume
1366
                // the batch ticker so that it can be cleared. Otherwise pause
1367
                // the ticker to prevent waking up the htlcManager while the
1368
                // batch is empty.
1369
                numUpdates := l.channel.NumPendingUpdates(
4,125✔
1370
                        lntypes.Local, lntypes.Remote,
4,125✔
1371
                )
4,125✔
1372
                if numUpdates > 0 {
4,631✔
1373
                        l.cfg.BatchTicker.Resume()
506✔
1374
                        l.log.Tracef("BatchTicker resumed, "+
506✔
1375
                                "NumPendingUpdates(Local, Remote)=%d",
506✔
1376
                                numUpdates,
506✔
1377
                        )
506✔
1378
                } else {
4,125✔
1379
                        l.cfg.BatchTicker.Pause()
3,619✔
1380
                        l.log.Trace("BatchTicker paused due to zero " +
3,619✔
1381
                                "NumPendingUpdates(Local, Remote)")
3,619✔
1382
                }
3,619✔
1383

1384
                select {
4,125✔
1385
                // We have a new hook that needs to be run when we reach a clean
1386
                // channel state.
1387
                case hook := <-l.flushHooks.newTransients:
1✔
1388
                        if l.channel.IsChannelClean() {
1✔
UNCOV
1389
                                hook()
×
1390
                        } else {
1✔
1391
                                l.flushHooks.alloc(hook)
1✔
1392
                        }
1✔
1393

1394
                // We have a new hook that needs to be run when we have
1395
                // committed all of our updates.
1396
                case hook := <-l.outgoingCommitHooks.newTransients:
1✔
1397
                        if !l.channel.OweCommitment() {
1✔
UNCOV
1398
                                hook()
×
1399
                        } else {
1✔
1400
                                l.outgoingCommitHooks.alloc(hook)
1✔
1401
                        }
1✔
1402

1403
                // We have a new hook that needs to be run when our peer has
1404
                // committed all of their updates.
1405
                case hook := <-l.incomingCommitHooks.newTransients:
×
1406
                        if !l.channel.NeedCommitment() {
×
1407
                                hook()
×
1408
                        } else {
×
1409
                                l.incomingCommitHooks.alloc(hook)
×
1410
                        }
×
1411

1412
                // Our update fee timer has fired, so we'll check the network
1413
                // fee to see if we should adjust our commitment fee.
1414
                case <-l.updateFeeTimer.C:
4✔
1415
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1416

4✔
1417
                        // If we're not the initiator of the channel, don't we
4✔
1418
                        // don't control the fees, so we can ignore this.
4✔
1419
                        if !l.channel.IsInitiator() {
4✔
1420
                                continue
×
1421
                        }
1422

1423
                        // If we are the initiator, then we'll sample the
1424
                        // current fee rate to get into the chain within 3
1425
                        // blocks.
1426
                        netFee, err := l.sampleNetworkFee()
4✔
1427
                        if err != nil {
4✔
1428
                                l.log.Errorf("unable to sample network fee: %v",
×
1429
                                        err)
×
1430
                                continue
×
1431
                        }
1432

1433
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1434

4✔
1435
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1436
                                netFee, minRelayFee,
4✔
1437
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1438
                                l.cfg.MaxFeeAllocation,
4✔
1439
                        )
4✔
1440

4✔
1441
                        // We determine if we should adjust the commitment fee
4✔
1442
                        // based on the current commitment fee, the suggested
4✔
1443
                        // new commitment fee and the current minimum relay fee
4✔
1444
                        // rate.
4✔
1445
                        commitFee := l.channel.CommitFeeRate()
4✔
1446
                        if !shouldAdjustCommitFee(
4✔
1447
                                newCommitFee, commitFee, minRelayFee,
4✔
1448
                        ) {
5✔
1449

1✔
1450
                                continue
1✔
1451
                        }
1452

1453
                        // If we do, then we'll send a new UpdateFee message to
1454
                        // the remote party, to be locked in with a new update.
1455
                        err = l.updateChannelFee(ctx, newCommitFee)
3✔
1456
                        if err != nil {
3✔
1457
                                l.log.Errorf("unable to update fee rate: %v",
×
1458
                                        err)
×
1459
                                continue
×
1460
                        }
1461

1462
                // The underlying channel has notified us of a unilateral close
1463
                // carried out by the remote peer. In the case of such an
1464
                // event, we'll wipe the channel state from the peer, and mark
1465
                // the contract as fully settled. Afterwards we can exit.
1466
                //
1467
                // TODO(roasbeef): add force closure? also breach?
UNCOV
1468
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
×
UNCOV
1469
                        l.log.Warnf("remote peer has closed on-chain")
×
UNCOV
1470

×
UNCOV
1471
                        // TODO(roasbeef): remove all together
×
UNCOV
1472
                        go func() {
×
UNCOV
1473
                                chanPoint := l.channel.ChannelPoint()
×
UNCOV
1474
                                l.cfg.Peer.WipeChannel(&chanPoint)
×
UNCOV
1475
                        }()
×
1476

UNCOV
1477
                        return
×
1478

1479
                case <-l.cfg.BatchTicker.Ticks():
197✔
1480
                        // Attempt to extend the remote commitment chain
197✔
1481
                        // including all the currently pending entries. If the
197✔
1482
                        // send was unsuccessful, then abandon the update,
197✔
1483
                        // waiting for the revocation window to open up.
197✔
1484
                        if !l.updateCommitTxOrFail(ctx) {
197✔
1485
                                return
×
1486
                        }
×
1487

1488
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1489
                        l.failf(
1✔
1490
                                LinkFailureError{
1✔
1491
                                        code:          ErrRemoteUnresponsive,
1✔
1492
                                        FailureAction: LinkFailureDisconnect,
1✔
1493
                                },
1✔
1494
                                "unable to complete dance",
1✔
1495
                        )
1✔
1496
                        return
1✔
1497

1498
                // A message from the switch was just received. This indicates
1499
                // that the link is an intermediate hop in a multi-hop HTLC
1500
                // circuit.
1501
                case pkt := <-l.downstream:
521✔
1502
                        l.handleDownstreamPkt(ctx, pkt)
521✔
1503

1504
                // A message from the connected peer was just received. This
1505
                // indicates that we have a new incoming HTLC, either directly
1506
                // for us, or part of a multi-hop HTLC circuit.
1507
                case msg := <-l.upstream:
3,145✔
1508
                        l.handleUpstreamMsg(ctx, msg)
3,145✔
1509

1510
                // A htlc resolution is received. This means that we now have a
1511
                // resolution for a previously accepted htlc.
1512
                case hodlItem := <-l.hodlQueue.ChanOut():
55✔
1513
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
55✔
1514
                        err := l.processHodlQueue(ctx, htlcResolution)
55✔
1515
                        switch err {
55✔
1516
                        // No error, success.
1517
                        case nil:
54✔
1518

1519
                        // If the duplicate keystone error was encountered,
1520
                        // fail back gracefully.
1521
                        case ErrDuplicateKeystone:
×
1522
                                l.failf(LinkFailureError{
×
1523
                                        code: ErrCircuitError,
×
1524
                                }, "process hodl queue: "+
×
1525
                                        "temporary circuit error: %v",
×
1526
                                        err,
×
1527
                                )
×
1528

1529
                        // Send an Error message to the peer.
1530
                        default:
1✔
1531
                                l.failf(LinkFailureError{
1✔
1532
                                        code: ErrInternalError,
1✔
1533
                                }, "process hodl queue: unable to update "+
1✔
1534
                                        "commitment: %v", err,
1✔
1535
                                )
1✔
1536
                        }
1537

1538
                case qReq := <-l.quiescenceReqs:
1✔
1539
                        l.quiescer.InitStfu(qReq)
1✔
1540

1✔
1541
                        if l.noDanglingUpdates(lntypes.Local) {
2✔
1542
                                err := l.quiescer.SendOwedStfu()
1✔
1543
                                if err != nil {
1✔
1544
                                        l.stfuFailf(
×
1545
                                                "SendOwedStfu: %s", err.Error(),
×
1546
                                        )
×
1547
                                        res := fn.Err[lntypes.ChannelParty](err)
×
1548
                                        qReq.Resolve(res)
×
1549
                                }
×
1550
                        }
1551

1552
                case <-l.cg.Done():
189✔
1553
                        return
189✔
1554
                }
1555
        }
1556
}
1557

1558
// processHodlQueue processes a received htlc resolution and continues reading
1559
// from the hodl queue until no more resolutions remain. When this function
1560
// returns without an error, the commit tx should be updated.
1561
func (l *channelLink) processHodlQueue(ctx context.Context,
1562
        firstResolution invoices.HtlcResolution) error {
55✔
1563

55✔
1564
        // Try to read all waiting resolution messages, so that they can all be
55✔
1565
        // processed in a single commitment tx update.
55✔
1566
        htlcResolution := firstResolution
55✔
1567
loop:
55✔
1568
        for {
110✔
1569
                // Lookup all hodl htlcs that can be failed or settled with this event.
55✔
1570
                // The hodl htlc must be present in the map.
55✔
1571
                circuitKey := htlcResolution.CircuitKey()
55✔
1572
                hodlHtlc, ok := l.hodlMap[circuitKey]
55✔
1573
                if !ok {
55✔
1574
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1575
                }
×
1576

1577
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
55✔
1578
                        return err
×
1579
                }
×
1580

1581
                // Clean up hodl map.
1582
                delete(l.hodlMap, circuitKey)
55✔
1583

55✔
1584
                select {
55✔
UNCOV
1585
                case item := <-l.hodlQueue.ChanOut():
×
UNCOV
1586
                        htlcResolution = item.(invoices.HtlcResolution)
×
1587

1588
                // No need to process it if the link is broken.
1589
                case <-l.cg.Done():
×
1590
                        return ErrLinkShuttingDown
×
1591

1592
                default:
55✔
1593
                        break loop
55✔
1594
                }
1595
        }
1596

1597
        // Update the commitment tx.
1598
        if err := l.updateCommitTx(ctx); err != nil {
56✔
1599
                return err
1✔
1600
        }
1✔
1601

1602
        return nil
54✔
1603
}
1604

1605
// processHtlcResolution applies a received htlc resolution to the provided
1606
// htlc. When this function returns without an error, the commit tx should be
1607
// updated.
1608
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1609
        htlc hodlHtlc) error {
201✔
1610

201✔
1611
        circuitKey := resolution.CircuitKey()
201✔
1612

201✔
1613
        // Determine required action for the resolution based on the type of
201✔
1614
        // resolution we have received.
201✔
1615
        switch res := resolution.(type) {
201✔
1616
        // Settle htlcs that returned a settle resolution using the preimage
1617
        // in the resolution.
1618
        case *invoices.HtlcSettleResolution:
197✔
1619
                l.log.Debugf("received settle resolution for %v "+
197✔
1620
                        "with outcome: %v", circuitKey, res.Outcome)
197✔
1621

197✔
1622
                return l.settleHTLC(
197✔
1623
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
197✔
1624
                )
197✔
1625

1626
        // For htlc failures, we get the relevant failure message based
1627
        // on the failure resolution and then fail the htlc.
1628
        case *invoices.HtlcFailResolution:
4✔
1629
                l.log.Debugf("received cancel resolution for "+
4✔
1630
                        "%v with outcome: %v", circuitKey, res.Outcome)
4✔
1631

4✔
1632
                // Get the lnwire failure message based on the resolution
4✔
1633
                // result.
4✔
1634
                failure := getResolutionFailure(res, htlc.add.Amount)
4✔
1635

4✔
1636
                l.sendHTLCError(
4✔
1637
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
4✔
1638
                        true,
4✔
1639
                )
4✔
1640
                return nil
4✔
1641

1642
        // Fail if we do not get a settle of fail resolution, since we
1643
        // are only expecting to handle settles and fails.
1644
        default:
×
1645
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1646
                        resolution)
×
1647
        }
1648
}
1649

1650
// getResolutionFailure returns the wire message that a htlc resolution should
1651
// be failed with.
1652
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1653
        amount lnwire.MilliSatoshi) *LinkError {
4✔
1654

4✔
1655
        // If the resolution has been resolved as part of a MPP timeout,
4✔
1656
        // we need to fail the htlc with lnwire.FailMppTimeout.
4✔
1657
        if resolution.Outcome == invoices.ResultMppTimeout {
4✔
1658
                return NewDetailedLinkError(
×
1659
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1660
                )
×
1661
        }
×
1662

1663
        // If the htlc is not a MPP timeout, we fail it with
1664
        // FailIncorrectDetails. This error is sent for invoice payment
1665
        // failures such as underpayment/ expiry too soon and hodl invoices
1666
        // (which return FailIncorrectDetails to avoid leaking information).
1667
        incorrectDetails := lnwire.NewFailIncorrectDetails(
4✔
1668
                amount, uint32(resolution.AcceptHeight),
4✔
1669
        )
4✔
1670

4✔
1671
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
4✔
1672
}
1673

1674
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1675
// within the link's configuration that will be used to determine when the link
1676
// should propose an update to its commitment fee rate.
1677
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
217✔
1678
        lower := int64(l.cfg.MinUpdateTimeout)
217✔
1679
        upper := int64(l.cfg.MaxUpdateTimeout)
217✔
1680
        return time.Duration(prand.Int63n(upper-lower) + lower)
217✔
1681
}
217✔
1682

1683
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1684
// downstream HTLC Switch.
1685
func (l *channelLink) handleDownstreamUpdateAdd(ctx context.Context,
1686
        pkt *htlcPacket) error {
480✔
1687

480✔
1688
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
480✔
1689
        if !ok {
480✔
1690
                return errors.New("not an UpdateAddHTLC packet")
×
1691
        }
×
1692

1693
        // If we are flushing the link in the outgoing direction or we have
1694
        // already sent Stfu, then we can't add new htlcs to the link and we
1695
        // need to bounce it.
1696
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
480✔
1697
                l.mailBox.FailAdd(pkt)
×
1698

×
1699
                return NewDetailedLinkError(
×
1700
                        &lnwire.FailTemporaryChannelFailure{},
×
1701
                        OutgoingFailureLinkNotEligible,
×
1702
                )
×
1703
        }
×
1704

1705
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1706
        // arbitrary delays between the switch adding an ADD to the
1707
        // mailbox, and the HTLC being added to the commitment state.
1708
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
480✔
1709
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1710
                l.mailBox.AckPacket(pkt.inKey())
×
1711
                return nil
×
1712
        }
×
1713

1714
        // Check if we can add the HTLC here without exceededing the max fee
1715
        // exposure threshold.
1716
        if l.isOverexposedWithHtlc(htlc, false) {
484✔
1717
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1718
                        "exposure exceeded")
4✔
1719

4✔
1720
                l.mailBox.FailAdd(pkt)
4✔
1721

4✔
1722
                return NewDetailedLinkError(
4✔
1723
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1724
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1725
                )
4✔
1726
        }
4✔
1727

1728
        // A new payment has been initiated via the downstream channel,
1729
        // so we add the new HTLC to our local log, then update the
1730
        // commitment chains.
1731
        htlc.ChanID = l.ChanID()
476✔
1732
        openCircuitRef := pkt.inKey()
476✔
1733

476✔
1734
        // We enforce the fee buffer for the commitment transaction because
476✔
1735
        // we are in control of adding this htlc. Nothing has locked-in yet so
476✔
1736
        // we can securely enforce the fee buffer which is only relevant if we
476✔
1737
        // are the initiator of the channel.
476✔
1738
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
476✔
1739
        if err != nil {
477✔
1740
                // The HTLC was unable to be added to the state machine,
1✔
1741
                // as a result, we'll signal the switch to cancel the
1✔
1742
                // pending payment.
1✔
1743
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
1✔
1744
                        err)
1✔
1745

1✔
1746
                // Remove this packet from the link's mailbox, this
1✔
1747
                // prevents it from being reprocessed if the link
1✔
1748
                // restarts and resets it mailbox. If this response
1✔
1749
                // doesn't make it back to the originating link, it will
1✔
1750
                // be rejected upon attempting to reforward the Add to
1✔
1751
                // the switch, since the circuit was never fully opened,
1✔
1752
                // and the forwarding package shows it as
1✔
1753
                // unacknowledged.
1✔
1754
                l.mailBox.FailAdd(pkt)
1✔
1755

1✔
1756
                return NewDetailedLinkError(
1✔
1757
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
1758
                        OutgoingFailureDownstreamHtlcAdd,
1✔
1759
                )
1✔
1760
        }
1✔
1761

1762
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
475✔
1763
                "local_log_index=%v, pend_updates=%v",
475✔
1764
                htlc.PaymentHash[:], index,
475✔
1765
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
475✔
1766

475✔
1767
        pkt.outgoingChanID = l.ShortChanID()
475✔
1768
        pkt.outgoingHTLCID = index
475✔
1769
        htlc.ID = index
475✔
1770

475✔
1771
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
475✔
1772
                pkt.inKey(), pkt.outKey())
475✔
1773

475✔
1774
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
475✔
1775
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
475✔
1776

475✔
1777
        _ = l.cfg.Peer.SendMessage(false, htlc)
475✔
1778

475✔
1779
        // Send a forward event notification to htlcNotifier.
475✔
1780
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
475✔
1781
                newHtlcKey(pkt),
475✔
1782
                HtlcInfo{
475✔
1783
                        IncomingTimeLock: pkt.incomingTimeout,
475✔
1784
                        IncomingAmt:      pkt.incomingAmount,
475✔
1785
                        OutgoingTimeLock: htlc.Expiry,
475✔
1786
                        OutgoingAmt:      htlc.Amount,
475✔
1787
                },
475✔
1788
                getEventType(pkt),
475✔
1789
        )
475✔
1790

475✔
1791
        l.tryBatchUpdateCommitTx(ctx)
475✔
1792

475✔
1793
        return nil
475✔
1794
}
1795

1796
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1797
// Switch. Possible messages sent by the switch include requests to forward new
1798
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1799
// cleared HTLCs with the upstream peer.
1800
//
1801
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1802
func (l *channelLink) handleDownstreamPkt(ctx context.Context,
1803
        pkt *htlcPacket) {
521✔
1804

521✔
1805
        if pkt.htlc.MsgType().IsChannelUpdate() &&
521✔
1806
                !l.quiescer.CanSendUpdates() {
521✔
1807

×
1808
                l.log.Warnf("unable to process channel update. "+
×
1809
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1810

×
1811
                return
×
1812
        }
×
1813

1814
        switch htlc := pkt.htlc.(type) {
521✔
1815
        case *lnwire.UpdateAddHTLC:
480✔
1816
                // Handle add message. The returned error can be ignored,
480✔
1817
                // because it is also sent through the mailbox.
480✔
1818
                _ = l.handleDownstreamUpdateAdd(ctx, pkt)
480✔
1819

1820
        case *lnwire.UpdateFulfillHTLC:
23✔
1821
                // If hodl.SettleOutgoing mode is active, we exit early to
23✔
1822
                // simulate arbitrary delays between the switch adding the
23✔
1823
                // SETTLE to the mailbox, and the HTLC being added to the
23✔
1824
                // commitment state.
23✔
1825
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
23✔
1826
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1827
                        l.mailBox.AckPacket(pkt.inKey())
×
1828
                        return
×
1829
                }
×
1830

1831
                // An HTLC we forward to the switch has just settled somewhere
1832
                // upstream. Therefore we settle the HTLC within the our local
1833
                // state machine.
1834
                inKey := pkt.inKey()
23✔
1835
                err := l.channel.SettleHTLC(
23✔
1836
                        htlc.PaymentPreimage,
23✔
1837
                        pkt.incomingHTLCID,
23✔
1838
                        pkt.sourceRef,
23✔
1839
                        pkt.destRef,
23✔
1840
                        &inKey,
23✔
1841
                )
23✔
1842
                if err != nil {
23✔
1843
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1844
                                "circuit-key=%v: %v", inKey, err)
×
1845

×
1846
                        // If the HTLC index for Settle response was not known
×
1847
                        // to our commitment state, it has already been
×
1848
                        // cleaned up by a prior response. We'll thus try to
×
1849
                        // clean up any lingering state to ensure we don't
×
1850
                        // continue reforwarding.
×
1851
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1852
                                l.cleanupSpuriousResponse(pkt)
×
1853
                        }
×
1854

1855
                        // Remove the packet from the link's mailbox to ensure
1856
                        // it doesn't get replayed after a reconnection.
1857
                        l.mailBox.AckPacket(inKey)
×
1858

×
1859
                        return
×
1860
                }
1861

1862
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
23✔
1863
                        "%s->%s", pkt.inKey(), pkt.outKey())
23✔
1864

23✔
1865
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
23✔
1866

23✔
1867
                // With the HTLC settled, we'll need to populate the wire
23✔
1868
                // message to target the specific channel and HTLC to be
23✔
1869
                // canceled.
23✔
1870
                htlc.ChanID = l.ChanID()
23✔
1871
                htlc.ID = pkt.incomingHTLCID
23✔
1872

23✔
1873
                // Then we send the HTLC settle message to the connected peer
23✔
1874
                // so we can continue the propagation of the settle message.
23✔
1875
                l.cfg.Peer.SendMessage(false, htlc)
23✔
1876

23✔
1877
                // Send a settle event notification to htlcNotifier.
23✔
1878
                l.cfg.HtlcNotifier.NotifySettleEvent(
23✔
1879
                        newHtlcKey(pkt),
23✔
1880
                        htlc.PaymentPreimage,
23✔
1881
                        getEventType(pkt),
23✔
1882
                )
23✔
1883

23✔
1884
                // Immediately update the commitment tx to minimize latency.
23✔
1885
                l.updateCommitTxOrFail(ctx)
23✔
1886

1887
        case *lnwire.UpdateFailHTLC:
18✔
1888
                // If hodl.FailOutgoing mode is active, we exit early to
18✔
1889
                // simulate arbitrary delays between the switch adding a FAIL to
18✔
1890
                // the mailbox, and the HTLC being added to the commitment
18✔
1891
                // state.
18✔
1892
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
18✔
1893
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1894
                        l.mailBox.AckPacket(pkt.inKey())
×
1895
                        return
×
1896
                }
×
1897

1898
                // An HTLC cancellation has been triggered somewhere upstream,
1899
                // we'll remove then HTLC from our local state machine.
1900
                inKey := pkt.inKey()
18✔
1901
                err := l.channel.FailHTLC(
18✔
1902
                        pkt.incomingHTLCID,
18✔
1903
                        htlc.Reason,
18✔
1904
                        pkt.sourceRef,
18✔
1905
                        pkt.destRef,
18✔
1906
                        &inKey,
18✔
1907
                )
18✔
1908
                if err != nil {
20✔
1909
                        l.log.Errorf("unable to cancel incoming HTLC for "+
2✔
1910
                                "circuit-key=%v: %v", inKey, err)
2✔
1911

2✔
1912
                        // If the HTLC index for Fail response was not known to
2✔
1913
                        // our commitment state, it has already been cleaned up
2✔
1914
                        // by a prior response. We'll thus try to clean up any
2✔
1915
                        // lingering state to ensure we don't continue
2✔
1916
                        // reforwarding.
2✔
1917
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
4✔
1918
                                l.cleanupSpuriousResponse(pkt)
2✔
1919
                        }
2✔
1920

1921
                        // Remove the packet from the link's mailbox to ensure
1922
                        // it doesn't get replayed after a reconnection.
1923
                        l.mailBox.AckPacket(inKey)
2✔
1924

2✔
1925
                        return
2✔
1926
                }
1927

1928
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
16✔
1929
                        pkt.inKey(), pkt.outKey())
16✔
1930

16✔
1931
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
16✔
1932

16✔
1933
                // With the HTLC removed, we'll need to populate the wire
16✔
1934
                // message to target the specific channel and HTLC to be
16✔
1935
                // canceled. The "Reason" field will have already been set
16✔
1936
                // within the switch.
16✔
1937
                htlc.ChanID = l.ChanID()
16✔
1938
                htlc.ID = pkt.incomingHTLCID
16✔
1939

16✔
1940
                // We send the HTLC message to the peer which initially created
16✔
1941
                // the HTLC. If the incoming blinding point is non-nil, we
16✔
1942
                // know that we are a relaying node in a blinded path.
16✔
1943
                // Otherwise, we're either an introduction node or not part of
16✔
1944
                // a blinded path at all.
16✔
1945
                if err := l.sendIncomingHTLCFailureMsg(
16✔
1946
                        htlc.ID,
16✔
1947
                        pkt.obfuscator,
16✔
1948
                        htlc.Reason,
16✔
1949
                ); err != nil {
16✔
1950
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1951
                                err)
×
1952

×
1953
                        return
×
1954
                }
×
1955

1956
                // If the packet does not have a link failure set, it failed
1957
                // further down the route so we notify a forwarding failure.
1958
                // Otherwise, we notify a link failure because it failed at our
1959
                // node.
1960
                if pkt.linkFailure != nil {
26✔
1961
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
10✔
1962
                                newHtlcKey(pkt),
10✔
1963
                                newHtlcInfo(pkt),
10✔
1964
                                getEventType(pkt),
10✔
1965
                                pkt.linkFailure,
10✔
1966
                                false,
10✔
1967
                        )
10✔
1968
                } else {
16✔
1969
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
6✔
1970
                                newHtlcKey(pkt), getEventType(pkt),
6✔
1971
                        )
6✔
1972
                }
6✔
1973

1974
                // Immediately update the commitment tx to minimize latency.
1975
                l.updateCommitTxOrFail(ctx)
16✔
1976
        }
1977
}
1978

1979
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1980
// full.
1981
func (l *channelLink) tryBatchUpdateCommitTx(ctx context.Context) {
475✔
1982
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
475✔
1983
        if pending < uint64(l.cfg.BatchSize) {
922✔
1984
                return
447✔
1985
        }
447✔
1986

1987
        l.updateCommitTxOrFail(ctx)
28✔
1988
}
1989

1990
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1991
// associated with this packet. If successful in doing so, it will also purge
1992
// the open circuit from the circuit map and remove the packet from the link's
1993
// mailbox.
1994
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1995
        inKey := pkt.inKey()
2✔
1996

2✔
1997
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1998
                "circuit-key=%v", inKey)
2✔
1999

2✔
2000
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
2001
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
2002
        if pkt.sourceRef == nil {
3✔
2003
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
2004
                        "circuit-key=%v, does not contain source reference",
1✔
2005
                        inKey)
1✔
2006
                return
1✔
2007
        }
1✔
2008

2009
        // If the source reference is present,  we will try to prevent this link
2010
        // from resending the packet to the switch. To do so, we ack the AddRef
2011
        // of the incoming HTLC belonging to this link.
2012
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
2013
        if err != nil {
1✔
2014
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2015
                        "circuit-key=%v: %v", inKey, err)
×
2016

×
2017
                // If this operation failed, it is unsafe to attempt removal of
×
2018
                // the destination reference or circuit, so we exit early. The
×
2019
                // cleanup may proceed with a different packet in the future
×
2020
                // that succeeds on this step.
×
2021
                return
×
2022
        }
×
2023

2024
        // Now that we know this link will stop retransmitting Adds to the
2025
        // switch, we can begin to teardown the response reference and circuit
2026
        // map.
2027
        //
2028
        // If the packet includes a destination reference, then a response for
2029
        // this HTLC was locked into the outgoing channel. Attempt to remove
2030
        // this reference, so we stop retransmitting the response internally.
2031
        // Even if this fails, we will proceed in trying to delete the circuit.
2032
        // When retransmitting responses, the destination references will be
2033
        // cleaned up if an open circuit is not found in the circuit map.
2034
        if pkt.destRef != nil {
1✔
2035
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2036
                if err != nil {
×
2037
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2038
                                "for incoming circuit-key=%v: %v",
×
2039
                                inKey, err)
×
2040
                }
×
2041
        }
2042

2043
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
2044

1✔
2045
        // With all known references acked, we can now safely delete the circuit
1✔
2046
        // from the switch's circuit map, as the state is no longer needed.
1✔
2047
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
2048
        if err != nil {
1✔
2049
                l.log.Errorf("unable to delete circuit for "+
×
2050
                        "circuit-key=%v: %v", inKey, err)
×
2051
        }
×
2052
}
2053

2054
// handleUpstreamMsg processes wire messages related to commitment state
2055
// updates from the upstream peer. The upstream peer is the peer whom we have a
2056
// direct channel with, updating our respective commitment chains.
2057
//
2058
//nolint:funlen
2059
func (l *channelLink) handleUpstreamMsg(ctx context.Context,
2060
        msg lnwire.Message) {
3,145✔
2061

3,145✔
2062
        l.log.Tracef("receive upstream msg %v, handling now... ", msg.MsgType())
3,145✔
2063
        defer l.log.Tracef("handled upstream msg %v", msg.MsgType())
3,145✔
2064

3,145✔
2065
        // First check if the message is an update and we are capable of
3,145✔
2066
        // receiving updates right now.
3,145✔
2067
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,145✔
2068
                l.stfuFailf("update received after stfu: %T", msg)
×
2069
                return
×
2070
        }
×
2071

2072
        switch msg := msg.(type) {
3,145✔
2073
        case *lnwire.UpdateAddHTLC:
450✔
2074
                if l.IsFlushing(Incoming) {
450✔
2075
                        // This is forbidden by the protocol specification.
×
2076
                        // The best chance we have to deal with this is to drop
×
2077
                        // the connection. This should roll back the channel
×
2078
                        // state to the last CommitSig. If the remote has
×
2079
                        // already sent a CommitSig we haven't received yet,
×
2080
                        // channel state will be re-synchronized with a
×
2081
                        // ChannelReestablish message upon reconnection and the
×
2082
                        // protocol state that caused us to flush the link will
×
2083
                        // be rolled back. In the event that there was some
×
2084
                        // non-deterministic behavior in the remote that caused
×
2085
                        // them to violate the protocol, we have a decent shot
×
2086
                        // at correcting it this way, since reconnecting will
×
2087
                        // put us in the cleanest possible state to try again.
×
2088
                        //
×
2089
                        // In addition to the above, it is possible for us to
×
2090
                        // hit this case in situations where we improperly
×
2091
                        // handle message ordering due to concurrency choices.
×
2092
                        // An issue has been filed to address this here:
×
2093
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2094
                        l.failf(
×
2095
                                LinkFailureError{
×
2096
                                        code:             ErrInvalidUpdate,
×
2097
                                        FailureAction:    LinkFailureDisconnect,
×
2098
                                        PermanentFailure: false,
×
2099
                                        Warning:          true,
×
2100
                                },
×
2101
                                "received add while link is flushing",
×
2102
                        )
×
2103

×
2104
                        return
×
2105
                }
×
2106

2107
                // Disallow htlcs with blinding points set if we haven't
2108
                // enabled the feature. This saves us from having to process
2109
                // the onion at all, but will only catch blinded payments
2110
                // where we are a relaying node (as the blinding point will
2111
                // be in the payload when we're the introduction node).
2112
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
450✔
2113
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2114
                                "blinding point included when route blinding "+
×
2115
                                        "is disabled")
×
2116

×
2117
                        return
×
2118
                }
×
2119

2120
                // We have to check the limit here rather than later in the
2121
                // switch because the counterparty can keep sending HTLC's
2122
                // without sending a revoke. This would mean that the switch
2123
                // check would only occur later.
2124
                if l.isOverexposedWithHtlc(msg, true) {
450✔
2125
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2126
                                "peer sent us an HTLC that exceeded our max "+
×
2127
                                        "fee exposure")
×
2128

×
2129
                        return
×
2130
                }
×
2131

2132
                // We just received an add request from an upstream peer, so we
2133
                // add it to our state machine, then add the HTLC to our
2134
                // "settle" list in the event that we know the preimage.
2135
                index, err := l.channel.ReceiveHTLC(msg)
450✔
2136
                if err != nil {
450✔
2137
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2138
                                "unable to handle upstream add HTLC: %v", err)
×
2139
                        return
×
2140
                }
×
2141

2142
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
450✔
2143
                        "assigning index: %v", msg.PaymentHash[:], index)
450✔
2144

2145
        case *lnwire.UpdateFulfillHTLC:
227✔
2146
                pre := msg.PaymentPreimage
227✔
2147
                idx := msg.ID
227✔
2148

227✔
2149
                // Before we pipeline the settle, we'll check the set of active
227✔
2150
                // htlc's to see if the related UpdateAddHTLC has been fully
227✔
2151
                // locked-in.
227✔
2152
                var lockedin bool
227✔
2153
                htlcs := l.channel.ActiveHtlcs()
227✔
2154
                for _, add := range htlcs {
661✔
2155
                        // The HTLC will be outgoing and match idx.
434✔
2156
                        if !add.Incoming && add.HtlcIndex == idx {
659✔
2157
                                lockedin = true
225✔
2158
                                break
225✔
2159
                        }
2160
                }
2161

2162
                if !lockedin {
229✔
2163
                        l.failf(
2✔
2164
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2165
                                "unable to handle upstream settle",
2✔
2166
                        )
2✔
2167
                        return
2✔
2168
                }
2✔
2169

2170
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
225✔
UNCOV
2171
                        l.failf(
×
UNCOV
2172
                                LinkFailureError{
×
UNCOV
2173
                                        code:          ErrInvalidUpdate,
×
UNCOV
2174
                                        FailureAction: LinkFailureForceClose,
×
UNCOV
2175
                                },
×
UNCOV
2176
                                "unable to handle upstream settle HTLC: %v", err,
×
UNCOV
2177
                        )
×
UNCOV
2178
                        return
×
UNCOV
2179
                }
×
2180

2181
                settlePacket := &htlcPacket{
225✔
2182
                        outgoingChanID: l.ShortChanID(),
225✔
2183
                        outgoingHTLCID: idx,
225✔
2184
                        htlc: &lnwire.UpdateFulfillHTLC{
225✔
2185
                                PaymentPreimage: pre,
225✔
2186
                        },
225✔
2187
                }
225✔
2188

225✔
2189
                // Add the newly discovered preimage to our growing list of
225✔
2190
                // uncommitted preimage. These will be written to the witness
225✔
2191
                // cache just before accepting the next commitment signature
225✔
2192
                // from the remote peer.
225✔
2193
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
225✔
2194

225✔
2195
                // Pipeline this settle, send it to the switch.
225✔
2196
                go l.forwardBatch(false, settlePacket)
225✔
2197

2198
        case *lnwire.UpdateFailMalformedHTLC:
3✔
2199
                // Convert the failure type encoded within the HTLC fail
3✔
2200
                // message to the proper generic lnwire error code.
3✔
2201
                var failure lnwire.FailureMessage
3✔
2202
                switch msg.FailureCode {
3✔
2203
                case lnwire.CodeInvalidOnionVersion:
1✔
2204
                        failure = &lnwire.FailInvalidOnionVersion{
1✔
2205
                                OnionSHA256: msg.ShaOnionBlob,
1✔
2206
                        }
1✔
2207
                case lnwire.CodeInvalidOnionHmac:
×
2208
                        failure = &lnwire.FailInvalidOnionHmac{
×
2209
                                OnionSHA256: msg.ShaOnionBlob,
×
2210
                        }
×
2211

2212
                case lnwire.CodeInvalidOnionKey:
×
2213
                        failure = &lnwire.FailInvalidOnionKey{
×
2214
                                OnionSHA256: msg.ShaOnionBlob,
×
2215
                        }
×
2216

2217
                // Handle malformed errors that are part of a blinded route.
2218
                // This case is slightly different, because we expect every
2219
                // relaying node in the blinded portion of the route to send
2220
                // malformed errors. If we're also a relaying node, we're
2221
                // likely going to switch this error out anyway for our own
2222
                // malformed error, but we handle the case here for
2223
                // completeness.
UNCOV
2224
                case lnwire.CodeInvalidBlinding:
×
UNCOV
2225
                        failure = &lnwire.FailInvalidBlinding{
×
UNCOV
2226
                                OnionSHA256: msg.ShaOnionBlob,
×
UNCOV
2227
                        }
×
2228

2229
                default:
2✔
2230
                        l.log.Warnf("unexpected failure code received in "+
2✔
2231
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2232

2✔
2233
                        // We don't just pass back the error we received from
2✔
2234
                        // our successor. Otherwise we might report a failure
2✔
2235
                        // that penalizes us more than needed. If the onion that
2✔
2236
                        // we forwarded was correct, the node should have been
2✔
2237
                        // able to send back its own failure. The node did not
2✔
2238
                        // send back its own failure, so we assume there was a
2✔
2239
                        // problem with the onion and report that back. We reuse
2✔
2240
                        // the invalid onion key failure because there is no
2✔
2241
                        // specific error for this case.
2✔
2242
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2243
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2244
                        }
2✔
2245
                }
2246

2247
                // With the error parsed, we'll convert the into it's opaque
2248
                // form.
2249
                var b bytes.Buffer
3✔
2250
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
3✔
2251
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2252
                        return
×
2253
                }
×
2254

2255
                // If remote side have been unable to parse the onion blob we
2256
                // have sent to it, than we should transform the malformed HTLC
2257
                // message to the usual HTLC fail message.
2258
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
3✔
2259
                if err != nil {
3✔
2260
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2261
                                "unable to handle upstream fail HTLC: %v", err)
×
2262
                        return
×
2263
                }
×
2264

2265
        case *lnwire.UpdateFailHTLC:
120✔
2266
                // Verify that the failure reason is at least 256 bytes plus
120✔
2267
                // overhead.
120✔
2268
                const minimumFailReasonLength = lnwire.FailureMessageLength +
120✔
2269
                        2 + 2 + 32
120✔
2270

120✔
2271
                if len(msg.Reason) < minimumFailReasonLength {
121✔
2272
                        // We've received a reason with a non-compliant length.
1✔
2273
                        // Older nodes happily relay back these failures that
1✔
2274
                        // may originate from a node further downstream.
1✔
2275
                        // Therefore we can't just fail the channel.
1✔
2276
                        //
1✔
2277
                        // We want to be compliant ourselves, so we also can't
1✔
2278
                        // pass back the reason unmodified. And we must make
1✔
2279
                        // sure that we don't hit the magic length check of 260
1✔
2280
                        // bytes in processRemoteSettleFails either.
1✔
2281
                        //
1✔
2282
                        // Because the reason is unreadable for the payer
1✔
2283
                        // anyway, we just replace it by a compliant-length
1✔
2284
                        // series of random bytes.
1✔
2285
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2286
                        _, err := crand.Read(msg.Reason[:])
1✔
2287
                        if err != nil {
1✔
2288
                                l.log.Errorf("Random generation error: %v", err)
×
2289

×
2290
                                return
×
2291
                        }
×
2292
                }
2293

2294
                // Add fail to the update log.
2295
                idx := msg.ID
120✔
2296
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
120✔
2297
                if err != nil {
120✔
2298
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2299
                                "unable to handle upstream fail HTLC: %v", err)
×
2300
                        return
×
2301
                }
×
2302

2303
        case *lnwire.CommitSig:
1,175✔
2304
                // Since we may have learned new preimages for the first time,
1,175✔
2305
                // we'll add them to our preimage cache. By doing this, we
1,175✔
2306
                // ensure any contested contracts watched by any on-chain
1,175✔
2307
                // arbitrators can now sweep this HTLC on-chain. We delay
1,175✔
2308
                // committing the preimages until just before accepting the new
1,175✔
2309
                // remote commitment, as afterwards the peer won't resend the
1,175✔
2310
                // Settle messages on the next channel reestablishment. Doing so
1,175✔
2311
                // allows us to more effectively batch this operation, instead
1,175✔
2312
                // of doing a single write per preimage.
1,175✔
2313
                err := l.cfg.PreimageCache.AddPreimages(
1,175✔
2314
                        l.uncommittedPreimages...,
1,175✔
2315
                )
1,175✔
2316
                if err != nil {
1,175✔
2317
                        l.failf(
×
2318
                                LinkFailureError{code: ErrInternalError},
×
2319
                                "unable to add preimages=%v to cache: %v",
×
2320
                                l.uncommittedPreimages, err,
×
2321
                        )
×
2322
                        return
×
2323
                }
×
2324

2325
                // Instead of truncating the slice to conserve memory
2326
                // allocations, we simply set the uncommitted preimage slice to
2327
                // nil so that a new one will be initialized if any more
2328
                // witnesses are discovered. We do this because the maximum size
2329
                // that the slice can occupy is 15KB, and we want to ensure we
2330
                // release that memory back to the runtime.
2331
                l.uncommittedPreimages = nil
1,175✔
2332

1,175✔
2333
                // We just received a new updates to our local commitment
1,175✔
2334
                // chain, validate this new commitment, closing the link if
1,175✔
2335
                // invalid.
1,175✔
2336
                auxSigBlob, err := msg.CustomRecords.Serialize()
1,175✔
2337
                if err != nil {
1,175✔
2338
                        l.failf(
×
2339
                                LinkFailureError{code: ErrInvalidCommitment},
×
2340
                                "unable to serialize custom records: %v", err,
×
2341
                        )
×
2342

×
2343
                        return
×
2344
                }
×
2345
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,175✔
2346
                        CommitSig:  msg.CommitSig,
1,175✔
2347
                        HtlcSigs:   msg.HtlcSigs,
1,175✔
2348
                        PartialSig: msg.PartialSig,
1,175✔
2349
                        AuxSigBlob: auxSigBlob,
1,175✔
2350
                })
1,175✔
2351
                if err != nil {
1,175✔
2352
                        // If we were unable to reconstruct their proposed
×
2353
                        // commitment, then we'll examine the type of error. If
×
2354
                        // it's an InvalidCommitSigError, then we'll send a
×
2355
                        // direct error.
×
2356
                        var sendData []byte
×
2357
                        switch err.(type) {
×
2358
                        case *lnwallet.InvalidCommitSigError:
×
2359
                                sendData = []byte(err.Error())
×
2360
                        case *lnwallet.InvalidHtlcSigError:
×
2361
                                sendData = []byte(err.Error())
×
2362
                        }
2363
                        l.failf(
×
2364
                                LinkFailureError{
×
2365
                                        code:          ErrInvalidCommitment,
×
2366
                                        FailureAction: LinkFailureForceClose,
×
2367
                                        SendData:      sendData,
×
2368
                                },
×
2369
                                "ChannelPoint(%v): unable to accept new "+
×
2370
                                        "commitment: %v",
×
2371
                                l.channel.ChannelPoint(), err,
×
2372
                        )
×
2373
                        return
×
2374
                }
2375

2376
                // As we've just accepted a new state, we'll now
2377
                // immediately send the remote peer a revocation for our prior
2378
                // state.
2379
                nextRevocation, currentHtlcs, finalHTLCs, err :=
1,175✔
2380
                        l.channel.RevokeCurrentCommitment()
1,175✔
2381
                if err != nil {
1,175✔
2382
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2383

×
2384
                        // We need to fail the channel in case revoking our
×
2385
                        // local commitment does not succeed. We might have
×
2386
                        // already advanced our channel state which would lead
×
2387
                        // us to proceed with an unclean state.
×
2388
                        //
×
2389
                        // NOTE: We do not trigger a force close because this
×
2390
                        // could resolve itself in case our db was just busy
×
2391
                        // not accepting new transactions.
×
2392
                        l.failf(
×
2393
                                LinkFailureError{
×
2394
                                        code:          ErrInternalError,
×
2395
                                        Warning:       true,
×
2396
                                        FailureAction: LinkFailureDisconnect,
×
2397
                                },
×
2398
                                "ChannelPoint(%v): unable to accept new "+
×
2399
                                        "commitment: %v",
×
2400
                                l.channel.ChannelPoint(), err,
×
2401
                        )
×
2402
                        return
×
2403
                }
×
2404

2405
                // As soon as we are ready to send our next revocation, we can
2406
                // invoke the incoming commit hooks.
2407
                l.RWMutex.Lock()
1,175✔
2408
                l.incomingCommitHooks.invoke()
1,175✔
2409
                l.RWMutex.Unlock()
1,175✔
2410

1,175✔
2411
                l.cfg.Peer.SendMessage(false, nextRevocation)
1,175✔
2412

1,175✔
2413
                // Notify the incoming htlcs of which the resolutions were
1,175✔
2414
                // locked in.
1,175✔
2415
                for id, settled := range finalHTLCs {
1,506✔
2416
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
331✔
2417
                                models.CircuitKey{
331✔
2418
                                        ChanID: l.ShortChanID(),
331✔
2419
                                        HtlcID: id,
331✔
2420
                                },
331✔
2421
                                channeldb.FinalHtlcInfo{
331✔
2422
                                        Settled:  settled,
331✔
2423
                                        Offchain: true,
331✔
2424
                                },
331✔
2425
                        )
331✔
2426
                }
331✔
2427

2428
                // Since we just revoked our commitment, we may have a new set
2429
                // of HTLC's on our commitment, so we'll send them using our
2430
                // function closure NotifyContractUpdate.
2431
                newUpdate := &contractcourt.ContractUpdate{
1,175✔
2432
                        HtlcKey: contractcourt.LocalHtlcSet,
1,175✔
2433
                        Htlcs:   currentHtlcs,
1,175✔
2434
                }
1,175✔
2435
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,175✔
2436
                if err != nil {
1,175✔
2437
                        l.log.Errorf("unable to notify contract update: %v",
×
2438
                                err)
×
2439
                        return
×
2440
                }
×
2441

2442
                select {
1,175✔
2443
                case <-l.cg.Done():
×
2444
                        return
×
2445
                default:
1,175✔
2446
                }
2447

2448
                // If the remote party initiated the state transition,
2449
                // we'll reply with a signature to provide them with their
2450
                // version of the latest commitment. Otherwise, both commitment
2451
                // chains are fully synced from our PoV, then we don't need to
2452
                // reply with a signature as both sides already have a
2453
                // commitment with the latest accepted.
2454
                if l.channel.OweCommitment() {
1,818✔
2455
                        if !l.updateCommitTxOrFail(ctx) {
643✔
2456
                                return
×
2457
                        }
×
2458
                }
2459

2460
                // If we need to send out an Stfu, this would be the time to do
2461
                // so.
2462
                if l.noDanglingUpdates(lntypes.Local) {
2,240✔
2463
                        err = l.quiescer.SendOwedStfu()
1,065✔
2464
                        if err != nil {
1,065✔
2465
                                l.stfuFailf("sendOwedStfu: %v", err.Error())
×
2466
                        }
×
2467
                }
2468

2469
                // Now that we have finished processing the incoming CommitSig
2470
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2471
                // the channel state is clean.
2472
                l.RWMutex.Lock()
1,175✔
2473
                if l.channel.IsChannelClean() {
1,361✔
2474
                        l.flushHooks.invoke()
186✔
2475
                }
186✔
2476
                l.RWMutex.Unlock()
1,175✔
2477

2478
        case *lnwire.RevokeAndAck:
1,164✔
2479
                // We've received a revocation from the remote chain, if valid,
1,164✔
2480
                // this moves the remote chain forward, and expands our
1,164✔
2481
                // revocation window.
1,164✔
2482

1,164✔
2483
                // We now process the message and advance our remote commit
1,164✔
2484
                // chain.
1,164✔
2485
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,164✔
2486
                if err != nil {
1,164✔
2487
                        // TODO(halseth): force close?
×
2488
                        l.failf(
×
2489
                                LinkFailureError{
×
2490
                                        code:          ErrInvalidRevocation,
×
2491
                                        FailureAction: LinkFailureDisconnect,
×
2492
                                },
×
2493
                                "unable to accept revocation: %v", err,
×
2494
                        )
×
2495
                        return
×
2496
                }
×
2497

2498
                // The remote party now has a new primary commitment, so we'll
2499
                // update the contract court to be aware of this new set (the
2500
                // prior old remote pending).
2501
                newUpdate := &contractcourt.ContractUpdate{
1,164✔
2502
                        HtlcKey: contractcourt.RemoteHtlcSet,
1,164✔
2503
                        Htlcs:   remoteHTLCs,
1,164✔
2504
                }
1,164✔
2505
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,164✔
2506
                if err != nil {
1,164✔
2507
                        l.log.Errorf("unable to notify contract update: %v",
×
2508
                                err)
×
2509
                        return
×
2510
                }
×
2511

2512
                select {
1,164✔
2513
                case <-l.cg.Done():
×
2514
                        return
×
2515
                default:
1,164✔
2516
                }
2517

2518
                // If we have a tower client for this channel type, we'll
2519
                // create a backup for the current state.
2520
                if l.cfg.TowerClient != nil {
1,164✔
UNCOV
2521
                        state := l.channel.State()
×
UNCOV
2522
                        chanID := l.ChanID()
×
UNCOV
2523

×
UNCOV
2524
                        err = l.cfg.TowerClient.BackupState(
×
UNCOV
2525
                                &chanID, state.RemoteCommitment.CommitHeight-1,
×
UNCOV
2526
                        )
×
UNCOV
2527
                        if err != nil {
×
2528
                                l.failf(LinkFailureError{
×
2529
                                        code: ErrInternalError,
×
2530
                                }, "unable to queue breach backup: %v", err)
×
2531
                                return
×
2532
                        }
×
2533
                }
2534

2535
                // If we can send updates then we can process adds in case we
2536
                // are the exit hop and need to send back resolutions, or in
2537
                // case there are validity issues with the packets. Otherwise
2538
                // we defer the action until resume.
2539
                //
2540
                // We are free to process the settles and fails without this
2541
                // check since processing those can't result in further updates
2542
                // to this channel link.
2543
                if l.quiescer.CanSendUpdates() {
2,327✔
2544
                        l.processRemoteAdds(fwdPkg)
1,163✔
2545
                } else {
1,164✔
2546
                        l.quiescer.OnResume(func() {
1✔
2547
                                l.processRemoteAdds(fwdPkg)
×
2548
                        })
×
2549
                }
2550
                l.processRemoteSettleFails(fwdPkg)
1,164✔
2551

1,164✔
2552
                // If the link failed during processing the adds, we must
1,164✔
2553
                // return to ensure we won't attempted to update the state
1,164✔
2554
                // further.
1,164✔
2555
                if l.failed {
1,164✔
2556
                        return
×
2557
                }
×
2558

2559
                // The revocation window opened up. If there are pending local
2560
                // updates, try to update the commit tx. Pending updates could
2561
                // already have been present because of a previously failed
2562
                // update to the commit tx or freshly added in by
2563
                // processRemoteAdds. Also in case there are no local updates,
2564
                // but there are still remote updates that are not in the remote
2565
                // commit tx yet, send out an update.
2566
                if l.channel.OweCommitment() {
1,458✔
2567
                        if !l.updateCommitTxOrFail(ctx) {
301✔
2568
                                return
7✔
2569
                        }
7✔
2570
                }
2571

2572
                // Now that we have finished processing the RevokeAndAck, we
2573
                // can invoke the flushHooks if the channel state is clean.
2574
                l.RWMutex.Lock()
1,157✔
2575
                if l.channel.IsChannelClean() {
1,318✔
2576
                        l.flushHooks.invoke()
161✔
2577
                }
161✔
2578
                l.RWMutex.Unlock()
1,157✔
2579

2580
        case *lnwire.UpdateFee:
3✔
2581
                // Check and see if their proposed fee-rate would make us
3✔
2582
                // exceed the fee threshold.
3✔
2583
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2584

3✔
2585
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2586
                if err != nil {
3✔
2587
                        // This shouldn't typically happen. If it does, it
×
2588
                        // indicates something is wrong with our channel state.
×
2589
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2590
                                "exceeded")
×
2591
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2592
                                "error calculating fee exposure: %v", err)
×
2593

×
2594
                        return
×
2595
                }
×
2596

2597
                if isDust {
3✔
2598
                        // The proposed fee-rate makes us exceed the fee
×
2599
                        // threshold.
×
2600
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2601
                                "fee threshold exceeded: %v", err)
×
2602
                        return
×
2603
                }
×
2604

2605
                // We received fee update from peer. If we are the initiator we
2606
                // will fail the channel, if not we will apply the update.
2607
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2608
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2609
                                "error receiving fee update: %v", err)
×
2610
                        return
×
2611
                }
×
2612

2613
                // Update the mailbox's feerate as well.
2614
                l.mailBox.SetFeeRate(fee)
3✔
2615

2616
        case *lnwire.Stfu:
2✔
2617
                err := l.handleStfu(msg)
2✔
2618
                if err != nil {
2✔
2619
                        l.stfuFailf("handleStfu: %v", err.Error())
×
2620
                }
×
2621

2622
        // In the case where we receive a warning message from our peer, just
2623
        // log it and move on. We choose not to disconnect from our peer,
2624
        // although we "MAY" do so according to the specification.
2625
        case *lnwire.Warning:
1✔
2626
                l.log.Warnf("received warning message from peer: %v",
1✔
2627
                        msg.Warning())
1✔
2628

UNCOV
2629
        case *lnwire.Error:
×
UNCOV
2630
                // Error received from remote, MUST fail channel, but should
×
UNCOV
2631
                // only print the contents of the error message if all
×
UNCOV
2632
                // characters are printable ASCII.
×
UNCOV
2633
                l.failf(
×
UNCOV
2634
                        LinkFailureError{
×
UNCOV
2635
                                code: ErrRemoteError,
×
UNCOV
2636

×
UNCOV
2637
                                // TODO(halseth): we currently don't fail the
×
UNCOV
2638
                                // channel permanently, as there are some sync
×
UNCOV
2639
                                // issues with other implementations that will
×
UNCOV
2640
                                // lead to them sending an error message, but
×
UNCOV
2641
                                // we can recover from on next connection. See
×
UNCOV
2642
                                // https://github.com/ElementsProject/lightning/issues/4212
×
UNCOV
2643
                                PermanentFailure: false,
×
UNCOV
2644
                        },
×
UNCOV
2645
                        "ChannelPoint(%v): received error from peer: %v",
×
UNCOV
2646
                        l.channel.ChannelPoint(), msg.Error(),
×
UNCOV
2647
                )
×
2648
        default:
×
2649
                l.log.Warnf("received unknown message of type %T", msg)
×
2650
        }
2651

2652
}
2653

2654
// handleStfu implements the top-level logic for handling the Stfu message from
2655
// our peer.
2656
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
2✔
2657
        if !l.noDanglingUpdates(lntypes.Remote) {
2✔
2658
                return ErrPendingRemoteUpdates
×
2659
        }
×
2660
        err := l.quiescer.RecvStfu(*stfu)
2✔
2661
        if err != nil {
2✔
2662
                return err
×
2663
        }
×
2664

2665
        // If we can immediately send an Stfu response back, we will.
2666
        if l.noDanglingUpdates(lntypes.Local) {
3✔
2667
                return l.quiescer.SendOwedStfu()
1✔
2668
        }
1✔
2669

2670
        return nil
1✔
2671
}
2672

2673
// stfuFailf fails the link in the case where the requirements of the quiescence
2674
// protocol are violated. In all cases we opt to drop the connection as only
2675
// link state (as opposed to channel state) is affected.
2676
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
2677
        l.failf(LinkFailureError{
×
2678
                code:             ErrStfuViolation,
×
2679
                FailureAction:    LinkFailureDisconnect,
×
2680
                PermanentFailure: false,
×
2681
                Warning:          true,
×
2682
        }, format, args...)
×
2683
}
×
2684

2685
// noDanglingUpdates returns true when there are 0 updates that were originally
2686
// issued by whose on either the Local or Remote commitment transaction.
2687
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,180✔
2688
        pendingOnLocal := l.channel.NumPendingUpdates(
1,180✔
2689
                whose, lntypes.Local,
1,180✔
2690
        )
1,180✔
2691
        pendingOnRemote := l.channel.NumPendingUpdates(
1,180✔
2692
                whose, lntypes.Remote,
1,180✔
2693
        )
1,180✔
2694

1,180✔
2695
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,180✔
2696
}
1,180✔
2697

2698
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2699
// for packets delivered from server, and cleaning up any circuits closed by
2700
// signing a previous commitment txn. This method ensures that the circuits are
2701
// removed from the circuit map before removing them from the link's mailbox,
2702
// otherwise it could be possible for some circuit to be missed if this link
2703
// flaps.
2704
func (l *channelLink) ackDownStreamPackets() error {
1,354✔
2705
        // First, remove the downstream Add packets that were included in the
1,354✔
2706
        // previous commitment signature. This will prevent the Adds from being
1,354✔
2707
        // replayed if this link disconnects.
1,354✔
2708
        for _, inKey := range l.openedCircuits {
1,818✔
2709
                // In order to test the sphinx replay logic of the remote
464✔
2710
                // party, unsafe replay does not acknowledge the packets from
464✔
2711
                // the mailbox. We can then force a replay of any Add packets
464✔
2712
                // held in memory by disconnecting and reconnecting the link.
464✔
2713
                if l.cfg.UnsafeReplay {
464✔
UNCOV
2714
                        continue
×
2715
                }
2716

2717
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
464✔
2718
                l.mailBox.AckPacket(inKey)
464✔
2719
        }
2720

2721
        // Now, we will delete all circuits closed by the previous commitment
2722
        // signature, which is the result of downstream Settle/Fail packets. We
2723
        // batch them here to ensure circuits are closed atomically and for
2724
        // performance.
2725
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,354✔
2726
        switch err {
1,354✔
2727
        case nil:
1,354✔
2728
                // Successful deletion.
2729

2730
        default:
×
2731
                l.log.Errorf("unable to delete %d circuits: %v",
×
2732
                        len(l.closedCircuits), err)
×
2733
                return err
×
2734
        }
2735

2736
        // With the circuits removed from memory and disk, we now ack any
2737
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2738
        // after startup. If forgive is enabled and we've reached this point,
2739
        // the circuits must have been removed at some point, so it is now safe
2740
        // to un-queue the corresponding Settle/Fails.
2741
        for _, inKey := range l.closedCircuits {
1,393✔
2742
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
39✔
2743
                        inKey)
39✔
2744
                l.mailBox.AckPacket(inKey)
39✔
2745
        }
39✔
2746

2747
        // Lastly, reset our buffers to be empty while keeping any acquired
2748
        // growth in the backing array.
2749
        l.openedCircuits = l.openedCircuits[:0]
1,354✔
2750
        l.closedCircuits = l.closedCircuits[:0]
1,354✔
2751

1,354✔
2752
        return nil
1,354✔
2753
}
2754

2755
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2756
// the link.
2757
func (l *channelLink) updateCommitTxOrFail(ctx context.Context) bool {
1,201✔
2758
        err := l.updateCommitTx(ctx)
1,201✔
2759
        switch err {
1,201✔
2760
        // No error encountered, success.
2761
        case nil:
1,191✔
2762

2763
        // A duplicate keystone error should be resolved and is not fatal, so
2764
        // we won't send an Error message to the peer.
2765
        case ErrDuplicateKeystone:
×
2766
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2767
                        "temporary circuit error: %v", err)
×
2768
                return false
×
2769

2770
        // Any other error is treated results in an Error message being sent to
2771
        // the peer.
2772
        default:
10✔
2773
                l.failf(LinkFailureError{code: ErrInternalError},
10✔
2774
                        "unable to update commitment: %v", err)
10✔
2775
                return false
10✔
2776
        }
2777

2778
        return true
1,191✔
2779
}
2780

2781
// updateCommitTx signs, then sends an update to the remote peer adding a new
2782
// commitment to their commitment chain which includes all the latest updates
2783
// we've received+processed up to this point.
2784
func (l *channelLink) updateCommitTx(ctx context.Context) error {
1,259✔
2785
        // Preemptively write all pending keystones to disk, just in case the
1,259✔
2786
        // HTLCs we have in memory are included in the subsequent attempt to
1,259✔
2787
        // sign a commitment state.
1,259✔
2788
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,259✔
2789
        if err != nil {
1,259✔
2790
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2791
                // it.
×
2792
                return err
×
2793
        }
×
2794

2795
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2796
        l.keystoneBatch = l.keystoneBatch[:0]
1,259✔
2797

1,259✔
2798
        // If hodl.Commit mode is active, we will refrain from attempting to
1,259✔
2799
        // commit any in-memory modifications to the channel state. Exiting here
1,259✔
2800
        // permits testing of either the switch or link's ability to trim
1,259✔
2801
        // circuits that have been opened, but unsuccessfully committed.
1,259✔
2802
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,263✔
2803
                l.log.Warnf(hodl.Commit.Warning())
4✔
2804
                return nil
4✔
2805
        }
4✔
2806

2807
        ctx, done := l.cg.Create(ctx)
1,255✔
2808
        defer done()
1,255✔
2809

1,255✔
2810
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,255✔
2811
        if err == lnwallet.ErrNoWindow {
1,326✔
2812
                l.cfg.PendingCommitTicker.Resume()
71✔
2813
                l.log.Trace("PendingCommitTicker resumed")
71✔
2814

71✔
2815
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
71✔
2816
                l.log.Tracef("revocation window exhausted, unable to send: "+
71✔
2817
                        "%v, pend_updates=%v, dangling_closes%v", n,
71✔
2818
                        lnutils.SpewLogClosure(l.openedCircuits),
71✔
2819
                        lnutils.SpewLogClosure(l.closedCircuits))
71✔
2820

71✔
2821
                return nil
71✔
2822
        } else if err != nil {
1,255✔
2823
                return err
×
2824
        }
×
2825

2826
        if err := l.ackDownStreamPackets(); err != nil {
1,184✔
2827
                return err
×
2828
        }
×
2829

2830
        l.cfg.PendingCommitTicker.Pause()
1,184✔
2831
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,184✔
2832

1,184✔
2833
        // The remote party now has a new pending commitment, so we'll update
1,184✔
2834
        // the contract court to be aware of this new set (the prior old remote
1,184✔
2835
        // pending).
1,184✔
2836
        newUpdate := &contractcourt.ContractUpdate{
1,184✔
2837
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,184✔
2838
                Htlcs:   newCommit.PendingHTLCs,
1,184✔
2839
        }
1,184✔
2840
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,184✔
2841
        if err != nil {
1,184✔
2842
                l.log.Errorf("unable to notify contract update: %v", err)
×
2843
                return err
×
2844
        }
×
2845

2846
        select {
1,184✔
2847
        case <-l.cg.Done():
11✔
2848
                return ErrLinkShuttingDown
11✔
2849
        default:
1,173✔
2850
        }
2851

2852
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,173✔
2853
        if err != nil {
1,173✔
2854
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2855
        }
×
2856

2857
        commitSig := &lnwire.CommitSig{
1,173✔
2858
                ChanID:        l.ChanID(),
1,173✔
2859
                CommitSig:     newCommit.CommitSig,
1,173✔
2860
                HtlcSigs:      newCommit.HtlcSigs,
1,173✔
2861
                PartialSig:    newCommit.PartialSig,
1,173✔
2862
                CustomRecords: auxBlobRecords,
1,173✔
2863
        }
1,173✔
2864
        l.cfg.Peer.SendMessage(false, commitSig)
1,173✔
2865

1,173✔
2866
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
1,173✔
2867
        // of commit hooks.
1,173✔
2868
        l.RWMutex.Lock()
1,173✔
2869
        l.outgoingCommitHooks.invoke()
1,173✔
2870
        l.RWMutex.Unlock()
1,173✔
2871

1,173✔
2872
        return nil
1,173✔
2873
}
2874

2875
// Peer returns the representation of remote peer with which we have the
2876
// channel link opened.
2877
//
2878
// NOTE: Part of the ChannelLink interface.
2879
func (l *channelLink) PeerPubKey() [33]byte {
441✔
2880
        return l.cfg.Peer.PubKey()
441✔
2881
}
441✔
2882

2883
// ChannelPoint returns the channel outpoint for the channel link.
2884
// NOTE: Part of the ChannelLink interface.
2885
func (l *channelLink) ChannelPoint() wire.OutPoint {
852✔
2886
        return l.channel.ChannelPoint()
852✔
2887
}
852✔
2888

2889
// ShortChanID returns the short channel ID for the channel link. The short
2890
// channel ID encodes the exact location in the main chain that the original
2891
// funding output can be found.
2892
//
2893
// NOTE: Part of the ChannelLink interface.
2894
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,249✔
2895
        l.RLock()
4,249✔
2896
        defer l.RUnlock()
4,249✔
2897

4,249✔
2898
        return l.channel.ShortChanID()
4,249✔
2899
}
4,249✔
2900

2901
// UpdateShortChanID updates the short channel ID for a link. This may be
2902
// required in the event that a link is created before the short chan ID for it
2903
// is known, or a re-org occurs, and the funding transaction changes location
2904
// within the chain.
2905
//
2906
// NOTE: Part of the ChannelLink interface.
UNCOV
2907
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
×
UNCOV
2908
        chanID := l.ChanID()
×
UNCOV
2909

×
UNCOV
2910
        // Refresh the channel state's short channel ID by loading it from disk.
×
UNCOV
2911
        // This ensures that the channel state accurately reflects the updated
×
UNCOV
2912
        // short channel ID.
×
UNCOV
2913
        err := l.channel.State().Refresh()
×
UNCOV
2914
        if err != nil {
×
2915
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2916
                        "%v", chanID, err)
×
2917
                return hop.Source, err
×
2918
        }
×
2919

UNCOV
2920
        return hop.Source, nil
×
2921
}
2922

2923
// ChanID returns the channel ID for the channel link. The channel ID is a more
2924
// compact representation of a channel's full outpoint.
2925
//
2926
// NOTE: Part of the ChannelLink interface.
2927
func (l *channelLink) ChanID() lnwire.ChannelID {
3,912✔
2928
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,912✔
2929
}
3,912✔
2930

2931
// Bandwidth returns the total amount that can flow through the channel link at
2932
// this given instance. The value returned is expressed in millisatoshi and can
2933
// be used by callers when making forwarding decisions to determine if a link
2934
// can accept an HTLC.
2935
//
2936
// NOTE: Part of the ChannelLink interface.
2937
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
811✔
2938
        // Get the balance available on the channel for new HTLCs. This takes
811✔
2939
        // the channel reserve into account so HTLCs up to this value won't
811✔
2940
        // violate it.
811✔
2941
        return l.channel.AvailableBalance()
811✔
2942
}
811✔
2943

2944
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2945
// amount provided to the link. This check does not reserve a space, since
2946
// forwards or other payments may use the available slot, so it should be
2947
// considered best-effort.
UNCOV
2948
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
×
UNCOV
2949
        return l.channel.MayAddOutgoingHtlc(amt)
×
UNCOV
2950
}
×
2951

2952
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2953
// method.
2954
//
2955
// NOTE: Part of the dustHandler interface.
2956
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2957
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,523✔
2958

2,523✔
2959
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,523✔
2960
}
2,523✔
2961

2962
// getFeeRate is a wrapper method that retrieves the underlying channel's
2963
// feerate.
2964
//
2965
// NOTE: Part of the dustHandler interface.
2966
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
669✔
2967
        return l.channel.CommitFeeRate()
669✔
2968
}
669✔
2969

2970
// getDustClosure returns a closure that can be used by the switch or mailbox
2971
// to evaluate whether a given HTLC is dust.
2972
//
2973
// NOTE: Part of the dustHandler interface.
2974
func (l *channelLink) getDustClosure() dustClosure {
1,599✔
2975
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,599✔
2976
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,599✔
2977
        chanType := l.channel.State().ChanType
1,599✔
2978

1,599✔
2979
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,599✔
2980
}
1,599✔
2981

2982
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2983
// is used so that the Switch can have access to the commitment fee without
2984
// needing to have a *LightningChannel. This doesn't include dust.
2985
//
2986
// NOTE: Part of the dustHandler interface.
2987
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,897✔
2988
        if remote {
2,864✔
2989
                return l.channel.State().RemoteCommitment.CommitFee
967✔
2990
        }
967✔
2991

2992
        return l.channel.State().LocalCommitment.CommitFee
930✔
2993
}
2994

2995
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2996
// increases the total dust and fees within the channel past the configured
2997
// fee threshold. It first calculates the dust sum over every update in the
2998
// update log with the proposed fee-rate and taking into account both the local
2999
// and remote dust limits. It uses every update in the update log instead of
3000
// what is actually on the local and remote commitments because it is assumed
3001
// that in a worst-case scenario, every update in the update log could
3002
// theoretically be on either commitment transaction and this needs to be
3003
// accounted for with this fee-rate. It then calculates the local and remote
3004
// commitment fees given the proposed fee-rate. Finally, it tallies the results
3005
// and determines if the fee threshold has been exceeded.
3006
func (l *channelLink) exceedsFeeExposureLimit(
3007
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
3008

6✔
3009
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
3010

6✔
3011
        // Get the sum of dust for both the local and remote commitments using
6✔
3012
        // this "dry-run" fee.
6✔
3013
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
3014
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
3015

6✔
3016
        // Calculate the local and remote commitment fees using this dry-run
6✔
3017
        // fee.
6✔
3018
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
3019
        if err != nil {
6✔
3020
                return false, err
×
3021
        }
×
3022

3023
        // Finally, check whether the max fee exposure was exceeded on either
3024
        // future commitment transaction with the fee-rate.
3025
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
3026
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
3027
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3028
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
3029
                        totalLocalDust, localFee)
×
3030

×
3031
                return true, nil
×
3032
        }
×
3033

3034
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
3035
                remoteFee,
6✔
3036
        )
6✔
3037

6✔
3038
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
3039
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3040
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
3041
                        totalRemoteDust, remoteFee)
×
3042

×
3043
                return true, nil
×
3044
        }
×
3045

3046
        return false, nil
6✔
3047
}
3048

3049
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
3050
// channel exceed the fee threshold. It first fetches the largest fee-rate that
3051
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3052
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3053
// the overall dust sum. If it is not dust, it contributes to weight, which
3054
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3055
// either commitment exceeds the configured fee threshold, this function
3056
// returns true.
3057
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3058
        incoming bool) bool {
930✔
3059

930✔
3060
        dustClosure := l.getDustClosure()
930✔
3061

930✔
3062
        feeRate := l.channel.WorstCaseFeeRate()
930✔
3063

930✔
3064
        amount := htlc.Amount.ToSatoshis()
930✔
3065

930✔
3066
        // See if this HTLC is dust on both the local and remote commitments.
930✔
3067
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
930✔
3068
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
930✔
3069

930✔
3070
        // Calculate the dust sum for the local and remote commitments.
930✔
3071
        localDustSum := l.getDustSum(
930✔
3072
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
930✔
3073
        )
930✔
3074
        remoteDustSum := l.getDustSum(
930✔
3075
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
930✔
3076
        )
930✔
3077

930✔
3078
        // Grab the larger of the local and remote commitment fees w/o dust.
930✔
3079
        commitFee := l.getCommitFee(false)
930✔
3080

930✔
3081
        if l.getCommitFee(true) > commitFee {
967✔
3082
                commitFee = l.getCommitFee(true)
37✔
3083
        }
37✔
3084

3085
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
930✔
3086

930✔
3087
        localDustSum += commitFeeMSat
930✔
3088
        remoteDustSum += commitFeeMSat
930✔
3089

930✔
3090
        // Calculate the additional fee increase if this is a non-dust HTLC.
930✔
3091
        weight := lntypes.WeightUnit(input.HTLCWeight)
930✔
3092
        additional := lnwire.NewMSatFromSatoshis(
930✔
3093
                feeRate.FeeForWeight(weight),
930✔
3094
        )
930✔
3095

930✔
3096
        if isLocalDust {
1,563✔
3097
                // If this is dust, it doesn't contribute to weight but does
633✔
3098
                // contribute to the overall dust sum.
633✔
3099
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
633✔
3100
        } else {
930✔
3101
                // Account for the fee increase that comes with an increase in
297✔
3102
                // weight.
297✔
3103
                localDustSum += additional
297✔
3104
        }
297✔
3105

3106
        if localDustSum > l.cfg.MaxFeeExposure {
934✔
3107
                // The max fee exposure was exceeded.
4✔
3108
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
3109
                        "overexposed, total local dust: %v (current commit "+
4✔
3110
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
3111

4✔
3112
                return true
4✔
3113
        }
4✔
3114

3115
        if isRemoteDust {
1,556✔
3116
                // If this is dust, it doesn't contribute to weight but does
630✔
3117
                // contribute to the overall dust sum.
630✔
3118
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
630✔
3119
        } else {
926✔
3120
                // Account for the fee increase that comes with an increase in
296✔
3121
                // weight.
296✔
3122
                remoteDustSum += additional
296✔
3123
        }
296✔
3124

3125
        if remoteDustSum > l.cfg.MaxFeeExposure {
926✔
3126
                // The max fee exposure was exceeded.
×
3127
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
3128
                        "overexposed, total remote dust: %v (current commit "+
×
3129
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
3130

×
3131
                return true
×
3132
        }
×
3133

3134
        return false
926✔
3135
}
3136

3137
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3138
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3139
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3140
// whether to evaluate on the local or remote commit, and finally an HTLC
3141
// amount to test.
3142
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3143
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3144

3145
// dustHelper is used to construct the dustClosure.
3146
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3147
        remoteDustLimit btcutil.Amount) dustClosure {
1,799✔
3148

1,799✔
3149
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,799✔
3150
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,921✔
3151

10,122✔
3152
                var dustLimit btcutil.Amount
10,122✔
3153
                if whoseCommit.IsLocal() {
15,183✔
3154
                        dustLimit = localDustLimit
5,061✔
3155
                } else {
10,122✔
3156
                        dustLimit = remoteDustLimit
5,061✔
3157
                }
5,061✔
3158

3159
                return lnwallet.HtlcIsDust(
10,122✔
3160
                        chantype, incoming, whoseCommit, feerate, amt,
10,122✔
3161
                        dustLimit,
10,122✔
3162
                )
10,122✔
3163
        }
3164

3165
        return isDust
1,799✔
3166
}
3167

3168
// zeroConfConfirmed returns whether or not the zero-conf channel has
3169
// confirmed on-chain.
3170
//
3171
// Part of the scidAliasHandler interface.
3172
func (l *channelLink) zeroConfConfirmed() bool {
3✔
3173
        return l.channel.State().ZeroConfConfirmed()
3✔
3174
}
3✔
3175

3176
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3177
// should not be called for non-zero-conf channels.
3178
//
3179
// Part of the scidAliasHandler interface.
3180
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
3✔
3181
        return l.channel.State().ZeroConfRealScid()
3✔
3182
}
3✔
3183

3184
// isZeroConf returns whether or not the underlying channel is a zero-conf
3185
// channel.
3186
//
3187
// Part of the scidAliasHandler interface.
3188
func (l *channelLink) isZeroConf() bool {
213✔
3189
        return l.channel.State().IsZeroConf()
213✔
3190
}
213✔
3191

3192
// negotiatedAliasFeature returns whether or not the underlying channel has
3193
// negotiated the option-scid-alias feature bit. This will be true for both
3194
// option-scid-alias and zero-conf channel-types. It will also be true for
3195
// channels with the feature bit but without the above channel-types.
3196
//
3197
// Part of the scidAliasFeature interface.
3198
func (l *channelLink) negotiatedAliasFeature() bool {
374✔
3199
        return l.channel.State().NegotiatedAliasFeature()
374✔
3200
}
374✔
3201

3202
// getAliases returns the set of aliases for the underlying channel.
3203
//
3204
// Part of the scidAliasHandler interface.
3205
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
219✔
3206
        return l.cfg.GetAliases(l.ShortChanID())
219✔
3207
}
219✔
3208

3209
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3210
//
3211
// Part of the scidAliasHandler interface.
3212
func (l *channelLink) attachFailAliasUpdate(closure func(
3213
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
214✔
3214

214✔
3215
        l.Lock()
214✔
3216
        l.cfg.FailAliasUpdate = closure
214✔
3217
        l.Unlock()
214✔
3218
}
214✔
3219

3220
// AttachMailBox updates the current mailbox used by this link, and hooks up
3221
// the mailbox's message and packet outboxes to the link's upstream and
3222
// downstream chans, respectively.
3223
func (l *channelLink) AttachMailBox(mailbox MailBox) {
213✔
3224
        l.Lock()
213✔
3225
        l.mailBox = mailbox
213✔
3226
        l.upstream = mailbox.MessageOutBox()
213✔
3227
        l.downstream = mailbox.PacketOutBox()
213✔
3228
        l.Unlock()
213✔
3229

213✔
3230
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
213✔
3231
        // never committed.
213✔
3232
        l.mailBox.SetFeeRate(l.getFeeRate())
213✔
3233

213✔
3234
        // Also set the mailbox's dust closure so that it can query whether HTLC's
213✔
3235
        // are dust given the current feerate.
213✔
3236
        l.mailBox.SetDustClosure(l.getDustClosure())
213✔
3237
}
213✔
3238

3239
// UpdateForwardingPolicy updates the forwarding policy for the target
3240
// ChannelLink. Once updated, the link will use the new forwarding policy to
3241
// govern if it an incoming HTLC should be forwarded or not. We assume that
3242
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3243
// update all of the link's FwrdingPolicy's values.
3244
//
3245
// NOTE: Part of the ChannelLink interface.
3246
func (l *channelLink) UpdateForwardingPolicy(
3247
        newPolicy models.ForwardingPolicy) {
12✔
3248

12✔
3249
        l.Lock()
12✔
3250
        defer l.Unlock()
12✔
3251

12✔
3252
        l.cfg.FwrdingPolicy = newPolicy
12✔
3253
}
12✔
3254

3255
// CheckHtlcForward should return a nil error if the passed HTLC details
3256
// satisfy the current forwarding policy fo the target link. Otherwise,
3257
// a LinkError with a valid protocol failure message should be returned
3258
// in order to signal to the source of the HTLC, the policy consistency
3259
// issue.
3260
//
3261
// NOTE: Part of the ChannelLink interface.
3262
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
3263
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
3264
        outgoingTimeout uint32, inboundFee models.InboundFee,
3265
        heightNow uint32, originalScid lnwire.ShortChannelID,
3266
        customRecords lnwire.CustomRecords) *LinkError {
49✔
3267

49✔
3268
        l.RLock()
49✔
3269
        policy := l.cfg.FwrdingPolicy
49✔
3270
        l.RUnlock()
49✔
3271

49✔
3272
        // Using the outgoing HTLC amount, we'll calculate the outgoing
49✔
3273
        // fee this incoming HTLC must carry in order to satisfy the constraints
49✔
3274
        // of the outgoing link.
49✔
3275
        outFee := ExpectedFee(policy, amtToForward)
49✔
3276

49✔
3277
        // Then calculate the inbound fee that we charge based on the sum of
49✔
3278
        // outgoing HTLC amount and outgoing fee.
49✔
3279
        inFee := inboundFee.CalcFee(amtToForward + outFee)
49✔
3280

49✔
3281
        // Add up both fee components. It is important to calculate both fees
49✔
3282
        // separately. An alternative way of calculating is to first determine
49✔
3283
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
49✔
3284
        // rounding may cause the result to be slightly higher than in the case
49✔
3285
        // of separately rounded fee components. This potentially causes failed
49✔
3286
        // forwards for senders and is something to be avoided.
49✔
3287
        expectedFee := inFee + int64(outFee)
49✔
3288

49✔
3289
        // If the actual fee is less than our expected fee, then we'll reject
49✔
3290
        // this HTLC as it didn't provide a sufficient amount of fees, or the
49✔
3291
        // values have been tampered with, or the send used incorrect/dated
49✔
3292
        // information to construct the forwarding information for this hop. In
49✔
3293
        // any case, we'll cancel this HTLC.
49✔
3294
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
49✔
3295
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
55✔
3296
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
6✔
3297
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
6✔
3298
                        "inboundFee=%v",
6✔
3299
                        payHash[:], expectedFee, actualFee,
6✔
3300
                        incomingHtlcAmt, amtToForward, inboundFee,
6✔
3301
                )
6✔
3302

6✔
3303
                // As part of the returned error, we'll send our latest routing
6✔
3304
                // policy so the sending node obtains the most up to date data.
6✔
3305
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
12✔
3306
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
6✔
3307
                }
6✔
3308
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
3309
                return NewLinkError(failure)
6✔
3310
        }
3311

3312
        // Check whether the outgoing htlc satisfies the channel policy.
3313
        err := l.canSendHtlc(
43✔
3314
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
43✔
3315
                originalScid, customRecords,
43✔
3316
        )
43✔
3317
        if err != nil {
56✔
3318
                return err
13✔
3319
        }
13✔
3320

3321
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3322
        // the following constraint: the incoming time-lock minus our time-lock
3323
        // delta should equal the outgoing time lock. Otherwise, whether the
3324
        // sender messed up, or an intermediate node tampered with the HTLC.
3325
        timeDelta := policy.TimeLockDelta
30✔
3326
        if incomingTimeout < outgoingTimeout+timeDelta {
32✔
3327
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3328
                        "expected at least %v block delta, got %v block delta",
2✔
3329
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3330

2✔
3331
                // Grab the latest routing policy so the sending node is up to
2✔
3332
                // date with our current policy.
2✔
3333
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3334
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3335
                                incomingTimeout, *upd,
2✔
3336
                        )
2✔
3337
                }
2✔
3338
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3339
                return NewLinkError(failure)
2✔
3340
        }
3341

3342
        return nil
28✔
3343
}
3344

3345
// CheckHtlcTransit should return a nil error if the passed HTLC details
3346
// satisfy the current channel policy.  Otherwise, a LinkError with a
3347
// valid protocol failure message should be returned in order to signal
3348
// the violation. This call is intended to be used for locally initiated
3349
// payments for which there is no corresponding incoming htlc.
3350
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3351
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
3352
        customRecords lnwire.CustomRecords) *LinkError {
406✔
3353

406✔
3354
        l.RLock()
406✔
3355
        policy := l.cfg.FwrdingPolicy
406✔
3356
        l.RUnlock()
406✔
3357

406✔
3358
        // We pass in hop.Source here as this is only used in the Switch when
406✔
3359
        // trying to send over a local link. This causes the fallback mechanism
406✔
3360
        // to occur.
406✔
3361
        return l.canSendHtlc(
406✔
3362
                policy, payHash, amt, timeout, heightNow, hop.Source,
406✔
3363
                customRecords,
406✔
3364
        )
406✔
3365
}
406✔
3366

3367
// canSendHtlc checks whether the given htlc parameters satisfy
3368
// the channel's amount and time lock constraints.
3369
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3370
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3371
        heightNow uint32, originalScid lnwire.ShortChannelID,
3372
        customRecords lnwire.CustomRecords) *LinkError {
449✔
3373

449✔
3374
        // As our first sanity check, we'll ensure that the passed HTLC isn't
449✔
3375
        // too small for the next hop. If so, then we'll cancel the HTLC
449✔
3376
        // directly.
449✔
3377
        if amt < policy.MinHTLCOut {
457✔
3378
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
8✔
3379
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
8✔
3380
                        amt)
8✔
3381

8✔
3382
                // As part of the returned error, we'll send our latest routing
8✔
3383
                // policy so the sending node obtains the most up to date data.
8✔
3384
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
16✔
3385
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
8✔
3386
                }
8✔
3387
                failure := l.createFailureWithUpdate(false, originalScid, cb)
8✔
3388
                return NewLinkError(failure)
8✔
3389
        }
3390

3391
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3392
        // cancel the HTLC directly.
3393
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
444✔
3394
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
3✔
3395
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
3✔
3396

3✔
3397
                // As part of the returned error, we'll send our latest routing
3✔
3398
                // policy so the sending node obtains the most up-to-date data.
3✔
3399
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
3400
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
3401
                }
3✔
3402
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
3403
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
3✔
3404
        }
3405

3406
        // We want to avoid offering an HTLC which will expire in the near
3407
        // future, so we'll reject an HTLC if the outgoing expiration time is
3408
        // too close to the current height.
3409
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
440✔
3410
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3411
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3412
                        timeout, heightNow)
2✔
3413

2✔
3414
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3415
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3416
                }
2✔
3417
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3418
                return NewLinkError(failure)
2✔
3419
        }
3420

3421
        // Check absolute max delta.
3422
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
437✔
3423
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3424
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3425
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3426

1✔
3427
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3428
        }
1✔
3429

3430
        // We now check the available bandwidth to see if this HTLC can be
3431
        // forwarded.
3432
        availableBandwidth := l.Bandwidth()
435✔
3433
        auxBandwidth, err := fn.MapOptionZ(
435✔
3434
                l.cfg.AuxTrafficShaper,
435✔
3435
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
435✔
3436
                        var htlcBlob fn.Option[tlv.Blob]
×
3437
                        blob, err := customRecords.Serialize()
×
3438
                        if err != nil {
×
3439
                                return fn.Err[OptionalBandwidth](
×
3440
                                        fmt.Errorf("unable to serialize "+
×
3441
                                                "custom records: %w", err))
×
3442
                        }
×
3443

3444
                        if len(blob) > 0 {
×
3445
                                htlcBlob = fn.Some(blob)
×
3446
                        }
×
3447

3448
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
3449
                },
3450
        ).Unpack()
3451
        if err != nil {
435✔
3452
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
3453
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
3454
        }
×
3455

3456
        if auxBandwidth.IsHandled && auxBandwidth.Bandwidth.IsSome() {
435✔
3457
                auxBandwidth.Bandwidth.WhenSome(
×
3458
                        func(bandwidth lnwire.MilliSatoshi) {
×
3459
                                availableBandwidth = bandwidth
×
3460
                        },
×
3461
                )
3462
        }
3463

3464
        // Check to see if there is enough balance in this channel.
3465
        if amt > availableBandwidth {
436✔
3466
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
1✔
3467
                        "larger than %v", amt, availableBandwidth)
1✔
3468
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2✔
3469
                        return lnwire.NewTemporaryChannelFailure(upd)
1✔
3470
                }
1✔
3471
                failure := l.createFailureWithUpdate(false, originalScid, cb)
1✔
3472
                return NewDetailedLinkError(
1✔
3473
                        failure, OutgoingFailureInsufficientBalance,
1✔
3474
                )
1✔
3475
        }
3476

3477
        return nil
434✔
3478
}
3479

3480
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
3481
// in milli-satoshi. This might be different from the regular BTC bandwidth for
3482
// custom channels. This will always return fn.None() for a regular (non-custom)
3483
// channel.
3484
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
3485
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
3486
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
3487

×
3488
        fundingBlob := l.FundingCustomBlob()
×
3489
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob, htlcBlob)
×
3490
        if err != nil {
×
3491
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
3492
                        "failed to decide whether to handle traffic: %w", err))
×
3493
        }
×
3494

3495
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
3496
                "traffic: %v", cid, shouldHandle)
×
3497

×
3498
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
3499
        // early.
×
3500
        if !shouldHandle {
×
3501
                return fn.Ok(OptionalBandwidth{
×
3502
                        IsHandled: false,
×
3503
                })
×
3504
        }
×
3505

3506
        // Ask for a specific bandwidth to be used for the channel.
3507
        commitmentBlob := l.CommitmentCustomBlob()
×
3508
        auxBandwidth, err := ts.PaymentBandwidth(
×
3509
                fundingBlob, htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
3510
                l.channel.FetchLatestAuxHTLCView(),
×
3511
        )
×
3512
        if err != nil {
×
3513
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
3514
                        "bandwidth from external traffic shaper: %w", err))
×
3515
        }
×
3516

3517
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
3518
                "bandwidth: %v", cid, auxBandwidth)
×
3519

×
3520
        return fn.Ok(OptionalBandwidth{
×
3521
                IsHandled: true,
×
3522
                Bandwidth: fn.Some(auxBandwidth),
×
3523
        })
×
3524
}
3525

3526
// Stats returns the statistics of channel link.
3527
//
3528
// NOTE: Part of the ChannelLink interface.
3529
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
4✔
3530
        snapshot := l.channel.StateSnapshot()
4✔
3531

4✔
3532
        return snapshot.ChannelCommitment.CommitHeight,
4✔
3533
                snapshot.TotalMSatSent,
4✔
3534
                snapshot.TotalMSatReceived
4✔
3535
}
4✔
3536

3537
// String returns the string representation of channel link.
3538
//
3539
// NOTE: Part of the ChannelLink interface.
3540
func (l *channelLink) String() string {
×
3541
        return l.channel.ChannelPoint().String()
×
3542
}
×
3543

3544
// handleSwitchPacket handles the switch packets. This packets which might be
3545
// forwarded to us from another channel link in case the htlc update came from
3546
// another peer or if the update was created by user
3547
//
3548
// NOTE: Part of the packetHandler interface.
3549
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
479✔
3550
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
479✔
3551
                pkt.inKey(), pkt.outKey())
479✔
3552

479✔
3553
        return l.mailBox.AddPacket(pkt)
479✔
3554
}
479✔
3555

3556
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3557
// to us from remote peer we have a channel with.
3558
//
3559
// NOTE: Part of the ChannelLink interface.
3560
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,315✔
3561
        select {
3,315✔
3562
        case <-l.cg.Done():
×
3563
                // Return early if the link is already in the process of
×
3564
                // quitting. It doesn't make sense to hand the message to the
×
3565
                // mailbox here.
×
3566
                return
×
3567
        default:
3,315✔
3568
        }
3569

3570
        err := l.mailBox.AddMessage(message)
3,315✔
3571
        if err != nil {
3,315✔
3572
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3573
        }
×
3574
}
3575

3576
// updateChannelFee updates the commitment fee-per-kw on this channel by
3577
// committing to an update_fee message.
3578
func (l *channelLink) updateChannelFee(ctx context.Context,
3579
        feePerKw chainfee.SatPerKWeight) error {
3✔
3580

3✔
3581
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3582

3✔
3583
        // We skip sending the UpdateFee message if the channel is not
3✔
3584
        // currently eligible to forward messages.
3✔
3585
        if !l.eligibleToUpdate() {
3✔
3586
                l.log.Debugf("skipping fee update for inactive channel")
×
3587
                return nil
×
3588
        }
×
3589

3590
        // Check and see if our proposed fee-rate would make us exceed the fee
3591
        // threshold.
3592
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3593
        if err != nil {
3✔
3594
                // This shouldn't typically happen. If it does, it indicates
×
3595
                // something is wrong with our channel state.
×
3596
                return err
×
3597
        }
×
3598

3599
        if thresholdExceeded {
3✔
3600
                return fmt.Errorf("link fee threshold exceeded")
×
3601
        }
×
3602

3603
        // First, we'll update the local fee on our commitment.
3604
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3605
                return err
×
3606
        }
×
3607

3608
        // The fee passed the channel's validation checks, so we update the
3609
        // mailbox feerate.
3610
        l.mailBox.SetFeeRate(feePerKw)
3✔
3611

3✔
3612
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3613
        // in immediately by triggering a commitment update.
3✔
3614
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3615
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3616
                return err
×
3617
        }
×
3618

3619
        return l.updateCommitTx(ctx)
3✔
3620
}
3621

3622
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3623
// after receiving a revocation from the remote party, and reprocesses them in
3624
// the context of the provided forwarding package. Any settles or fails that
3625
// have already been acknowledged in the forwarding package will not be sent to
3626
// the switch.
3627
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,164✔
3628
        if len(fwdPkg.SettleFails) == 0 {
2,012✔
3629
                l.log.Trace("fwd package has no settle/fails to process " +
848✔
3630
                        "exiting early")
848✔
3631

848✔
3632
                return
848✔
3633
        }
848✔
3634

3635
        // Exit early if the fwdPkg is already processed.
3636
        if fwdPkg.State == channeldb.FwdStateCompleted {
316✔
3637
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
3638

×
3639
                return
×
3640
        }
×
3641

3642
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
316✔
3643

316✔
3644
        var switchPackets []*htlcPacket
316✔
3645
        for i, update := range fwdPkg.SettleFails {
632✔
3646
                destRef := fwdPkg.DestRef(uint16(i))
316✔
3647

316✔
3648
                // Skip any settles or fails that have already been
316✔
3649
                // acknowledged by the incoming link that originated the
316✔
3650
                // forwarded Add.
316✔
3651
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
316✔
3652
                        continue
×
3653
                }
3654

3655
                // TODO(roasbeef): rework log entries to a shared
3656
                // interface.
3657

3658
                switch msg := update.UpdateMsg.(type) {
316✔
3659
                // A settle for an HTLC we previously forwarded HTLC has been
3660
                // received. So we'll forward the HTLC to the switch which will
3661
                // handle propagating the settle to the prior hop.
3662
                case *lnwire.UpdateFulfillHTLC:
193✔
3663
                        // If hodl.SettleIncoming is requested, we will not
193✔
3664
                        // forward the SETTLE to the switch and will not signal
193✔
3665
                        // a free slot on the commitment transaction.
193✔
3666
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
193✔
3667
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3668
                                continue
×
3669
                        }
3670

3671
                        settlePacket := &htlcPacket{
193✔
3672
                                outgoingChanID: l.ShortChanID(),
193✔
3673
                                outgoingHTLCID: msg.ID,
193✔
3674
                                destRef:        &destRef,
193✔
3675
                                htlc:           msg,
193✔
3676
                        }
193✔
3677

193✔
3678
                        // Add the packet to the batch to be forwarded, and
193✔
3679
                        // notify the overflow queue that a spare spot has been
193✔
3680
                        // freed up within the commitment state.
193✔
3681
                        switchPackets = append(switchPackets, settlePacket)
193✔
3682

3683
                // A failureCode message for a previously forwarded HTLC has
3684
                // been received. As a result a new slot will be freed up in
3685
                // our commitment state, so we'll forward this to the switch so
3686
                // the backwards undo can continue.
3687
                case *lnwire.UpdateFailHTLC:
123✔
3688
                        // If hodl.SettleIncoming is requested, we will not
123✔
3689
                        // forward the FAIL to the switch and will not signal a
123✔
3690
                        // free slot on the commitment transaction.
123✔
3691
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
123✔
3692
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3693
                                continue
×
3694
                        }
3695

3696
                        // Fetch the reason the HTLC was canceled so we can
3697
                        // continue to propagate it. This failure originated
3698
                        // from another node, so the linkFailure field is not
3699
                        // set on the packet.
3700
                        failPacket := &htlcPacket{
123✔
3701
                                outgoingChanID: l.ShortChanID(),
123✔
3702
                                outgoingHTLCID: msg.ID,
123✔
3703
                                destRef:        &destRef,
123✔
3704
                                htlc:           msg,
123✔
3705
                        }
123✔
3706

123✔
3707
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
123✔
3708

123✔
3709
                        // If the failure message lacks an HMAC (but includes
123✔
3710
                        // the 4 bytes for encoding the message and padding
123✔
3711
                        // lengths, then this means that we received it as an
123✔
3712
                        // UpdateFailMalformedHTLC. As a result, we'll signal
123✔
3713
                        // that we need to convert this error within the switch
123✔
3714
                        // to an actual error, by encrypting it as if we were
123✔
3715
                        // the originating hop.
123✔
3716
                        convertedErrorSize := lnwire.FailureMessageLength + 4
123✔
3717
                        if len(msg.Reason) == convertedErrorSize {
126✔
3718
                                failPacket.convertedError = true
3✔
3719
                        }
3✔
3720

3721
                        // Add the packet to the batch to be forwarded, and
3722
                        // notify the overflow queue that a spare spot has been
3723
                        // freed up within the commitment state.
3724
                        switchPackets = append(switchPackets, failPacket)
123✔
3725
                }
3726
        }
3727

3728
        // Only spawn the task forward packets we have a non-zero number.
3729
        if len(switchPackets) > 0 {
632✔
3730
                go l.forwardBatch(false, switchPackets...)
316✔
3731
        }
316✔
3732
}
3733

3734
// processRemoteAdds serially processes each of the Add payment descriptors
3735
// which have been "locked-in" by receiving a revocation from the remote party.
3736
// The forwarding package provided instructs how to process this batch,
3737
// indicating whether this is the first time these Adds are being processed, or
3738
// whether we are reprocessing as a result of a failure or restart. Adds that
3739
// have already been acknowledged in the forwarding package will be ignored.
3740
//
3741
//nolint:funlen
3742
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,166✔
3743
        // Exit early if there are no adds to process.
1,166✔
3744
        if len(fwdPkg.Adds) == 0 {
2,025✔
3745
                l.log.Trace("fwd package has no adds to process exiting early")
859✔
3746

859✔
3747
                return
859✔
3748
        }
859✔
3749

3750
        // Exit early if the fwdPkg is already processed.
3751
        if fwdPkg.State == channeldb.FwdStateCompleted {
307✔
3752
                l.log.Debugf("skipped processing completed fwdPkg %v", fwdPkg)
×
3753

×
3754
                return
×
3755
        }
×
3756

3757
        l.log.Tracef("processing %d remote adds for height %d",
307✔
3758
                len(fwdPkg.Adds), fwdPkg.Height)
307✔
3759

307✔
3760
        // decodeReqs is a list of requests sent to the onion decoder. We expect
307✔
3761
        // the same length of responses to be returned.
307✔
3762
        decodeReqs := make([]hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds))
307✔
3763

307✔
3764
        // unackedAdds is a list of ADDs that's waiting for the remote's
307✔
3765
        // settle/fail update.
307✔
3766
        unackedAdds := make([]*lnwire.UpdateAddHTLC, 0, len(fwdPkg.Adds))
307✔
3767

307✔
3768
        for i, update := range fwdPkg.Adds {
756✔
3769
                // If this index is already found in the ack filter, the
449✔
3770
                // response to this forwarding decision has already been
449✔
3771
                // committed by one of our commitment txns. ADDs in this state
449✔
3772
                // are waiting for the rest of the fwding package to get acked
449✔
3773
                // before being garbage collected.
449✔
3774
                if fwdPkg.State == channeldb.FwdStateProcessed &&
449✔
3775
                        fwdPkg.AckFilter.Contains(uint16(i)) {
449✔
3776

×
3777
                        continue
×
3778
                }
3779

3780
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
898✔
3781
                        // Before adding the new htlc to the state machine,
449✔
3782
                        // parse the onion object in order to obtain the
449✔
3783
                        // routing information with DecodeHopIterator function
449✔
3784
                        // which process the Sphinx packet.
449✔
3785
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
449✔
3786

449✔
3787
                        req := hop.DecodeHopIteratorRequest{
449✔
3788
                                OnionReader:    onionReader,
449✔
3789
                                RHash:          msg.PaymentHash[:],
449✔
3790
                                IncomingCltv:   msg.Expiry,
449✔
3791
                                IncomingAmount: msg.Amount,
449✔
3792
                                BlindingPoint:  msg.BlindingPoint,
449✔
3793
                        }
449✔
3794

449✔
3795
                        decodeReqs = append(decodeReqs, req)
449✔
3796
                        unackedAdds = append(unackedAdds, msg)
449✔
3797
                }
449✔
3798
        }
3799

3800
        // If the fwdPkg has already been processed, it means we are
3801
        // reforwarding the packets again, which happens only on a restart.
3802
        reforward := fwdPkg.State == channeldb.FwdStateProcessed
307✔
3803

307✔
3804
        // Atomically decode the incoming htlcs, simultaneously checking for
307✔
3805
        // replay attempts. A particular index in the returned, spare list of
307✔
3806
        // channel iterators should only be used if the failure code at the
307✔
3807
        // same index is lnwire.FailCodeNone.
307✔
3808
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
307✔
3809
                fwdPkg.ID(), decodeReqs, reforward,
307✔
3810
        )
307✔
3811
        if sphinxErr != nil {
307✔
3812
                l.failf(LinkFailureError{code: ErrInternalError},
×
3813
                        "unable to decode hop iterators: %v", sphinxErr)
×
3814
                return
×
3815
        }
×
3816

3817
        var switchPackets []*htlcPacket
307✔
3818

307✔
3819
        for i, update := range unackedAdds {
756✔
3820
                idx := uint16(i)
449✔
3821
                sourceRef := fwdPkg.SourceRef(idx)
449✔
3822
                add := *update
449✔
3823

449✔
3824
                // An incoming HTLC add has been full-locked in. As a result we
449✔
3825
                // can now examine the forwarding details of the HTLC, and the
449✔
3826
                // HTLC itself to decide if: we should forward it, cancel it,
449✔
3827
                // or are able to settle it (and it adheres to our fee related
449✔
3828
                // constraints).
449✔
3829

449✔
3830
                // Before adding the new htlc to the state machine, parse the
449✔
3831
                // onion object in order to obtain the routing information with
449✔
3832
                // DecodeHopIterator function which process the Sphinx packet.
449✔
3833
                chanIterator, failureCode := decodeResps[i].Result()
449✔
3834
                if failureCode != lnwire.CodeNone {
451✔
3835
                        // If we're unable to process the onion blob then we
2✔
3836
                        // should send the malformed htlc error to payment
2✔
3837
                        // sender.
2✔
3838
                        l.sendMalformedHTLCError(
2✔
3839
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
2✔
3840
                        )
2✔
3841

2✔
3842
                        l.log.Errorf("unable to decode onion hop iterator "+
2✔
3843
                                "for htlc(id=%v, hash=%x): %v", add.ID,
2✔
3844
                                add.PaymentHash, failureCode)
2✔
3845

2✔
3846
                        continue
2✔
3847
                }
3848

3849
                heightNow := l.cfg.BestHeight()
447✔
3850

447✔
3851
                pld, routeRole, pldErr := chanIterator.HopPayload()
447✔
3852
                if pldErr != nil {
447✔
UNCOV
3853
                        // If we're unable to process the onion payload, or we
×
UNCOV
3854
                        // received invalid onion payload failure, then we
×
UNCOV
3855
                        // should send an error back to the caller so the HTLC
×
UNCOV
3856
                        // can be canceled.
×
UNCOV
3857
                        var failedType uint64
×
UNCOV
3858

×
UNCOV
3859
                        // We need to get the underlying error value, so we
×
UNCOV
3860
                        // can't use errors.As as suggested by the linter.
×
UNCOV
3861
                        //nolint:errorlint
×
UNCOV
3862
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
×
3863
                                failedType = uint64(e.Type)
×
3864
                        }
×
3865

3866
                        // If we couldn't parse the payload, make our best
3867
                        // effort at creating an error encrypter that knows
3868
                        // what blinding type we were, but if we couldn't
3869
                        // parse the payload we have no way of knowing whether
3870
                        // we were the introduction node or not.
3871
                        //
3872
                        //nolint:ll
UNCOV
3873
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
×
UNCOV
3874
                                l.cfg.ExtractErrorEncrypter,
×
UNCOV
3875
                                // We need our route role here because we
×
UNCOV
3876
                                // couldn't parse or validate the payload.
×
UNCOV
3877
                                routeRole == hop.RouteRoleIntroduction,
×
UNCOV
3878
                        )
×
UNCOV
3879
                        if failCode != lnwire.CodeNone {
×
3880
                                l.log.Errorf("could not extract error "+
×
3881
                                        "encrypter: %v", pldErr)
×
3882

×
3883
                                // We can't process this htlc, send back
×
3884
                                // malformed.
×
3885
                                l.sendMalformedHTLCError(
×
3886
                                        add.ID, failureCode, add.OnionBlob,
×
3887
                                        &sourceRef,
×
3888
                                )
×
3889

×
3890
                                continue
×
3891
                        }
3892

3893
                        // TODO: currently none of the test unit infrastructure
3894
                        // is setup to handle TLV payloads, so testing this
3895
                        // would require implementing a separate mock iterator
3896
                        // for TLV payloads that also supports injecting invalid
3897
                        // payloads. Deferring this non-trival effort till a
3898
                        // later date
UNCOV
3899
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
×
UNCOV
3900

×
UNCOV
3901
                        l.sendHTLCError(
×
UNCOV
3902
                                add, sourceRef, NewLinkError(failure),
×
UNCOV
3903
                                obfuscator, false,
×
UNCOV
3904
                        )
×
UNCOV
3905

×
UNCOV
3906
                        l.log.Errorf("unable to decode forwarding "+
×
UNCOV
3907
                                "instructions: %v", pldErr)
×
UNCOV
3908

×
UNCOV
3909
                        continue
×
3910
                }
3911

3912
                // Retrieve onion obfuscator from onion blob in order to
3913
                // produce initial obfuscation of the onion failureCode.
3914
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
447✔
3915
                        l.cfg.ExtractErrorEncrypter,
447✔
3916
                        routeRole == hop.RouteRoleIntroduction,
447✔
3917
                )
447✔
3918
                if failureCode != lnwire.CodeNone {
448✔
3919
                        // If we're unable to process the onion blob than we
1✔
3920
                        // should send the malformed htlc error to payment
1✔
3921
                        // sender.
1✔
3922
                        l.sendMalformedHTLCError(
1✔
3923
                                add.ID, failureCode, add.OnionBlob,
1✔
3924
                                &sourceRef,
1✔
3925
                        )
1✔
3926

1✔
3927
                        l.log.Errorf("unable to decode onion "+
1✔
3928
                                "obfuscator: %v", failureCode)
1✔
3929

1✔
3930
                        continue
1✔
3931
                }
3932

3933
                fwdInfo := pld.ForwardingInfo()
446✔
3934

446✔
3935
                // Check whether the payload we've just processed uses our
446✔
3936
                // node as the introduction point (gave us a blinding key in
446✔
3937
                // the payload itself) and fail it back if we don't support
446✔
3938
                // route blinding.
446✔
3939
                if fwdInfo.NextBlinding.IsSome() &&
446✔
3940
                        l.cfg.DisallowRouteBlinding {
446✔
UNCOV
3941

×
UNCOV
3942
                        failure := lnwire.NewInvalidBlinding(
×
UNCOV
3943
                                fn.Some(add.OnionBlob),
×
UNCOV
3944
                        )
×
UNCOV
3945

×
UNCOV
3946
                        l.sendHTLCError(
×
UNCOV
3947
                                add, sourceRef, NewLinkError(failure),
×
UNCOV
3948
                                obfuscator, false,
×
UNCOV
3949
                        )
×
UNCOV
3950

×
UNCOV
3951
                        l.log.Error("rejected htlc that uses use as an " +
×
UNCOV
3952
                                "introduction point when we do not support " +
×
UNCOV
3953
                                "route blinding")
×
UNCOV
3954

×
UNCOV
3955
                        continue
×
3956
                }
3957

3958
                switch fwdInfo.NextHop {
446✔
3959
                case hop.Exit:
410✔
3960
                        err := l.processExitHop(
410✔
3961
                                add, sourceRef, obfuscator, fwdInfo,
410✔
3962
                                heightNow, pld,
410✔
3963
                        )
410✔
3964
                        if err != nil {
410✔
3965
                                l.failf(LinkFailureError{
×
3966
                                        code: ErrInternalError,
×
3967
                                }, err.Error()) //nolint
×
3968

×
3969
                                return
×
3970
                        }
×
3971

3972
                // There are additional channels left within this route. So
3973
                // we'll simply do some forwarding package book-keeping.
3974
                default:
36✔
3975
                        // If hodl.AddIncoming is requested, we will not
36✔
3976
                        // validate the forwarded ADD, nor will we send the
36✔
3977
                        // packet to the htlc switch.
36✔
3978
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
36✔
3979
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3980
                                continue
×
3981
                        }
3982

3983
                        endorseValue := l.experimentalEndorsement(
36✔
3984
                                record.CustomSet(add.CustomRecords),
36✔
3985
                        )
36✔
3986
                        endorseType := uint64(
36✔
3987
                                lnwire.ExperimentalEndorsementType,
36✔
3988
                        )
36✔
3989

36✔
3990
                        switch fwdPkg.State {
36✔
UNCOV
3991
                        case channeldb.FwdStateProcessed:
×
UNCOV
3992
                                // This add was not forwarded on the previous
×
UNCOV
3993
                                // processing phase, run it through our
×
UNCOV
3994
                                // validation pipeline to reproduce an error.
×
UNCOV
3995
                                // This may trigger a different error due to
×
UNCOV
3996
                                // expiring timelocks, but we expect that an
×
UNCOV
3997
                                // error will be reproduced.
×
UNCOV
3998
                                if !fwdPkg.FwdFilter.Contains(idx) {
×
3999
                                        break
×
4000
                                }
4001

4002
                                // Otherwise, it was already processed, we can
4003
                                // can collect it and continue.
UNCOV
4004
                                outgoingAdd := &lnwire.UpdateAddHTLC{
×
UNCOV
4005
                                        Expiry:        fwdInfo.OutgoingCTLV,
×
UNCOV
4006
                                        Amount:        fwdInfo.AmountToForward,
×
UNCOV
4007
                                        PaymentHash:   add.PaymentHash,
×
UNCOV
4008
                                        BlindingPoint: fwdInfo.NextBlinding,
×
UNCOV
4009
                                }
×
UNCOV
4010

×
UNCOV
4011
                                endorseValue.WhenSome(func(e byte) {
×
UNCOV
4012
                                        custRecords := map[uint64][]byte{
×
UNCOV
4013
                                                endorseType: {e},
×
UNCOV
4014
                                        }
×
UNCOV
4015

×
UNCOV
4016
                                        outgoingAdd.CustomRecords = custRecords
×
UNCOV
4017
                                })
×
4018

4019
                                // Finally, we'll encode the onion packet for
4020
                                // the _next_ hop using the hop iterator
4021
                                // decoded for the current hop.
UNCOV
4022
                                buf := bytes.NewBuffer(
×
UNCOV
4023
                                        outgoingAdd.OnionBlob[0:0],
×
UNCOV
4024
                                )
×
UNCOV
4025

×
UNCOV
4026
                                // We know this cannot fail, as this ADD
×
UNCOV
4027
                                // was marked forwarded in a previous
×
UNCOV
4028
                                // round of processing.
×
UNCOV
4029
                                chanIterator.EncodeNextHop(buf)
×
UNCOV
4030

×
UNCOV
4031
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
×
UNCOV
4032

×
UNCOV
4033
                                //nolint:ll
×
UNCOV
4034
                                updatePacket := &htlcPacket{
×
UNCOV
4035
                                        incomingChanID:       l.ShortChanID(),
×
UNCOV
4036
                                        incomingHTLCID:       add.ID,
×
UNCOV
4037
                                        outgoingChanID:       fwdInfo.NextHop,
×
UNCOV
4038
                                        sourceRef:            &sourceRef,
×
UNCOV
4039
                                        incomingAmount:       add.Amount,
×
UNCOV
4040
                                        amount:               outgoingAdd.Amount,
×
UNCOV
4041
                                        htlc:                 outgoingAdd,
×
UNCOV
4042
                                        obfuscator:           obfuscator,
×
UNCOV
4043
                                        incomingTimeout:      add.Expiry,
×
UNCOV
4044
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
×
UNCOV
4045
                                        inOnionCustomRecords: pld.CustomRecords(),
×
UNCOV
4046
                                        inboundFee:           inboundFee,
×
UNCOV
4047
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
×
UNCOV
4048
                                }
×
UNCOV
4049
                                switchPackets = append(
×
UNCOV
4050
                                        switchPackets, updatePacket,
×
UNCOV
4051
                                )
×
UNCOV
4052

×
UNCOV
4053
                                continue
×
4054
                        }
4055

4056
                        // TODO(roasbeef): ensure don't accept outrageous
4057
                        // timeout for htlc
4058

4059
                        // With all our forwarding constraints met, we'll
4060
                        // create the outgoing HTLC using the parameters as
4061
                        // specified in the forwarding info.
4062
                        addMsg := &lnwire.UpdateAddHTLC{
36✔
4063
                                Expiry:        fwdInfo.OutgoingCTLV,
36✔
4064
                                Amount:        fwdInfo.AmountToForward,
36✔
4065
                                PaymentHash:   add.PaymentHash,
36✔
4066
                                BlindingPoint: fwdInfo.NextBlinding,
36✔
4067
                        }
36✔
4068

36✔
4069
                        endorseValue.WhenSome(func(e byte) {
72✔
4070
                                addMsg.CustomRecords = map[uint64][]byte{
36✔
4071
                                        endorseType: {e},
36✔
4072
                                }
36✔
4073
                        })
36✔
4074

4075
                        // Finally, we'll encode the onion packet for the
4076
                        // _next_ hop using the hop iterator decoded for the
4077
                        // current hop.
4078
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
36✔
4079
                        err := chanIterator.EncodeNextHop(buf)
36✔
4080
                        if err != nil {
36✔
4081
                                l.log.Errorf("unable to encode the "+
×
4082
                                        "remaining route %v", err)
×
4083

×
4084
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
4085
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
4086
                                }
×
4087

4088
                                failure := l.createFailureWithUpdate(
×
4089
                                        true, hop.Source, cb,
×
4090
                                )
×
4091

×
4092
                                l.sendHTLCError(
×
4093
                                        add, sourceRef, NewLinkError(failure),
×
4094
                                        obfuscator, false,
×
4095
                                )
×
4096
                                continue
×
4097
                        }
4098

4099
                        // Now that this add has been reprocessed, only append
4100
                        // it to our list of packets to forward to the switch
4101
                        // this is the first time processing the add. If the
4102
                        // fwd pkg has already been processed, then we entered
4103
                        // the above section to recreate a previous error.  If
4104
                        // the packet had previously been forwarded, it would
4105
                        // have been added to switchPackets at the top of this
4106
                        // section.
4107
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
72✔
4108
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
36✔
4109

36✔
4110
                                //nolint:ll
36✔
4111
                                updatePacket := &htlcPacket{
36✔
4112
                                        incomingChanID:       l.ShortChanID(),
36✔
4113
                                        incomingHTLCID:       add.ID,
36✔
4114
                                        outgoingChanID:       fwdInfo.NextHop,
36✔
4115
                                        sourceRef:            &sourceRef,
36✔
4116
                                        incomingAmount:       add.Amount,
36✔
4117
                                        amount:               addMsg.Amount,
36✔
4118
                                        htlc:                 addMsg,
36✔
4119
                                        obfuscator:           obfuscator,
36✔
4120
                                        incomingTimeout:      add.Expiry,
36✔
4121
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
36✔
4122
                                        inOnionCustomRecords: pld.CustomRecords(),
36✔
4123
                                        inboundFee:           inboundFee,
36✔
4124
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
36✔
4125
                                }
36✔
4126

36✔
4127
                                fwdPkg.FwdFilter.Set(idx)
36✔
4128
                                switchPackets = append(switchPackets,
36✔
4129
                                        updatePacket)
36✔
4130
                        }
36✔
4131
                }
4132
        }
4133

4134
        // Commit the htlcs we are intending to forward if this package has not
4135
        // been fully processed.
4136
        if fwdPkg.State == channeldb.FwdStateLockedIn {
611✔
4137
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
304✔
4138
                if err != nil {
304✔
4139
                        l.failf(LinkFailureError{code: ErrInternalError},
×
4140
                                "unable to set fwd filter: %v", err)
×
4141
                        return
×
4142
                }
×
4143
        }
4144

4145
        if len(switchPackets) == 0 {
578✔
4146
                return
271✔
4147
        }
271✔
4148

4149
        l.log.Debugf("forwarding %d packets to switch: reforward=%v",
36✔
4150
                len(switchPackets), reforward)
36✔
4151

36✔
4152
        // NOTE: This call is made synchronous so that we ensure all circuits
36✔
4153
        // are committed in the exact order that they are processed in the link.
36✔
4154
        // Failing to do this could cause reorderings/gaps in the range of
36✔
4155
        // opened circuits, which violates assumptions made by the circuit
36✔
4156
        // trimming.
36✔
4157
        l.forwardBatch(reforward, switchPackets...)
36✔
4158
}
4159

4160
// experimentalEndorsement returns the value to set for our outgoing
4161
// experimental endorsement field, and a boolean indicating whether it should
4162
// be populated on the outgoing htlc.
4163
func (l *channelLink) experimentalEndorsement(
4164
        customUpdateAdd record.CustomSet) fn.Option[byte] {
36✔
4165

36✔
4166
        // Only relay experimental signal if we are within the experiment
36✔
4167
        // period.
36✔
4168
        if !l.cfg.ShouldFwdExpEndorsement() {
36✔
UNCOV
4169
                return fn.None[byte]()
×
UNCOV
4170
        }
×
4171

4172
        // If we don't have any custom records or the experimental field is
4173
        // not set, just forward a zero value.
4174
        if len(customUpdateAdd) == 0 {
72✔
4175
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
36✔
4176
        }
36✔
4177

UNCOV
4178
        t := uint64(lnwire.ExperimentalEndorsementType)
×
UNCOV
4179
        value, set := customUpdateAdd[t]
×
UNCOV
4180
        if !set {
×
4181
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4182
        }
×
4183

4184
        // We expect at least one byte for this field, consider it invalid if
4185
        // it has no data and just forward a zero value.
UNCOV
4186
        if len(value) == 0 {
×
4187
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4188
        }
×
4189

4190
        // Only forward endorsed if the incoming link is endorsed.
UNCOV
4191
        if value[0] == lnwire.ExperimentalEndorsed {
×
UNCOV
4192
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
×
UNCOV
4193
        }
×
4194

4195
        // Forward as unendorsed otherwise, including cases where we've
4196
        // received an invalid value that uses more than 3 bits of information.
UNCOV
4197
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4198
}
4199

4200
// processExitHop handles an htlc for which this link is the exit hop. It
4201
// returns a boolean indicating whether the commitment tx needs an update.
4202
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
4203
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
4204
        fwdInfo hop.ForwardingInfo, heightNow uint32,
4205
        payload invoices.Payload) error {
410✔
4206

410✔
4207
        // If hodl.ExitSettle is requested, we will not validate the final hop's
410✔
4208
        // ADD, nor will we settle the corresponding invoice or respond with the
410✔
4209
        // preimage.
410✔
4210
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
517✔
4211
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
107✔
4212
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
107✔
4213

107✔
4214
                return nil
107✔
4215
        }
107✔
4216

4217
        // In case the traffic shaper is active, we'll check if the HTLC has
4218
        // custom records and skip the amount check in the onion payload below.
4219
        isCustomHTLC := fn.MapOptionZ(
303✔
4220
                l.cfg.AuxTrafficShaper,
303✔
4221
                func(ts AuxTrafficShaper) bool {
303✔
4222
                        return ts.IsCustomHTLC(add.CustomRecords)
×
4223
                },
×
4224
        )
4225

4226
        // As we're the exit hop, we'll double check the hop-payload included in
4227
        // the HTLC to ensure that it was crafted correctly by the sender and
4228
        // is compatible with the HTLC we were extended. If an external
4229
        // validator is active we might bypass the amount check.
4230
        if !isCustomHTLC && add.Amount < fwdInfo.AmountToForward {
403✔
4231
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
4232
                        "incompatible value: expected <=%v, got %v",
100✔
4233
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
4234

100✔
4235
                failure := NewLinkError(
100✔
4236
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
4237
                )
100✔
4238
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
4239

100✔
4240
                return nil
100✔
4241
        }
100✔
4242

4243
        // We'll also ensure that our time-lock value has been computed
4244
        // correctly.
4245
        if add.Expiry < fwdInfo.OutgoingCTLV {
204✔
4246
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
4247
                        "incompatible time-lock: expected <=%v, got %v",
1✔
4248
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
4249

1✔
4250
                failure := NewLinkError(
1✔
4251
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
4252
                )
1✔
4253

1✔
4254
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
4255

1✔
4256
                return nil
1✔
4257
        }
1✔
4258

4259
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4260
        // after this, this code will be re-executed after restart. We will
4261
        // receive back a resolution event.
4262
        invoiceHash := lntypes.Hash(add.PaymentHash)
202✔
4263

202✔
4264
        circuitKey := models.CircuitKey{
202✔
4265
                ChanID: l.ShortChanID(),
202✔
4266
                HtlcID: add.ID,
202✔
4267
        }
202✔
4268

202✔
4269
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
202✔
4270
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
202✔
4271
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
202✔
4272
        )
202✔
4273
        if err != nil {
202✔
4274
                return err
×
4275
        }
×
4276

4277
        // Create a hodlHtlc struct and decide either resolved now or later.
4278
        htlc := hodlHtlc{
202✔
4279
                add:        add,
202✔
4280
                sourceRef:  sourceRef,
202✔
4281
                obfuscator: obfuscator,
202✔
4282
        }
202✔
4283

202✔
4284
        // If the event is nil, the invoice is being held, so we save payment
202✔
4285
        // descriptor for future reference.
202✔
4286
        if event == nil {
258✔
4287
                l.hodlMap[circuitKey] = htlc
56✔
4288
                return nil
56✔
4289
        }
56✔
4290

4291
        // Process the received resolution.
4292
        return l.processHtlcResolution(event, htlc)
146✔
4293
}
4294

4295
// settleHTLC settles the HTLC on the channel.
4296
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4297
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
197✔
4298

197✔
4299
        hash := preimage.Hash()
197✔
4300

197✔
4301
        l.log.Infof("settling htlc %v as exit hop", hash)
197✔
4302

197✔
4303
        err := l.channel.SettleHTLC(
197✔
4304
                preimage, htlcIndex, &sourceRef, nil, nil,
197✔
4305
        )
197✔
4306
        if err != nil {
197✔
4307
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4308
        }
×
4309

4310
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4311
        // fake one before sending it to the peer.
4312
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
197✔
UNCOV
4313
                l.log.Warnf(hodl.BogusSettle.Warning())
×
UNCOV
4314
                preimage = [32]byte{}
×
UNCOV
4315
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
×
UNCOV
4316
        }
×
4317

4318
        // HTLC was successfully settled locally send notification about it
4319
        // remote peer.
4320
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
197✔
4321
                ChanID:          l.ChanID(),
197✔
4322
                ID:              htlcIndex,
197✔
4323
                PaymentPreimage: preimage,
197✔
4324
        })
197✔
4325

197✔
4326
        // Once we have successfully settled the htlc, notify a settle event.
197✔
4327
        l.cfg.HtlcNotifier.NotifySettleEvent(
197✔
4328
                HtlcKey{
197✔
4329
                        IncomingCircuit: models.CircuitKey{
197✔
4330
                                ChanID: l.ShortChanID(),
197✔
4331
                                HtlcID: htlcIndex,
197✔
4332
                        },
197✔
4333
                },
197✔
4334
                preimage,
197✔
4335
                HtlcEventTypeReceive,
197✔
4336
        )
197✔
4337

197✔
4338
        return nil
197✔
4339
}
4340

4341
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4342
// err chan for the individual responses. This method is intended to be spawned
4343
// as a goroutine so the responses can be handled in the background.
4344
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
577✔
4345
        // Don't forward packets for which we already have a response in our
577✔
4346
        // mailbox. This could happen if a packet fails and is buffered in the
577✔
4347
        // mailbox, and the incoming link flaps.
577✔
4348
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
577✔
4349
        for _, pkt := range packets {
1,154✔
4350
                if l.mailBox.HasPacket(pkt.inKey()) {
577✔
UNCOV
4351
                        continue
×
4352
                }
4353

4354
                filteredPkts = append(filteredPkts, pkt)
577✔
4355
        }
4356

4357
        err := l.cfg.ForwardPackets(l.cg.Done(), replay, filteredPkts...)
577✔
4358
        if err != nil {
588✔
4359
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
4360
                        "settle/fail over htlcswitch: %v", err)
11✔
4361
        }
11✔
4362
}
4363

4364
// sendHTLCError functions cancels HTLC and send cancel message back to the
4365
// peer from which HTLC was received.
4366
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4367
        sourceRef channeldb.AddRef, failure *LinkError,
4368
        e hop.ErrorEncrypter, isReceive bool) {
105✔
4369

105✔
4370
        reason, err := e.EncryptFirstHop(failure.WireMessage())
105✔
4371
        if err != nil {
105✔
4372
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4373
                return
×
4374
        }
×
4375

4376
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
105✔
4377
        if err != nil {
105✔
4378
                l.log.Errorf("unable cancel htlc: %v", err)
×
4379
                return
×
4380
        }
×
4381

4382
        // Send the appropriate failure message depending on whether we're
4383
        // in a blinded route or not.
4384
        if err := l.sendIncomingHTLCFailureMsg(
105✔
4385
                add.ID, e, reason,
105✔
4386
        ); err != nil {
105✔
4387
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4388
                return
×
4389
        }
×
4390

4391
        // Notify a link failure on our incoming link. Outgoing htlc information
4392
        // is not available at this point, because we have not decrypted the
4393
        // onion, so it is excluded.
4394
        var eventType HtlcEventType
105✔
4395
        if isReceive {
210✔
4396
                eventType = HtlcEventTypeReceive
105✔
4397
        } else {
105✔
UNCOV
4398
                eventType = HtlcEventTypeForward
×
UNCOV
4399
        }
×
4400

4401
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
105✔
4402
                HtlcKey{
105✔
4403
                        IncomingCircuit: models.CircuitKey{
105✔
4404
                                ChanID: l.ShortChanID(),
105✔
4405
                                HtlcID: add.ID,
105✔
4406
                        },
105✔
4407
                },
105✔
4408
                HtlcInfo{
105✔
4409
                        IncomingTimeLock: add.Expiry,
105✔
4410
                        IncomingAmt:      add.Amount,
105✔
4411
                },
105✔
4412
                eventType,
105✔
4413
                failure,
105✔
4414
                true,
105✔
4415
        )
105✔
4416
}
4417

4418
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4419
// peer from which the HTLC was received. This function is primarily used to
4420
// handle the special requirements of route blinding, specifically:
4421
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4422
// - Introduction nodes should return regular HTLC failure messages.
4423
//
4424
// It accepts the original opaque failure, which will be used in the case
4425
// that we're not part of a blinded route and an error encrypter that'll be
4426
// used if we are the introduction node and need to present an error as if
4427
// we're the failing party.
4428
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4429
        e hop.ErrorEncrypter,
4430
        originalFailure lnwire.OpaqueReason) error {
121✔
4431

121✔
4432
        var msg lnwire.Message
121✔
4433
        switch {
121✔
4434
        // Our circuit's error encrypter will be nil if this was a locally
4435
        // initiated payment. We can only hit a blinded error for a locally
4436
        // initiated payment if we allow ourselves to be picked as the
4437
        // introduction node for our own payments and in that case we
4438
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4439
        // we fail it back and log an error.
4440
        // code.
4441
        case e == nil:
×
4442
                msg = &lnwire.UpdateFailHTLC{
×
4443
                        ChanID: l.ChanID(),
×
4444
                        ID:     htlcIndex,
×
4445
                        Reason: originalFailure,
×
4446
                }
×
4447

×
4448
                l.log.Errorf("Unexpected blinded failure when "+
×
4449
                        "we are the sending node, incoming htlc: %v(%v)",
×
4450
                        l.ShortChanID(), htlcIndex)
×
4451

4452
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4453
        // transformation on the error message and can just send the original.
4454
        case !e.Type().IsBlinded():
121✔
4455
                msg = &lnwire.UpdateFailHTLC{
121✔
4456
                        ChanID: l.ChanID(),
121✔
4457
                        ID:     htlcIndex,
121✔
4458
                        Reason: originalFailure,
121✔
4459
                }
121✔
4460

4461
        // When we're the introduction node, we need to convert the error to
4462
        // a UpdateFailHTLC.
UNCOV
4463
        case e.Type() == hop.EncrypterTypeIntroduction:
×
UNCOV
4464
                l.log.Debugf("Introduction blinded node switching out failure "+
×
UNCOV
4465
                        "error: %v", htlcIndex)
×
UNCOV
4466

×
UNCOV
4467
                // The specification does not require that we set the onion
×
UNCOV
4468
                // blob.
×
UNCOV
4469
                failureMsg := lnwire.NewInvalidBlinding(
×
UNCOV
4470
                        fn.None[[lnwire.OnionPacketSize]byte](),
×
UNCOV
4471
                )
×
UNCOV
4472
                reason, err := e.EncryptFirstHop(failureMsg)
×
UNCOV
4473
                if err != nil {
×
4474
                        return err
×
4475
                }
×
4476

UNCOV
4477
                msg = &lnwire.UpdateFailHTLC{
×
UNCOV
4478
                        ChanID: l.ChanID(),
×
UNCOV
4479
                        ID:     htlcIndex,
×
UNCOV
4480
                        Reason: reason,
×
UNCOV
4481
                }
×
4482

4483
        // If we are a relaying node, we need to switch out any error that
4484
        // we've received to a malformed HTLC error.
UNCOV
4485
        case e.Type() == hop.EncrypterTypeRelaying:
×
UNCOV
4486
                l.log.Debugf("Relaying blinded node switching out malformed "+
×
UNCOV
4487
                        "error: %v", htlcIndex)
×
UNCOV
4488

×
UNCOV
4489
                msg = &lnwire.UpdateFailMalformedHTLC{
×
UNCOV
4490
                        ChanID:      l.ChanID(),
×
UNCOV
4491
                        ID:          htlcIndex,
×
UNCOV
4492
                        FailureCode: lnwire.CodeInvalidBlinding,
×
UNCOV
4493
                }
×
4494

4495
        default:
×
4496
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4497
        }
4498

4499
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
121✔
4500
                l.log.Warnf("Send update fail failed: %v", err)
×
4501
        }
×
4502

4503
        return nil
121✔
4504
}
4505

4506
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4507
// to the payment sender.
4508
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4509
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4510
        sourceRef *channeldb.AddRef) {
3✔
4511

3✔
4512
        shaOnionBlob := sha256.Sum256(onionBlob[:])
3✔
4513
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
3✔
4514
        if err != nil {
3✔
4515
                l.log.Errorf("unable cancel htlc: %v", err)
×
4516
                return
×
4517
        }
×
4518

4519
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
3✔
4520
                ChanID:       l.ChanID(),
3✔
4521
                ID:           htlcIndex,
3✔
4522
                ShaOnionBlob: shaOnionBlob,
3✔
4523
                FailureCode:  code,
3✔
4524
        })
3✔
4525
}
4526

4527
// failf is a function which is used to encapsulate the action necessary for
4528
// properly failing the link. It takes a LinkFailureError, which will be passed
4529
// to the OnChannelFailure closure, in order for it to determine if we should
4530
// force close the channel, and if we should send an error message to the
4531
// remote peer.
4532
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4533
        a ...interface{}) {
14✔
4534

14✔
4535
        reason := fmt.Errorf(format, a...)
14✔
4536

14✔
4537
        // Return if we have already notified about a failure.
14✔
4538
        if l.failed {
14✔
UNCOV
4539
                l.log.Warnf("ignoring link failure (%v), as link already "+
×
UNCOV
4540
                        "failed", reason)
×
UNCOV
4541
                return
×
UNCOV
4542
        }
×
4543

4544
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
14✔
4545

14✔
4546
        // Set failed, such that we won't process any more updates, and notify
14✔
4547
        // the peer about the failure.
14✔
4548
        l.failed = true
14✔
4549
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
14✔
4550
}
4551

4552
// FundingCustomBlob returns the custom funding blob of the channel that this
4553
// link is associated with. The funding blob represents static information about
4554
// the channel that was created at channel funding time.
4555
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4556
        if l.channel == nil {
×
4557
                return fn.None[tlv.Blob]()
×
4558
        }
×
4559

4560
        if l.channel.State() == nil {
×
4561
                return fn.None[tlv.Blob]()
×
4562
        }
×
4563

4564
        return l.channel.State().CustomBlob
×
4565
}
4566

4567
// CommitmentCustomBlob returns the custom blob of the current local commitment
4568
// of the channel that this link is associated with.
4569
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4570
        if l.channel == nil {
×
4571
                return fn.None[tlv.Blob]()
×
4572
        }
×
4573

4574
        return l.channel.LocalCommitmentBlob()
×
4575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc