• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12428593038

20 Dec 2024 09:02AM UTC coverage: 58.33% (-0.2%) from 58.576%
12428593038

Pull #9382

github

guggero
.golangci.yml: speed up linter by updating start commit

With this we allow the linter to only look at recent changes, since
everything between that old commit and this most recent one has been
linted correctly anyway.
Pull Request #9382: lint: deprecate old linters, use new ref commit

133769 of 229330 relevant lines covered (58.33%)

19284.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.42
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "crypto/sha256"
7
        "errors"
8
        "fmt"
9
        prand "math/rand"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/btcsuite/btclog/v2"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/contractcourt"
19
        "github.com/lightningnetwork/lnd/fn/v2"
20
        "github.com/lightningnetwork/lnd/graph/db/models"
21
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
23
        "github.com/lightningnetwork/lnd/input"
24
        "github.com/lightningnetwork/lnd/invoices"
25
        "github.com/lightningnetwork/lnd/lnpeer"
26
        "github.com/lightningnetwork/lnd/lntypes"
27
        "github.com/lightningnetwork/lnd/lnutils"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/queue"
32
        "github.com/lightningnetwork/lnd/record"
33
        "github.com/lightningnetwork/lnd/ticker"
34
        "github.com/lightningnetwork/lnd/tlv"
35
)
36

37
func init() {
12✔
38
        prand.Seed(time.Now().UnixNano())
12✔
39
}
12✔
40

41
const (
42
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
43
        // the node accepts for forwarded payments. The value is relative to the
44
        // current block height. The reason to have a maximum is to prevent
45
        // funds getting locked up unreasonably long. Otherwise, an attacker
46
        // willing to lock its own funds too, could force the funds of this node
47
        // to be locked up for an indefinite (max int32) number of blocks.
48
        //
49
        // The value 2016 corresponds to on average two weeks worth of blocks
50
        // and is based on the maximum number of hops (20), the default CLTV
51
        // delta (40), and some extra margin to account for the other lightning
52
        // implementations and past lnd versions which used to have a default
53
        // CLTV delta of 144.
54
        DefaultMaxOutgoingCltvExpiry = 2016
55

56
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
57
        // which a link should propose to update its commitment fee rate.
58
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
59

60
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
61
        // which a link should propose to update its commitment fee rate.
62
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
63

64
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
65
        // a channel's commitment fee to be of its balance. This only applies to
66
        // the initiator of the channel.
67
        DefaultMaxLinkFeeAllocation float64 = 0.5
68
)
69

70
// ExpectedFee computes the expected fee for a given htlc amount. The value
71
// returned from this function is to be used as a sanity check when forwarding
72
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
73
// forwarding policy.
74
//
75
// TODO(roasbeef): also add in current available channel bandwidth, inverse
76
// func
77
func ExpectedFee(f models.ForwardingPolicy,
78
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
79✔
79

79✔
80
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
79✔
81
}
79✔
82

83
// ChannelLinkConfig defines the configuration for the channel link. ALL
84
// elements within the configuration MUST be non-nil for channel link to carry
85
// out its duties.
86
type ChannelLinkConfig struct {
87
        // FwrdingPolicy is the initial forwarding policy to be used when
88
        // deciding whether to forwarding incoming HTLC's or not. This value
89
        // can be updated with subsequent calls to UpdateForwardingPolicy
90
        // targeted at a given ChannelLink concrete interface implementation.
91
        FwrdingPolicy models.ForwardingPolicy
92

93
        // Circuits provides restricted access to the switch's circuit map,
94
        // allowing the link to open and close circuits.
95
        Circuits CircuitModifier
96

97
        // BestHeight returns the best known height.
98
        BestHeight func() uint32
99

100
        // ForwardPackets attempts to forward the batch of htlcs through the
101
        // switch. The function returns and error in case it fails to send one or
102
        // more packets. The link's quit signal should be provided to allow
103
        // cancellation of forwarding during link shutdown.
104
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
105

106
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
107
        // blobs, which are then used to inform how to forward an HTLC.
108
        //
109
        // NOTE: This function assumes the same set of readers and preimages
110
        // are always presented for the same identifier.
111
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
112
                []hop.DecodeHopIteratorResponse, error)
113

114
        // ExtractErrorEncrypter function is responsible for decoding HTLC
115
        // Sphinx onion blob, and creating onion failure obfuscator.
116
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
117

118
        // FetchLastChannelUpdate retrieves the latest routing policy for a
119
        // target channel. This channel will typically be the outgoing channel
120
        // specified when we receive an incoming HTLC.  This will be used to
121
        // provide payment senders our latest policy when sending encrypted
122
        // error messages.
123
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
124
                *lnwire.ChannelUpdate1, error)
125

126
        // Peer is a lightning network node with which we have the channel link
127
        // opened.
128
        Peer lnpeer.Peer
129

130
        // Registry is a sub-system which responsible for managing the invoices
131
        // in thread-safe manner.
132
        Registry InvoiceDatabase
133

134
        // PreimageCache is a global witness beacon that houses any new
135
        // preimages discovered by other links. We'll use this to add new
136
        // witnesses that we discover which will notify any sub-systems
137
        // subscribed to new events.
138
        PreimageCache contractcourt.WitnessBeacon
139

140
        // OnChannelFailure is a function closure that we'll call if the
141
        // channel failed for some reason. Depending on the severity of the
142
        // error, the closure potentially must force close this channel and
143
        // disconnect the peer.
144
        //
145
        // NOTE: The method must return in order for the ChannelLink to be able
146
        // to shut down properly.
147
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
148
                LinkFailureError)
149

150
        // UpdateContractSignals is a function closure that we'll use to update
151
        // outside sub-systems with this channel's latest ShortChannelID.
152
        UpdateContractSignals func(*contractcourt.ContractSignals) error
153

154
        // NotifyContractUpdate is a function closure that we'll use to update
155
        // the contractcourt and more specifically the ChannelArbitrator of the
156
        // latest channel state.
157
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
158

159
        // ChainEvents is an active subscription to the chain watcher for this
160
        // channel to be notified of any on-chain activity related to this
161
        // channel.
162
        ChainEvents *contractcourt.ChainEventSubscription
163

164
        // FeeEstimator is an instance of a live fee estimator which will be
165
        // used to dynamically regulate the current fee of the commitment
166
        // transaction to ensure timely confirmation.
167
        FeeEstimator chainfee.Estimator
168

169
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
170
        // for HTLC forwarding internal to the switch.
171
        //
172
        // NOTE: This should only be used for testing.
173
        HodlMask hodl.Mask
174

175
        // SyncStates is used to indicate that we need send the channel
176
        // reestablishment message to the remote peer. It should be done if our
177
        // clients have been restarted, or remote peer have been reconnected.
178
        SyncStates bool
179

180
        // BatchTicker is the ticker that determines the interval that we'll
181
        // use to check the batch to see if there're any updates we should
182
        // flush out. By batching updates into a single commit, we attempt to
183
        // increase throughput by maximizing the number of updates coalesced
184
        // into a single commit.
185
        BatchTicker ticker.Ticker
186

187
        // FwdPkgGCTicker is the ticker determining the frequency at which
188
        // garbage collection of forwarding packages occurs. We use a
189
        // time-based approach, as opposed to block epochs, as to not hinder
190
        // syncing.
191
        FwdPkgGCTicker ticker.Ticker
192

193
        // PendingCommitTicker is a ticker that allows the link to determine if
194
        // a locally initiated commitment dance gets stuck waiting for the
195
        // remote party to revoke.
196
        PendingCommitTicker ticker.Ticker
197

198
        // BatchSize is the max size of a batch of updates done to the link
199
        // before we do a state update.
200
        BatchSize uint32
201

202
        // UnsafeReplay will cause a link to replay the adds in its latest
203
        // commitment txn after the link is restarted. This should only be used
204
        // in testing, it is here to ensure the sphinx replay detection on the
205
        // receiving node is persistent.
206
        UnsafeReplay bool
207

208
        // MinUpdateTimeout represents the minimum interval in which a link
209
        // will propose to update its commitment fee rate. A random timeout will
210
        // be selected between this and MaxUpdateTimeout.
211
        MinUpdateTimeout time.Duration
212

213
        // MaxUpdateTimeout represents the maximum interval in which a link
214
        // will propose to update its commitment fee rate. A random timeout will
215
        // be selected between this and MinUpdateTimeout.
216
        MaxUpdateTimeout time.Duration
217

218
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
219
        // an htlc where we don't offer an htlc anymore. This should be at least
220
        // the outgoing broadcast delta, because in any case we don't want to
221
        // risk offering an htlc that triggers channel closure.
222
        OutgoingCltvRejectDelta uint32
223

224
        // TowerClient is an optional engine that manages the signing,
225
        // encrypting, and uploading of justice transactions to the daemon's
226
        // configured set of watchtowers for legacy channels.
227
        TowerClient TowerClient
228

229
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
230
        // should accept for a forwarded HTLC. The value is relative to the
231
        // current block height.
232
        MaxOutgoingCltvExpiry uint32
233

234
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
235
        // commitment fee to be of its balance. This only applies to the
236
        // initiator of the channel.
237
        MaxFeeAllocation float64
238

239
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
240
        // the initiator for channels of the anchor type.
241
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
242

243
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
244
        // link is first started.
245
        NotifyActiveLink func(wire.OutPoint)
246

247
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
248
        // channels becomes active.
249
        NotifyActiveChannel func(wire.OutPoint)
250

251
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
252
        // when channels become inactive.
253
        NotifyInactiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveLinkEvent allows the switch to tell the
256
        // ChannelNotifier when a channel link become inactive.
257
        NotifyInactiveLinkEvent func(wire.OutPoint)
258

259
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
260
        // events through.
261
        HtlcNotifier htlcNotifier
262

263
        // FailAliasUpdate is a function used to fail an HTLC for an
264
        // option_scid_alias channel.
265
        FailAliasUpdate func(sid lnwire.ShortChannelID,
266
                incoming bool) *lnwire.ChannelUpdate1
267

268
        // GetAliases is used by the link and switch to fetch the set of
269
        // aliases for a given link.
270
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
271

272
        // PreviouslySentShutdown is an optional value that is set if, at the
273
        // time of the link being started, persisted shutdown info was found for
274
        // the channel. This value being set means that we previously sent a
275
        // Shutdown message to our peer, and so we should do so again on
276
        // re-establish and should not allow anymore HTLC adds on the outgoing
277
        // direction of the link.
278
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
279

280
        // Adds the option to disable forwarding payments in blinded routes
281
        // by failing back any blinding-related payloads as if they were
282
        // invalid.
283
        DisallowRouteBlinding bool
284

285
        // DisallowQuiescence is a flag that can be used to disable the
286
        // quiescence protocol.
287
        DisallowQuiescence bool
288

289
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
290
        // restrict the flow of HTLCs and fee updates.
291
        MaxFeeExposure lnwire.MilliSatoshi
292

293
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
294
        // should forward experimental endorsement signals.
295
        ShouldFwdExpEndorsement func() bool
296

297
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
298
        // used to manage the bandwidth of the link.
299
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
300
}
301

302
// channelLink is the service which drives a channel's commitment update
303
// state-machine. In the event that an HTLC needs to be propagated to another
304
// link, the forward handler from config is used which sends HTLC to the
305
// switch. Additionally, the link encapsulate logic of commitment protocol
306
// message ordering and updates.
307
type channelLink struct {
308
        // The following fields are only meant to be used *atomically*
309
        started       int32
310
        reestablished int32
311
        shutdown      int32
312

313
        // failed should be set to true in case a link error happens, making
314
        // sure we don't process any more updates.
315
        failed bool
316

317
        // keystoneBatch represents a volatile list of keystones that must be
318
        // written before attempting to sign the next commitment txn. These
319
        // represent all the HTLC's forwarded to the link from the switch. Once
320
        // we lock them into our outgoing commitment, then the circuit has a
321
        // keystone, and is fully opened.
322
        keystoneBatch []Keystone
323

324
        // openedCircuits is the set of all payment circuits that will be open
325
        // once we make our next commitment. After making the commitment we'll
326
        // ACK all these from our mailbox to ensure that they don't get
327
        // re-delivered if we reconnect.
328
        openedCircuits []CircuitKey
329

330
        // closedCircuits is the set of all payment circuits that will be
331
        // closed once we make our next commitment. After taking the commitment
332
        // we'll ACK all these to ensure that they don't get re-delivered if we
333
        // reconnect.
334
        closedCircuits []CircuitKey
335

336
        // channel is a lightning network channel to which we apply htlc
337
        // updates.
338
        channel *lnwallet.LightningChannel
339

340
        // cfg is a structure which carries all dependable fields/handlers
341
        // which may affect behaviour of the service.
342
        cfg ChannelLinkConfig
343

344
        // mailBox is the main interface between the outside world and the
345
        // link. All incoming messages will be sent over this mailBox. Messages
346
        // include new updates from our connected peer, and new packets to be
347
        // forwarded sent by the switch.
348
        mailBox MailBox
349

350
        // upstream is a channel that new messages sent from the remote peer to
351
        // the local peer will be sent across.
352
        upstream chan lnwire.Message
353

354
        // downstream is a channel in which new multi-hop HTLC's to be
355
        // forwarded will be sent across. Messages from this channel are sent
356
        // by the HTLC switch.
357
        downstream chan *htlcPacket
358

359
        // updateFeeTimer is the timer responsible for updating the link's
360
        // commitment fee every time it fires.
361
        updateFeeTimer *time.Timer
362

363
        // uncommittedPreimages stores a list of all preimages that have been
364
        // learned since receiving the last CommitSig from the remote peer. The
365
        // batch will be flushed just before accepting the subsequent CommitSig
366
        // or on shutdown to avoid doing a write for each preimage received.
367
        uncommittedPreimages []lntypes.Preimage
368

369
        sync.RWMutex
370

371
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
372
        // registry.
373
        hodlQueue *queue.ConcurrentQueue
374

375
        // hodlMap stores related htlc data for a circuit key. It allows
376
        // resolving those htlcs when we receive a message on hodlQueue.
377
        hodlMap map[models.CircuitKey]hodlHtlc
378

379
        // log is a link-specific logging instance.
380
        log btclog.Logger
381

382
        // isOutgoingAddBlocked tracks whether the channelLink can send an
383
        // UpdateAddHTLC.
384
        isOutgoingAddBlocked atomic.Bool
385

386
        // isIncomingAddBlocked tracks whether the channelLink can receive an
387
        // UpdateAddHTLC.
388
        isIncomingAddBlocked atomic.Bool
389

390
        // flushHooks is a hookMap that is triggered when we reach a channel
391
        // state with no live HTLCs.
392
        flushHooks hookMap
393

394
        // outgoingCommitHooks is a hookMap that is triggered after we send our
395
        // next CommitSig.
396
        outgoingCommitHooks hookMap
397

398
        // incomingCommitHooks is a hookMap that is triggered after we receive
399
        // our next CommitSig.
400
        incomingCommitHooks hookMap
401

402
        // quiescer is the state machine that tracks where this channel is with
403
        // respect to the quiescence protocol.
404
        quiescer Quiescer
405

406
        // quiescenceReqs is a queue of requests to quiesce this link. The
407
        // members of the queue are send-only channels we should call back with
408
        // the result.
409
        quiescenceReqs chan StfuReq
410

411
        // ContextGuard is a helper that encapsulates a wait group and quit
412
        // channel and allows contexts that either block or cancel on those
413
        // depending on the use case.
414
        *fn.ContextGuard
415
}
416

417
// hookMap is a data structure that is used to track the hooks that need to be
418
// called in various parts of the channelLink's lifecycle.
419
//
420
// WARNING: NOT thread-safe.
421
type hookMap struct {
422
        // allocIdx keeps track of the next id we haven't yet allocated.
423
        allocIdx atomic.Uint64
424

425
        // transient is a map of hooks that are only called the next time invoke
426
        // is called. These hooks are deleted during invoke.
427
        transient map[uint64]func()
428

429
        // newTransients is a channel that we use to accept new hooks into the
430
        // hookMap.
431
        newTransients chan func()
432
}
433

434
// newHookMap initializes a new empty hookMap.
435
func newHookMap() hookMap {
646✔
436
        return hookMap{
646✔
437
                allocIdx:      atomic.Uint64{},
646✔
438
                transient:     make(map[uint64]func()),
646✔
439
                newTransients: make(chan func()),
646✔
440
        }
646✔
441
}
646✔
442

443
// alloc allocates space in the hook map for the supplied hook, the second
444
// argument determines whether it goes into the transient or persistent part
445
// of the hookMap.
446
func (m *hookMap) alloc(hook func()) uint64 {
3✔
447
        // We assume we never overflow a uint64. Seems OK.
3✔
448
        hookID := m.allocIdx.Add(1)
3✔
449
        if hookID == 0 {
3✔
450
                panic("hookMap allocIdx overflow")
×
451
        }
452
        m.transient[hookID] = hook
3✔
453

3✔
454
        return hookID
3✔
455
}
456

457
// invoke is used on a hook map to call all the registered hooks and then clear
458
// out the transient hooks so they are not called again.
459
func (m *hookMap) invoke() {
2,710✔
460
        for _, hook := range m.transient {
2,713✔
461
                hook()
3✔
462
        }
3✔
463

464
        m.transient = make(map[uint64]func())
2,710✔
465
}
466

467
// hodlHtlc contains htlc data that is required for resolution.
468
type hodlHtlc struct {
469
        add        lnwire.UpdateAddHTLC
470
        sourceRef  channeldb.AddRef
471
        obfuscator hop.ErrorEncrypter
472
}
473

474
// NewChannelLink creates a new instance of a ChannelLink given a configuration
475
// and active channel that will be used to verify/apply updates to.
476
func NewChannelLink(cfg ChannelLinkConfig,
477
        channel *lnwallet.LightningChannel) ChannelLink {
216✔
478

216✔
479
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
216✔
480

216✔
481
        // If the max fee exposure isn't set, use the default.
216✔
482
        if cfg.MaxFeeExposure == 0 {
431✔
483
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
484
        }
215✔
485

486
        var qsm Quiescer
216✔
487
        if !cfg.DisallowQuiescence {
432✔
488
                qsm = NewQuiescer(QuiescerCfg{
216✔
489
                        chanID: lnwire.NewChanIDFromOutPoint(
216✔
490
                                channel.ChannelPoint(),
216✔
491
                        ),
216✔
492
                        channelInitiator: channel.Initiator(),
216✔
493
                        sendMsg: func(s lnwire.Stfu) error {
219✔
494
                                return cfg.Peer.SendMessage(false, &s)
3✔
495
                        },
3✔
496
                        timeoutDuration: defaultQuiescenceTimeout,
497
                        onTimeout: func() {
2✔
498
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
2✔
499
                        },
2✔
500
                })
501
        } else {
×
502
                qsm = &quiescerNoop{}
×
503
        }
×
504

505
        quiescenceReqs := make(
216✔
506
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
216✔
507
        )
216✔
508

216✔
509
        return &channelLink{
216✔
510
                cfg:                 cfg,
216✔
511
                channel:             channel,
216✔
512
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
216✔
513
                hodlQueue:           queue.NewConcurrentQueue(10),
216✔
514
                log:                 log.WithPrefix(logPrefix),
216✔
515
                flushHooks:          newHookMap(),
216✔
516
                outgoingCommitHooks: newHookMap(),
216✔
517
                incomingCommitHooks: newHookMap(),
216✔
518
                quiescer:            qsm,
216✔
519
                quiescenceReqs:      quiescenceReqs,
216✔
520
                ContextGuard:        fn.NewContextGuard(),
216✔
521
        }
216✔
522
}
523

524
// A compile time check to ensure channelLink implements the ChannelLink
525
// interface.
526
var _ ChannelLink = (*channelLink)(nil)
527

528
// Start starts all helper goroutines required for the operation of the channel
529
// link.
530
//
531
// NOTE: Part of the ChannelLink interface.
532
func (l *channelLink) Start() error {
214✔
533
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
214✔
534
                err := fmt.Errorf("channel link(%v): already started", l)
×
535
                l.log.Warn("already started")
×
536
                return err
×
537
        }
×
538

539
        l.log.Info("starting")
214✔
540

214✔
541
        // If the config supplied watchtower client, ensure the channel is
214✔
542
        // registered before trying to use it during operation.
214✔
543
        if l.cfg.TowerClient != nil {
215✔
544
                err := l.cfg.TowerClient.RegisterChannel(
1✔
545
                        l.ChanID(), l.channel.State().ChanType,
1✔
546
                )
1✔
547
                if err != nil {
1✔
548
                        return err
×
549
                }
×
550
        }
551

552
        l.mailBox.ResetMessages()
214✔
553
        l.hodlQueue.Start()
214✔
554

214✔
555
        // Before launching the htlcManager messages, revert any circuits that
214✔
556
        // were marked open in the switch's circuit map, but did not make it
214✔
557
        // into a commitment txn. We use the next local htlc index as the cut
214✔
558
        // off point, since all indexes below that are committed. This action
214✔
559
        // is only performed if the link's final short channel ID has been
214✔
560
        // assigned, otherwise we would try to trim the htlcs belonging to the
214✔
561
        // all-zero, hop.Source ID.
214✔
562
        if l.ShortChanID() != hop.Source {
428✔
563
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
214✔
564
                if err != nil {
214✔
565
                        return fmt.Errorf("unable to retrieve next local "+
×
566
                                "htlc index: %v", err)
×
567
                }
×
568

569
                // NOTE: This is automatically done by the switch when it
570
                // starts up, but is necessary to prevent inconsistencies in
571
                // the case that the link flaps. This is a result of a link's
572
                // life-cycle being shorter than that of the switch.
573
                chanID := l.ShortChanID()
214✔
574
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
214✔
575
                if err != nil {
214✔
576
                        return fmt.Errorf("unable to trim circuits above "+
×
577
                                "local htlc index %d: %v", localHtlcIndex, err)
×
578
                }
×
579

580
                // Since the link is live, before we start the link we'll update
581
                // the ChainArbitrator with the set of new channel signals for
582
                // this channel.
583
                //
584
                // TODO(roasbeef): split goroutines within channel arb to avoid
585
                go func() {
428✔
586
                        signals := &contractcourt.ContractSignals{
214✔
587
                                ShortChanID: l.channel.ShortChanID(),
214✔
588
                        }
214✔
589

214✔
590
                        err := l.cfg.UpdateContractSignals(signals)
214✔
591
                        if err != nil {
214✔
592
                                l.log.Errorf("unable to update signals")
×
593
                        }
×
594
                }()
595
        }
596

597
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
214✔
598

214✔
599
        l.Wg.Add(1)
214✔
600
        go l.htlcManager()
214✔
601

214✔
602
        return nil
214✔
603
}
604

605
// Stop gracefully stops all active helper goroutines, then waits until they've
606
// exited.
607
//
608
// NOTE: Part of the ChannelLink interface.
609
func (l *channelLink) Stop() {
215✔
610
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
227✔
611
                l.log.Warn("already stopped")
12✔
612
                return
12✔
613
        }
12✔
614

615
        l.log.Info("stopping")
203✔
616

203✔
617
        // As the link is stopping, we are no longer interested in htlc
203✔
618
        // resolutions coming from the invoice registry.
203✔
619
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
203✔
620

203✔
621
        if l.cfg.ChainEvents.Cancel != nil {
204✔
622
                l.cfg.ChainEvents.Cancel()
1✔
623
        }
1✔
624

625
        // Ensure the channel for the timer is drained.
626
        if l.updateFeeTimer != nil {
406✔
627
                if !l.updateFeeTimer.Stop() {
203✔
628
                        select {
×
629
                        case <-l.updateFeeTimer.C:
×
630
                        default:
×
631
                        }
632
                }
633
        }
634

635
        if l.hodlQueue != nil {
406✔
636
                l.hodlQueue.Stop()
203✔
637
        }
203✔
638

639
        close(l.Quit)
203✔
640
        l.Wg.Wait()
203✔
641

203✔
642
        // Now that the htlcManager has completely exited, reset the packet
203✔
643
        // courier. This allows the mailbox to revaluate any lingering Adds that
203✔
644
        // were delivered but didn't make it on a commitment to be failed back
203✔
645
        // if the link is offline for an extended period of time. The error is
203✔
646
        // ignored since it can only fail when the daemon is exiting.
203✔
647
        _ = l.mailBox.ResetPackets()
203✔
648

203✔
649
        // As a final precaution, we will attempt to flush any uncommitted
203✔
650
        // preimages to the preimage cache. The preimages should be re-delivered
203✔
651
        // after channel reestablishment, however this adds an extra layer of
203✔
652
        // protection in case the peer never returns. Without this, we will be
203✔
653
        // unable to settle any contracts depending on the preimages even though
203✔
654
        // we had learned them at some point.
203✔
655
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
203✔
656
        if err != nil {
203✔
657
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
658
                        l.uncommittedPreimages, err)
×
659
        }
×
660
}
661

662
// WaitForShutdown blocks until the link finishes shutting down, which includes
663
// termination of all dependent goroutines.
664
func (l *channelLink) WaitForShutdown() {
×
665
        l.Wg.Wait()
×
666
}
×
667

668
// EligibleToForward returns a bool indicating if the channel is able to
669
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
670
// we are eligible to update AND the channel isn't currently flushing the
671
// outgoing half of the channel.
672
//
673
// NOTE: MUST NOT be called from the main event loop.
674
func (l *channelLink) EligibleToForward() bool {
614✔
675
        l.RLock()
614✔
676
        defer l.RUnlock()
614✔
677

614✔
678
        return l.eligibleToForward()
614✔
679
}
614✔
680

681
// eligibleToForward returns a bool indicating if the channel is able to
682
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
683
// we are eligible to update AND the channel isn't currently flushing the
684
// outgoing half of the channel.
685
//
686
// NOTE: MUST be called from the main event loop.
687
func (l *channelLink) eligibleToForward() bool {
614✔
688
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
614✔
689
}
614✔
690

691
// eligibleToUpdate returns a bool indicating if the channel is able to update
692
// channel state. We're able to update channel state if we know the remote
693
// party's next revocation point. Otherwise, we can't initiate new channel
694
// state. We also require that the short channel ID not be the all-zero source
695
// ID, meaning that the channel has had its ID finalized.
696
//
697
// NOTE: MUST be called from the main event loop.
698
func (l *channelLink) eligibleToUpdate() bool {
617✔
699
        return l.channel.RemoteNextRevocation() != nil &&
617✔
700
                l.channel.ShortChanID() != hop.Source &&
617✔
701
                l.isReestablished() &&
617✔
702
                l.quiescer.CanSendUpdates()
617✔
703
}
617✔
704

705
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
706
// the specified direction. It returns true if the state was changed and false
707
// if the desired state was already set before the method was called.
708
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
16✔
709
        if linkDirection == Outgoing {
27✔
710
                return l.isOutgoingAddBlocked.Swap(false)
11✔
711
        }
11✔
712

713
        return l.isIncomingAddBlocked.Swap(false)
5✔
714
}
715

716
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
717
// the specified direction. It returns true if the state was changed and false
718
// if the desired state was already set before the method was called.
719
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
14✔
720
        if linkDirection == Outgoing {
19✔
721
                return !l.isOutgoingAddBlocked.Swap(true)
5✔
722
        }
5✔
723

724
        return !l.isIncomingAddBlocked.Swap(true)
10✔
725
}
726

727
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
728
// the argument.
729
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,592✔
730
        if linkDirection == Outgoing {
2,710✔
731
                return l.isOutgoingAddBlocked.Load()
1,118✔
732
        }
1,118✔
733

734
        return l.isIncomingAddBlocked.Load()
475✔
735
}
736

737
// OnFlushedOnce adds a hook that will be called the next time the channel
738
// state reaches zero htlcs. This hook will only ever be called once. If the
739
// channel state already has zero htlcs, then this will be called immediately.
740
func (l *channelLink) OnFlushedOnce(hook func()) {
2✔
741
        select {
2✔
742
        case l.flushHooks.newTransients <- hook:
2✔
743
        case <-l.Quit:
×
744
        }
745
}
746

747
// OnCommitOnce adds a hook that will be called the next time a CommitSig
748
// message is sent in the argument's LinkDirection. This hook will only ever be
749
// called once. If no CommitSig is owed in the argument's LinkDirection, then
750
// we will call this hook be run immediately.
751
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
2✔
752
        var queue chan func()
2✔
753

2✔
754
        if direction == Outgoing {
4✔
755
                queue = l.outgoingCommitHooks.newTransients
2✔
756
        } else {
2✔
757
                queue = l.incomingCommitHooks.newTransients
×
758
        }
×
759

760
        select {
2✔
761
        case queue <- hook:
2✔
762
        case <-l.Quit:
×
763
        }
764
}
765

766
// InitStfu allows us to initiate quiescence on this link. It returns a receive
767
// only channel that will block until quiescence has been achieved, or
768
// definitively fails.
769
//
770
// This operation has been added to allow channels to be quiesced via RPC. It
771
// may be removed or reworked in the future as RPC initiated quiescence is a
772
// holdover until we have downstream protocols that use it.
773
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
2✔
774
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
2✔
775
                fn.Unit{},
2✔
776
        )
2✔
777

2✔
778
        select {
2✔
779
        case l.quiescenceReqs <- req:
2✔
780
        case <-l.Quit:
×
781
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
782
        }
783

784
        return out
2✔
785
}
786

787
// isReestablished returns true if the link has successfully completed the
788
// channel reestablishment dance.
789
func (l *channelLink) isReestablished() bool {
617✔
790
        return atomic.LoadInt32(&l.reestablished) == 1
617✔
791
}
617✔
792

793
// markReestablished signals that the remote peer has successfully exchanged
794
// channel reestablish messages and that the channel is ready to process
795
// subsequent messages.
796
func (l *channelLink) markReestablished() {
214✔
797
        atomic.StoreInt32(&l.reestablished, 1)
214✔
798
}
214✔
799

800
// IsUnadvertised returns true if the underlying channel is unadvertised.
801
func (l *channelLink) IsUnadvertised() bool {
3✔
802
        state := l.channel.State()
3✔
803
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
3✔
804
}
3✔
805

806
// sampleNetworkFee samples the current fee rate on the network to get into the
807
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
808
// this is the native rate used when computing the fee for commitment
809
// transactions, and the second-level HTLC transactions.
810
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
811
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
812
        // blocks.
4✔
813
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
814
        if err != nil {
4✔
815
                return 0, err
×
816
        }
×
817

818
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
819
                int64(feePerKw))
4✔
820

4✔
821
        return feePerKw, nil
4✔
822
}
823

824
// shouldAdjustCommitFee returns true if we should update our commitment fee to
825
// match that of the network fee. We'll only update our commitment fee if the
826
// network fee is +/- 10% to our commitment fee or if our current commitment
827
// fee is below the minimum relay fee.
828
func shouldAdjustCommitFee(netFee, chanFee,
829
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
830

14✔
831
        switch {
14✔
832
        // If the network fee is greater than our current commitment fee and
833
        // our current commitment fee is below the minimum relay fee then
834
        // we should switch to it no matter if it is less than a 10% increase.
835
        case netFee > chanFee && chanFee < minRelayFee:
1✔
836
                return true
1✔
837

838
        // If the network fee is greater than the commitment fee, then we'll
839
        // switch to it if it's at least 10% greater than the commit fee.
840
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
841
                return true
4✔
842

843
        // If the network fee is less than our commitment fee, then we'll
844
        // switch to it if it's at least 10% less than the commitment fee.
845
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
846
                return true
2✔
847

848
        // Otherwise, we won't modify our fee.
849
        default:
7✔
850
                return false
7✔
851
        }
852
}
853

854
// failCb is used to cut down on the argument verbosity.
855
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
856

857
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
858
// outgoing HTLC. It may return a FailureMessage that references a channel's
859
// alias. If the channel does not have an alias, then the regular channel
860
// update from disk will be returned.
861
func (l *channelLink) createFailureWithUpdate(incoming bool,
862
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
23✔
863

23✔
864
        // Determine which SCID to use in case we need to use aliases in the
23✔
865
        // ChannelUpdate.
23✔
866
        scid := outgoingScid
23✔
867
        if incoming {
23✔
868
                scid = l.ShortChanID()
×
869
        }
×
870

871
        // Try using the FailAliasUpdate function. If it returns nil, fallback
872
        // to the non-alias behavior.
873
        update := l.cfg.FailAliasUpdate(scid, incoming)
23✔
874
        if update == nil {
40✔
875
                // Fallback to the non-alias behavior.
17✔
876
                var err error
17✔
877
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
17✔
878
                if err != nil {
17✔
879
                        return &lnwire.FailTemporaryNodeFailure{}
×
880
                }
×
881
        }
882

883
        return cb(update)
23✔
884
}
885

886
// syncChanState attempts to synchronize channel states with the remote party.
887
// This method is to be called upon reconnection after the initial funding
888
// flow. We'll compare out commitment chains with the remote party, and re-send
889
// either a danging commit signature, a revocation, or both.
890
func (l *channelLink) syncChanStates() error {
171✔
891
        chanState := l.channel.State()
171✔
892

171✔
893
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
171✔
894

171✔
895
        // First, we'll generate our ChanSync message to send to the other
171✔
896
        // side. Based on this message, the remote party will decide if they
171✔
897
        // need to retransmit any data or not.
171✔
898
        localChanSyncMsg, err := chanState.ChanSyncMsg()
171✔
899
        if err != nil {
171✔
900
                return fmt.Errorf("unable to generate chan sync message for "+
×
901
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
902
        }
×
903
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
171✔
904
                return fmt.Errorf("unable to send chan sync message for "+
×
905
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
906
        }
×
907

908
        var msgsToReSend []lnwire.Message
171✔
909

171✔
910
        // Next, we'll wait indefinitely to receive the ChanSync message. The
171✔
911
        // first message sent MUST be the ChanSync message.
171✔
912
        select {
171✔
913
        case msg := <-l.upstream:
171✔
914
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
171✔
915
                        l.cfg.Peer.PubKey())
171✔
916

171✔
917
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
171✔
918
                if !ok {
171✔
919
                        return fmt.Errorf("first message sent to sync "+
×
920
                                "should be ChannelReestablish, instead "+
×
921
                                "received: %T", msg)
×
922
                }
×
923

924
                // If the remote party indicates that they think we haven't
925
                // done any state updates yet, then we'll retransmit the
926
                // channel_ready message first. We do this, as at this point
927
                // we can't be sure if they've really received the
928
                // ChannelReady message.
929
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
171✔
930
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
171✔
931
                        !l.channel.IsPending() {
336✔
932

165✔
933
                        l.log.Infof("resending ChannelReady message to peer")
165✔
934

165✔
935
                        nextRevocation, err := l.channel.NextRevocationKey()
165✔
936
                        if err != nil {
165✔
937
                                return fmt.Errorf("unable to create next "+
×
938
                                        "revocation: %v", err)
×
939
                        }
×
940

941
                        channelReadyMsg := lnwire.NewChannelReady(
165✔
942
                                l.ChanID(), nextRevocation,
165✔
943
                        )
165✔
944

165✔
945
                        // If this is a taproot channel, then we'll send the
165✔
946
                        // very same nonce that we sent above, as they should
165✔
947
                        // take the latest verification nonce we send.
165✔
948
                        if chanState.ChanType.IsTaproot() {
166✔
949
                                //nolint:ll
1✔
950
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
1✔
951
                        }
1✔
952

953
                        // For channels that negotiated the option-scid-alias
954
                        // feature bit, ensure that we send over the alias in
955
                        // the channel_ready message. We'll send the first
956
                        // alias we find for the channel since it does not
957
                        // matter which alias we send. We'll error out if no
958
                        // aliases are found.
959
                        if l.negotiatedAliasFeature() {
166✔
960
                                aliases := l.getAliases()
1✔
961
                                if len(aliases) == 0 {
1✔
962
                                        // This shouldn't happen since we
×
963
                                        // always add at least one alias before
×
964
                                        // the channel reaches the link.
×
965
                                        return fmt.Errorf("no aliases found")
×
966
                                }
×
967

968
                                // getAliases returns a copy of the alias slice
969
                                // so it is ok to use a pointer to the first
970
                                // entry.
971
                                channelReadyMsg.AliasScid = &aliases[0]
1✔
972
                        }
973

974
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
165✔
975
                        if err != nil {
165✔
976
                                return fmt.Errorf("unable to re-send "+
×
977
                                        "ChannelReady: %v", err)
×
978
                        }
×
979
                }
980

981
                // In any case, we'll then process their ChanSync message.
982
                l.log.Info("received re-establishment message from remote side")
171✔
983

171✔
984
                var (
171✔
985
                        openedCircuits []CircuitKey
171✔
986
                        closedCircuits []CircuitKey
171✔
987
                )
171✔
988

171✔
989
                // We've just received a ChanSync message from the remote
171✔
990
                // party, so we'll process the message  in order to determine
171✔
991
                // if we need to re-transmit any messages to the remote party.
171✔
992
                ctx, cancel := l.WithCtxQuitNoTimeout()
171✔
993
                defer cancel()
171✔
994
                msgsToReSend, openedCircuits, closedCircuits, err =
171✔
995
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
171✔
996
                if err != nil {
172✔
997
                        return err
1✔
998
                }
1✔
999

1000
                // Repopulate any identifiers for circuits that may have been
1001
                // opened or unclosed. This may happen if we needed to
1002
                // retransmit a commitment signature message.
1003
                l.openedCircuits = openedCircuits
171✔
1004
                l.closedCircuits = closedCircuits
171✔
1005

171✔
1006
                // Ensure that all packets have been have been removed from the
171✔
1007
                // link's mailbox.
171✔
1008
                if err := l.ackDownStreamPackets(); err != nil {
171✔
1009
                        return err
×
1010
                }
×
1011

1012
                if len(msgsToReSend) > 0 {
176✔
1013
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1014
                                "state", len(msgsToReSend))
5✔
1015
                }
5✔
1016

1017
                // If we have any messages to retransmit, we'll do so
1018
                // immediately so we return to a synchronized state as soon as
1019
                // possible.
1020
                for _, msg := range msgsToReSend {
182✔
1021
                        l.cfg.Peer.SendMessage(false, msg)
11✔
1022
                }
11✔
1023

1024
        case <-l.Quit:
1✔
1025
                return ErrLinkShuttingDown
1✔
1026
        }
1027

1028
        return nil
171✔
1029
}
1030

1031
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1032
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1033
// we previously received are reinstated in memory, and forwarded to the switch
1034
// if necessary. After a restart, this will also delete any previously
1035
// completed packages.
1036
func (l *channelLink) resolveFwdPkgs() error {
214✔
1037
        fwdPkgs, err := l.channel.LoadFwdPkgs()
214✔
1038
        if err != nil {
215✔
1039
                return err
1✔
1040
        }
1✔
1041

1042
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
213✔
1043

213✔
1044
        for _, fwdPkg := range fwdPkgs {
220✔
1045
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
7✔
1046
                        return err
×
1047
                }
×
1048
        }
1049

1050
        // If any of our reprocessing steps require an update to the commitment
1051
        // txn, we initiate a state transition to capture all relevant changes.
1052
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
214✔
1053
                return l.updateCommitTx()
1✔
1054
        }
1✔
1055

1056
        return nil
213✔
1057
}
1058

1059
// resolveFwdPkg interprets the FwdState of the provided package, either
1060
// reprocesses any outstanding htlcs in the package, or performs garbage
1061
// collection on the package.
1062
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
7✔
1063
        // Remove any completed packages to clear up space.
7✔
1064
        if fwdPkg.State == channeldb.FwdStateCompleted {
9✔
1065
                l.log.Debugf("removing completed fwd pkg for height=%d",
2✔
1066
                        fwdPkg.Height)
2✔
1067

2✔
1068
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
2✔
1069
                if err != nil {
2✔
1070
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1071
                                "%v", fwdPkg.Height, err)
×
1072
                        return err
×
1073
                }
×
1074
        }
1075

1076
        // Otherwise this is either a new package or one has gone through
1077
        // processing, but contains htlcs that need to be restored in memory.
1078
        // We replay this forwarding package to make sure our local mem state
1079
        // is resurrected, we mimic any original responses back to the remote
1080
        // party, and re-forward the relevant HTLCs to the switch.
1081

1082
        // If the package is fully acked but not completed, it must still have
1083
        // settles and fails to propagate.
1084
        if !fwdPkg.SettleFailFilter.IsFull() {
8✔
1085
                l.processRemoteSettleFails(fwdPkg)
1✔
1086
        }
1✔
1087

1088
        // Finally, replay *ALL ADDS* in this forwarding package. The
1089
        // downstream logic is able to filter out any duplicates, but we must
1090
        // shove the entire, original set of adds down the pipeline so that the
1091
        // batch of adds presented to the sphinx router does not ever change.
1092
        if !fwdPkg.AckFilter.IsFull() {
11✔
1093
                l.processRemoteAdds(fwdPkg)
4✔
1094

4✔
1095
                // If the link failed during processing the adds, we must
4✔
1096
                // return to ensure we won't attempted to update the state
4✔
1097
                // further.
4✔
1098
                if l.failed {
4✔
1099
                        return fmt.Errorf("link failed while " +
×
1100
                                "processing remote adds")
×
1101
                }
×
1102
        }
1103

1104
        return nil
7✔
1105
}
1106

1107
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1108
// removes those that can be discarded. It is safe to do this entirely in the
1109
// background, since all state is coordinated on disk. This also ensures the
1110
// link can continue to process messages and interleave database accesses.
1111
//
1112
// NOTE: This MUST be run as a goroutine.
1113
func (l *channelLink) fwdPkgGarbager() {
213✔
1114
        defer l.Wg.Done()
213✔
1115

213✔
1116
        l.cfg.FwdPkgGCTicker.Resume()
213✔
1117
        defer l.cfg.FwdPkgGCTicker.Stop()
213✔
1118

213✔
1119
        if err := l.loadAndRemove(); err != nil {
213✔
1120
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1121
        }
×
1122

1123
        for {
443✔
1124
                select {
230✔
1125
                case <-l.cfg.FwdPkgGCTicker.Ticks():
17✔
1126
                        if err := l.loadAndRemove(); err != nil {
34✔
1127
                                l.log.Warnf("unable to remove fwd pkgs: %v",
17✔
1128
                                        err)
17✔
1129
                                continue
17✔
1130
                        }
1131
                case <-l.Quit:
203✔
1132
                        return
203✔
1133
                }
1134
        }
1135
}
1136

1137
// loadAndRemove loads all the channels forwarding packages and determines if
1138
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1139
// a longer tick interval can be used.
1140
func (l *channelLink) loadAndRemove() error {
230✔
1141
        fwdPkgs, err := l.channel.LoadFwdPkgs()
230✔
1142
        if err != nil {
247✔
1143
                return err
17✔
1144
        }
17✔
1145

1146
        var removeHeights []uint64
213✔
1147
        for _, fwdPkg := range fwdPkgs {
219✔
1148
                if fwdPkg.State != channeldb.FwdStateCompleted {
12✔
1149
                        continue
6✔
1150
                }
1151

1152
                removeHeights = append(removeHeights, fwdPkg.Height)
1✔
1153
        }
1154

1155
        // If removeHeights is empty, return early so we don't use a db
1156
        // transaction.
1157
        if len(removeHeights) == 0 {
426✔
1158
                return nil
213✔
1159
        }
213✔
1160

1161
        return l.channel.RemoveFwdPkgs(removeHeights...)
1✔
1162
}
1163

1164
// handleChanSyncErr performs the error handling logic in the case where we
1165
// could not successfully syncChanStates with our channel peer.
1166
func (l *channelLink) handleChanSyncErr(err error) {
1✔
1167
        l.log.Warnf("error when syncing channel states: %v", err)
1✔
1168

1✔
1169
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
1✔
1170

1✔
1171
        switch {
1✔
1172
        case errors.Is(err, ErrLinkShuttingDown):
1✔
1173
                l.log.Debugf("unable to sync channel states, link is " +
1✔
1174
                        "shutting down")
1✔
1175
                return
1✔
1176

1177
        // We failed syncing the commit chains, probably because the remote has
1178
        // lost state. We should force close the channel.
1179
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
1✔
1180
                fallthrough
1✔
1181

1182
        // The remote sent us an invalid last commit secret, we should force
1183
        // close the channel.
1184
        // TODO(halseth): and permanently ban the peer?
1185
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
1✔
1186
                fallthrough
1✔
1187

1188
        // The remote sent us a commit point different from what they sent us
1189
        // before.
1190
        // TODO(halseth): ban peer?
1191
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
1✔
1192
                // We'll fail the link and tell the peer to force close the
1✔
1193
                // channel. Note that the database state is not updated here,
1✔
1194
                // but will be updated when the close transaction is ready to
1✔
1195
                // avoid that we go down before storing the transaction in the
1✔
1196
                // db.
1✔
1197
                l.failf(
1✔
1198
                        LinkFailureError{
1✔
1199
                                code:          ErrSyncError,
1✔
1200
                                FailureAction: LinkFailureForceClose,
1✔
1201
                        },
1✔
1202
                        "unable to synchronize channel states: %v", err,
1✔
1203
                )
1✔
1204

1205
        // We have lost state and cannot safely force close the channel. Fail
1206
        // the channel and wait for the remote to hopefully force close it. The
1207
        // remote has sent us its latest unrevoked commitment point, and we'll
1208
        // store it in the database, such that we can attempt to recover the
1209
        // funds if the remote force closes the channel.
1210
        case errors.As(err, &errDataLoss):
1✔
1211
                err := l.channel.MarkDataLoss(
1✔
1212
                        errDataLoss.CommitPoint,
1✔
1213
                )
1✔
1214
                if err != nil {
1✔
1215
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1216
                                err)
×
1217
                }
×
1218

1219
        // We determined the commit chains were not possible to sync. We
1220
        // cautiously fail the channel, but don't force close.
1221
        // TODO(halseth): can we safely force close in any cases where this
1222
        // error is returned?
1223
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1224
                if err := l.channel.MarkBorked(); err != nil {
×
1225
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1226
                }
×
1227

1228
        // Other, unspecified error.
1229
        default:
×
1230
        }
1231

1232
        l.failf(
1✔
1233
                LinkFailureError{
1✔
1234
                        code:          ErrRecoveryError,
1✔
1235
                        FailureAction: LinkFailureForceNone,
1✔
1236
                },
1✔
1237
                "unable to synchronize channel states: %v", err,
1✔
1238
        )
1✔
1239
}
1240

1241
// htlcManager is the primary goroutine which drives a channel's commitment
1242
// update state-machine in response to messages received via several channels.
1243
// This goroutine reads messages from the upstream (remote) peer, and also from
1244
// downstream channel managed by the channel link. In the event that an htlc
1245
// needs to be forwarded, then send-only forward handler is used which sends
1246
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1247
// all timeouts for any active HTLCs, manages the channel's revocation window,
1248
// and also the htlc trickle queue+timer for this active channels.
1249
//
1250
// NOTE: This MUST be run as a goroutine.
1251
func (l *channelLink) htlcManager() {
214✔
1252
        defer func() {
419✔
1253
                l.cfg.BatchTicker.Stop()
205✔
1254
                l.Wg.Done()
205✔
1255
                l.log.Infof("exited")
205✔
1256
        }()
205✔
1257

1258
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
214✔
1259

214✔
1260
        // Notify any clients that the link is now in the switch via an
214✔
1261
        // ActiveLinkEvent. We'll also defer an inactive link notification for
214✔
1262
        // when the link exits to ensure that every active notification is
214✔
1263
        // matched by an inactive one.
214✔
1264
        l.cfg.NotifyActiveLink(l.ChannelPoint())
214✔
1265
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
214✔
1266

214✔
1267
        // TODO(roasbeef): need to call wipe chan whenever D/C?
214✔
1268

214✔
1269
        // If this isn't the first time that this channel link has been
214✔
1270
        // created, then we'll need to check to see if we need to
214✔
1271
        // re-synchronize state with the remote peer. settledHtlcs is a map of
214✔
1272
        // HTLC's that we re-settled as part of the channel state sync.
214✔
1273
        if l.cfg.SyncStates {
385✔
1274
                err := l.syncChanStates()
171✔
1275
                if err != nil {
172✔
1276
                        l.handleChanSyncErr(err)
1✔
1277
                        return
1✔
1278
                }
1✔
1279
        }
1280

1281
        // If a shutdown message has previously been sent on this link, then we
1282
        // need to make sure that we have disabled any HTLC adds on the outgoing
1283
        // direction of the link and that we re-resend the same shutdown message
1284
        // that we previously sent.
1285
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
215✔
1286
                // Immediately disallow any new outgoing HTLCs.
1✔
1287
                if !l.DisableAdds(Outgoing) {
1✔
1288
                        l.log.Warnf("Outgoing link adds already disabled")
×
1289
                }
×
1290

1291
                // Re-send the shutdown message the peer. Since syncChanStates
1292
                // would have sent any outstanding CommitSig, it is fine for us
1293
                // to immediately queue the shutdown message now.
1294
                err := l.cfg.Peer.SendMessage(false, &shutdown)
1✔
1295
                if err != nil {
1✔
1296
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1297
                }
×
1298
        })
1299

1300
        // We've successfully reestablished the channel, mark it as such to
1301
        // allow the switch to forward HTLCs in the outbound direction.
1302
        l.markReestablished()
214✔
1303

214✔
1304
        // Now that we've received both channel_ready and channel reestablish,
214✔
1305
        // we can go ahead and send the active channel notification. We'll also
214✔
1306
        // defer the inactive notification for when the link exits to ensure
214✔
1307
        // that every active notification is matched by an inactive one.
214✔
1308
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
214✔
1309
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
214✔
1310

214✔
1311
        // With the channel states synced, we now reset the mailbox to ensure
214✔
1312
        // we start processing all unacked packets in order. This is done here
214✔
1313
        // to ensure that all acknowledgments that occur during channel
214✔
1314
        // resynchronization have taken affect, causing us only to pull unacked
214✔
1315
        // packets after starting to read from the downstream mailbox.
214✔
1316
        l.mailBox.ResetPackets()
214✔
1317

214✔
1318
        // After cleaning up any memory pertaining to incoming packets, we now
214✔
1319
        // replay our forwarding packages to handle any htlcs that can be
214✔
1320
        // processed locally, or need to be forwarded out to the switch. We will
214✔
1321
        // only attempt to resolve packages if our short chan id indicates that
214✔
1322
        // the channel is not pending, otherwise we should have no htlcs to
214✔
1323
        // reforward.
214✔
1324
        if l.ShortChanID() != hop.Source {
428✔
1325
                err := l.resolveFwdPkgs()
214✔
1326
                switch err {
214✔
1327
                // No error was encountered, success.
1328
                case nil:
213✔
1329

1330
                // If the duplicate keystone error was encountered, we'll fail
1331
                // without sending an Error message to the peer.
1332
                case ErrDuplicateKeystone:
×
1333
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1334
                                "temporary circuit error: %v", err)
×
1335
                        return
×
1336

1337
                // A non-nil error was encountered, send an Error message to
1338
                // the peer.
1339
                default:
1✔
1340
                        l.failf(LinkFailureError{code: ErrInternalError},
1✔
1341
                                "unable to resolve fwd pkgs: %v", err)
1✔
1342
                        return
1✔
1343
                }
1344

1345
                // With our link's in-memory state fully reconstructed, spawn a
1346
                // goroutine to manage the reclamation of disk space occupied by
1347
                // completed forwarding packages.
1348
                l.Wg.Add(1)
213✔
1349
                go l.fwdPkgGarbager()
213✔
1350
        }
1351

1352
        for {
4,382✔
1353
                // We must always check if we failed at some point processing
4,169✔
1354
                // the last update before processing the next.
4,169✔
1355
                if l.failed {
4,183✔
1356
                        l.log.Errorf("link failed, exiting htlcManager")
14✔
1357
                        return
14✔
1358
                }
14✔
1359

1360
                // If the previous event resulted in a non-empty batch, resume
1361
                // the batch ticker so that it can be cleared. Otherwise pause
1362
                // the ticker to prevent waking up the htlcManager while the
1363
                // batch is empty.
1364
                numUpdates := l.channel.NumPendingUpdates(
4,156✔
1365
                        lntypes.Local, lntypes.Remote,
4,156✔
1366
                )
4,156✔
1367
                if numUpdates > 0 {
4,663✔
1368
                        l.cfg.BatchTicker.Resume()
507✔
1369
                        l.log.Tracef("BatchTicker resumed, "+
507✔
1370
                                "NumPendingUpdates(Local, Remote)=%d",
507✔
1371
                                numUpdates,
507✔
1372
                        )
507✔
1373
                } else {
4,157✔
1374
                        l.cfg.BatchTicker.Pause()
3,650✔
1375
                        l.log.Trace("BatchTicker paused due to zero " +
3,650✔
1376
                                "NumPendingUpdates(Local, Remote)")
3,650✔
1377
                }
3,650✔
1378

1379
                select {
4,156✔
1380
                // We have a new hook that needs to be run when we reach a clean
1381
                // channel state.
1382
                case hook := <-l.flushHooks.newTransients:
2✔
1383
                        if l.channel.IsChannelClean() {
3✔
1384
                                hook()
1✔
1385
                        } else {
3✔
1386
                                l.flushHooks.alloc(hook)
2✔
1387
                        }
2✔
1388

1389
                // We have a new hook that needs to be run when we have
1390
                // committed all of our updates.
1391
                case hook := <-l.outgoingCommitHooks.newTransients:
2✔
1392
                        if !l.channel.OweCommitment() {
3✔
1393
                                hook()
1✔
1394
                        } else {
2✔
1395
                                l.outgoingCommitHooks.alloc(hook)
1✔
1396
                        }
1✔
1397

1398
                // We have a new hook that needs to be run when our peer has
1399
                // committed all of their updates.
1400
                case hook := <-l.incomingCommitHooks.newTransients:
×
1401
                        if !l.channel.NeedCommitment() {
×
1402
                                hook()
×
1403
                        } else {
×
1404
                                l.incomingCommitHooks.alloc(hook)
×
1405
                        }
×
1406

1407
                // Our update fee timer has fired, so we'll check the network
1408
                // fee to see if we should adjust our commitment fee.
1409
                case <-l.updateFeeTimer.C:
4✔
1410
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1411

4✔
1412
                        // If we're not the initiator of the channel, don't we
4✔
1413
                        // don't control the fees, so we can ignore this.
4✔
1414
                        if !l.channel.IsInitiator() {
4✔
1415
                                continue
×
1416
                        }
1417

1418
                        // If we are the initiator, then we'll sample the
1419
                        // current fee rate to get into the chain within 3
1420
                        // blocks.
1421
                        netFee, err := l.sampleNetworkFee()
4✔
1422
                        if err != nil {
4✔
1423
                                l.log.Errorf("unable to sample network fee: %v",
×
1424
                                        err)
×
1425
                                continue
×
1426
                        }
1427

1428
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1429

4✔
1430
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1431
                                netFee, minRelayFee,
4✔
1432
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1433
                                l.cfg.MaxFeeAllocation,
4✔
1434
                        )
4✔
1435

4✔
1436
                        // We determine if we should adjust the commitment fee
4✔
1437
                        // based on the current commitment fee, the suggested
4✔
1438
                        // new commitment fee and the current minimum relay fee
4✔
1439
                        // rate.
4✔
1440
                        commitFee := l.channel.CommitFeeRate()
4✔
1441
                        if !shouldAdjustCommitFee(
4✔
1442
                                newCommitFee, commitFee, minRelayFee,
4✔
1443
                        ) {
5✔
1444

1✔
1445
                                continue
1✔
1446
                        }
1447

1448
                        // If we do, then we'll send a new UpdateFee message to
1449
                        // the remote party, to be locked in with a new update.
1450
                        if err := l.updateChannelFee(newCommitFee); err != nil {
3✔
1451
                                l.log.Errorf("unable to update fee rate: %v",
×
1452
                                        err)
×
1453
                                continue
×
1454
                        }
1455

1456
                // The underlying channel has notified us of a unilateral close
1457
                // carried out by the remote peer. In the case of such an
1458
                // event, we'll wipe the channel state from the peer, and mark
1459
                // the contract as fully settled. Afterwards we can exit.
1460
                //
1461
                // TODO(roasbeef): add force closure? also breach?
1462
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
1✔
1463
                        l.log.Warnf("remote peer has closed on-chain")
1✔
1464

1✔
1465
                        // TODO(roasbeef): remove all together
1✔
1466
                        go func() {
2✔
1467
                                chanPoint := l.channel.ChannelPoint()
1✔
1468
                                l.cfg.Peer.WipeChannel(&chanPoint)
1✔
1469
                        }()
1✔
1470

1471
                        return
1✔
1472

1473
                case <-l.cfg.BatchTicker.Ticks():
197✔
1474
                        // Attempt to extend the remote commitment chain
197✔
1475
                        // including all the currently pending entries. If the
197✔
1476
                        // send was unsuccessful, then abandon the update,
197✔
1477
                        // waiting for the revocation window to open up.
197✔
1478
                        if !l.updateCommitTxOrFail() {
197✔
1479
                                return
×
1480
                        }
×
1481

1482
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1483
                        l.failf(
1✔
1484
                                LinkFailureError{
1✔
1485
                                        code:          ErrRemoteUnresponsive,
1✔
1486
                                        FailureAction: LinkFailureDisconnect,
1✔
1487
                                },
1✔
1488
                                "unable to complete dance",
1✔
1489
                        )
1✔
1490
                        return
1✔
1491

1492
                // A message from the switch was just received. This indicates
1493
                // that the link is an intermediate hop in a multi-hop HTLC
1494
                // circuit.
1495
                case pkt := <-l.downstream:
522✔
1496
                        l.handleDownstreamPkt(pkt)
522✔
1497

1498
                // A message from the connected peer was just received. This
1499
                // indicates that we have a new incoming HTLC, either directly
1500
                // for us, or part of a multi-hop HTLC circuit.
1501
                case msg := <-l.upstream:
3,178✔
1502
                        l.handleUpstreamMsg(msg)
3,178✔
1503

1504
                // A htlc resolution is received. This means that we now have a
1505
                // resolution for a previously accepted htlc.
1506
                case hodlItem := <-l.hodlQueue.ChanOut():
56✔
1507
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
56✔
1508
                        err := l.processHodlQueue(htlcResolution)
56✔
1509
                        switch err {
56✔
1510
                        // No error, success.
1511
                        case nil:
55✔
1512

1513
                        // If the duplicate keystone error was encountered,
1514
                        // fail back gracefully.
1515
                        case ErrDuplicateKeystone:
×
1516
                                l.failf(LinkFailureError{
×
1517
                                        code: ErrCircuitError,
×
1518
                                }, "process hodl queue: "+
×
1519
                                        "temporary circuit error: %v",
×
1520
                                        err,
×
1521
                                )
×
1522

1523
                        // Send an Error message to the peer.
1524
                        default:
1✔
1525
                                l.failf(LinkFailureError{
1✔
1526
                                        code: ErrInternalError,
1✔
1527
                                }, "process hodl queue: unable to update "+
1✔
1528
                                        "commitment: %v", err,
1✔
1529
                                )
1✔
1530
                        }
1531

1532
                case qReq := <-l.quiescenceReqs:
2✔
1533
                        l.quiescer.InitStfu(qReq)
2✔
1534

2✔
1535
                        if l.noDanglingUpdates(lntypes.Local) {
4✔
1536
                                err := l.quiescer.SendOwedStfu()
2✔
1537
                                if err != nil {
2✔
1538
                                        l.stfuFailf(
×
1539
                                                "SendOwedStfu: %s", err.Error(),
×
1540
                                        )
×
1541
                                        res := fn.Err[lntypes.ChannelParty](err)
×
1542
                                        qReq.Resolve(res)
×
1543
                                }
×
1544
                        }
1545

1546
                case <-l.Quit:
190✔
1547
                        return
190✔
1548
                }
1549
        }
1550
}
1551

1552
// processHodlQueue processes a received htlc resolution and continues reading
1553
// from the hodl queue until no more resolutions remain. When this function
1554
// returns without an error, the commit tx should be updated.
1555
func (l *channelLink) processHodlQueue(
1556
        firstResolution invoices.HtlcResolution) error {
56✔
1557

56✔
1558
        // Try to read all waiting resolution messages, so that they can all be
56✔
1559
        // processed in a single commitment tx update.
56✔
1560
        htlcResolution := firstResolution
56✔
1561
loop:
56✔
1562
        for {
112✔
1563
                // Lookup all hodl htlcs that can be failed or settled with this event.
56✔
1564
                // The hodl htlc must be present in the map.
56✔
1565
                circuitKey := htlcResolution.CircuitKey()
56✔
1566
                hodlHtlc, ok := l.hodlMap[circuitKey]
56✔
1567
                if !ok {
56✔
1568
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1569
                }
×
1570

1571
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
56✔
1572
                        return err
×
1573
                }
×
1574

1575
                // Clean up hodl map.
1576
                delete(l.hodlMap, circuitKey)
56✔
1577

56✔
1578
                select {
56✔
1579
                case item := <-l.hodlQueue.ChanOut():
1✔
1580
                        htlcResolution = item.(invoices.HtlcResolution)
1✔
1581
                default:
56✔
1582
                        break loop
56✔
1583
                }
1584
        }
1585

1586
        // Update the commitment tx.
1587
        if err := l.updateCommitTx(); err != nil {
57✔
1588
                return err
1✔
1589
        }
1✔
1590

1591
        return nil
55✔
1592
}
1593

1594
// processHtlcResolution applies a received htlc resolution to the provided
1595
// htlc. When this function returns without an error, the commit tx should be
1596
// updated.
1597
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1598
        htlc hodlHtlc) error {
202✔
1599

202✔
1600
        circuitKey := resolution.CircuitKey()
202✔
1601

202✔
1602
        // Determine required action for the resolution based on the type of
202✔
1603
        // resolution we have received.
202✔
1604
        switch res := resolution.(type) {
202✔
1605
        // Settle htlcs that returned a settle resolution using the preimage
1606
        // in the resolution.
1607
        case *invoices.HtlcSettleResolution:
198✔
1608
                l.log.Debugf("received settle resolution for %v "+
198✔
1609
                        "with outcome: %v", circuitKey, res.Outcome)
198✔
1610

198✔
1611
                return l.settleHTLC(
198✔
1612
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
198✔
1613
                )
198✔
1614

1615
        // For htlc failures, we get the relevant failure message based
1616
        // on the failure resolution and then fail the htlc.
1617
        case *invoices.HtlcFailResolution:
5✔
1618
                l.log.Debugf("received cancel resolution for "+
5✔
1619
                        "%v with outcome: %v", circuitKey, res.Outcome)
5✔
1620

5✔
1621
                // Get the lnwire failure message based on the resolution
5✔
1622
                // result.
5✔
1623
                failure := getResolutionFailure(res, htlc.add.Amount)
5✔
1624

5✔
1625
                l.sendHTLCError(
5✔
1626
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
5✔
1627
                        true,
5✔
1628
                )
5✔
1629
                return nil
5✔
1630

1631
        // Fail if we do not get a settle of fail resolution, since we
1632
        // are only expecting to handle settles and fails.
1633
        default:
×
1634
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1635
                        resolution)
×
1636
        }
1637
}
1638

1639
// getResolutionFailure returns the wire message that a htlc resolution should
1640
// be failed with.
1641
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1642
        amount lnwire.MilliSatoshi) *LinkError {
5✔
1643

5✔
1644
        // If the resolution has been resolved as part of a MPP timeout,
5✔
1645
        // we need to fail the htlc with lnwire.FailMppTimeout.
5✔
1646
        if resolution.Outcome == invoices.ResultMppTimeout {
5✔
1647
                return NewDetailedLinkError(
×
1648
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1649
                )
×
1650
        }
×
1651

1652
        // If the htlc is not a MPP timeout, we fail it with
1653
        // FailIncorrectDetails. This error is sent for invoice payment
1654
        // failures such as underpayment/ expiry too soon and hodl invoices
1655
        // (which return FailIncorrectDetails to avoid leaking information).
1656
        incorrectDetails := lnwire.NewFailIncorrectDetails(
5✔
1657
                amount, uint32(resolution.AcceptHeight),
5✔
1658
        )
5✔
1659

5✔
1660
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
5✔
1661
}
1662

1663
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1664
// within the link's configuration that will be used to determine when the link
1665
// should propose an update to its commitment fee rate.
1666
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
218✔
1667
        lower := int64(l.cfg.MinUpdateTimeout)
218✔
1668
        upper := int64(l.cfg.MaxUpdateTimeout)
218✔
1669
        return time.Duration(prand.Int63n(upper-lower) + lower)
218✔
1670
}
218✔
1671

1672
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1673
// downstream HTLC Switch.
1674
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
481✔
1675
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
481✔
1676
        if !ok {
481✔
1677
                return errors.New("not an UpdateAddHTLC packet")
×
1678
        }
×
1679

1680
        // If we are flushing the link in the outgoing direction or we have
1681
        // already sent Stfu, then we can't add new htlcs to the link and we
1682
        // need to bounce it.
1683
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
481✔
1684
                l.mailBox.FailAdd(pkt)
×
1685

×
1686
                return NewDetailedLinkError(
×
1687
                        &lnwire.FailTemporaryChannelFailure{},
×
1688
                        OutgoingFailureLinkNotEligible,
×
1689
                )
×
1690
        }
×
1691

1692
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1693
        // arbitrary delays between the switch adding an ADD to the
1694
        // mailbox, and the HTLC being added to the commitment state.
1695
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
481✔
1696
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1697
                l.mailBox.AckPacket(pkt.inKey())
×
1698
                return nil
×
1699
        }
×
1700

1701
        // Check if we can add the HTLC here without exceededing the max fee
1702
        // exposure threshold.
1703
        if l.isOverexposedWithHtlc(htlc, false) {
485✔
1704
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1705
                        "exposure exceeded")
4✔
1706

4✔
1707
                l.mailBox.FailAdd(pkt)
4✔
1708

4✔
1709
                return NewDetailedLinkError(
4✔
1710
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1711
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1712
                )
4✔
1713
        }
4✔
1714

1715
        // A new payment has been initiated via the downstream channel,
1716
        // so we add the new HTLC to our local log, then update the
1717
        // commitment chains.
1718
        htlc.ChanID = l.ChanID()
477✔
1719
        openCircuitRef := pkt.inKey()
477✔
1720

477✔
1721
        // We enforce the fee buffer for the commitment transaction because
477✔
1722
        // we are in control of adding this htlc. Nothing has locked-in yet so
477✔
1723
        // we can securely enforce the fee buffer which is only relevant if we
477✔
1724
        // are the initiator of the channel.
477✔
1725
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
477✔
1726
        if err != nil {
479✔
1727
                // The HTLC was unable to be added to the state machine,
2✔
1728
                // as a result, we'll signal the switch to cancel the
2✔
1729
                // pending payment.
2✔
1730
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
2✔
1731
                        err)
2✔
1732

2✔
1733
                // Remove this packet from the link's mailbox, this
2✔
1734
                // prevents it from being reprocessed if the link
2✔
1735
                // restarts and resets it mailbox. If this response
2✔
1736
                // doesn't make it back to the originating link, it will
2✔
1737
                // be rejected upon attempting to reforward the Add to
2✔
1738
                // the switch, since the circuit was never fully opened,
2✔
1739
                // and the forwarding package shows it as
2✔
1740
                // unacknowledged.
2✔
1741
                l.mailBox.FailAdd(pkt)
2✔
1742

2✔
1743
                return NewDetailedLinkError(
2✔
1744
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1745
                        OutgoingFailureDownstreamHtlcAdd,
2✔
1746
                )
2✔
1747
        }
2✔
1748

1749
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
476✔
1750
                "local_log_index=%v, pend_updates=%v",
476✔
1751
                htlc.PaymentHash[:], index,
476✔
1752
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
476✔
1753

476✔
1754
        pkt.outgoingChanID = l.ShortChanID()
476✔
1755
        pkt.outgoingHTLCID = index
476✔
1756
        htlc.ID = index
476✔
1757

476✔
1758
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
476✔
1759
                pkt.inKey(), pkt.outKey())
476✔
1760

476✔
1761
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
476✔
1762
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
476✔
1763

476✔
1764
        _ = l.cfg.Peer.SendMessage(false, htlc)
476✔
1765

476✔
1766
        // Send a forward event notification to htlcNotifier.
476✔
1767
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
476✔
1768
                newHtlcKey(pkt),
476✔
1769
                HtlcInfo{
476✔
1770
                        IncomingTimeLock: pkt.incomingTimeout,
476✔
1771
                        IncomingAmt:      pkt.incomingAmount,
476✔
1772
                        OutgoingTimeLock: htlc.Expiry,
476✔
1773
                        OutgoingAmt:      htlc.Amount,
476✔
1774
                },
476✔
1775
                getEventType(pkt),
476✔
1776
        )
476✔
1777

476✔
1778
        l.tryBatchUpdateCommitTx()
476✔
1779

476✔
1780
        return nil
476✔
1781
}
1782

1783
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1784
// Switch. Possible messages sent by the switch include requests to forward new
1785
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1786
// cleared HTLCs with the upstream peer.
1787
//
1788
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1789
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
522✔
1790
        if pkt.htlc.MsgType().IsChannelUpdate() &&
522✔
1791
                !l.quiescer.CanSendUpdates() {
522✔
1792

×
1793
                l.log.Warnf("unable to process channel update. "+
×
1794
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1795

×
1796
                return
×
1797
        }
×
1798

1799
        switch htlc := pkt.htlc.(type) {
522✔
1800
        case *lnwire.UpdateAddHTLC:
481✔
1801
                // Handle add message. The returned error can be ignored,
481✔
1802
                // because it is also sent through the mailbox.
481✔
1803
                _ = l.handleDownstreamUpdateAdd(pkt)
481✔
1804

1805
        case *lnwire.UpdateFulfillHTLC:
24✔
1806
                // If hodl.SettleOutgoing mode is active, we exit early to
24✔
1807
                // simulate arbitrary delays between the switch adding the
24✔
1808
                // SETTLE to the mailbox, and the HTLC being added to the
24✔
1809
                // commitment state.
24✔
1810
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
24✔
1811
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1812
                        l.mailBox.AckPacket(pkt.inKey())
×
1813
                        return
×
1814
                }
×
1815

1816
                // An HTLC we forward to the switch has just settled somewhere
1817
                // upstream. Therefore we settle the HTLC within the our local
1818
                // state machine.
1819
                inKey := pkt.inKey()
24✔
1820
                err := l.channel.SettleHTLC(
24✔
1821
                        htlc.PaymentPreimage,
24✔
1822
                        pkt.incomingHTLCID,
24✔
1823
                        pkt.sourceRef,
24✔
1824
                        pkt.destRef,
24✔
1825
                        &inKey,
24✔
1826
                )
24✔
1827
                if err != nil {
24✔
1828
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1829
                                "circuit-key=%v: %v", inKey, err)
×
1830

×
1831
                        // If the HTLC index for Settle response was not known
×
1832
                        // to our commitment state, it has already been
×
1833
                        // cleaned up by a prior response. We'll thus try to
×
1834
                        // clean up any lingering state to ensure we don't
×
1835
                        // continue reforwarding.
×
1836
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1837
                                l.cleanupSpuriousResponse(pkt)
×
1838
                        }
×
1839

1840
                        // Remove the packet from the link's mailbox to ensure
1841
                        // it doesn't get replayed after a reconnection.
1842
                        l.mailBox.AckPacket(inKey)
×
1843

×
1844
                        return
×
1845
                }
1846

1847
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
24✔
1848
                        "%s->%s", pkt.inKey(), pkt.outKey())
24✔
1849

24✔
1850
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
24✔
1851

24✔
1852
                // With the HTLC settled, we'll need to populate the wire
24✔
1853
                // message to target the specific channel and HTLC to be
24✔
1854
                // canceled.
24✔
1855
                htlc.ChanID = l.ChanID()
24✔
1856
                htlc.ID = pkt.incomingHTLCID
24✔
1857

24✔
1858
                // Then we send the HTLC settle message to the connected peer
24✔
1859
                // so we can continue the propagation of the settle message.
24✔
1860
                l.cfg.Peer.SendMessage(false, htlc)
24✔
1861

24✔
1862
                // Send a settle event notification to htlcNotifier.
24✔
1863
                l.cfg.HtlcNotifier.NotifySettleEvent(
24✔
1864
                        newHtlcKey(pkt),
24✔
1865
                        htlc.PaymentPreimage,
24✔
1866
                        getEventType(pkt),
24✔
1867
                )
24✔
1868

24✔
1869
                // Immediately update the commitment tx to minimize latency.
24✔
1870
                l.updateCommitTxOrFail()
24✔
1871

1872
        case *lnwire.UpdateFailHTLC:
19✔
1873
                // If hodl.FailOutgoing mode is active, we exit early to
19✔
1874
                // simulate arbitrary delays between the switch adding a FAIL to
19✔
1875
                // the mailbox, and the HTLC being added to the commitment
19✔
1876
                // state.
19✔
1877
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
19✔
1878
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1879
                        l.mailBox.AckPacket(pkt.inKey())
×
1880
                        return
×
1881
                }
×
1882

1883
                // An HTLC cancellation has been triggered somewhere upstream,
1884
                // we'll remove then HTLC from our local state machine.
1885
                inKey := pkt.inKey()
19✔
1886
                err := l.channel.FailHTLC(
19✔
1887
                        pkt.incomingHTLCID,
19✔
1888
                        htlc.Reason,
19✔
1889
                        pkt.sourceRef,
19✔
1890
                        pkt.destRef,
19✔
1891
                        &inKey,
19✔
1892
                )
19✔
1893
                if err != nil {
22✔
1894
                        l.log.Errorf("unable to cancel incoming HTLC for "+
3✔
1895
                                "circuit-key=%v: %v", inKey, err)
3✔
1896

3✔
1897
                        // If the HTLC index for Fail response was not known to
3✔
1898
                        // our commitment state, it has already been cleaned up
3✔
1899
                        // by a prior response. We'll thus try to clean up any
3✔
1900
                        // lingering state to ensure we don't continue
3✔
1901
                        // reforwarding.
3✔
1902
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
5✔
1903
                                l.cleanupSpuriousResponse(pkt)
2✔
1904
                        }
2✔
1905

1906
                        // Remove the packet from the link's mailbox to ensure
1907
                        // it doesn't get replayed after a reconnection.
1908
                        l.mailBox.AckPacket(inKey)
3✔
1909

3✔
1910
                        return
3✔
1911
                }
1912

1913
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
17✔
1914
                        pkt.inKey(), pkt.outKey())
17✔
1915

17✔
1916
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
17✔
1917

17✔
1918
                // With the HTLC removed, we'll need to populate the wire
17✔
1919
                // message to target the specific channel and HTLC to be
17✔
1920
                // canceled. The "Reason" field will have already been set
17✔
1921
                // within the switch.
17✔
1922
                htlc.ChanID = l.ChanID()
17✔
1923
                htlc.ID = pkt.incomingHTLCID
17✔
1924

17✔
1925
                // We send the HTLC message to the peer which initially created
17✔
1926
                // the HTLC. If the incoming blinding point is non-nil, we
17✔
1927
                // know that we are a relaying node in a blinded path.
17✔
1928
                // Otherwise, we're either an introduction node or not part of
17✔
1929
                // a blinded path at all.
17✔
1930
                if err := l.sendIncomingHTLCFailureMsg(
17✔
1931
                        htlc.ID,
17✔
1932
                        pkt.obfuscator,
17✔
1933
                        htlc.Reason,
17✔
1934
                ); err != nil {
17✔
1935
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1936
                                err)
×
1937

×
1938
                        return
×
1939
                }
×
1940

1941
                // If the packet does not have a link failure set, it failed
1942
                // further down the route so we notify a forwarding failure.
1943
                // Otherwise, we notify a link failure because it failed at our
1944
                // node.
1945
                if pkt.linkFailure != nil {
28✔
1946
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
11✔
1947
                                newHtlcKey(pkt),
11✔
1948
                                newHtlcInfo(pkt),
11✔
1949
                                getEventType(pkt),
11✔
1950
                                pkt.linkFailure,
11✔
1951
                                false,
11✔
1952
                        )
11✔
1953
                } else {
18✔
1954
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
7✔
1955
                                newHtlcKey(pkt), getEventType(pkt),
7✔
1956
                        )
7✔
1957
                }
7✔
1958

1959
                // Immediately update the commitment tx to minimize latency.
1960
                l.updateCommitTxOrFail()
17✔
1961
        }
1962
}
1963

1964
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1965
// full.
1966
func (l *channelLink) tryBatchUpdateCommitTx() {
476✔
1967
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
476✔
1968
        if pending < uint64(l.cfg.BatchSize) {
935✔
1969
                return
459✔
1970
        }
459✔
1971

1972
        l.updateCommitTxOrFail()
18✔
1973
}
1974

1975
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1976
// associated with this packet. If successful in doing so, it will also purge
1977
// the open circuit from the circuit map and remove the packet from the link's
1978
// mailbox.
1979
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1980
        inKey := pkt.inKey()
2✔
1981

2✔
1982
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1983
                "circuit-key=%v", inKey)
2✔
1984

2✔
1985
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1986
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1987
        if pkt.sourceRef == nil {
3✔
1988
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1989
                        "circuit-key=%v, does not contain source reference",
1✔
1990
                        inKey)
1✔
1991
                return
1✔
1992
        }
1✔
1993

1994
        // If the source reference is present,  we will try to prevent this link
1995
        // from resending the packet to the switch. To do so, we ack the AddRef
1996
        // of the incoming HTLC belonging to this link.
1997
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1998
        if err != nil {
1✔
1999
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2000
                        "circuit-key=%v: %v", inKey, err)
×
2001

×
2002
                // If this operation failed, it is unsafe to attempt removal of
×
2003
                // the destination reference or circuit, so we exit early. The
×
2004
                // cleanup may proceed with a different packet in the future
×
2005
                // that succeeds on this step.
×
2006
                return
×
2007
        }
×
2008

2009
        // Now that we know this link will stop retransmitting Adds to the
2010
        // switch, we can begin to teardown the response reference and circuit
2011
        // map.
2012
        //
2013
        // If the packet includes a destination reference, then a response for
2014
        // this HTLC was locked into the outgoing channel. Attempt to remove
2015
        // this reference, so we stop retransmitting the response internally.
2016
        // Even if this fails, we will proceed in trying to delete the circuit.
2017
        // When retransmitting responses, the destination references will be
2018
        // cleaned up if an open circuit is not found in the circuit map.
2019
        if pkt.destRef != nil {
1✔
2020
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2021
                if err != nil {
×
2022
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2023
                                "for incoming circuit-key=%v: %v",
×
2024
                                inKey, err)
×
2025
                }
×
2026
        }
2027

2028
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
2029

1✔
2030
        // With all known references acked, we can now safely delete the circuit
1✔
2031
        // from the switch's circuit map, as the state is no longer needed.
1✔
2032
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
2033
        if err != nil {
1✔
2034
                l.log.Errorf("unable to delete circuit for "+
×
2035
                        "circuit-key=%v: %v", inKey, err)
×
2036
        }
×
2037
}
2038

2039
// handleUpstreamMsg processes wire messages related to commitment state
2040
// updates from the upstream peer. The upstream peer is the peer whom we have a
2041
// direct channel with, updating our respective commitment chains.
2042
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
3,178✔
2043
        // First check if the message is an update and we are capable of
3,178✔
2044
        // receiving updates right now.
3,178✔
2045
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,178✔
2046
                l.stfuFailf("update received after stfu: %T", msg)
×
2047
                return
×
2048
        }
×
2049

2050
        switch msg := msg.(type) {
3,178✔
2051
        case *lnwire.UpdateAddHTLC:
451✔
2052
                if l.IsFlushing(Incoming) {
451✔
2053
                        // This is forbidden by the protocol specification.
×
2054
                        // The best chance we have to deal with this is to drop
×
2055
                        // the connection. This should roll back the channel
×
2056
                        // state to the last CommitSig. If the remote has
×
2057
                        // already sent a CommitSig we haven't received yet,
×
2058
                        // channel state will be re-synchronized with a
×
2059
                        // ChannelReestablish message upon reconnection and the
×
2060
                        // protocol state that caused us to flush the link will
×
2061
                        // be rolled back. In the event that there was some
×
2062
                        // non-deterministic behavior in the remote that caused
×
2063
                        // them to violate the protocol, we have a decent shot
×
2064
                        // at correcting it this way, since reconnecting will
×
2065
                        // put us in the cleanest possible state to try again.
×
2066
                        //
×
2067
                        // In addition to the above, it is possible for us to
×
2068
                        // hit this case in situations where we improperly
×
2069
                        // handle message ordering due to concurrency choices.
×
2070
                        // An issue has been filed to address this here:
×
2071
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2072
                        l.failf(
×
2073
                                LinkFailureError{
×
2074
                                        code:             ErrInvalidUpdate,
×
2075
                                        FailureAction:    LinkFailureDisconnect,
×
2076
                                        PermanentFailure: false,
×
2077
                                        Warning:          true,
×
2078
                                },
×
2079
                                "received add while link is flushing",
×
2080
                        )
×
2081

×
2082
                        return
×
2083
                }
×
2084

2085
                // Disallow htlcs with blinding points set if we haven't
2086
                // enabled the feature. This saves us from having to process
2087
                // the onion at all, but will only catch blinded payments
2088
                // where we are a relaying node (as the blinding point will
2089
                // be in the payload when we're the introduction node).
2090
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
451✔
2091
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2092
                                "blinding point included when route blinding "+
×
2093
                                        "is disabled")
×
2094

×
2095
                        return
×
2096
                }
×
2097

2098
                // We have to check the limit here rather than later in the
2099
                // switch because the counterparty can keep sending HTLC's
2100
                // without sending a revoke. This would mean that the switch
2101
                // check would only occur later.
2102
                if l.isOverexposedWithHtlc(msg, true) {
451✔
2103
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2104
                                "peer sent us an HTLC that exceeded our max "+
×
2105
                                        "fee exposure")
×
2106

×
2107
                        return
×
2108
                }
×
2109

2110
                // We just received an add request from an upstream peer, so we
2111
                // add it to our state machine, then add the HTLC to our
2112
                // "settle" list in the event that we know the preimage.
2113
                index, err := l.channel.ReceiveHTLC(msg)
451✔
2114
                if err != nil {
451✔
2115
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2116
                                "unable to handle upstream add HTLC: %v", err)
×
2117
                        return
×
2118
                }
×
2119

2120
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
451✔
2121
                        "assigning index: %v", msg.PaymentHash[:], index)
451✔
2122

2123
        case *lnwire.UpdateFulfillHTLC:
228✔
2124
                pre := msg.PaymentPreimage
228✔
2125
                idx := msg.ID
228✔
2126

228✔
2127
                // Before we pipeline the settle, we'll check the set of active
228✔
2128
                // htlc's to see if the related UpdateAddHTLC has been fully
228✔
2129
                // locked-in.
228✔
2130
                var lockedin bool
228✔
2131
                htlcs := l.channel.ActiveHtlcs()
228✔
2132
                for _, add := range htlcs {
865✔
2133
                        // The HTLC will be outgoing and match idx.
637✔
2134
                        if !add.Incoming && add.HtlcIndex == idx {
863✔
2135
                                lockedin = true
226✔
2136
                                break
226✔
2137
                        }
2138
                }
2139

2140
                if !lockedin {
230✔
2141
                        l.failf(
2✔
2142
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2143
                                "unable to handle upstream settle",
2✔
2144
                        )
2✔
2145
                        return
2✔
2146
                }
2✔
2147

2148
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
227✔
2149
                        l.failf(
1✔
2150
                                LinkFailureError{
1✔
2151
                                        code:          ErrInvalidUpdate,
1✔
2152
                                        FailureAction: LinkFailureForceClose,
1✔
2153
                                },
1✔
2154
                                "unable to handle upstream settle HTLC: %v", err,
1✔
2155
                        )
1✔
2156
                        return
1✔
2157
                }
1✔
2158

2159
                settlePacket := &htlcPacket{
226✔
2160
                        outgoingChanID: l.ShortChanID(),
226✔
2161
                        outgoingHTLCID: idx,
226✔
2162
                        htlc: &lnwire.UpdateFulfillHTLC{
226✔
2163
                                PaymentPreimage: pre,
226✔
2164
                        },
226✔
2165
                }
226✔
2166

226✔
2167
                // Add the newly discovered preimage to our growing list of
226✔
2168
                // uncommitted preimage. These will be written to the witness
226✔
2169
                // cache just before accepting the next commitment signature
226✔
2170
                // from the remote peer.
226✔
2171
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
226✔
2172

226✔
2173
                // Pipeline this settle, send it to the switch.
226✔
2174
                go l.forwardBatch(false, settlePacket)
226✔
2175

2176
        case *lnwire.UpdateFailMalformedHTLC:
4✔
2177
                // Convert the failure type encoded within the HTLC fail
4✔
2178
                // message to the proper generic lnwire error code.
4✔
2179
                var failure lnwire.FailureMessage
4✔
2180
                switch msg.FailureCode {
4✔
2181
                case lnwire.CodeInvalidOnionVersion:
2✔
2182
                        failure = &lnwire.FailInvalidOnionVersion{
2✔
2183
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2184
                        }
2✔
2185
                case lnwire.CodeInvalidOnionHmac:
×
2186
                        failure = &lnwire.FailInvalidOnionHmac{
×
2187
                                OnionSHA256: msg.ShaOnionBlob,
×
2188
                        }
×
2189

2190
                case lnwire.CodeInvalidOnionKey:
×
2191
                        failure = &lnwire.FailInvalidOnionKey{
×
2192
                                OnionSHA256: msg.ShaOnionBlob,
×
2193
                        }
×
2194

2195
                // Handle malformed errors that are part of a blinded route.
2196
                // This case is slightly different, because we expect every
2197
                // relaying node in the blinded portion of the route to send
2198
                // malformed errors. If we're also a relaying node, we're
2199
                // likely going to switch this error out anyway for our own
2200
                // malformed error, but we handle the case here for
2201
                // completeness.
2202
                case lnwire.CodeInvalidBlinding:
1✔
2203
                        failure = &lnwire.FailInvalidBlinding{
1✔
2204
                                OnionSHA256: msg.ShaOnionBlob,
1✔
2205
                        }
1✔
2206

2207
                default:
2✔
2208
                        l.log.Warnf("unexpected failure code received in "+
2✔
2209
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2210

2✔
2211
                        // We don't just pass back the error we received from
2✔
2212
                        // our successor. Otherwise we might report a failure
2✔
2213
                        // that penalizes us more than needed. If the onion that
2✔
2214
                        // we forwarded was correct, the node should have been
2✔
2215
                        // able to send back its own failure. The node did not
2✔
2216
                        // send back its own failure, so we assume there was a
2✔
2217
                        // problem with the onion and report that back. We reuse
2✔
2218
                        // the invalid onion key failure because there is no
2✔
2219
                        // specific error for this case.
2✔
2220
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2221
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2222
                        }
2✔
2223
                }
2224

2225
                // With the error parsed, we'll convert the into it's opaque
2226
                // form.
2227
                var b bytes.Buffer
4✔
2228
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
4✔
2229
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2230
                        return
×
2231
                }
×
2232

2233
                // If remote side have been unable to parse the onion blob we
2234
                // have sent to it, than we should transform the malformed HTLC
2235
                // message to the usual HTLC fail message.
2236
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
4✔
2237
                if err != nil {
4✔
2238
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2239
                                "unable to handle upstream fail HTLC: %v", err)
×
2240
                        return
×
2241
                }
×
2242

2243
        case *lnwire.UpdateFailHTLC:
121✔
2244
                // Verify that the failure reason is at least 256 bytes plus
121✔
2245
                // overhead.
121✔
2246
                const minimumFailReasonLength = lnwire.FailureMessageLength +
121✔
2247
                        2 + 2 + 32
121✔
2248

121✔
2249
                if len(msg.Reason) < minimumFailReasonLength {
122✔
2250
                        // We've received a reason with a non-compliant length.
1✔
2251
                        // Older nodes happily relay back these failures that
1✔
2252
                        // may originate from a node further downstream.
1✔
2253
                        // Therefore we can't just fail the channel.
1✔
2254
                        //
1✔
2255
                        // We want to be compliant ourselves, so we also can't
1✔
2256
                        // pass back the reason unmodified. And we must make
1✔
2257
                        // sure that we don't hit the magic length check of 260
1✔
2258
                        // bytes in processRemoteSettleFails either.
1✔
2259
                        //
1✔
2260
                        // Because the reason is unreadable for the payer
1✔
2261
                        // anyway, we just replace it by a compliant-length
1✔
2262
                        // series of random bytes.
1✔
2263
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2264
                        _, err := crand.Read(msg.Reason[:])
1✔
2265
                        if err != nil {
1✔
2266
                                l.log.Errorf("Random generation error: %v", err)
×
2267

×
2268
                                return
×
2269
                        }
×
2270
                }
2271

2272
                // Add fail to the update log.
2273
                idx := msg.ID
121✔
2274
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
121✔
2275
                if err != nil {
121✔
2276
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2277
                                "unable to handle upstream fail HTLC: %v", err)
×
2278
                        return
×
2279
                }
×
2280

2281
        case *lnwire.CommitSig:
1,192✔
2282
                // Since we may have learned new preimages for the first time,
1,192✔
2283
                // we'll add them to our preimage cache. By doing this, we
1,192✔
2284
                // ensure any contested contracts watched by any on-chain
1,192✔
2285
                // arbitrators can now sweep this HTLC on-chain. We delay
1,192✔
2286
                // committing the preimages until just before accepting the new
1,192✔
2287
                // remote commitment, as afterwards the peer won't resend the
1,192✔
2288
                // Settle messages on the next channel reestablishment. Doing so
1,192✔
2289
                // allows us to more effectively batch this operation, instead
1,192✔
2290
                // of doing a single write per preimage.
1,192✔
2291
                err := l.cfg.PreimageCache.AddPreimages(
1,192✔
2292
                        l.uncommittedPreimages...,
1,192✔
2293
                )
1,192✔
2294
                if err != nil {
1,192✔
2295
                        l.failf(
×
2296
                                LinkFailureError{code: ErrInternalError},
×
2297
                                "unable to add preimages=%v to cache: %v",
×
2298
                                l.uncommittedPreimages, err,
×
2299
                        )
×
2300
                        return
×
2301
                }
×
2302

2303
                // Instead of truncating the slice to conserve memory
2304
                // allocations, we simply set the uncommitted preimage slice to
2305
                // nil so that a new one will be initialized if any more
2306
                // witnesses are discovered. We do this because the maximum size
2307
                // that the slice can occupy is 15KB, and we want to ensure we
2308
                // release that memory back to the runtime.
2309
                l.uncommittedPreimages = nil
1,192✔
2310

1,192✔
2311
                // We just received a new updates to our local commitment
1,192✔
2312
                // chain, validate this new commitment, closing the link if
1,192✔
2313
                // invalid.
1,192✔
2314
                auxSigBlob, err := msg.CustomRecords.Serialize()
1,192✔
2315
                if err != nil {
1,192✔
2316
                        l.failf(
×
2317
                                LinkFailureError{code: ErrInvalidCommitment},
×
2318
                                "unable to serialize custom records: %v", err,
×
2319
                        )
×
2320

×
2321
                        return
×
2322
                }
×
2323
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,192✔
2324
                        CommitSig:  msg.CommitSig,
1,192✔
2325
                        HtlcSigs:   msg.HtlcSigs,
1,192✔
2326
                        PartialSig: msg.PartialSig,
1,192✔
2327
                        AuxSigBlob: auxSigBlob,
1,192✔
2328
                })
1,192✔
2329
                if err != nil {
1,192✔
2330
                        // If we were unable to reconstruct their proposed
×
2331
                        // commitment, then we'll examine the type of error. If
×
2332
                        // it's an InvalidCommitSigError, then we'll send a
×
2333
                        // direct error.
×
2334
                        var sendData []byte
×
2335
                        switch err.(type) {
×
2336
                        case *lnwallet.InvalidCommitSigError:
×
2337
                                sendData = []byte(err.Error())
×
2338
                        case *lnwallet.InvalidHtlcSigError:
×
2339
                                sendData = []byte(err.Error())
×
2340
                        }
2341
                        l.failf(
×
2342
                                LinkFailureError{
×
2343
                                        code:          ErrInvalidCommitment,
×
2344
                                        FailureAction: LinkFailureForceClose,
×
2345
                                        SendData:      sendData,
×
2346
                                },
×
2347
                                "ChannelPoint(%v): unable to accept new "+
×
2348
                                        "commitment: %v",
×
2349
                                l.channel.ChannelPoint(), err,
×
2350
                        )
×
2351
                        return
×
2352
                }
2353

2354
                // As we've just accepted a new state, we'll now
2355
                // immediately send the remote peer a revocation for our prior
2356
                // state.
2357
                nextRevocation, currentHtlcs, finalHTLCs, err :=
1,192✔
2358
                        l.channel.RevokeCurrentCommitment()
1,192✔
2359
                if err != nil {
1,192✔
2360
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2361

×
2362
                        // We need to fail the channel in case revoking our
×
2363
                        // local commitment does not succeed. We might have
×
2364
                        // already advanced our channel state which would lead
×
2365
                        // us to proceed with an unclean state.
×
2366
                        //
×
2367
                        // NOTE: We do not trigger a force close because this
×
2368
                        // could resolve itself in case our db was just busy
×
2369
                        // not accepting new transactions.
×
2370
                        l.failf(
×
2371
                                LinkFailureError{
×
2372
                                        code:          ErrInternalError,
×
2373
                                        Warning:       true,
×
2374
                                        FailureAction: LinkFailureDisconnect,
×
2375
                                },
×
2376
                                "ChannelPoint(%v): unable to accept new "+
×
2377
                                        "commitment: %v",
×
2378
                                l.channel.ChannelPoint(), err,
×
2379
                        )
×
2380
                        return
×
2381
                }
×
2382

2383
                // As soon as we are ready to send our next revocation, we can
2384
                // invoke the incoming commit hooks.
2385
                l.RWMutex.Lock()
1,192✔
2386
                l.incomingCommitHooks.invoke()
1,192✔
2387
                l.RWMutex.Unlock()
1,192✔
2388

1,192✔
2389
                l.cfg.Peer.SendMessage(false, nextRevocation)
1,192✔
2390

1,192✔
2391
                // Notify the incoming htlcs of which the resolutions were
1,192✔
2392
                // locked in.
1,192✔
2393
                for id, settled := range finalHTLCs {
1,524✔
2394
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
332✔
2395
                                models.CircuitKey{
332✔
2396
                                        ChanID: l.ShortChanID(),
332✔
2397
                                        HtlcID: id,
332✔
2398
                                },
332✔
2399
                                channeldb.FinalHtlcInfo{
332✔
2400
                                        Settled:  settled,
332✔
2401
                                        Offchain: true,
332✔
2402
                                },
332✔
2403
                        )
332✔
2404
                }
332✔
2405

2406
                // Since we just revoked our commitment, we may have a new set
2407
                // of HTLC's on our commitment, so we'll send them using our
2408
                // function closure NotifyContractUpdate.
2409
                newUpdate := &contractcourt.ContractUpdate{
1,192✔
2410
                        HtlcKey: contractcourt.LocalHtlcSet,
1,192✔
2411
                        Htlcs:   currentHtlcs,
1,192✔
2412
                }
1,192✔
2413
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,192✔
2414
                if err != nil {
1,192✔
2415
                        l.log.Errorf("unable to notify contract update: %v",
×
2416
                                err)
×
2417
                        return
×
2418
                }
×
2419

2420
                select {
1,192✔
2421
                case <-l.Quit:
×
2422
                        return
×
2423
                default:
1,192✔
2424
                }
2425

2426
                // If the remote party initiated the state transition,
2427
                // we'll reply with a signature to provide them with their
2428
                // version of the latest commitment. Otherwise, both commitment
2429
                // chains are fully synced from our PoV, then we don't need to
2430
                // reply with a signature as both sides already have a
2431
                // commitment with the latest accepted.
2432
                if l.channel.OweCommitment() {
1,864✔
2433
                        if !l.updateCommitTxOrFail() {
672✔
2434
                                return
×
2435
                        }
×
2436
                }
2437

2438
                // If we need to send out an Stfu, this would be the time to do
2439
                // so.
2440
                if l.noDanglingUpdates(lntypes.Local) {
2,241✔
2441
                        err = l.quiescer.SendOwedStfu()
1,049✔
2442
                        if err != nil {
1,049✔
2443
                                l.stfuFailf("sendOwedStfu: %v", err.Error())
×
2444
                        }
×
2445
                }
2446

2447
                // Now that we have finished processing the incoming CommitSig
2448
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2449
                // the channel state is clean.
2450
                l.RWMutex.Lock()
1,192✔
2451
                if l.channel.IsChannelClean() {
1,363✔
2452
                        l.flushHooks.invoke()
171✔
2453
                }
171✔
2454
                l.RWMutex.Unlock()
1,192✔
2455

2456
        case *lnwire.RevokeAndAck:
1,181✔
2457
                // We've received a revocation from the remote chain, if valid,
1,181✔
2458
                // this moves the remote chain forward, and expands our
1,181✔
2459
                // revocation window.
1,181✔
2460

1,181✔
2461
                // We now process the message and advance our remote commit
1,181✔
2462
                // chain.
1,181✔
2463
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,181✔
2464
                if err != nil {
1,181✔
2465
                        // TODO(halseth): force close?
×
2466
                        l.failf(
×
2467
                                LinkFailureError{
×
2468
                                        code:          ErrInvalidRevocation,
×
2469
                                        FailureAction: LinkFailureDisconnect,
×
2470
                                },
×
2471
                                "unable to accept revocation: %v", err,
×
2472
                        )
×
2473
                        return
×
2474
                }
×
2475

2476
                // The remote party now has a new primary commitment, so we'll
2477
                // update the contract court to be aware of this new set (the
2478
                // prior old remote pending).
2479
                newUpdate := &contractcourt.ContractUpdate{
1,181✔
2480
                        HtlcKey: contractcourt.RemoteHtlcSet,
1,181✔
2481
                        Htlcs:   remoteHTLCs,
1,181✔
2482
                }
1,181✔
2483
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,181✔
2484
                if err != nil {
1,181✔
2485
                        l.log.Errorf("unable to notify contract update: %v",
×
2486
                                err)
×
2487
                        return
×
2488
                }
×
2489

2490
                select {
1,181✔
2491
                case <-l.Quit:
2✔
2492
                        return
2✔
2493
                default:
1,179✔
2494
                }
2495

2496
                // If we have a tower client for this channel type, we'll
2497
                // create a backup for the current state.
2498
                if l.cfg.TowerClient != nil {
1,180✔
2499
                        state := l.channel.State()
1✔
2500
                        chanID := l.ChanID()
1✔
2501

1✔
2502
                        err = l.cfg.TowerClient.BackupState(
1✔
2503
                                &chanID, state.RemoteCommitment.CommitHeight-1,
1✔
2504
                        )
1✔
2505
                        if err != nil {
1✔
2506
                                l.failf(LinkFailureError{
×
2507
                                        code: ErrInternalError,
×
2508
                                }, "unable to queue breach backup: %v", err)
×
2509
                                return
×
2510
                        }
×
2511
                }
2512

2513
                // If we can send updates then we can process adds in case we
2514
                // are the exit hop and need to send back resolutions, or in
2515
                // case there are validity issues with the packets. Otherwise
2516
                // we defer the action until resume.
2517
                //
2518
                // We are free to process the settles and fails without this
2519
                // check since processing those can't result in further updates
2520
                // to this channel link.
2521
                if l.quiescer.CanSendUpdates() {
2,357✔
2522
                        l.processRemoteAdds(fwdPkg)
1,178✔
2523
                } else {
1,179✔
2524
                        l.quiescer.OnResume(func() {
1✔
2525
                                l.processRemoteAdds(fwdPkg)
×
2526
                        })
×
2527
                }
2528
                l.processRemoteSettleFails(fwdPkg)
1,179✔
2529

1,179✔
2530
                // If the link failed during processing the adds, we must
1,179✔
2531
                // return to ensure we won't attempted to update the state
1,179✔
2532
                // further.
1,179✔
2533
                if l.failed {
1,180✔
2534
                        return
1✔
2535
                }
1✔
2536

2537
                // The revocation window opened up. If there are pending local
2538
                // updates, try to update the commit tx. Pending updates could
2539
                // already have been present because of a previously failed
2540
                // update to the commit tx or freshly added in by
2541
                // processRemoteAdds. Also in case there are no local updates,
2542
                // but there are still remote updates that are not in the remote
2543
                // commit tx yet, send out an update.
2544
                if l.channel.OweCommitment() {
1,490✔
2545
                        if !l.updateCommitTxOrFail() {
318✔
2546
                                return
7✔
2547
                        }
7✔
2548
                }
2549

2550
                // Now that we have finished processing the RevokeAndAck, we
2551
                // can invoke the flushHooks if the channel state is clean.
2552
                l.RWMutex.Lock()
1,172✔
2553
                if l.channel.IsChannelClean() {
1,332✔
2554
                        l.flushHooks.invoke()
160✔
2555
                }
160✔
2556
                l.RWMutex.Unlock()
1,172✔
2557

2558
        case *lnwire.UpdateFee:
3✔
2559
                // Check and see if their proposed fee-rate would make us
3✔
2560
                // exceed the fee threshold.
3✔
2561
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2562

3✔
2563
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2564
                if err != nil {
3✔
2565
                        // This shouldn't typically happen. If it does, it
×
2566
                        // indicates something is wrong with our channel state.
×
2567
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2568
                                "exceeded")
×
2569
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2570
                                "error calculating fee exposure: %v", err)
×
2571

×
2572
                        return
×
2573
                }
×
2574

2575
                if isDust {
3✔
2576
                        // The proposed fee-rate makes us exceed the fee
×
2577
                        // threshold.
×
2578
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2579
                                "fee threshold exceeded: %v", err)
×
2580
                        return
×
2581
                }
×
2582

2583
                // We received fee update from peer. If we are the initiator we
2584
                // will fail the channel, if not we will apply the update.
2585
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2586
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2587
                                "error receiving fee update: %v", err)
×
2588
                        return
×
2589
                }
×
2590

2591
                // Update the mailbox's feerate as well.
2592
                l.mailBox.SetFeeRate(fee)
3✔
2593

2594
        case *lnwire.Stfu:
3✔
2595
                err := l.handleStfu(msg)
3✔
2596
                if err != nil {
3✔
2597
                        l.stfuFailf("handleStfu: %v", err.Error())
×
2598
                }
×
2599

2600
        // In the case where we receive a warning message from our peer, just
2601
        // log it and move on. We choose not to disconnect from our peer,
2602
        // although we "MAY" do so according to the specification.
2603
        case *lnwire.Warning:
1✔
2604
                l.log.Warnf("received warning message from peer: %v",
1✔
2605
                        msg.Warning())
1✔
2606

2607
        case *lnwire.Error:
1✔
2608
                // Error received from remote, MUST fail channel, but should
1✔
2609
                // only print the contents of the error message if all
1✔
2610
                // characters are printable ASCII.
1✔
2611
                l.failf(
1✔
2612
                        LinkFailureError{
1✔
2613
                                code: ErrRemoteError,
1✔
2614

1✔
2615
                                // TODO(halseth): we currently don't fail the
1✔
2616
                                // channel permanently, as there are some sync
1✔
2617
                                // issues with other implementations that will
1✔
2618
                                // lead to them sending an error message, but
1✔
2619
                                // we can recover from on next connection. See
1✔
2620
                                // https://github.com/ElementsProject/lightning/issues/4212
1✔
2621
                                PermanentFailure: false,
1✔
2622
                        },
1✔
2623
                        "ChannelPoint(%v): received error from peer: %v",
1✔
2624
                        l.channel.ChannelPoint(), msg.Error(),
1✔
2625
                )
1✔
2626
        default:
×
2627
                l.log.Warnf("received unknown message of type %T", msg)
×
2628
        }
2629

2630
}
2631

2632
// handleStfu implements the top-level logic for handling the Stfu message from
2633
// our peer.
2634
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
3✔
2635
        if !l.noDanglingUpdates(lntypes.Remote) {
3✔
2636
                return ErrPendingRemoteUpdates
×
2637
        }
×
2638
        err := l.quiescer.RecvStfu(*stfu)
3✔
2639
        if err != nil {
3✔
2640
                return err
×
2641
        }
×
2642

2643
        // If we can immediately send an Stfu response back, we will.
2644
        if l.noDanglingUpdates(lntypes.Local) {
5✔
2645
                return l.quiescer.SendOwedStfu()
2✔
2646
        }
2✔
2647

2648
        return nil
1✔
2649
}
2650

2651
// stfuFailf fails the link in the case where the requirements of the quiescence
2652
// protocol are violated. In all cases we opt to drop the connection as only
2653
// link state (as opposed to channel state) is affected.
2654
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
2655
        l.failf(LinkFailureError{
×
2656
                code:             ErrStfuViolation,
×
2657
                FailureAction:    LinkFailureDisconnect,
×
2658
                PermanentFailure: false,
×
2659
                Warning:          true,
×
2660
        }, format, args...)
×
2661
}
×
2662

2663
// noDanglingUpdates returns true when there are 0 updates that were originally
2664
// issued by whose on either the Local or Remote commitment transaction.
2665
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,197✔
2666
        pendingOnLocal := l.channel.NumPendingUpdates(
1,197✔
2667
                whose, lntypes.Local,
1,197✔
2668
        )
1,197✔
2669
        pendingOnRemote := l.channel.NumPendingUpdates(
1,197✔
2670
                whose, lntypes.Remote,
1,197✔
2671
        )
1,197✔
2672

1,197✔
2673
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,197✔
2674
}
1,197✔
2675

2676
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2677
// for packets delivered from server, and cleaning up any circuits closed by
2678
// signing a previous commitment txn. This method ensures that the circuits are
2679
// removed from the circuit map before removing them from the link's mailbox,
2680
// otherwise it could be possible for some circuit to be missed if this link
2681
// flaps.
2682
func (l *channelLink) ackDownStreamPackets() error {
1,371✔
2683
        // First, remove the downstream Add packets that were included in the
1,371✔
2684
        // previous commitment signature. This will prevent the Adds from being
1,371✔
2685
        // replayed if this link disconnects.
1,371✔
2686
        for _, inKey := range l.openedCircuits {
1,836✔
2687
                // In order to test the sphinx replay logic of the remote
465✔
2688
                // party, unsafe replay does not acknowledge the packets from
465✔
2689
                // the mailbox. We can then force a replay of any Add packets
465✔
2690
                // held in memory by disconnecting and reconnecting the link.
465✔
2691
                if l.cfg.UnsafeReplay {
466✔
2692
                        continue
1✔
2693
                }
2694

2695
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
465✔
2696
                l.mailBox.AckPacket(inKey)
465✔
2697
        }
2698

2699
        // Now, we will delete all circuits closed by the previous commitment
2700
        // signature, which is the result of downstream Settle/Fail packets. We
2701
        // batch them here to ensure circuits are closed atomically and for
2702
        // performance.
2703
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,371✔
2704
        switch err {
1,371✔
2705
        case nil:
1,371✔
2706
                // Successful deletion.
2707

2708
        default:
×
2709
                l.log.Errorf("unable to delete %d circuits: %v",
×
2710
                        len(l.closedCircuits), err)
×
2711
                return err
×
2712
        }
2713

2714
        // With the circuits removed from memory and disk, we now ack any
2715
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2716
        // after startup. If forgive is enabled and we've reached this point,
2717
        // the circuits must have been removed at some point, so it is now safe
2718
        // to un-queue the corresponding Settle/Fails.
2719
        for _, inKey := range l.closedCircuits {
1,411✔
2720
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
40✔
2721
                        inKey)
40✔
2722
                l.mailBox.AckPacket(inKey)
40✔
2723
        }
40✔
2724

2725
        // Lastly, reset our buffers to be empty while keeping any acquired
2726
        // growth in the backing array.
2727
        l.openedCircuits = l.openedCircuits[:0]
1,371✔
2728
        l.closedCircuits = l.closedCircuits[:0]
1,371✔
2729

1,371✔
2730
        return nil
1,371✔
2731
}
2732

2733
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2734
// the link.
2735
func (l *channelLink) updateCommitTxOrFail() bool {
1,234✔
2736
        err := l.updateCommitTx()
1,234✔
2737
        switch err {
1,234✔
2738
        // No error encountered, success.
2739
        case nil:
1,224✔
2740

2741
        // A duplicate keystone error should be resolved and is not fatal, so
2742
        // we won't send an Error message to the peer.
2743
        case ErrDuplicateKeystone:
×
2744
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2745
                        "temporary circuit error: %v", err)
×
2746
                return false
×
2747

2748
        // Any other error is treated results in an Error message being sent to
2749
        // the peer.
2750
        default:
10✔
2751
                l.failf(LinkFailureError{code: ErrInternalError},
10✔
2752
                        "unable to update commitment: %v", err)
10✔
2753
                return false
10✔
2754
        }
2755

2756
        return true
1,224✔
2757
}
2758

2759
// updateCommitTx signs, then sends an update to the remote peer adding a new
2760
// commitment to their commitment chain which includes all the latest updates
2761
// we've received+processed up to this point.
2762
func (l *channelLink) updateCommitTx() error {
1,292✔
2763
        // Preemptively write all pending keystones to disk, just in case the
1,292✔
2764
        // HTLCs we have in memory are included in the subsequent attempt to
1,292✔
2765
        // sign a commitment state.
1,292✔
2766
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,292✔
2767
        if err != nil {
1,292✔
2768
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2769
                // it.
×
2770
                return err
×
2771
        }
×
2772

2773
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2774
        l.keystoneBatch = l.keystoneBatch[:0]
1,292✔
2775

1,292✔
2776
        // If hodl.Commit mode is active, we will refrain from attempting to
1,292✔
2777
        // commit any in-memory modifications to the channel state. Exiting here
1,292✔
2778
        // permits testing of either the switch or link's ability to trim
1,292✔
2779
        // circuits that have been opened, but unsuccessfully committed.
1,292✔
2780
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,297✔
2781
                l.log.Warnf(hodl.Commit.Warning())
5✔
2782
                return nil
5✔
2783
        }
5✔
2784

2785
        ctx, done := l.WithCtxQuitNoTimeout()
1,288✔
2786
        defer done()
1,288✔
2787

1,288✔
2788
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,288✔
2789
        if err == lnwallet.ErrNoWindow {
1,376✔
2790
                l.cfg.PendingCommitTicker.Resume()
88✔
2791
                l.log.Trace("PendingCommitTicker resumed")
88✔
2792

88✔
2793
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
88✔
2794
                l.log.Tracef("revocation window exhausted, unable to send: "+
88✔
2795
                        "%v, pend_updates=%v, dangling_closes%v", n,
88✔
2796
                        lnutils.SpewLogClosure(l.openedCircuits),
88✔
2797
                        lnutils.SpewLogClosure(l.closedCircuits))
88✔
2798

88✔
2799
                return nil
88✔
2800
        } else if err != nil {
1,289✔
2801
                return err
×
2802
        }
×
2803

2804
        if err := l.ackDownStreamPackets(); err != nil {
1,201✔
2805
                return err
×
2806
        }
×
2807

2808
        l.cfg.PendingCommitTicker.Pause()
1,201✔
2809
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,201✔
2810

1,201✔
2811
        // The remote party now has a new pending commitment, so we'll update
1,201✔
2812
        // the contract court to be aware of this new set (the prior old remote
1,201✔
2813
        // pending).
1,201✔
2814
        newUpdate := &contractcourt.ContractUpdate{
1,201✔
2815
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,201✔
2816
                Htlcs:   newCommit.PendingHTLCs,
1,201✔
2817
        }
1,201✔
2818
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,201✔
2819
        if err != nil {
1,201✔
2820
                l.log.Errorf("unable to notify contract update: %v", err)
×
2821
                return err
×
2822
        }
×
2823

2824
        select {
1,201✔
2825
        case <-l.Quit:
11✔
2826
                return ErrLinkShuttingDown
11✔
2827
        default:
1,190✔
2828
        }
2829

2830
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,190✔
2831
        if err != nil {
1,190✔
2832
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2833
        }
×
2834

2835
        commitSig := &lnwire.CommitSig{
1,190✔
2836
                ChanID:        l.ChanID(),
1,190✔
2837
                CommitSig:     newCommit.CommitSig,
1,190✔
2838
                HtlcSigs:      newCommit.HtlcSigs,
1,190✔
2839
                PartialSig:    newCommit.PartialSig,
1,190✔
2840
                CustomRecords: auxBlobRecords,
1,190✔
2841
        }
1,190✔
2842
        l.cfg.Peer.SendMessage(false, commitSig)
1,190✔
2843

1,190✔
2844
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
1,190✔
2845
        // of commit hooks.
1,190✔
2846
        l.RWMutex.Lock()
1,190✔
2847
        l.outgoingCommitHooks.invoke()
1,190✔
2848
        l.RWMutex.Unlock()
1,190✔
2849

1,190✔
2850
        return nil
1,190✔
2851
}
2852

2853
// Peer returns the representation of remote peer with which we have the
2854
// channel link opened.
2855
//
2856
// NOTE: Part of the ChannelLink interface.
2857
func (l *channelLink) PeerPubKey() [33]byte {
442✔
2858
        return l.cfg.Peer.PubKey()
442✔
2859
}
442✔
2860

2861
// ChannelPoint returns the channel outpoint for the channel link.
2862
// NOTE: Part of the ChannelLink interface.
2863
func (l *channelLink) ChannelPoint() wire.OutPoint {
853✔
2864
        return l.channel.ChannelPoint()
853✔
2865
}
853✔
2866

2867
// ShortChanID returns the short channel ID for the channel link. The short
2868
// channel ID encodes the exact location in the main chain that the original
2869
// funding output can be found.
2870
//
2871
// NOTE: Part of the ChannelLink interface.
2872
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,249✔
2873
        l.RLock()
4,249✔
2874
        defer l.RUnlock()
4,249✔
2875

4,249✔
2876
        return l.channel.ShortChanID()
4,249✔
2877
}
4,249✔
2878

2879
// UpdateShortChanID updates the short channel ID for a link. This may be
2880
// required in the event that a link is created before the short chan ID for it
2881
// is known, or a re-org occurs, and the funding transaction changes location
2882
// within the chain.
2883
//
2884
// NOTE: Part of the ChannelLink interface.
2885
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
1✔
2886
        chanID := l.ChanID()
1✔
2887

1✔
2888
        // Refresh the channel state's short channel ID by loading it from disk.
1✔
2889
        // This ensures that the channel state accurately reflects the updated
1✔
2890
        // short channel ID.
1✔
2891
        err := l.channel.State().Refresh()
1✔
2892
        if err != nil {
1✔
2893
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2894
                        "%v", chanID, err)
×
2895
                return hop.Source, err
×
2896
        }
×
2897

2898
        return hop.Source, nil
1✔
2899
}
2900

2901
// ChanID returns the channel ID for the channel link. The channel ID is a more
2902
// compact representation of a channel's full outpoint.
2903
//
2904
// NOTE: Part of the ChannelLink interface.
2905
func (l *channelLink) ChanID() lnwire.ChannelID {
3,930✔
2906
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,930✔
2907
}
3,930✔
2908

2909
// Bandwidth returns the total amount that can flow through the channel link at
2910
// this given instance. The value returned is expressed in millisatoshi and can
2911
// be used by callers when making forwarding decisions to determine if a link
2912
// can accept an HTLC.
2913
//
2914
// NOTE: Part of the ChannelLink interface.
2915
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
813✔
2916
        // Get the balance available on the channel for new HTLCs. This takes
813✔
2917
        // the channel reserve into account so HTLCs up to this value won't
813✔
2918
        // violate it.
813✔
2919
        return l.channel.AvailableBalance()
813✔
2920
}
813✔
2921

2922
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2923
// amount provided to the link. This check does not reserve a space, since
2924
// forwards or other payments may use the available slot, so it should be
2925
// considered best-effort.
2926
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
1✔
2927
        return l.channel.MayAddOutgoingHtlc(amt)
1✔
2928
}
1✔
2929

2930
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2931
// method.
2932
//
2933
// NOTE: Part of the dustHandler interface.
2934
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2935
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,524✔
2936

2,524✔
2937
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,524✔
2938
}
2,524✔
2939

2940
// getFeeRate is a wrapper method that retrieves the underlying channel's
2941
// feerate.
2942
//
2943
// NOTE: Part of the dustHandler interface.
2944
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
670✔
2945
        return l.channel.CommitFeeRate()
670✔
2946
}
670✔
2947

2948
// getDustClosure returns a closure that can be used by the switch or mailbox
2949
// to evaluate whether a given HTLC is dust.
2950
//
2951
// NOTE: Part of the dustHandler interface.
2952
func (l *channelLink) getDustClosure() dustClosure {
1,600✔
2953
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,600✔
2954
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,600✔
2955
        chanType := l.channel.State().ChanType
1,600✔
2956

1,600✔
2957
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,600✔
2958
}
1,600✔
2959

2960
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2961
// is used so that the Switch can have access to the commitment fee without
2962
// needing to have a *LightningChannel. This doesn't include dust.
2963
//
2964
// NOTE: Part of the dustHandler interface.
2965
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,885✔
2966
        if remote {
2,840✔
2967
                return l.channel.State().RemoteCommitment.CommitFee
955✔
2968
        }
955✔
2969

2970
        return l.channel.State().LocalCommitment.CommitFee
931✔
2971
}
2972

2973
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2974
// increases the total dust and fees within the channel past the configured
2975
// fee threshold. It first calculates the dust sum over every update in the
2976
// update log with the proposed fee-rate and taking into account both the local
2977
// and remote dust limits. It uses every update in the update log instead of
2978
// what is actually on the local and remote commitments because it is assumed
2979
// that in a worst-case scenario, every update in the update log could
2980
// theoretically be on either commitment transaction and this needs to be
2981
// accounted for with this fee-rate. It then calculates the local and remote
2982
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2983
// and determines if the fee threshold has been exceeded.
2984
func (l *channelLink) exceedsFeeExposureLimit(
2985
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2986

6✔
2987
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2988

6✔
2989
        // Get the sum of dust for both the local and remote commitments using
6✔
2990
        // this "dry-run" fee.
6✔
2991
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2992
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2993

6✔
2994
        // Calculate the local and remote commitment fees using this dry-run
6✔
2995
        // fee.
6✔
2996
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2997
        if err != nil {
6✔
2998
                return false, err
×
2999
        }
×
3000

3001
        // Finally, check whether the max fee exposure was exceeded on either
3002
        // future commitment transaction with the fee-rate.
3003
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
3004
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
3005
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3006
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
3007
                        totalLocalDust, localFee)
×
3008

×
3009
                return true, nil
×
3010
        }
×
3011

3012
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
3013
                remoteFee,
6✔
3014
        )
6✔
3015

6✔
3016
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
3017
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3018
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
3019
                        totalRemoteDust, remoteFee)
×
3020

×
3021
                return true, nil
×
3022
        }
×
3023

3024
        return false, nil
6✔
3025
}
3026

3027
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
3028
// channel exceed the fee threshold. It first fetches the largest fee-rate that
3029
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3030
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3031
// the overall dust sum. If it is not dust, it contributes to weight, which
3032
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3033
// either commitment exceeds the configured fee threshold, this function
3034
// returns true.
3035
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3036
        incoming bool) bool {
931✔
3037

931✔
3038
        dustClosure := l.getDustClosure()
931✔
3039

931✔
3040
        feeRate := l.channel.WorstCaseFeeRate()
931✔
3041

931✔
3042
        amount := htlc.Amount.ToSatoshis()
931✔
3043

931✔
3044
        // See if this HTLC is dust on both the local and remote commitments.
931✔
3045
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
931✔
3046
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
931✔
3047

931✔
3048
        // Calculate the dust sum for the local and remote commitments.
931✔
3049
        localDustSum := l.getDustSum(
931✔
3050
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
931✔
3051
        )
931✔
3052
        remoteDustSum := l.getDustSum(
931✔
3053
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
931✔
3054
        )
931✔
3055

931✔
3056
        // Grab the larger of the local and remote commitment fees w/o dust.
931✔
3057
        commitFee := l.getCommitFee(false)
931✔
3058

931✔
3059
        if l.getCommitFee(true) > commitFee {
956✔
3060
                commitFee = l.getCommitFee(true)
25✔
3061
        }
25✔
3062

3063
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
931✔
3064

931✔
3065
        localDustSum += commitFeeMSat
931✔
3066
        remoteDustSum += commitFeeMSat
931✔
3067

931✔
3068
        // Calculate the additional fee increase if this is a non-dust HTLC.
931✔
3069
        weight := lntypes.WeightUnit(input.HTLCWeight)
931✔
3070
        additional := lnwire.NewMSatFromSatoshis(
931✔
3071
                feeRate.FeeForWeight(weight),
931✔
3072
        )
931✔
3073

931✔
3074
        if isLocalDust {
1,565✔
3075
                // If this is dust, it doesn't contribute to weight but does
634✔
3076
                // contribute to the overall dust sum.
634✔
3077
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
634✔
3078
        } else {
932✔
3079
                // Account for the fee increase that comes with an increase in
298✔
3080
                // weight.
298✔
3081
                localDustSum += additional
298✔
3082
        }
298✔
3083

3084
        if localDustSum > l.cfg.MaxFeeExposure {
935✔
3085
                // The max fee exposure was exceeded.
4✔
3086
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
3087
                        "overexposed, total local dust: %v (current commit "+
4✔
3088
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
3089

4✔
3090
                return true
4✔
3091
        }
4✔
3092

3093
        if isRemoteDust {
1,558✔
3094
                // If this is dust, it doesn't contribute to weight but does
631✔
3095
                // contribute to the overall dust sum.
631✔
3096
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
631✔
3097
        } else {
928✔
3098
                // Account for the fee increase that comes with an increase in
297✔
3099
                // weight.
297✔
3100
                remoteDustSum += additional
297✔
3101
        }
297✔
3102

3103
        if remoteDustSum > l.cfg.MaxFeeExposure {
927✔
3104
                // The max fee exposure was exceeded.
×
3105
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
3106
                        "overexposed, total remote dust: %v (current commit "+
×
3107
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
3108

×
3109
                return true
×
3110
        }
×
3111

3112
        return false
927✔
3113
}
3114

3115
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3116
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3117
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3118
// whether to evaluate on the local or remote commit, and finally an HTLC
3119
// amount to test.
3120
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3121
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3122

3123
// dustHelper is used to construct the dustClosure.
3124
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3125
        remoteDustLimit btcutil.Amount) dustClosure {
1,800✔
3126

1,800✔
3127
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,800✔
3128
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,733✔
3129

9,933✔
3130
                var dustLimit btcutil.Amount
9,933✔
3131
                if whoseCommit.IsLocal() {
14,900✔
3132
                        dustLimit = localDustLimit
4,967✔
3133
                } else {
9,934✔
3134
                        dustLimit = remoteDustLimit
4,967✔
3135
                }
4,967✔
3136

3137
                return lnwallet.HtlcIsDust(
9,933✔
3138
                        chantype, incoming, whoseCommit, feerate, amt,
9,933✔
3139
                        dustLimit,
9,933✔
3140
                )
9,933✔
3141
        }
3142

3143
        return isDust
1,800✔
3144
}
3145

3146
// zeroConfConfirmed returns whether or not the zero-conf channel has
3147
// confirmed on-chain.
3148
//
3149
// Part of the scidAliasHandler interface.
3150
func (l *channelLink) zeroConfConfirmed() bool {
4✔
3151
        return l.channel.State().ZeroConfConfirmed()
4✔
3152
}
4✔
3153

3154
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3155
// should not be called for non-zero-conf channels.
3156
//
3157
// Part of the scidAliasHandler interface.
3158
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
4✔
3159
        return l.channel.State().ZeroConfRealScid()
4✔
3160
}
4✔
3161

3162
// isZeroConf returns whether or not the underlying channel is a zero-conf
3163
// channel.
3164
//
3165
// Part of the scidAliasHandler interface.
3166
func (l *channelLink) isZeroConf() bool {
214✔
3167
        return l.channel.State().IsZeroConf()
214✔
3168
}
214✔
3169

3170
// negotiatedAliasFeature returns whether or not the underlying channel has
3171
// negotiated the option-scid-alias feature bit. This will be true for both
3172
// option-scid-alias and zero-conf channel-types. It will also be true for
3173
// channels with the feature bit but without the above channel-types.
3174
//
3175
// Part of the scidAliasFeature interface.
3176
func (l *channelLink) negotiatedAliasFeature() bool {
375✔
3177
        return l.channel.State().NegotiatedAliasFeature()
375✔
3178
}
375✔
3179

3180
// getAliases returns the set of aliases for the underlying channel.
3181
//
3182
// Part of the scidAliasHandler interface.
3183
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
220✔
3184
        return l.cfg.GetAliases(l.ShortChanID())
220✔
3185
}
220✔
3186

3187
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3188
//
3189
// Part of the scidAliasHandler interface.
3190
func (l *channelLink) attachFailAliasUpdate(closure func(
3191
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
215✔
3192

215✔
3193
        l.Lock()
215✔
3194
        l.cfg.FailAliasUpdate = closure
215✔
3195
        l.Unlock()
215✔
3196
}
215✔
3197

3198
// AttachMailBox updates the current mailbox used by this link, and hooks up
3199
// the mailbox's message and packet outboxes to the link's upstream and
3200
// downstream chans, respectively.
3201
func (l *channelLink) AttachMailBox(mailbox MailBox) {
214✔
3202
        l.Lock()
214✔
3203
        l.mailBox = mailbox
214✔
3204
        l.upstream = mailbox.MessageOutBox()
214✔
3205
        l.downstream = mailbox.PacketOutBox()
214✔
3206
        l.Unlock()
214✔
3207

214✔
3208
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
214✔
3209
        // never committed.
214✔
3210
        l.mailBox.SetFeeRate(l.getFeeRate())
214✔
3211

214✔
3212
        // Also set the mailbox's dust closure so that it can query whether HTLC's
214✔
3213
        // are dust given the current feerate.
214✔
3214
        l.mailBox.SetDustClosure(l.getDustClosure())
214✔
3215
}
214✔
3216

3217
// UpdateForwardingPolicy updates the forwarding policy for the target
3218
// ChannelLink. Once updated, the link will use the new forwarding policy to
3219
// govern if it an incoming HTLC should be forwarded or not. We assume that
3220
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3221
// update all of the link's FwrdingPolicy's values.
3222
//
3223
// NOTE: Part of the ChannelLink interface.
3224
func (l *channelLink) UpdateForwardingPolicy(
3225
        newPolicy models.ForwardingPolicy) {
13✔
3226

13✔
3227
        l.Lock()
13✔
3228
        defer l.Unlock()
13✔
3229

13✔
3230
        l.cfg.FwrdingPolicy = newPolicy
13✔
3231
}
13✔
3232

3233
// CheckHtlcForward should return a nil error if the passed HTLC details
3234
// satisfy the current forwarding policy fo the target link. Otherwise,
3235
// a LinkError with a valid protocol failure message should be returned
3236
// in order to signal to the source of the HTLC, the policy consistency
3237
// issue.
3238
//
3239
// NOTE: Part of the ChannelLink interface.
3240
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
3241
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
3242
        outgoingTimeout uint32, inboundFee models.InboundFee,
3243
        heightNow uint32, originalScid lnwire.ShortChannelID,
3244
        customRecords lnwire.CustomRecords) *LinkError {
50✔
3245

50✔
3246
        l.RLock()
50✔
3247
        policy := l.cfg.FwrdingPolicy
50✔
3248
        l.RUnlock()
50✔
3249

50✔
3250
        // Using the outgoing HTLC amount, we'll calculate the outgoing
50✔
3251
        // fee this incoming HTLC must carry in order to satisfy the constraints
50✔
3252
        // of the outgoing link.
50✔
3253
        outFee := ExpectedFee(policy, amtToForward)
50✔
3254

50✔
3255
        // Then calculate the inbound fee that we charge based on the sum of
50✔
3256
        // outgoing HTLC amount and outgoing fee.
50✔
3257
        inFee := inboundFee.CalcFee(amtToForward + outFee)
50✔
3258

50✔
3259
        // Add up both fee components. It is important to calculate both fees
50✔
3260
        // separately. An alternative way of calculating is to first determine
50✔
3261
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
50✔
3262
        // rounding may cause the result to be slightly higher than in the case
50✔
3263
        // of separately rounded fee components. This potentially causes failed
50✔
3264
        // forwards for senders and is something to be avoided.
50✔
3265
        expectedFee := inFee + int64(outFee)
50✔
3266

50✔
3267
        // If the actual fee is less than our expected fee, then we'll reject
50✔
3268
        // this HTLC as it didn't provide a sufficient amount of fees, or the
50✔
3269
        // values have been tampered with, or the send used incorrect/dated
50✔
3270
        // information to construct the forwarding information for this hop. In
50✔
3271
        // any case, we'll cancel this HTLC.
50✔
3272
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
50✔
3273
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
57✔
3274
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
7✔
3275
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
7✔
3276
                        "inboundFee=%v",
7✔
3277
                        payHash[:], expectedFee, actualFee,
7✔
3278
                        incomingHtlcAmt, amtToForward, inboundFee,
7✔
3279
                )
7✔
3280

7✔
3281
                // As part of the returned error, we'll send our latest routing
7✔
3282
                // policy so the sending node obtains the most up to date data.
7✔
3283
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
14✔
3284
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
7✔
3285
                }
7✔
3286
                failure := l.createFailureWithUpdate(false, originalScid, cb)
7✔
3287
                return NewLinkError(failure)
7✔
3288
        }
3289

3290
        // Check whether the outgoing htlc satisfies the channel policy.
3291
        err := l.canSendHtlc(
44✔
3292
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
44✔
3293
                originalScid, customRecords,
44✔
3294
        )
44✔
3295
        if err != nil {
58✔
3296
                return err
14✔
3297
        }
14✔
3298

3299
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3300
        // the following constraint: the incoming time-lock minus our time-lock
3301
        // delta should equal the outgoing time lock. Otherwise, whether the
3302
        // sender messed up, or an intermediate node tampered with the HTLC.
3303
        timeDelta := policy.TimeLockDelta
31✔
3304
        if incomingTimeout < outgoingTimeout+timeDelta {
33✔
3305
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3306
                        "expected at least %v block delta, got %v block delta",
2✔
3307
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3308

2✔
3309
                // Grab the latest routing policy so the sending node is up to
2✔
3310
                // date with our current policy.
2✔
3311
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3312
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3313
                                incomingTimeout, *upd,
2✔
3314
                        )
2✔
3315
                }
2✔
3316
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3317
                return NewLinkError(failure)
2✔
3318
        }
3319

3320
        return nil
29✔
3321
}
3322

3323
// CheckHtlcTransit should return a nil error if the passed HTLC details
3324
// satisfy the current channel policy.  Otherwise, a LinkError with a
3325
// valid protocol failure message should be returned in order to signal
3326
// the violation. This call is intended to be used for locally initiated
3327
// payments for which there is no corresponding incoming htlc.
3328
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3329
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
3330
        customRecords lnwire.CustomRecords) *LinkError {
407✔
3331

407✔
3332
        l.RLock()
407✔
3333
        policy := l.cfg.FwrdingPolicy
407✔
3334
        l.RUnlock()
407✔
3335

407✔
3336
        // We pass in hop.Source here as this is only used in the Switch when
407✔
3337
        // trying to send over a local link. This causes the fallback mechanism
407✔
3338
        // to occur.
407✔
3339
        return l.canSendHtlc(
407✔
3340
                policy, payHash, amt, timeout, heightNow, hop.Source,
407✔
3341
                customRecords,
407✔
3342
        )
407✔
3343
}
407✔
3344

3345
// canSendHtlc checks whether the given htlc parameters satisfy
3346
// the channel's amount and time lock constraints.
3347
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3348
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3349
        heightNow uint32, originalScid lnwire.ShortChannelID,
3350
        customRecords lnwire.CustomRecords) *LinkError {
450✔
3351

450✔
3352
        // As our first sanity check, we'll ensure that the passed HTLC isn't
450✔
3353
        // too small for the next hop. If so, then we'll cancel the HTLC
450✔
3354
        // directly.
450✔
3355
        if amt < policy.MinHTLCOut {
459✔
3356
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
9✔
3357
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
9✔
3358
                        amt)
9✔
3359

9✔
3360
                // As part of the returned error, we'll send our latest routing
9✔
3361
                // policy so the sending node obtains the most up to date data.
9✔
3362
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
18✔
3363
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
9✔
3364
                }
9✔
3365
                failure := l.createFailureWithUpdate(false, originalScid, cb)
9✔
3366
                return NewLinkError(failure)
9✔
3367
        }
3368

3369
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3370
        // cancel the HTLC directly.
3371
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
446✔
3372
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
4✔
3373
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
4✔
3374

4✔
3375
                // As part of the returned error, we'll send our latest routing
4✔
3376
                // policy so the sending node obtains the most up-to-date data.
4✔
3377
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
8✔
3378
                        return lnwire.NewTemporaryChannelFailure(upd)
4✔
3379
                }
4✔
3380
                failure := l.createFailureWithUpdate(false, originalScid, cb)
4✔
3381
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
4✔
3382
        }
3383

3384
        // We want to avoid offering an HTLC which will expire in the near
3385
        // future, so we'll reject an HTLC if the outgoing expiration time is
3386
        // too close to the current height.
3387
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
441✔
3388
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3389
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3390
                        timeout, heightNow)
2✔
3391

2✔
3392
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3393
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3394
                }
2✔
3395
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3396
                return NewLinkError(failure)
2✔
3397
        }
3398

3399
        // Check absolute max delta.
3400
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
438✔
3401
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3402
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3403
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3404

1✔
3405
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3406
        }
1✔
3407

3408
        // We now check the available bandwidth to see if this HTLC can be
3409
        // forwarded.
3410
        availableBandwidth := l.Bandwidth()
436✔
3411
        auxBandwidth, err := fn.MapOptionZ(
436✔
3412
                l.cfg.AuxTrafficShaper,
436✔
3413
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
436✔
3414
                        var htlcBlob fn.Option[tlv.Blob]
×
3415
                        blob, err := customRecords.Serialize()
×
3416
                        if err != nil {
×
3417
                                return fn.Err[OptionalBandwidth](
×
3418
                                        fmt.Errorf("unable to serialize "+
×
3419
                                                "custom records: %w", err))
×
3420
                        }
×
3421

3422
                        if len(blob) > 0 {
×
3423
                                htlcBlob = fn.Some(blob)
×
3424
                        }
×
3425

3426
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
3427
                },
3428
        ).Unpack()
3429
        if err != nil {
436✔
3430
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
3431
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
3432
        }
×
3433

3434
        auxBandwidth.WhenSome(func(bandwidth lnwire.MilliSatoshi) {
436✔
3435
                availableBandwidth = bandwidth
×
3436
        })
×
3437

3438
        // Check to see if there is enough balance in this channel.
3439
        if amt > availableBandwidth {
438✔
3440
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
2✔
3441
                        "larger than %v", amt, l.Bandwidth())
2✔
3442
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3443
                        return lnwire.NewTemporaryChannelFailure(upd)
2✔
3444
                }
2✔
3445
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3446
                return NewDetailedLinkError(
2✔
3447
                        failure, OutgoingFailureInsufficientBalance,
2✔
3448
                )
2✔
3449
        }
3450

3451
        return nil
435✔
3452
}
3453

3454
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
3455
// in milli-satoshi. This might be different from the regular BTC bandwidth for
3456
// custom channels. This will always return fn.None() for a regular (non-custom)
3457
// channel.
3458
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
3459
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
3460
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
3461

×
3462
        unknownBandwidth := fn.None[lnwire.MilliSatoshi]()
×
3463

×
3464
        fundingBlob := l.FundingCustomBlob()
×
3465
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob)
×
3466
        if err != nil {
×
3467
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
3468
                        "failed to decide whether to handle traffic: %w", err))
×
3469
        }
×
3470

3471
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
3472
                "traffic: %v", cid, shouldHandle)
×
3473

×
3474
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
3475
        // early.
×
3476
        if !shouldHandle {
×
3477
                return fn.Ok(unknownBandwidth)
×
3478
        }
×
3479

3480
        // Ask for a specific bandwidth to be used for the channel.
3481
        commitmentBlob := l.CommitmentCustomBlob()
×
3482
        auxBandwidth, err := ts.PaymentBandwidth(
×
3483
                htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
3484
        )
×
3485
        if err != nil {
×
3486
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
3487
                        "bandwidth from external traffic shaper: %w", err))
×
3488
        }
×
3489

3490
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
3491
                "bandwidth: %v", cid, auxBandwidth)
×
3492

×
3493
        return fn.Ok(fn.Some(auxBandwidth))
×
3494
}
3495

3496
// Stats returns the statistics of channel link.
3497
//
3498
// NOTE: Part of the ChannelLink interface.
3499
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
5✔
3500
        snapshot := l.channel.StateSnapshot()
5✔
3501

5✔
3502
        return snapshot.ChannelCommitment.CommitHeight,
5✔
3503
                snapshot.TotalMSatSent,
5✔
3504
                snapshot.TotalMSatReceived
5✔
3505
}
5✔
3506

3507
// String returns the string representation of channel link.
3508
//
3509
// NOTE: Part of the ChannelLink interface.
3510
func (l *channelLink) String() string {
×
3511
        return l.channel.ChannelPoint().String()
×
3512
}
×
3513

3514
// handleSwitchPacket handles the switch packets. This packets which might be
3515
// forwarded to us from another channel link in case the htlc update came from
3516
// another peer or if the update was created by user
3517
//
3518
// NOTE: Part of the packetHandler interface.
3519
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
480✔
3520
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
480✔
3521
                pkt.inKey(), pkt.outKey())
480✔
3522

480✔
3523
        return l.mailBox.AddPacket(pkt)
480✔
3524
}
480✔
3525

3526
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3527
// to us from remote peer we have a channel with.
3528
//
3529
// NOTE: Part of the ChannelLink interface.
3530
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,348✔
3531
        select {
3,348✔
3532
        case <-l.Quit:
×
3533
                // Return early if the link is already in the process of
×
3534
                // quitting. It doesn't make sense to hand the message to the
×
3535
                // mailbox here.
×
3536
                return
×
3537
        default:
3,348✔
3538
        }
3539

3540
        err := l.mailBox.AddMessage(message)
3,348✔
3541
        if err != nil {
3,348✔
3542
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3543
        }
×
3544
}
3545

3546
// updateChannelFee updates the commitment fee-per-kw on this channel by
3547
// committing to an update_fee message.
3548
func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
3✔
3549
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3550

3✔
3551
        // We skip sending the UpdateFee message if the channel is not
3✔
3552
        // currently eligible to forward messages.
3✔
3553
        if !l.eligibleToUpdate() {
3✔
3554
                l.log.Debugf("skipping fee update for inactive channel")
×
3555
                return nil
×
3556
        }
×
3557

3558
        // Check and see if our proposed fee-rate would make us exceed the fee
3559
        // threshold.
3560
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3561
        if err != nil {
3✔
3562
                // This shouldn't typically happen. If it does, it indicates
×
3563
                // something is wrong with our channel state.
×
3564
                return err
×
3565
        }
×
3566

3567
        if thresholdExceeded {
3✔
3568
                return fmt.Errorf("link fee threshold exceeded")
×
3569
        }
×
3570

3571
        // First, we'll update the local fee on our commitment.
3572
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3573
                return err
×
3574
        }
×
3575

3576
        // The fee passed the channel's validation checks, so we update the
3577
        // mailbox feerate.
3578
        l.mailBox.SetFeeRate(feePerKw)
3✔
3579

3✔
3580
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3581
        // in immediately by triggering a commitment update.
3✔
3582
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3583
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3584
                return err
×
3585
        }
×
3586
        return l.updateCommitTx()
3✔
3587
}
3588

3589
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3590
// after receiving a revocation from the remote party, and reprocesses them in
3591
// the context of the provided forwarding package. Any settles or fails that
3592
// have already been acknowledged in the forwarding package will not be sent to
3593
// the switch.
3594
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,179✔
3595
        if len(fwdPkg.SettleFails) == 0 {
2,044✔
3596
                return
865✔
3597
        }
865✔
3598

3599
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
315✔
3600

315✔
3601
        var switchPackets []*htlcPacket
315✔
3602
        for i, update := range fwdPkg.SettleFails {
630✔
3603
                destRef := fwdPkg.DestRef(uint16(i))
315✔
3604

315✔
3605
                // Skip any settles or fails that have already been
315✔
3606
                // acknowledged by the incoming link that originated the
315✔
3607
                // forwarded Add.
315✔
3608
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
315✔
3609
                        continue
×
3610
                }
3611

3612
                // TODO(roasbeef): rework log entries to a shared
3613
                // interface.
3614

3615
                switch msg := update.UpdateMsg.(type) {
315✔
3616
                // A settle for an HTLC we previously forwarded HTLC has been
3617
                // received. So we'll forward the HTLC to the switch which will
3618
                // handle propagating the settle to the prior hop.
3619
                case *lnwire.UpdateFulfillHTLC:
192✔
3620
                        // If hodl.SettleIncoming is requested, we will not
192✔
3621
                        // forward the SETTLE to the switch and will not signal
192✔
3622
                        // a free slot on the commitment transaction.
192✔
3623
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
192✔
3624
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3625
                                continue
×
3626
                        }
3627

3628
                        settlePacket := &htlcPacket{
192✔
3629
                                outgoingChanID: l.ShortChanID(),
192✔
3630
                                outgoingHTLCID: msg.ID,
192✔
3631
                                destRef:        &destRef,
192✔
3632
                                htlc:           msg,
192✔
3633
                        }
192✔
3634

192✔
3635
                        // Add the packet to the batch to be forwarded, and
192✔
3636
                        // notify the overflow queue that a spare spot has been
192✔
3637
                        // freed up within the commitment state.
192✔
3638
                        switchPackets = append(switchPackets, settlePacket)
192✔
3639

3640
                // A failureCode message for a previously forwarded HTLC has
3641
                // been received. As a result a new slot will be freed up in
3642
                // our commitment state, so we'll forward this to the switch so
3643
                // the backwards undo can continue.
3644
                case *lnwire.UpdateFailHTLC:
124✔
3645
                        // If hodl.SettleIncoming is requested, we will not
124✔
3646
                        // forward the FAIL to the switch and will not signal a
124✔
3647
                        // free slot on the commitment transaction.
124✔
3648
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
124✔
3649
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3650
                                continue
×
3651
                        }
3652

3653
                        // Fetch the reason the HTLC was canceled so we can
3654
                        // continue to propagate it. This failure originated
3655
                        // from another node, so the linkFailure field is not
3656
                        // set on the packet.
3657
                        failPacket := &htlcPacket{
124✔
3658
                                outgoingChanID: l.ShortChanID(),
124✔
3659
                                outgoingHTLCID: msg.ID,
124✔
3660
                                destRef:        &destRef,
124✔
3661
                                htlc:           msg,
124✔
3662
                        }
124✔
3663

124✔
3664
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
124✔
3665

124✔
3666
                        // If the failure message lacks an HMAC (but includes
124✔
3667
                        // the 4 bytes for encoding the message and padding
124✔
3668
                        // lengths, then this means that we received it as an
124✔
3669
                        // UpdateFailMalformedHTLC. As a result, we'll signal
124✔
3670
                        // that we need to convert this error within the switch
124✔
3671
                        // to an actual error, by encrypting it as if we were
124✔
3672
                        // the originating hop.
124✔
3673
                        convertedErrorSize := lnwire.FailureMessageLength + 4
124✔
3674
                        if len(msg.Reason) == convertedErrorSize {
128✔
3675
                                failPacket.convertedError = true
4✔
3676
                        }
4✔
3677

3678
                        // Add the packet to the batch to be forwarded, and
3679
                        // notify the overflow queue that a spare spot has been
3680
                        // freed up within the commitment state.
3681
                        switchPackets = append(switchPackets, failPacket)
124✔
3682
                }
3683
        }
3684

3685
        // Only spawn the task forward packets we have a non-zero number.
3686
        if len(switchPackets) > 0 {
630✔
3687
                go l.forwardBatch(false, switchPackets...)
315✔
3688
        }
315✔
3689
}
3690

3691
// processRemoteAdds serially processes each of the Add payment descriptors
3692
// which have been "locked-in" by receiving a revocation from the remote party.
3693
// The forwarding package provided instructs how to process this batch,
3694
// indicating whether this is the first time these Adds are being processed, or
3695
// whether we are reprocessing as a result of a failure or restart. Adds that
3696
// have already been acknowledged in the forwarding package will be ignored.
3697
//
3698
//nolint:funlen
3699
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,181✔
3700
        l.log.Tracef("processing %d remote adds for height %d",
1,181✔
3701
                len(fwdPkg.Adds), fwdPkg.Height)
1,181✔
3702

1,181✔
3703
        decodeReqs := make(
1,181✔
3704
                []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds),
1,181✔
3705
        )
1,181✔
3706
        for _, update := range fwdPkg.Adds {
1,631✔
3707
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
900✔
3708
                        // Before adding the new htlc to the state machine,
450✔
3709
                        // parse the onion object in order to obtain the
450✔
3710
                        // routing information with DecodeHopIterator function
450✔
3711
                        // which process the Sphinx packet.
450✔
3712
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
450✔
3713

450✔
3714
                        req := hop.DecodeHopIteratorRequest{
450✔
3715
                                OnionReader:    onionReader,
450✔
3716
                                RHash:          msg.PaymentHash[:],
450✔
3717
                                IncomingCltv:   msg.Expiry,
450✔
3718
                                IncomingAmount: msg.Amount,
450✔
3719
                                BlindingPoint:  msg.BlindingPoint,
450✔
3720
                        }
450✔
3721

450✔
3722
                        decodeReqs = append(decodeReqs, req)
450✔
3723
                }
450✔
3724
        }
3725

3726
        // Atomically decode the incoming htlcs, simultaneously checking for
3727
        // replay attempts. A particular index in the returned, spare list of
3728
        // channel iterators should only be used if the failure code at the
3729
        // same index is lnwire.FailCodeNone.
3730
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,181✔
3731
                fwdPkg.ID(), decodeReqs,
1,181✔
3732
        )
1,181✔
3733
        if sphinxErr != nil {
1,181✔
3734
                l.failf(LinkFailureError{code: ErrInternalError},
×
3735
                        "unable to decode hop iterators: %v", sphinxErr)
×
3736
                return
×
3737
        }
×
3738

3739
        var switchPackets []*htlcPacket
1,181✔
3740

1,181✔
3741
        for i, update := range fwdPkg.Adds {
1,631✔
3742
                idx := uint16(i)
450✔
3743

450✔
3744
                //nolint:forcetypeassert
450✔
3745
                add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC)
450✔
3746
                sourceRef := fwdPkg.SourceRef(idx)
450✔
3747

450✔
3748
                if fwdPkg.State == channeldb.FwdStateProcessed &&
450✔
3749
                        fwdPkg.AckFilter.Contains(idx) {
450✔
3750

×
3751
                        // If this index is already found in the ack filter,
×
3752
                        // the response to this forwarding decision has already
×
3753
                        // been committed by one of our commitment txns. ADDs
×
3754
                        // in this state are waiting for the rest of the fwding
×
3755
                        // package to get acked before being garbage collected.
×
3756
                        continue
×
3757
                }
3758

3759
                // An incoming HTLC add has been full-locked in. As a result we
3760
                // can now examine the forwarding details of the HTLC, and the
3761
                // HTLC itself to decide if: we should forward it, cancel it,
3762
                // or are able to settle it (and it adheres to our fee related
3763
                // constraints).
3764

3765
                // Before adding the new htlc to the state machine, parse the
3766
                // onion object in order to obtain the routing information with
3767
                // DecodeHopIterator function which process the Sphinx packet.
3768
                chanIterator, failureCode := decodeResps[i].Result()
450✔
3769
                if failureCode != lnwire.CodeNone {
453✔
3770
                        // If we're unable to process the onion blob then we
3✔
3771
                        // should send the malformed htlc error to payment
3✔
3772
                        // sender.
3✔
3773
                        l.sendMalformedHTLCError(
3✔
3774
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
3✔
3775
                        )
3✔
3776

3✔
3777
                        l.log.Errorf("unable to decode onion hop "+
3✔
3778
                                "iterator: %v", failureCode)
3✔
3779
                        continue
3✔
3780
                }
3781

3782
                heightNow := l.cfg.BestHeight()
448✔
3783

448✔
3784
                pld, routeRole, pldErr := chanIterator.HopPayload()
448✔
3785
                if pldErr != nil {
449✔
3786
                        // If we're unable to process the onion payload, or we
1✔
3787
                        // received invalid onion payload failure, then we
1✔
3788
                        // should send an error back to the caller so the HTLC
1✔
3789
                        // can be canceled.
1✔
3790
                        var failedType uint64
1✔
3791

1✔
3792
                        // We need to get the underlying error value, so we
1✔
3793
                        // can't use errors.As as suggested by the linter.
1✔
3794
                        //nolint:errorlint
1✔
3795
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
1✔
3796
                                failedType = uint64(e.Type)
×
3797
                        }
×
3798

3799
                        // If we couldn't parse the payload, make our best
3800
                        // effort at creating an error encrypter that knows
3801
                        // what blinding type we were, but if we couldn't
3802
                        // parse the payload we have no way of knowing whether
3803
                        // we were the introduction node or not.
3804
                        //
3805
                        //nolint:ll
3806
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
1✔
3807
                                l.cfg.ExtractErrorEncrypter,
1✔
3808
                                // We need our route role here because we
1✔
3809
                                // couldn't parse or validate the payload.
1✔
3810
                                routeRole == hop.RouteRoleIntroduction,
1✔
3811
                        )
1✔
3812
                        if failCode != lnwire.CodeNone {
1✔
3813
                                l.log.Errorf("could not extract error "+
×
3814
                                        "encrypter: %v", pldErr)
×
3815

×
3816
                                // We can't process this htlc, send back
×
3817
                                // malformed.
×
3818
                                l.sendMalformedHTLCError(
×
3819
                                        add.ID, failureCode, add.OnionBlob,
×
3820
                                        &sourceRef,
×
3821
                                )
×
3822

×
3823
                                continue
×
3824
                        }
3825

3826
                        // TODO: currently none of the test unit infrastructure
3827
                        // is setup to handle TLV payloads, so testing this
3828
                        // would require implementing a separate mock iterator
3829
                        // for TLV payloads that also supports injecting invalid
3830
                        // payloads. Deferring this non-trival effort till a
3831
                        // later date
3832
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
1✔
3833

1✔
3834
                        l.sendHTLCError(
1✔
3835
                                add, sourceRef, NewLinkError(failure),
1✔
3836
                                obfuscator, false,
1✔
3837
                        )
1✔
3838

1✔
3839
                        l.log.Errorf("unable to decode forwarding "+
1✔
3840
                                "instructions: %v", pldErr)
1✔
3841

1✔
3842
                        continue
1✔
3843
                }
3844

3845
                // Retrieve onion obfuscator from onion blob in order to
3846
                // produce initial obfuscation of the onion failureCode.
3847
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
448✔
3848
                        l.cfg.ExtractErrorEncrypter,
448✔
3849
                        routeRole == hop.RouteRoleIntroduction,
448✔
3850
                )
448✔
3851
                if failureCode != lnwire.CodeNone {
449✔
3852
                        // If we're unable to process the onion blob than we
1✔
3853
                        // should send the malformed htlc error to payment
1✔
3854
                        // sender.
1✔
3855
                        l.sendMalformedHTLCError(
1✔
3856
                                add.ID, failureCode, add.OnionBlob,
1✔
3857
                                &sourceRef,
1✔
3858
                        )
1✔
3859

1✔
3860
                        l.log.Errorf("unable to decode onion "+
1✔
3861
                                "obfuscator: %v", failureCode)
1✔
3862

1✔
3863
                        continue
1✔
3864
                }
3865

3866
                fwdInfo := pld.ForwardingInfo()
447✔
3867

447✔
3868
                // Check whether the payload we've just processed uses our
447✔
3869
                // node as the introduction point (gave us a blinding key in
447✔
3870
                // the payload itself) and fail it back if we don't support
447✔
3871
                // route blinding.
447✔
3872
                if fwdInfo.NextBlinding.IsSome() &&
447✔
3873
                        l.cfg.DisallowRouteBlinding {
448✔
3874

1✔
3875
                        failure := lnwire.NewInvalidBlinding(
1✔
3876
                                fn.Some(add.OnionBlob),
1✔
3877
                        )
1✔
3878

1✔
3879
                        l.sendHTLCError(
1✔
3880
                                add, sourceRef, NewLinkError(failure),
1✔
3881
                                obfuscator, false,
1✔
3882
                        )
1✔
3883

1✔
3884
                        l.log.Error("rejected htlc that uses use as an " +
1✔
3885
                                "introduction point when we do not support " +
1✔
3886
                                "route blinding")
1✔
3887

1✔
3888
                        continue
1✔
3889
                }
3890

3891
                switch fwdInfo.NextHop {
447✔
3892
                case hop.Exit:
411✔
3893
                        err := l.processExitHop(
411✔
3894
                                add, sourceRef, obfuscator, fwdInfo,
411✔
3895
                                heightNow, pld,
411✔
3896
                        )
411✔
3897
                        if err != nil {
412✔
3898
                                l.failf(LinkFailureError{
1✔
3899
                                        code: ErrInternalError,
1✔
3900
                                }, err.Error()) //nolint
1✔
3901

1✔
3902
                                return
1✔
3903
                        }
1✔
3904

3905
                // There are additional channels left within this route. So
3906
                // we'll simply do some forwarding package book-keeping.
3907
                default:
37✔
3908
                        // If hodl.AddIncoming is requested, we will not
37✔
3909
                        // validate the forwarded ADD, nor will we send the
37✔
3910
                        // packet to the htlc switch.
37✔
3911
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
37✔
3912
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3913
                                continue
×
3914
                        }
3915

3916
                        endorseValue := l.experimentalEndorsement(
37✔
3917
                                record.CustomSet(add.CustomRecords),
37✔
3918
                        )
37✔
3919
                        endorseType := uint64(
37✔
3920
                                lnwire.ExperimentalEndorsementType,
37✔
3921
                        )
37✔
3922

37✔
3923
                        switch fwdPkg.State {
37✔
3924
                        case channeldb.FwdStateProcessed:
1✔
3925
                                // This add was not forwarded on the previous
1✔
3926
                                // processing phase, run it through our
1✔
3927
                                // validation pipeline to reproduce an error.
1✔
3928
                                // This may trigger a different error due to
1✔
3929
                                // expiring timelocks, but we expect that an
1✔
3930
                                // error will be reproduced.
1✔
3931
                                if !fwdPkg.FwdFilter.Contains(idx) {
1✔
3932
                                        break
×
3933
                                }
3934

3935
                                // Otherwise, it was already processed, we can
3936
                                // can collect it and continue.
3937
                                outgoingAdd := &lnwire.UpdateAddHTLC{
1✔
3938
                                        Expiry:        fwdInfo.OutgoingCTLV,
1✔
3939
                                        Amount:        fwdInfo.AmountToForward,
1✔
3940
                                        PaymentHash:   add.PaymentHash,
1✔
3941
                                        BlindingPoint: fwdInfo.NextBlinding,
1✔
3942
                                }
1✔
3943

1✔
3944
                                endorseValue.WhenSome(func(e byte) {
2✔
3945
                                        custRecords := map[uint64][]byte{
1✔
3946
                                                endorseType: {e},
1✔
3947
                                        }
1✔
3948

1✔
3949
                                        outgoingAdd.CustomRecords = custRecords
1✔
3950
                                })
1✔
3951

3952
                                // Finally, we'll encode the onion packet for
3953
                                // the _next_ hop using the hop iterator
3954
                                // decoded for the current hop.
3955
                                buf := bytes.NewBuffer(
1✔
3956
                                        outgoingAdd.OnionBlob[0:0],
1✔
3957
                                )
1✔
3958

1✔
3959
                                // We know this cannot fail, as this ADD
1✔
3960
                                // was marked forwarded in a previous
1✔
3961
                                // round of processing.
1✔
3962
                                chanIterator.EncodeNextHop(buf)
1✔
3963

1✔
3964
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
1✔
3965

1✔
3966
                                //nolint:ll
1✔
3967
                                updatePacket := &htlcPacket{
1✔
3968
                                        incomingChanID:       l.ShortChanID(),
1✔
3969
                                        incomingHTLCID:       add.ID,
1✔
3970
                                        outgoingChanID:       fwdInfo.NextHop,
1✔
3971
                                        sourceRef:            &sourceRef,
1✔
3972
                                        incomingAmount:       add.Amount,
1✔
3973
                                        amount:               outgoingAdd.Amount,
1✔
3974
                                        htlc:                 outgoingAdd,
1✔
3975
                                        obfuscator:           obfuscator,
1✔
3976
                                        incomingTimeout:      add.Expiry,
1✔
3977
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
1✔
3978
                                        inOnionCustomRecords: pld.CustomRecords(),
1✔
3979
                                        inboundFee:           inboundFee,
1✔
3980
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
1✔
3981
                                }
1✔
3982
                                switchPackets = append(
1✔
3983
                                        switchPackets, updatePacket,
1✔
3984
                                )
1✔
3985

1✔
3986
                                continue
1✔
3987
                        }
3988

3989
                        // TODO(roasbeef): ensure don't accept outrageous
3990
                        // timeout for htlc
3991

3992
                        // With all our forwarding constraints met, we'll
3993
                        // create the outgoing HTLC using the parameters as
3994
                        // specified in the forwarding info.
3995
                        addMsg := &lnwire.UpdateAddHTLC{
37✔
3996
                                Expiry:        fwdInfo.OutgoingCTLV,
37✔
3997
                                Amount:        fwdInfo.AmountToForward,
37✔
3998
                                PaymentHash:   add.PaymentHash,
37✔
3999
                                BlindingPoint: fwdInfo.NextBlinding,
37✔
4000
                        }
37✔
4001

37✔
4002
                        endorseValue.WhenSome(func(e byte) {
74✔
4003
                                addMsg.CustomRecords = map[uint64][]byte{
37✔
4004
                                        endorseType: {e},
37✔
4005
                                }
37✔
4006
                        })
37✔
4007

4008
                        // Finally, we'll encode the onion packet for the
4009
                        // _next_ hop using the hop iterator decoded for the
4010
                        // current hop.
4011
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
37✔
4012
                        err := chanIterator.EncodeNextHop(buf)
37✔
4013
                        if err != nil {
37✔
4014
                                l.log.Errorf("unable to encode the "+
×
4015
                                        "remaining route %v", err)
×
4016

×
4017
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
4018
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
4019
                                }
×
4020

4021
                                failure := l.createFailureWithUpdate(
×
4022
                                        true, hop.Source, cb,
×
4023
                                )
×
4024

×
4025
                                l.sendHTLCError(
×
4026
                                        add, sourceRef, NewLinkError(failure),
×
4027
                                        obfuscator, false,
×
4028
                                )
×
4029
                                continue
×
4030
                        }
4031

4032
                        // Now that this add has been reprocessed, only append
4033
                        // it to our list of packets to forward to the switch
4034
                        // this is the first time processing the add. If the
4035
                        // fwd pkg has already been processed, then we entered
4036
                        // the above section to recreate a previous error.  If
4037
                        // the packet had previously been forwarded, it would
4038
                        // have been added to switchPackets at the top of this
4039
                        // section.
4040
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
74✔
4041
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
37✔
4042

37✔
4043
                                //nolint:ll
37✔
4044
                                updatePacket := &htlcPacket{
37✔
4045
                                        incomingChanID:       l.ShortChanID(),
37✔
4046
                                        incomingHTLCID:       add.ID,
37✔
4047
                                        outgoingChanID:       fwdInfo.NextHop,
37✔
4048
                                        sourceRef:            &sourceRef,
37✔
4049
                                        incomingAmount:       add.Amount,
37✔
4050
                                        amount:               addMsg.Amount,
37✔
4051
                                        htlc:                 addMsg,
37✔
4052
                                        obfuscator:           obfuscator,
37✔
4053
                                        incomingTimeout:      add.Expiry,
37✔
4054
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
37✔
4055
                                        inOnionCustomRecords: pld.CustomRecords(),
37✔
4056
                                        inboundFee:           inboundFee,
37✔
4057
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
37✔
4058
                                }
37✔
4059

37✔
4060
                                fwdPkg.FwdFilter.Set(idx)
37✔
4061
                                switchPackets = append(switchPackets,
37✔
4062
                                        updatePacket)
37✔
4063
                        }
37✔
4064
                }
4065
        }
4066

4067
        // Commit the htlcs we are intending to forward if this package has not
4068
        // been fully processed.
4069
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,359✔
4070
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,178✔
4071
                if err != nil {
1,178✔
4072
                        l.failf(LinkFailureError{code: ErrInternalError},
×
4073
                                "unable to set fwd filter: %v", err)
×
4074
                        return
×
4075
                }
×
4076
        }
4077

4078
        if len(switchPackets) == 0 {
2,326✔
4079
                return
1,145✔
4080
        }
1,145✔
4081

4082
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
37✔
4083

37✔
4084
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
37✔
4085
                len(switchPackets), replay)
37✔
4086

37✔
4087
        // NOTE: This call is made synchronous so that we ensure all circuits
37✔
4088
        // are committed in the exact order that they are processed in the link.
37✔
4089
        // Failing to do this could cause reorderings/gaps in the range of
37✔
4090
        // opened circuits, which violates assumptions made by the circuit
37✔
4091
        // trimming.
37✔
4092
        l.forwardBatch(replay, switchPackets...)
37✔
4093
}
4094

4095
// experimentalEndorsement returns the value to set for our outgoing
4096
// experimental endorsement field, and a boolean indicating whether it should
4097
// be populated on the outgoing htlc.
4098
func (l *channelLink) experimentalEndorsement(
4099
        customUpdateAdd record.CustomSet) fn.Option[byte] {
37✔
4100

37✔
4101
        // Only relay experimental signal if we are within the experiment
37✔
4102
        // period.
37✔
4103
        if !l.cfg.ShouldFwdExpEndorsement() {
38✔
4104
                return fn.None[byte]()
1✔
4105
        }
1✔
4106

4107
        // If we don't have any custom records or the experimental field is
4108
        // not set, just forward a zero value.
4109
        if len(customUpdateAdd) == 0 {
74✔
4110
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
37✔
4111
        }
37✔
4112

4113
        t := uint64(lnwire.ExperimentalEndorsementType)
1✔
4114
        value, set := customUpdateAdd[t]
1✔
4115
        if !set {
1✔
4116
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4117
        }
×
4118

4119
        // We expect at least one byte for this field, consider it invalid if
4120
        // it has no data and just forward a zero value.
4121
        if len(value) == 0 {
1✔
4122
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4123
        }
×
4124

4125
        // Only forward endorsed if the incoming link is endorsed.
4126
        if value[0] == lnwire.ExperimentalEndorsed {
2✔
4127
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
1✔
4128
        }
1✔
4129

4130
        // Forward as unendorsed otherwise, including cases where we've
4131
        // received an invalid value that uses more than 3 bits of information.
4132
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
1✔
4133
}
4134

4135
// processExitHop handles an htlc for which this link is the exit hop. It
4136
// returns a boolean indicating whether the commitment tx needs an update.
4137
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
4138
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
4139
        fwdInfo hop.ForwardingInfo, heightNow uint32,
4140
        payload invoices.Payload) error {
411✔
4141

411✔
4142
        // If hodl.ExitSettle is requested, we will not validate the final hop's
411✔
4143
        // ADD, nor will we settle the corresponding invoice or respond with the
411✔
4144
        // preimage.
411✔
4145
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
519✔
4146
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
108✔
4147
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
108✔
4148

108✔
4149
                return nil
108✔
4150
        }
108✔
4151

4152
        // As we're the exit hop, we'll double check the hop-payload included in
4153
        // the HTLC to ensure that it was crafted correctly by the sender and
4154
        // is compatible with the HTLC we were extended.
4155
        //
4156
        // For a special case, if the fwdInfo doesn't have any blinded path
4157
        // information, and the incoming HTLC had special extra data, then
4158
        // we'll skip this amount check. The invoice acceptor will make sure we
4159
        // reject the HTLC if it's not containing the correct amount after
4160
        // examining the custom data.
4161
        hasBlindedPath := fwdInfo.NextBlinding.IsSome()
304✔
4162
        customHTLC := len(add.CustomRecords) > 0 && !hasBlindedPath
304✔
4163
        log.Tracef("Exit hop has_blinded_path=%v custom_htlc_bypass=%v",
304✔
4164
                hasBlindedPath, customHTLC)
304✔
4165

304✔
4166
        if !customHTLC && add.Amount < fwdInfo.AmountToForward {
404✔
4167
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
4168
                        "incompatible value: expected <=%v, got %v",
100✔
4169
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
4170

100✔
4171
                failure := NewLinkError(
100✔
4172
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
4173
                )
100✔
4174
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
4175

100✔
4176
                return nil
100✔
4177
        }
100✔
4178

4179
        // We'll also ensure that our time-lock value has been computed
4180
        // correctly.
4181
        if add.Expiry < fwdInfo.OutgoingCTLV {
205✔
4182
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
4183
                        "incompatible time-lock: expected <=%v, got %v",
1✔
4184
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
4185

1✔
4186
                failure := NewLinkError(
1✔
4187
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
4188
                )
1✔
4189

1✔
4190
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
4191

1✔
4192
                return nil
1✔
4193
        }
1✔
4194

4195
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4196
        // after this, this code will be re-executed after restart. We will
4197
        // receive back a resolution event.
4198
        invoiceHash := lntypes.Hash(add.PaymentHash)
203✔
4199

203✔
4200
        circuitKey := models.CircuitKey{
203✔
4201
                ChanID: l.ShortChanID(),
203✔
4202
                HtlcID: add.ID,
203✔
4203
        }
203✔
4204

203✔
4205
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
203✔
4206
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
203✔
4207
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
203✔
4208
        )
203✔
4209
        if err != nil {
204✔
4210
                return err
1✔
4211
        }
1✔
4212

4213
        // Create a hodlHtlc struct and decide either resolved now or later.
4214
        htlc := hodlHtlc{
203✔
4215
                add:        add,
203✔
4216
                sourceRef:  sourceRef,
203✔
4217
                obfuscator: obfuscator,
203✔
4218
        }
203✔
4219

203✔
4220
        // If the event is nil, the invoice is being held, so we save payment
203✔
4221
        // descriptor for future reference.
203✔
4222
        if event == nil {
260✔
4223
                l.hodlMap[circuitKey] = htlc
57✔
4224
                return nil
57✔
4225
        }
57✔
4226

4227
        // Process the received resolution.
4228
        return l.processHtlcResolution(event, htlc)
147✔
4229
}
4230

4231
// settleHTLC settles the HTLC on the channel.
4232
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4233
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
198✔
4234

198✔
4235
        hash := preimage.Hash()
198✔
4236

198✔
4237
        l.log.Infof("settling htlc %v as exit hop", hash)
198✔
4238

198✔
4239
        err := l.channel.SettleHTLC(
198✔
4240
                preimage, htlcIndex, &sourceRef, nil, nil,
198✔
4241
        )
198✔
4242
        if err != nil {
198✔
4243
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4244
        }
×
4245

4246
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4247
        // fake one before sending it to the peer.
4248
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
199✔
4249
                l.log.Warnf(hodl.BogusSettle.Warning())
1✔
4250
                preimage = [32]byte{}
1✔
4251
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
1✔
4252
        }
1✔
4253

4254
        // HTLC was successfully settled locally send notification about it
4255
        // remote peer.
4256
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
198✔
4257
                ChanID:          l.ChanID(),
198✔
4258
                ID:              htlcIndex,
198✔
4259
                PaymentPreimage: preimage,
198✔
4260
        })
198✔
4261

198✔
4262
        // Once we have successfully settled the htlc, notify a settle event.
198✔
4263
        l.cfg.HtlcNotifier.NotifySettleEvent(
198✔
4264
                HtlcKey{
198✔
4265
                        IncomingCircuit: models.CircuitKey{
198✔
4266
                                ChanID: l.ShortChanID(),
198✔
4267
                                HtlcID: htlcIndex,
198✔
4268
                        },
198✔
4269
                },
198✔
4270
                preimage,
198✔
4271
                HtlcEventTypeReceive,
198✔
4272
        )
198✔
4273

198✔
4274
        return nil
198✔
4275
}
4276

4277
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4278
// err chan for the individual responses. This method is intended to be spawned
4279
// as a goroutine so the responses can be handled in the background.
4280
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
576✔
4281
        // Don't forward packets for which we already have a response in our
576✔
4282
        // mailbox. This could happen if a packet fails and is buffered in the
576✔
4283
        // mailbox, and the incoming link flaps.
576✔
4284
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
576✔
4285
        for _, pkt := range packets {
1,152✔
4286
                if l.mailBox.HasPacket(pkt.inKey()) {
577✔
4287
                        continue
1✔
4288
                }
4289

4290
                filteredPkts = append(filteredPkts, pkt)
576✔
4291
        }
4292

4293
        err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...)
576✔
4294
        if err != nil {
587✔
4295
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
4296
                        "settle/fail over htlcswitch: %v", err)
11✔
4297
        }
11✔
4298
}
4299

4300
// sendHTLCError functions cancels HTLC and send cancel message back to the
4301
// peer from which HTLC was received.
4302
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4303
        sourceRef channeldb.AddRef, failure *LinkError,
4304
        e hop.ErrorEncrypter, isReceive bool) {
106✔
4305

106✔
4306
        reason, err := e.EncryptFirstHop(failure.WireMessage())
106✔
4307
        if err != nil {
106✔
4308
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4309
                return
×
4310
        }
×
4311

4312
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
106✔
4313
        if err != nil {
106✔
4314
                l.log.Errorf("unable cancel htlc: %v", err)
×
4315
                return
×
4316
        }
×
4317

4318
        // Send the appropriate failure message depending on whether we're
4319
        // in a blinded route or not.
4320
        if err := l.sendIncomingHTLCFailureMsg(
106✔
4321
                add.ID, e, reason,
106✔
4322
        ); err != nil {
106✔
4323
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4324
                return
×
4325
        }
×
4326

4327
        // Notify a link failure on our incoming link. Outgoing htlc information
4328
        // is not available at this point, because we have not decrypted the
4329
        // onion, so it is excluded.
4330
        var eventType HtlcEventType
106✔
4331
        if isReceive {
212✔
4332
                eventType = HtlcEventTypeReceive
106✔
4333
        } else {
107✔
4334
                eventType = HtlcEventTypeForward
1✔
4335
        }
1✔
4336

4337
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
106✔
4338
                HtlcKey{
106✔
4339
                        IncomingCircuit: models.CircuitKey{
106✔
4340
                                ChanID: l.ShortChanID(),
106✔
4341
                                HtlcID: add.ID,
106✔
4342
                        },
106✔
4343
                },
106✔
4344
                HtlcInfo{
106✔
4345
                        IncomingTimeLock: add.Expiry,
106✔
4346
                        IncomingAmt:      add.Amount,
106✔
4347
                },
106✔
4348
                eventType,
106✔
4349
                failure,
106✔
4350
                true,
106✔
4351
        )
106✔
4352
}
4353

4354
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4355
// peer from which the HTLC was received. This function is primarily used to
4356
// handle the special requirements of route blinding, specifically:
4357
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4358
// - Introduction nodes should return regular HTLC failure messages.
4359
//
4360
// It accepts the original opaque failure, which will be used in the case
4361
// that we're not part of a blinded route and an error encrypter that'll be
4362
// used if we are the introduction node and need to present an error as if
4363
// we're the failing party.
4364
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4365
        e hop.ErrorEncrypter,
4366
        originalFailure lnwire.OpaqueReason) error {
122✔
4367

122✔
4368
        var msg lnwire.Message
122✔
4369
        switch {
122✔
4370
        // Our circuit's error encrypter will be nil if this was a locally
4371
        // initiated payment. We can only hit a blinded error for a locally
4372
        // initiated payment if we allow ourselves to be picked as the
4373
        // introduction node for our own payments and in that case we
4374
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4375
        // we fail it back and log an error.
4376
        // code.
4377
        case e == nil:
×
4378
                msg = &lnwire.UpdateFailHTLC{
×
4379
                        ChanID: l.ChanID(),
×
4380
                        ID:     htlcIndex,
×
4381
                        Reason: originalFailure,
×
4382
                }
×
4383

×
4384
                l.log.Errorf("Unexpected blinded failure when "+
×
4385
                        "we are the sending node, incoming htlc: %v(%v)",
×
4386
                        l.ShortChanID(), htlcIndex)
×
4387

4388
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4389
        // transformation on the error message and can just send the original.
4390
        case !e.Type().IsBlinded():
122✔
4391
                msg = &lnwire.UpdateFailHTLC{
122✔
4392
                        ChanID: l.ChanID(),
122✔
4393
                        ID:     htlcIndex,
122✔
4394
                        Reason: originalFailure,
122✔
4395
                }
122✔
4396

4397
        // When we're the introduction node, we need to convert the error to
4398
        // a UpdateFailHTLC.
4399
        case e.Type() == hop.EncrypterTypeIntroduction:
1✔
4400
                l.log.Debugf("Introduction blinded node switching out failure "+
1✔
4401
                        "error: %v", htlcIndex)
1✔
4402

1✔
4403
                // The specification does not require that we set the onion
1✔
4404
                // blob.
1✔
4405
                failureMsg := lnwire.NewInvalidBlinding(
1✔
4406
                        fn.None[[lnwire.OnionPacketSize]byte](),
1✔
4407
                )
1✔
4408
                reason, err := e.EncryptFirstHop(failureMsg)
1✔
4409
                if err != nil {
1✔
4410
                        return err
×
4411
                }
×
4412

4413
                msg = &lnwire.UpdateFailHTLC{
1✔
4414
                        ChanID: l.ChanID(),
1✔
4415
                        ID:     htlcIndex,
1✔
4416
                        Reason: reason,
1✔
4417
                }
1✔
4418

4419
        // If we are a relaying node, we need to switch out any error that
4420
        // we've received to a malformed HTLC error.
4421
        case e.Type() == hop.EncrypterTypeRelaying:
1✔
4422
                l.log.Debugf("Relaying blinded node switching out malformed "+
1✔
4423
                        "error: %v", htlcIndex)
1✔
4424

1✔
4425
                msg = &lnwire.UpdateFailMalformedHTLC{
1✔
4426
                        ChanID:      l.ChanID(),
1✔
4427
                        ID:          htlcIndex,
1✔
4428
                        FailureCode: lnwire.CodeInvalidBlinding,
1✔
4429
                }
1✔
4430

4431
        default:
×
4432
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4433
        }
4434

4435
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
122✔
4436
                l.log.Warnf("Send update fail failed: %v", err)
×
4437
        }
×
4438

4439
        return nil
122✔
4440
}
4441

4442
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4443
// to the payment sender.
4444
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4445
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4446
        sourceRef *channeldb.AddRef) {
4✔
4447

4✔
4448
        shaOnionBlob := sha256.Sum256(onionBlob[:])
4✔
4449
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
4✔
4450
        if err != nil {
4✔
4451
                l.log.Errorf("unable cancel htlc: %v", err)
×
4452
                return
×
4453
        }
×
4454

4455
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
4✔
4456
                ChanID:       l.ChanID(),
4✔
4457
                ID:           htlcIndex,
4✔
4458
                ShaOnionBlob: shaOnionBlob,
4✔
4459
                FailureCode:  code,
4✔
4460
        })
4✔
4461
}
4462

4463
// failf is a function which is used to encapsulate the action necessary for
4464
// properly failing the link. It takes a LinkFailureError, which will be passed
4465
// to the OnChannelFailure closure, in order for it to determine if we should
4466
// force close the channel, and if we should send an error message to the
4467
// remote peer.
4468
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4469
        a ...interface{}) {
16✔
4470

16✔
4471
        reason := fmt.Errorf(format, a...)
16✔
4472

16✔
4473
        // Return if we have already notified about a failure.
16✔
4474
        if l.failed {
17✔
4475
                l.log.Warnf("ignoring link failure (%v), as link already "+
1✔
4476
                        "failed", reason)
1✔
4477
                return
1✔
4478
        }
1✔
4479

4480
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
16✔
4481

16✔
4482
        // Set failed, such that we won't process any more updates, and notify
16✔
4483
        // the peer about the failure.
16✔
4484
        l.failed = true
16✔
4485
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
16✔
4486
}
4487

4488
// FundingCustomBlob returns the custom funding blob of the channel that this
4489
// link is associated with. The funding blob represents static information about
4490
// the channel that was created at channel funding time.
4491
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4492
        if l.channel == nil {
×
4493
                return fn.None[tlv.Blob]()
×
4494
        }
×
4495

4496
        if l.channel.State() == nil {
×
4497
                return fn.None[tlv.Blob]()
×
4498
        }
×
4499

4500
        return l.channel.State().CustomBlob
×
4501
}
4502

4503
// CommitmentCustomBlob returns the custom blob of the current local commitment
4504
// of the channel that this link is associated with.
4505
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4506
        if l.channel == nil {
×
4507
                return fn.None[tlv.Blob]()
×
4508
        }
×
4509

4510
        return l.channel.LocalCommitmentBlob()
×
4511
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc