• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.83
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "crypto/sha256"
7
        "errors"
8
        "fmt"
9
        prand "math/rand"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/btcsuite/btclog"
17
        "github.com/lightningnetwork/lnd/build"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/channeldb/models"
20
        "github.com/lightningnetwork/lnd/contractcourt"
21
        "github.com/lightningnetwork/lnd/fn"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/ticker"
34
        "github.com/lightningnetwork/lnd/tlv"
35
)
36

37
func init() {
11✔
38
        prand.Seed(time.Now().UnixNano())
11✔
39
}
11✔
40

41
const (
42
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
43
        // the node accepts for forwarded payments. The value is relative to the
44
        // current block height. The reason to have a maximum is to prevent
45
        // funds getting locked up unreasonably long. Otherwise, an attacker
46
        // willing to lock its own funds too, could force the funds of this node
47
        // to be locked up for an indefinite (max int32) number of blocks.
48
        //
49
        // The value 2016 corresponds to on average two weeks worth of blocks
50
        // and is based on the maximum number of hops (20), the default CLTV
51
        // delta (40), and some extra margin to account for the other lightning
52
        // implementations and past lnd versions which used to have a default
53
        // CLTV delta of 144.
54
        DefaultMaxOutgoingCltvExpiry = 2016
55

56
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
57
        // which a link should propose to update its commitment fee rate.
58
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
59

60
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
61
        // which a link should propose to update its commitment fee rate.
62
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
63

64
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
65
        // a channel's commitment fee to be of its balance. This only applies to
66
        // the initiator of the channel.
67
        DefaultMaxLinkFeeAllocation float64 = 0.5
68
)
69

70
// ExpectedFee computes the expected fee for a given htlc amount. The value
71
// returned from this function is to be used as a sanity check when forwarding
72
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
73
// forwarding policy.
74
//
75
// TODO(roasbeef): also add in current available channel bandwidth, inverse
76
// func
77
func ExpectedFee(f models.ForwardingPolicy,
78
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
78✔
79

78✔
80
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
78✔
81
}
78✔
82

83
// ChannelLinkConfig defines the configuration for the channel link. ALL
84
// elements within the configuration MUST be non-nil for channel link to carry
85
// out its duties.
86
type ChannelLinkConfig struct {
87
        // FwrdingPolicy is the initial forwarding policy to be used when
88
        // deciding whether to forwarding incoming HTLC's or not. This value
89
        // can be updated with subsequent calls to UpdateForwardingPolicy
90
        // targeted at a given ChannelLink concrete interface implementation.
91
        FwrdingPolicy models.ForwardingPolicy
92

93
        // Circuits provides restricted access to the switch's circuit map,
94
        // allowing the link to open and close circuits.
95
        Circuits CircuitModifier
96

97
        // BestHeight returns the best known height.
98
        BestHeight func() uint32
99

100
        // ForwardPackets attempts to forward the batch of htlcs through the
101
        // switch. The function returns and error in case it fails to send one or
102
        // more packets. The link's quit signal should be provided to allow
103
        // cancellation of forwarding during link shutdown.
104
        ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error
105

106
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
107
        // blobs, which are then used to inform how to forward an HTLC.
108
        //
109
        // NOTE: This function assumes the same set of readers and preimages
110
        // are always presented for the same identifier.
111
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
112
                []hop.DecodeHopIteratorResponse, error)
113

114
        // ExtractErrorEncrypter function is responsible for decoding HTLC
115
        // Sphinx onion blob, and creating onion failure obfuscator.
116
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
117

118
        // FetchLastChannelUpdate retrieves the latest routing policy for a
119
        // target channel. This channel will typically be the outgoing channel
120
        // specified when we receive an incoming HTLC.  This will be used to
121
        // provide payment senders our latest policy when sending encrypted
122
        // error messages.
123
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
124
                *lnwire.ChannelUpdate1, error)
125

126
        // Peer is a lightning network node with which we have the channel link
127
        // opened.
128
        Peer lnpeer.Peer
129

130
        // Registry is a sub-system which responsible for managing the invoices
131
        // in thread-safe manner.
132
        Registry InvoiceDatabase
133

134
        // PreimageCache is a global witness beacon that houses any new
135
        // preimages discovered by other links. We'll use this to add new
136
        // witnesses that we discover which will notify any sub-systems
137
        // subscribed to new events.
138
        PreimageCache contractcourt.WitnessBeacon
139

140
        // OnChannelFailure is a function closure that we'll call if the
141
        // channel failed for some reason. Depending on the severity of the
142
        // error, the closure potentially must force close this channel and
143
        // disconnect the peer.
144
        //
145
        // NOTE: The method must return in order for the ChannelLink to be able
146
        // to shut down properly.
147
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
148
                LinkFailureError)
149

150
        // UpdateContractSignals is a function closure that we'll use to update
151
        // outside sub-systems with this channel's latest ShortChannelID.
152
        UpdateContractSignals func(*contractcourt.ContractSignals) error
153

154
        // NotifyContractUpdate is a function closure that we'll use to update
155
        // the contractcourt and more specifically the ChannelArbitrator of the
156
        // latest channel state.
157
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
158

159
        // ChainEvents is an active subscription to the chain watcher for this
160
        // channel to be notified of any on-chain activity related to this
161
        // channel.
162
        ChainEvents *contractcourt.ChainEventSubscription
163

164
        // FeeEstimator is an instance of a live fee estimator which will be
165
        // used to dynamically regulate the current fee of the commitment
166
        // transaction to ensure timely confirmation.
167
        FeeEstimator chainfee.Estimator
168

169
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
170
        // for HTLC forwarding internal to the switch.
171
        //
172
        // NOTE: This should only be used for testing.
173
        HodlMask hodl.Mask
174

175
        // SyncStates is used to indicate that we need send the channel
176
        // reestablishment message to the remote peer. It should be done if our
177
        // clients have been restarted, or remote peer have been reconnected.
178
        SyncStates bool
179

180
        // BatchTicker is the ticker that determines the interval that we'll
181
        // use to check the batch to see if there're any updates we should
182
        // flush out. By batching updates into a single commit, we attempt to
183
        // increase throughput by maximizing the number of updates coalesced
184
        // into a single commit.
185
        BatchTicker ticker.Ticker
186

187
        // FwdPkgGCTicker is the ticker determining the frequency at which
188
        // garbage collection of forwarding packages occurs. We use a
189
        // time-based approach, as opposed to block epochs, as to not hinder
190
        // syncing.
191
        FwdPkgGCTicker ticker.Ticker
192

193
        // PendingCommitTicker is a ticker that allows the link to determine if
194
        // a locally initiated commitment dance gets stuck waiting for the
195
        // remote party to revoke.
196
        PendingCommitTicker ticker.Ticker
197

198
        // BatchSize is the max size of a batch of updates done to the link
199
        // before we do a state update.
200
        BatchSize uint32
201

202
        // UnsafeReplay will cause a link to replay the adds in its latest
203
        // commitment txn after the link is restarted. This should only be used
204
        // in testing, it is here to ensure the sphinx replay detection on the
205
        // receiving node is persistent.
206
        UnsafeReplay bool
207

208
        // MinUpdateTimeout represents the minimum interval in which a link
209
        // will propose to update its commitment fee rate. A random timeout will
210
        // be selected between this and MaxUpdateTimeout.
211
        MinUpdateTimeout time.Duration
212

213
        // MaxUpdateTimeout represents the maximum interval in which a link
214
        // will propose to update its commitment fee rate. A random timeout will
215
        // be selected between this and MinUpdateTimeout.
216
        MaxUpdateTimeout time.Duration
217

218
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
219
        // an htlc where we don't offer an htlc anymore. This should be at least
220
        // the outgoing broadcast delta, because in any case we don't want to
221
        // risk offering an htlc that triggers channel closure.
222
        OutgoingCltvRejectDelta uint32
223

224
        // TowerClient is an optional engine that manages the signing,
225
        // encrypting, and uploading of justice transactions to the daemon's
226
        // configured set of watchtowers for legacy channels.
227
        TowerClient TowerClient
228

229
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
230
        // should accept for a forwarded HTLC. The value is relative to the
231
        // current block height.
232
        MaxOutgoingCltvExpiry uint32
233

234
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
235
        // commitment fee to be of its balance. This only applies to the
236
        // initiator of the channel.
237
        MaxFeeAllocation float64
238

239
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
240
        // the initiator for channels of the anchor type.
241
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
242

243
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
244
        // link is first started.
245
        NotifyActiveLink func(wire.OutPoint)
246

247
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
248
        // channels becomes active.
249
        NotifyActiveChannel func(wire.OutPoint)
250

251
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
252
        // when channels become inactive.
253
        NotifyInactiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveLinkEvent allows the switch to tell the
256
        // ChannelNotifier when a channel link become inactive.
257
        NotifyInactiveLinkEvent func(wire.OutPoint)
258

259
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
260
        // events through.
261
        HtlcNotifier htlcNotifier
262

263
        // FailAliasUpdate is a function used to fail an HTLC for an
264
        // option_scid_alias channel.
265
        FailAliasUpdate func(sid lnwire.ShortChannelID,
266
                incoming bool) *lnwire.ChannelUpdate1
267

268
        // GetAliases is used by the link and switch to fetch the set of
269
        // aliases for a given link.
270
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
271

272
        // PreviouslySentShutdown is an optional value that is set if, at the
273
        // time of the link being started, persisted shutdown info was found for
274
        // the channel. This value being set means that we previously sent a
275
        // Shutdown message to our peer, and so we should do so again on
276
        // re-establish and should not allow anymore HTLC adds on the outgoing
277
        // direction of the link.
278
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
279

280
        // Adds the option to disable forwarding payments in blinded routes
281
        // by failing back any blinding-related payloads as if they were
282
        // invalid.
283
        DisallowRouteBlinding bool
284

285
        // DisallowQuiescence is a flag that can be used to disable the
286
        // quiescence protocol.
287
        DisallowQuiescence bool
288

289
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
290
        // restrict the flow of HTLCs and fee updates.
291
        MaxFeeExposure lnwire.MilliSatoshi
292
}
293

294
// channelLink is the service which drives a channel's commitment update
295
// state-machine. In the event that an HTLC needs to be propagated to another
296
// link, the forward handler from config is used which sends HTLC to the
297
// switch. Additionally, the link encapsulate logic of commitment protocol
298
// message ordering and updates.
299
type channelLink struct {
300
        // The following fields are only meant to be used *atomically*
301
        started       int32
302
        reestablished int32
303
        shutdown      int32
304

305
        // failed should be set to true in case a link error happens, making
306
        // sure we don't process any more updates.
307
        failed bool
308

309
        // keystoneBatch represents a volatile list of keystones that must be
310
        // written before attempting to sign the next commitment txn. These
311
        // represent all the HTLC's forwarded to the link from the switch. Once
312
        // we lock them into our outgoing commitment, then the circuit has a
313
        // keystone, and is fully opened.
314
        keystoneBatch []Keystone
315

316
        // openedCircuits is the set of all payment circuits that will be open
317
        // once we make our next commitment. After making the commitment we'll
318
        // ACK all these from our mailbox to ensure that they don't get
319
        // re-delivered if we reconnect.
320
        openedCircuits []CircuitKey
321

322
        // closedCircuits is the set of all payment circuits that will be
323
        // closed once we make our next commitment. After taking the commitment
324
        // we'll ACK all these to ensure that they don't get re-delivered if we
325
        // reconnect.
326
        closedCircuits []CircuitKey
327

328
        // channel is a lightning network channel to which we apply htlc
329
        // updates.
330
        channel *lnwallet.LightningChannel
331

332
        // cfg is a structure which carries all dependable fields/handlers
333
        // which may affect behaviour of the service.
334
        cfg ChannelLinkConfig
335

336
        // mailBox is the main interface between the outside world and the
337
        // link. All incoming messages will be sent over this mailBox. Messages
338
        // include new updates from our connected peer, and new packets to be
339
        // forwarded sent by the switch.
340
        mailBox MailBox
341

342
        // upstream is a channel that new messages sent from the remote peer to
343
        // the local peer will be sent across.
344
        upstream chan lnwire.Message
345

346
        // downstream is a channel in which new multi-hop HTLC's to be
347
        // forwarded will be sent across. Messages from this channel are sent
348
        // by the HTLC switch.
349
        downstream chan *htlcPacket
350

351
        // updateFeeTimer is the timer responsible for updating the link's
352
        // commitment fee every time it fires.
353
        updateFeeTimer *time.Timer
354

355
        // uncommittedPreimages stores a list of all preimages that have been
356
        // learned since receiving the last CommitSig from the remote peer. The
357
        // batch will be flushed just before accepting the subsequent CommitSig
358
        // or on shutdown to avoid doing a write for each preimage received.
359
        uncommittedPreimages []lntypes.Preimage
360

361
        sync.RWMutex
362

363
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
364
        // registry.
365
        hodlQueue *queue.ConcurrentQueue
366

367
        // hodlMap stores related htlc data for a circuit key. It allows
368
        // resolving those htlcs when we receive a message on hodlQueue.
369
        hodlMap map[models.CircuitKey]hodlHtlc
370

371
        // log is a link-specific logging instance.
372
        log btclog.Logger
373

374
        // isOutgoingAddBlocked tracks whether the channelLink can send an
375
        // UpdateAddHTLC.
376
        isOutgoingAddBlocked atomic.Bool
377

378
        // isIncomingAddBlocked tracks whether the channelLink can receive an
379
        // UpdateAddHTLC.
380
        isIncomingAddBlocked atomic.Bool
381

382
        // flushHooks is a hookMap that is triggered when we reach a channel
383
        // state with no live HTLCs.
384
        flushHooks hookMap
385

386
        // outgoingCommitHooks is a hookMap that is triggered after we send our
387
        // next CommitSig.
388
        outgoingCommitHooks hookMap
389

390
        // incomingCommitHooks is a hookMap that is triggered after we receive
391
        // our next CommitSig.
392
        incomingCommitHooks hookMap
393

394
        // quiescer is the state machine that tracks where this channel is with
395
        // respect to the quiescence protocol.
396
        quiescer quiescer
397

398
        // quiescenceRequests is a queue of requests to quiesce this link.
399
        // The members of the queue are send-only channels we should call back
400
        // with the result.
401
        quiescenceReqs chan StfuReq
402

403
        // stateQueries is a channel that is used to query the current state of
404
        // the channelLink in a thread-safe manner by delegating the state reads
405
        // to the main event loop.
406
        //
407
        // WARNING: closures passed over this channel must be safe to run in the
408
        // main event loop as that is the context in which they will be
409
        // executed. The most obvious problem that can arise from using this is
410
        // if the closure itself attempts to send over this chan. This will
411
        // create deadlock opportunities as we cannot ensure that there is
412
        // available buffer space on this channel in the presence of other
413
        // threads calling APIs that depend on this component.
414
        stateQueries chan func()
415

416
        wg   sync.WaitGroup
417
        quit chan struct{}
418
}
419

420
// hookMap is a data structure that is used to track the hooks that need to be
421
// called in various parts of the channelLink's lifecycle.
422
//
423
// WARNING: NOT thread-safe.
424
type hookMap struct {
425
        // allocIdx keeps track of the next id we haven't yet allocated.
426
        allocIdx atomic.Uint64
427

428
        // transient is a map of hooks that are only called the next time invoke
429
        // is called. These hooks are deleted during invoke.
430
        transient map[uint64]func()
431

432
        // newTransients is a channel that we use to accept new hooks into the
433
        // hookMap.
434
        newTransients chan func()
435
}
436

437
// newHookMap initializes a new empty hookMap.
438
func newHookMap() hookMap {
645✔
439
        return hookMap{
645✔
440
                allocIdx:      atomic.Uint64{},
645✔
441
                transient:     make(map[uint64]func()),
645✔
442
                newTransients: make(chan func()),
645✔
443
        }
645✔
444
}
645✔
445

446
// alloc allocates space in the hook map for the supplied hook, the second
447
// argument determines whether it goes into the transient or persistent part
448
// of the hookMap.
449
func (m *hookMap) alloc(hook func()) uint64 {
2✔
450
        // We assume we never overflow a uint64. Seems OK.
2✔
451
        hookID := m.allocIdx.Add(1)
2✔
452
        if hookID == 0 {
2✔
453
                panic("hookMap allocIdx overflow")
×
454
        }
455
        m.transient[hookID] = hook
2✔
456

2✔
457
        return hookID
2✔
458
}
459

460
// invoke is used on a hook map to call all the registered hooks and then clear
461
// out the transient hooks so they are not called again.
462
func (m *hookMap) invoke() {
5,585✔
463
        for _, hook := range m.transient {
5,587✔
464
                hook()
2✔
465
        }
2✔
466

467
        m.transient = make(map[uint64]func())
5,585✔
468
}
469

470
// hodlHtlc contains htlc data that is required for resolution.
471
type hodlHtlc struct {
472
        add        lnwire.UpdateAddHTLC
473
        sourceRef  channeldb.AddRef
474
        obfuscator hop.ErrorEncrypter
475
}
476

477
// NewChannelLink creates a new instance of a ChannelLink given a configuration
478
// and active channel that will be used to verify/apply updates to.
479
func NewChannelLink(cfg ChannelLinkConfig,
480
        channel *lnwallet.LightningChannel) ChannelLink {
215✔
481

215✔
482
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
215✔
483

215✔
484
        // If the max fee exposure isn't set, use the default.
215✔
485
        if cfg.MaxFeeExposure == 0 {
430✔
486
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
487
        }
215✔
488

489
        var qsm quiescer
215✔
490
        if !cfg.DisallowQuiescence {
430✔
491
                qsm = newQuiescer(quiescerCfg{
215✔
492
                        chanID: lnwire.NewChanIDFromOutPoint(
215✔
493
                                channel.ChannelPoint(),
215✔
494
                        ),
215✔
495
                        channelInitiator:  channel.Initiator(),
215✔
496
                        numPendingUpdates: channel.NumPendingUpdates,
215✔
497
                        sendMsg: func(s lnwire.Stfu) error {
217✔
498
                                return cfg.Peer.SendMessage(false, &s)
2✔
499
                        },
2✔
500
                })
NEW
501
        } else {
×
NEW
502
                qsm = &quiescerNoop{}
×
NEW
503
        }
×
504

505
        quiescenceReqs := make(
215✔
506
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
215✔
507
        )
215✔
508

215✔
509
        return &channelLink{
215✔
510
                cfg:                 cfg,
215✔
511
                channel:             channel,
215✔
512
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
215✔
513
                hodlQueue:           queue.NewConcurrentQueue(10),
215✔
514
                log:                 build.NewPrefixLog(logPrefix, log),
215✔
515
                flushHooks:          newHookMap(),
215✔
516
                outgoingCommitHooks: newHookMap(),
215✔
517
                incomingCommitHooks: newHookMap(),
215✔
518
                quiescer:            qsm,
215✔
519
                quiescenceReqs:      quiescenceReqs,
215✔
520
                stateQueries:        make(chan func(), 1),
215✔
521
                quit:                make(chan struct{}),
215✔
522
        }
215✔
523
}
524

525
// A compile time check to ensure channelLink implements the ChannelLink
526
// interface.
527
var _ ChannelLink = (*channelLink)(nil)
528

529
// Start starts all helper goroutines required for the operation of the channel
530
// link.
531
//
532
// NOTE: Part of the ChannelLink interface.
533
func (l *channelLink) Start() error {
213✔
534
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
213✔
535
                err := fmt.Errorf("channel link(%v): already started", l)
×
536
                l.log.Warn("already started")
×
537
                return err
×
538
        }
×
539

540
        l.log.Info("starting")
213✔
541

213✔
542
        // If the config supplied watchtower client, ensure the channel is
213✔
543
        // registered before trying to use it during operation.
213✔
544
        if l.cfg.TowerClient != nil {
213✔
UNCOV
545
                err := l.cfg.TowerClient.RegisterChannel(
×
UNCOV
546
                        l.ChanID(), l.channel.State().ChanType,
×
UNCOV
547
                )
×
UNCOV
548
                if err != nil {
×
549
                        return err
×
550
                }
×
551
        }
552

553
        l.mailBox.ResetMessages()
213✔
554
        l.hodlQueue.Start()
213✔
555

213✔
556
        // Before launching the htlcManager messages, revert any circuits that
213✔
557
        // were marked open in the switch's circuit map, but did not make it
213✔
558
        // into a commitment txn. We use the next local htlc index as the cut
213✔
559
        // off point, since all indexes below that are committed. This action
213✔
560
        // is only performed if the link's final short channel ID has been
213✔
561
        // assigned, otherwise we would try to trim the htlcs belonging to the
213✔
562
        // all-zero, hop.Source ID.
213✔
563
        if l.ShortChanID() != hop.Source {
426✔
564
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
213✔
565
                if err != nil {
213✔
566
                        return fmt.Errorf("unable to retrieve next local "+
×
567
                                "htlc index: %v", err)
×
568
                }
×
569

570
                // NOTE: This is automatically done by the switch when it
571
                // starts up, but is necessary to prevent inconsistencies in
572
                // the case that the link flaps. This is a result of a link's
573
                // life-cycle being shorter than that of the switch.
574
                chanID := l.ShortChanID()
213✔
575
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
213✔
576
                if err != nil {
213✔
577
                        return fmt.Errorf("unable to trim circuits above "+
×
578
                                "local htlc index %d: %v", localHtlcIndex, err)
×
579
                }
×
580

581
                // Since the link is live, before we start the link we'll update
582
                // the ChainArbitrator with the set of new channel signals for
583
                // this channel.
584
                //
585
                // TODO(roasbeef): split goroutines within channel arb to avoid
586
                go func() {
426✔
587
                        signals := &contractcourt.ContractSignals{
213✔
588
                                ShortChanID: l.channel.ShortChanID(),
213✔
589
                        }
213✔
590

213✔
591
                        err := l.cfg.UpdateContractSignals(signals)
213✔
592
                        if err != nil {
213✔
593
                                l.log.Errorf("unable to update signals")
×
594
                        }
×
595
                }()
596
        }
597

598
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
213✔
599

213✔
600
        l.wg.Add(1)
213✔
601
        go l.htlcManager()
213✔
602

213✔
603
        return nil
213✔
604
}
605

606
// Stop gracefully stops all active helper goroutines, then waits until they've
607
// exited.
608
//
609
// NOTE: Part of the ChannelLink interface.
610
func (l *channelLink) Stop() {
214✔
611
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
226✔
612
                l.log.Warn("already stopped")
12✔
613
                return
12✔
614
        }
12✔
615

616
        l.log.Info("stopping")
202✔
617

202✔
618
        // As the link is stopping, we are no longer interested in htlc
202✔
619
        // resolutions coming from the invoice registry.
202✔
620
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
202✔
621

202✔
622
        if l.cfg.ChainEvents.Cancel != nil {
202✔
UNCOV
623
                l.cfg.ChainEvents.Cancel()
×
UNCOV
624
        }
×
625

626
        // Ensure the channel for the timer is drained.
627
        if l.updateFeeTimer != nil {
404✔
628
                if !l.updateFeeTimer.Stop() {
202✔
629
                        select {
×
630
                        case <-l.updateFeeTimer.C:
×
631
                        default:
×
632
                        }
633
                }
634
        }
635

636
        if l.hodlQueue != nil {
404✔
637
                l.hodlQueue.Stop()
202✔
638
        }
202✔
639

640
        close(l.quit)
202✔
641
        l.wg.Wait()
202✔
642

202✔
643
        // Now that the htlcManager has completely exited, reset the packet
202✔
644
        // courier. This allows the mailbox to revaluate any lingering Adds that
202✔
645
        // were delivered but didn't make it on a commitment to be failed back
202✔
646
        // if the link is offline for an extended period of time. The error is
202✔
647
        // ignored since it can only fail when the daemon is exiting.
202✔
648
        _ = l.mailBox.ResetPackets()
202✔
649

202✔
650
        // As a final precaution, we will attempt to flush any uncommitted
202✔
651
        // preimages to the preimage cache. The preimages should be re-delivered
202✔
652
        // after channel reestablishment, however this adds an extra layer of
202✔
653
        // protection in case the peer never returns. Without this, we will be
202✔
654
        // unable to settle any contracts depending on the preimages even though
202✔
655
        // we had learned them at some point.
202✔
656
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
202✔
657
        if err != nil {
202✔
658
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
659
                        l.uncommittedPreimages, err)
×
660
        }
×
661
}
662

663
// WaitForShutdown blocks until the link finishes shutting down, which includes
664
// termination of all dependent goroutines.
665
func (l *channelLink) WaitForShutdown() {
×
666
        l.wg.Wait()
×
667
}
×
668

669
// EligibleToForward returns a bool indicating if the channel is able to
670
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
671
// we are eligible to update AND the channel isn't currently flushing the
672
// outgoing half of the channel.
673
//
674
// NOTE: MUST NOT be called from the main event loop.
675
func (l *channelLink) EligibleToForward() bool {
1,654✔
676
        return runInEventLoop(l, fn.Unit{}, func(_ fn.Unit) bool {
3,308✔
677
                return l.eligibleToForward()
1,654✔
678
        }).UnwrapLeftOr(false)
1,654✔
679
}
680

681
// eligibleToForward returns a bool indicating if the channel is able to
682
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
683
// we are eligible to update AND the channel isn't currently flushing the
684
// outgoing half of the channel.
685
//
686
// NOTE: MUST be called from the main event loop.
687
func (l *channelLink) eligibleToForward() bool {
1,654✔
688
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
1,654✔
689
}
1,654✔
690

691
// EligibleToUpdate returns a bool indicating if the channel is able to update
692
// channel state. We're able to update channel state if we know the remote
693
// party's next revocation point. Otherwise, we can't initiate new channel
694
// state. We also require that the short channel ID not be the all-zero source
695
// ID, meaning that the channel has had its ID finalized.
696
//
697
// NOTE: MUST be called from the main event loop.
UNCOV
698
func (l *channelLink) EligibleToUpdate() bool {
×
NEW
699
        return runInEventLoop(l, fn.Unit{}, func(_ fn.Unit) bool {
×
NEW
700
                return l.eligibleToUpdate()
×
NEW
701
        }).UnwrapLeftOr(false)
×
702
}
703

704
// eligibleToUpdate returns a bool indicating if the channel is able to update
705
// channel state. We're able to update channel state if we know the remote
706
// party's next revocation point. Otherwise, we can't initiate new channel
707
// state. We also require that the short channel ID not be the all-zero source
708
// ID, meaning that the channel has had its ID finalized.
709
//
710
// NOTE: MUST be called from the main event loop.
711
func (l *channelLink) eligibleToUpdate() bool {
1,657✔
712
        return l.channel.RemoteNextRevocation() != nil &&
1,657✔
713
                l.channel.ShortChanID() != hop.Source &&
1,657✔
714
                l.isReestablished() &&
1,657✔
715
                l.quiescer.canSendUpdates()
1,657✔
716
}
1,657✔
717

718
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
719
// the specified direction. It returns true if the state was changed and false
720
// if the desired state was already set before the method was called.
721
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
16✔
722
        if linkDirection == Outgoing {
24✔
723
                return l.isOutgoingAddBlocked.Swap(false)
8✔
724
        }
8✔
725

726
        return l.isIncomingAddBlocked.Swap(false)
8✔
727
}
728

729
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
730
// the specified direction. It returns true if the state was changed and false
731
// if the desired state was already set before the method was called.
732
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
13✔
733
        if linkDirection == Outgoing {
20✔
734
                return !l.isOutgoingAddBlocked.Swap(true)
7✔
735
        }
7✔
736

737
        return !l.isIncomingAddBlocked.Swap(true)
6✔
738
}
739

740
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
741
// the argument.
742
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
4,714✔
743
        if linkDirection == Outgoing {
7,913✔
744
                return l.isOutgoingAddBlocked.Load()
3,199✔
745
        }
3,199✔
746

747
        return l.isIncomingAddBlocked.Load()
1,515✔
748
}
749

750
// OnFlushedOnce adds a hook that will be called the next time the channel
751
// state reaches zero htlcs. This hook will only ever be called once. If the
752
// channel state already has zero htlcs, then this will be called immediately.
753
func (l *channelLink) OnFlushedOnce(hook func()) {
1✔
754
        select {
1✔
755
        case l.flushHooks.newTransients <- hook:
1✔
756
        case <-l.quit:
×
757
        }
758
}
759

760
// OnCommitOnce adds a hook that will be called the next time a CommitSig
761
// message is sent in the argument's LinkDirection. This hook will only ever be
762
// called once. If no CommitSig is owed in the argument's LinkDirection, then
763
// we will call this hook be run immediately.
764
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
1✔
765
        var queue chan func()
1✔
766

1✔
767
        if direction == Outgoing {
2✔
768
                queue = l.outgoingCommitHooks.newTransients
1✔
769
        } else {
1✔
770
                queue = l.incomingCommitHooks.newTransients
×
771
        }
×
772

773
        select {
1✔
774
        case queue <- hook:
1✔
775
        case <-l.quit:
×
776
        }
777
}
778

779
// InitStfu allows us to initiate quiescence on this link. It returns a receive
780
// only channel that will block until quiescence has been achieved, or
781
// definitively fails.
782
//
783
// This operation has been added to allow channels to be quiesced via RPC. It
784
// may be removed or reworked in the future as RPC initiated quiescence is a
785
// holdover until we have downstream protocols that use it.
786
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
1✔
787
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
1✔
788
                fn.Unit{},
1✔
789
        )
1✔
790

1✔
791
        select {
1✔
792
        case l.quiescenceReqs <- req:
1✔
NEW
793
        case <-l.quit:
×
NEW
794
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
795
        }
796

797
        return out
1✔
798
}
799

800
// isReestablished returns true if the link has successfully completed the
801
// channel reestablishment dance.
802
func (l *channelLink) isReestablished() bool {
1,657✔
803
        return atomic.LoadInt32(&l.reestablished) == 1
1,657✔
804
}
1,657✔
805

806
// markReestablished signals that the remote peer has successfully exchanged
807
// channel reestablish messages and that the channel is ready to process
808
// subsequent messages.
809
func (l *channelLink) markReestablished() {
213✔
810
        atomic.StoreInt32(&l.reestablished, 1)
213✔
811
}
213✔
812

813
// IsUnadvertised returns true if the underlying channel is unadvertised.
814
func (l *channelLink) IsUnadvertised() bool {
2✔
815
        state := l.channel.State()
2✔
816
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
2✔
817
}
2✔
818

819
// sampleNetworkFee samples the current fee rate on the network to get into the
820
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
821
// this is the native rate used when computing the fee for commitment
822
// transactions, and the second-level HTLC transactions.
823
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
824
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
825
        // blocks.
4✔
826
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
827
        if err != nil {
4✔
828
                return 0, err
×
829
        }
×
830

831
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
832
                int64(feePerKw))
4✔
833

4✔
834
        return feePerKw, nil
4✔
835
}
836

837
// shouldAdjustCommitFee returns true if we should update our commitment fee to
838
// match that of the network fee. We'll only update our commitment fee if the
839
// network fee is +/- 10% to our commitment fee or if our current commitment
840
// fee is below the minimum relay fee.
841
func shouldAdjustCommitFee(netFee, chanFee,
842
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
843

14✔
844
        switch {
14✔
845
        // If the network fee is greater than our current commitment fee and
846
        // our current commitment fee is below the minimum relay fee then
847
        // we should switch to it no matter if it is less than a 10% increase.
848
        case netFee > chanFee && chanFee < minRelayFee:
1✔
849
                return true
1✔
850

851
        // If the network fee is greater than the commitment fee, then we'll
852
        // switch to it if it's at least 10% greater than the commit fee.
853
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
854
                return true
4✔
855

856
        // If the network fee is less than our commitment fee, then we'll
857
        // switch to it if it's at least 10% less than the commitment fee.
858
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
859
                return true
2✔
860

861
        // Otherwise, we won't modify our fee.
862
        default:
7✔
863
                return false
7✔
864
        }
865
}
866

867
// failCb is used to cut down on the argument verbosity.
868
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
869

870
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
871
// outgoing HTLC. It may return a FailureMessage that references a channel's
872
// alias. If the channel does not have an alias, then the regular channel
873
// update from disk will be returned.
874
func (l *channelLink) createFailureWithUpdate(incoming bool,
875
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
22✔
876

22✔
877
        // Determine which SCID to use in case we need to use aliases in the
22✔
878
        // ChannelUpdate.
22✔
879
        scid := outgoingScid
22✔
880
        if incoming {
22✔
881
                scid = l.ShortChanID()
×
882
        }
×
883

884
        // Try using the FailAliasUpdate function. If it returns nil, fallback
885
        // to the non-alias behavior.
886
        update := l.cfg.FailAliasUpdate(scid, incoming)
22✔
887
        if update == nil {
38✔
888
                // Fallback to the non-alias behavior.
16✔
889
                var err error
16✔
890
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
16✔
891
                if err != nil {
16✔
892
                        return &lnwire.FailTemporaryNodeFailure{}
×
893
                }
×
894
        }
895

896
        return cb(update)
22✔
897
}
898

899
// syncChanState attempts to synchronize channel states with the remote party.
900
// This method is to be called upon reconnection after the initial funding
901
// flow. We'll compare out commitment chains with the remote party, and re-send
902
// either a danging commit signature, a revocation, or both.
903
func (l *channelLink) syncChanStates() error {
170✔
904
        chanState := l.channel.State()
170✔
905

170✔
906
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
170✔
907

170✔
908
        // First, we'll generate our ChanSync message to send to the other
170✔
909
        // side. Based on this message, the remote party will decide if they
170✔
910
        // need to retransmit any data or not.
170✔
911
        localChanSyncMsg, err := chanState.ChanSyncMsg()
170✔
912
        if err != nil {
170✔
913
                return fmt.Errorf("unable to generate chan sync message for "+
×
914
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
915
        }
×
916
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
170✔
917
                return fmt.Errorf("unable to send chan sync message for "+
×
918
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
919
        }
×
920

921
        var msgsToReSend []lnwire.Message
170✔
922

170✔
923
        // Next, we'll wait indefinitely to receive the ChanSync message. The
170✔
924
        // first message sent MUST be the ChanSync message.
170✔
925
        select {
170✔
926
        case msg := <-l.upstream:
170✔
927
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
170✔
928
                        l.cfg.Peer.PubKey())
170✔
929

170✔
930
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
170✔
931
                if !ok {
170✔
932
                        return fmt.Errorf("first message sent to sync "+
×
933
                                "should be ChannelReestablish, instead "+
×
934
                                "received: %T", msg)
×
935
                }
×
936

937
                // If the remote party indicates that they think we haven't
938
                // done any state updates yet, then we'll retransmit the
939
                // channel_ready message first. We do this, as at this point
940
                // we can't be sure if they've really received the
941
                // ChannelReady message.
942
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
170✔
943
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
170✔
944
                        !l.channel.IsPending() {
334✔
945

164✔
946
                        l.log.Infof("resending ChannelReady message to peer")
164✔
947

164✔
948
                        nextRevocation, err := l.channel.NextRevocationKey()
164✔
949
                        if err != nil {
164✔
950
                                return fmt.Errorf("unable to create next "+
×
951
                                        "revocation: %v", err)
×
952
                        }
×
953

954
                        channelReadyMsg := lnwire.NewChannelReady(
164✔
955
                                l.ChanID(), nextRevocation,
164✔
956
                        )
164✔
957

164✔
958
                        // If this is a taproot channel, then we'll send the
164✔
959
                        // very same nonce that we sent above, as they should
164✔
960
                        // take the latest verification nonce we send.
164✔
961
                        if chanState.ChanType.IsTaproot() {
164✔
UNCOV
962
                                //nolint:lll
×
UNCOV
963
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
×
UNCOV
964
                        }
×
965

966
                        // For channels that negotiated the option-scid-alias
967
                        // feature bit, ensure that we send over the alias in
968
                        // the channel_ready message. We'll send the first
969
                        // alias we find for the channel since it does not
970
                        // matter which alias we send. We'll error out if no
971
                        // aliases are found.
972
                        if l.negotiatedAliasFeature() {
164✔
UNCOV
973
                                aliases := l.getAliases()
×
UNCOV
974
                                if len(aliases) == 0 {
×
975
                                        // This shouldn't happen since we
×
976
                                        // always add at least one alias before
×
977
                                        // the channel reaches the link.
×
978
                                        return fmt.Errorf("no aliases found")
×
979
                                }
×
980

981
                                // getAliases returns a copy of the alias slice
982
                                // so it is ok to use a pointer to the first
983
                                // entry.
UNCOV
984
                                channelReadyMsg.AliasScid = &aliases[0]
×
985
                        }
986

987
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
164✔
988
                        if err != nil {
164✔
989
                                return fmt.Errorf("unable to re-send "+
×
990
                                        "ChannelReady: %v", err)
×
991
                        }
×
992
                }
993

994
                // In any case, we'll then process their ChanSync message.
995
                l.log.Info("received re-establishment message from remote side")
170✔
996

170✔
997
                var (
170✔
998
                        openedCircuits []CircuitKey
170✔
999
                        closedCircuits []CircuitKey
170✔
1000
                )
170✔
1001

170✔
1002
                // We've just received a ChanSync message from the remote
170✔
1003
                // party, so we'll process the message  in order to determine
170✔
1004
                // if we need to re-transmit any messages to the remote party.
170✔
1005
                msgsToReSend, openedCircuits, closedCircuits, err =
170✔
1006
                        l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
170✔
1007
                if err != nil {
170✔
UNCOV
1008
                        return err
×
UNCOV
1009
                }
×
1010

1011
                // Repopulate any identifiers for circuits that may have been
1012
                // opened or unclosed. This may happen if we needed to
1013
                // retransmit a commitment signature message.
1014
                l.openedCircuits = openedCircuits
170✔
1015
                l.closedCircuits = closedCircuits
170✔
1016

170✔
1017
                // Ensure that all packets have been have been removed from the
170✔
1018
                // link's mailbox.
170✔
1019
                if err := l.ackDownStreamPackets(); err != nil {
170✔
1020
                        return err
×
1021
                }
×
1022

1023
                if len(msgsToReSend) > 0 {
175✔
1024
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1025
                                "state", len(msgsToReSend))
5✔
1026
                }
5✔
1027

1028
                // If we have any messages to retransmit, we'll do so
1029
                // immediately so we return to a synchronized state as soon as
1030
                // possible.
1031
                for _, msg := range msgsToReSend {
181✔
1032
                        l.cfg.Peer.SendMessage(false, msg)
11✔
1033
                }
11✔
1034

UNCOV
1035
        case <-l.quit:
×
UNCOV
1036
                return ErrLinkShuttingDown
×
1037
        }
1038

1039
        return nil
170✔
1040
}
1041

1042
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1043
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1044
// we previously received are reinstated in memory, and forwarded to the switch
1045
// if necessary. After a restart, this will also delete any previously
1046
// completed packages.
1047
func (l *channelLink) resolveFwdPkgs() error {
213✔
1048
        fwdPkgs, err := l.channel.LoadFwdPkgs()
213✔
1049
        if err != nil {
214✔
1050
                return err
1✔
1051
        }
1✔
1052

1053
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
212✔
1054

212✔
1055
        for _, fwdPkg := range fwdPkgs {
218✔
1056
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
6✔
1057
                        return err
×
1058
                }
×
1059
        }
1060

1061
        // If any of our reprocessing steps require an update to the commitment
1062
        // txn, we initiate a state transition to capture all relevant changes.
1063
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
212✔
UNCOV
1064
                return l.updateCommitTx()
×
UNCOV
1065
        }
×
1066

1067
        return nil
212✔
1068
}
1069

1070
// resolveFwdPkg interprets the FwdState of the provided package, either
1071
// reprocesses any outstanding htlcs in the package, or performs garbage
1072
// collection on the package.
1073
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
6✔
1074
        // Remove any completed packages to clear up space.
6✔
1075
        if fwdPkg.State == channeldb.FwdStateCompleted {
7✔
1076
                l.log.Debugf("removing completed fwd pkg for height=%d",
1✔
1077
                        fwdPkg.Height)
1✔
1078

1✔
1079
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
1✔
1080
                if err != nil {
1✔
1081
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
1082
                                "%v", fwdPkg.Height, err)
×
1083
                        return err
×
1084
                }
×
1085
        }
1086

1087
        // Otherwise this is either a new package or one has gone through
1088
        // processing, but contains htlcs that need to be restored in memory.
1089
        // We replay this forwarding package to make sure our local mem state
1090
        // is resurrected, we mimic any original responses back to the remote
1091
        // party, and re-forward the relevant HTLCs to the switch.
1092

1093
        // If the package is fully acked but not completed, it must still have
1094
        // settles and fails to propagate.
1095
        if !fwdPkg.SettleFailFilter.IsFull() {
6✔
UNCOV
1096
                l.processRemoteSettleFails(fwdPkg)
×
UNCOV
1097
        }
×
1098

1099
        // Finally, replay *ALL ADDS* in this forwarding package. The
1100
        // downstream logic is able to filter out any duplicates, but we must
1101
        // shove the entire, original set of adds down the pipeline so that the
1102
        // batch of adds presented to the sphinx router does not ever change.
1103
        if !fwdPkg.AckFilter.IsFull() {
9✔
1104
                l.processRemoteAdds(fwdPkg)
3✔
1105

3✔
1106
                // If the link failed during processing the adds, we must
3✔
1107
                // return to ensure we won't attempted to update the state
3✔
1108
                // further.
3✔
1109
                if l.failed {
3✔
1110
                        return fmt.Errorf("link failed while " +
×
1111
                                "processing remote adds")
×
1112
                }
×
1113
        }
1114

1115
        return nil
6✔
1116
}
1117

1118
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1119
// removes those that can be discarded. It is safe to do this entirely in the
1120
// background, since all state is coordinated on disk. This also ensures the
1121
// link can continue to process messages and interleave database accesses.
1122
//
1123
// NOTE: This MUST be run as a goroutine.
1124
func (l *channelLink) fwdPkgGarbager() {
212✔
1125
        defer l.wg.Done()
212✔
1126

212✔
1127
        l.cfg.FwdPkgGCTicker.Resume()
212✔
1128
        defer l.cfg.FwdPkgGCTicker.Stop()
212✔
1129

212✔
1130
        if err := l.loadAndRemove(); err != nil {
212✔
1131
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1132
        }
×
1133

1134
        for {
494✔
1135
                select {
282✔
1136
                case <-l.cfg.FwdPkgGCTicker.Ticks():
70✔
1137
                        if err := l.loadAndRemove(); err != nil {
128✔
1138
                                l.log.Warnf("unable to remove fwd pkgs: %v",
58✔
1139
                                        err)
58✔
1140
                                continue
58✔
1141
                        }
1142
                case <-l.quit:
202✔
1143
                        return
202✔
1144
                }
1145
        }
1146
}
1147

1148
// loadAndRemove loads all the channels forwarding packages and determines if
1149
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1150
// a longer tick interval can be used.
1151
func (l *channelLink) loadAndRemove() error {
282✔
1152
        fwdPkgs, err := l.channel.LoadFwdPkgs()
282✔
1153
        if err != nil {
340✔
1154
                return err
58✔
1155
        }
58✔
1156

1157
        var removeHeights []uint64
224✔
1158
        for _, fwdPkg := range fwdPkgs {
1,226✔
1159
                if fwdPkg.State != channeldb.FwdStateCompleted {
1,122✔
1160
                        continue
120✔
1161
                }
1162

1163
                removeHeights = append(removeHeights, fwdPkg.Height)
882✔
1164
        }
1165

1166
        // If removeHeights is empty, return early so we don't use a db
1167
        // transaction.
1168
        if len(removeHeights) == 0 {
436✔
1169
                return nil
212✔
1170
        }
212✔
1171

1172
        return l.channel.RemoveFwdPkgs(removeHeights...)
12✔
1173
}
1174

1175
// handleChanSyncErr performs the error handling logic in the case where we
1176
// could not successfully syncChanStates with our channel peer.
UNCOV
1177
func (l *channelLink) handleChanSyncErr(err error) {
×
UNCOV
1178
        l.log.Warnf("error when syncing channel states: %v", err)
×
UNCOV
1179

×
UNCOV
1180
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
×
UNCOV
1181

×
UNCOV
1182
        switch {
×
UNCOV
1183
        case errors.Is(err, ErrLinkShuttingDown):
×
UNCOV
1184
                l.log.Debugf("unable to sync channel states, link is " +
×
UNCOV
1185
                        "shutting down")
×
UNCOV
1186
                return
×
1187

1188
        // We failed syncing the commit chains, probably because the remote has
1189
        // lost state. We should force close the channel.
UNCOV
1190
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
×
UNCOV
1191
                fallthrough
×
1192

1193
        // The remote sent us an invalid last commit secret, we should force
1194
        // close the channel.
1195
        // TODO(halseth): and permanently ban the peer?
UNCOV
1196
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
×
UNCOV
1197
                fallthrough
×
1198

1199
        // The remote sent us a commit point different from what they sent us
1200
        // before.
1201
        // TODO(halseth): ban peer?
UNCOV
1202
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
×
UNCOV
1203
                // We'll fail the link and tell the peer to force close the
×
UNCOV
1204
                // channel. Note that the database state is not updated here,
×
UNCOV
1205
                // but will be updated when the close transaction is ready to
×
UNCOV
1206
                // avoid that we go down before storing the transaction in the
×
UNCOV
1207
                // db.
×
UNCOV
1208
                l.failf(
×
UNCOV
1209
                        LinkFailureError{
×
UNCOV
1210
                                code:          ErrSyncError,
×
UNCOV
1211
                                FailureAction: LinkFailureForceClose,
×
UNCOV
1212
                        },
×
UNCOV
1213
                        "unable to synchronize channel states: %v", err,
×
UNCOV
1214
                )
×
1215

1216
        // We have lost state and cannot safely force close the channel. Fail
1217
        // the channel and wait for the remote to hopefully force close it. The
1218
        // remote has sent us its latest unrevoked commitment point, and we'll
1219
        // store it in the database, such that we can attempt to recover the
1220
        // funds if the remote force closes the channel.
UNCOV
1221
        case errors.As(err, &errDataLoss):
×
UNCOV
1222
                err := l.channel.MarkDataLoss(
×
UNCOV
1223
                        errDataLoss.CommitPoint,
×
UNCOV
1224
                )
×
UNCOV
1225
                if err != nil {
×
1226
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1227
                                err)
×
1228
                }
×
1229

1230
        // We determined the commit chains were not possible to sync. We
1231
        // cautiously fail the channel, but don't force close.
1232
        // TODO(halseth): can we safely force close in any cases where this
1233
        // error is returned?
1234
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1235
                if err := l.channel.MarkBorked(); err != nil {
×
1236
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1237
                }
×
1238

1239
        // Other, unspecified error.
1240
        default:
×
1241
        }
1242

UNCOV
1243
        l.failf(
×
UNCOV
1244
                LinkFailureError{
×
UNCOV
1245
                        code:          ErrRecoveryError,
×
UNCOV
1246
                        FailureAction: LinkFailureForceNone,
×
UNCOV
1247
                },
×
UNCOV
1248
                "unable to synchronize channel states: %v", err,
×
UNCOV
1249
        )
×
1250
}
1251

1252
// htlcManager is the primary goroutine which drives a channel's commitment
1253
// update state-machine in response to messages received via several channels.
1254
// This goroutine reads messages from the upstream (remote) peer, and also from
1255
// downstream channel managed by the channel link. In the event that an htlc
1256
// needs to be forwarded, then send-only forward handler is used which sends
1257
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1258
// all timeouts for any active HTLCs, manages the channel's revocation window,
1259
// and also the htlc trickle queue+timer for this active channels.
1260
//
1261
// NOTE: This MUST be run as a goroutine.
1262
func (l *channelLink) htlcManager() {
213✔
1263
        defer func() {
418✔
1264
                l.cfg.BatchTicker.Stop()
205✔
1265
                l.wg.Done()
205✔
1266
                l.log.Infof("exited")
205✔
1267
        }()
205✔
1268

1269
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
213✔
1270

213✔
1271
        // Notify any clients that the link is now in the switch via an
213✔
1272
        // ActiveLinkEvent. We'll also defer an inactive link notification for
213✔
1273
        // when the link exits to ensure that every active notification is
213✔
1274
        // matched by an inactive one.
213✔
1275
        l.cfg.NotifyActiveLink(l.ChannelPoint())
213✔
1276
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
213✔
1277

213✔
1278
        // TODO(roasbeef): need to call wipe chan whenever D/C?
213✔
1279

213✔
1280
        // If this isn't the first time that this channel link has been
213✔
1281
        // created, then we'll need to check to see if we need to
213✔
1282
        // re-synchronize state with the remote peer. settledHtlcs is a map of
213✔
1283
        // HTLC's that we re-settled as part of the channel state sync.
213✔
1284
        if l.cfg.SyncStates {
383✔
1285
                err := l.syncChanStates()
170✔
1286
                if err != nil {
170✔
UNCOV
1287
                        l.handleChanSyncErr(err)
×
UNCOV
1288
                        return
×
UNCOV
1289
                }
×
1290
        }
1291

1292
        // If a shutdown message has previously been sent on this link, then we
1293
        // need to make sure that we have disabled any HTLC adds on the outgoing
1294
        // direction of the link and that we re-resend the same shutdown message
1295
        // that we previously sent.
1296
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
213✔
UNCOV
1297
                // Immediately disallow any new outgoing HTLCs.
×
UNCOV
1298
                if !l.DisableAdds(Outgoing) {
×
1299
                        l.log.Warnf("Outgoing link adds already disabled")
×
1300
                }
×
1301

1302
                // Re-send the shutdown message the peer. Since syncChanStates
1303
                // would have sent any outstanding CommitSig, it is fine for us
1304
                // to immediately queue the shutdown message now.
UNCOV
1305
                err := l.cfg.Peer.SendMessage(false, &shutdown)
×
UNCOV
1306
                if err != nil {
×
1307
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1308
                }
×
1309
        })
1310

1311
        // We've successfully reestablished the channel, mark it as such to
1312
        // allow the switch to forward HTLCs in the outbound direction.
1313
        l.markReestablished()
213✔
1314

213✔
1315
        // Now that we've received both channel_ready and channel reestablish,
213✔
1316
        // we can go ahead and send the active channel notification. We'll also
213✔
1317
        // defer the inactive notification for when the link exits to ensure
213✔
1318
        // that every active notification is matched by an inactive one.
213✔
1319
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
213✔
1320
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
213✔
1321

213✔
1322
        // With the channel states synced, we now reset the mailbox to ensure
213✔
1323
        // we start processing all unacked packets in order. This is done here
213✔
1324
        // to ensure that all acknowledgments that occur during channel
213✔
1325
        // resynchronization have taken affect, causing us only to pull unacked
213✔
1326
        // packets after starting to read from the downstream mailbox.
213✔
1327
        l.mailBox.ResetPackets()
213✔
1328

213✔
1329
        // After cleaning up any memory pertaining to incoming packets, we now
213✔
1330
        // replay our forwarding packages to handle any htlcs that can be
213✔
1331
        // processed locally, or need to be forwarded out to the switch. We will
213✔
1332
        // only attempt to resolve packages if our short chan id indicates that
213✔
1333
        // the channel is not pending, otherwise we should have no htlcs to
213✔
1334
        // reforward.
213✔
1335
        if l.ShortChanID() != hop.Source {
426✔
1336
                err := l.resolveFwdPkgs()
213✔
1337
                switch err {
213✔
1338
                // No error was encountered, success.
1339
                case nil:
212✔
1340

1341
                // If the duplicate keystone error was encountered, we'll fail
1342
                // without sending an Error message to the peer.
1343
                case ErrDuplicateKeystone:
×
1344
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1345
                                "temporary circuit error: %v", err)
×
1346
                        return
×
1347

1348
                // A non-nil error was encountered, send an Error message to
1349
                // the peer.
1350
                default:
1✔
1351
                        l.failf(LinkFailureError{code: ErrInternalError},
1✔
1352
                                "unable to resolve fwd pkgs: %v", err)
1✔
1353
                        return
1✔
1354
                }
1355

1356
                // With our link's in-memory state fully reconstructed, spawn a
1357
                // goroutine to manage the reclamation of disk space occupied by
1358
                // completed forwarding packages.
1359
                l.wg.Add(1)
212✔
1360
                go l.fwdPkgGarbager()
212✔
1361
        }
1362

1363
        for {
11,857✔
1364
                // We must always check if we failed at some point processing
11,645✔
1365
                // the last update before processing the next.
11,645✔
1366
                if l.failed {
11,649✔
1367
                        l.log.Errorf("link failed, exiting htlcManager")
4✔
1368
                        return
4✔
1369
                }
4✔
1370

1371
                // If the previous event resulted in a non-empty batch, resume
1372
                // the batch ticker so that it can be cleared. Otherwise pause
1373
                // the ticker to prevent waking up the htlcManager while the
1374
                // batch is empty.
1375
                numUpdates := l.channel.NumPendingUpdates(
11,641✔
1376
                        lntypes.Local, lntypes.Remote,
11,641✔
1377
                )
11,641✔
1378
                if numUpdates > 0 {
14,365✔
1379
                        l.cfg.BatchTicker.Resume()
2,724✔
1380
                        l.log.Tracef("BatchTicker resumed, "+
2,724✔
1381
                                "NumPendingUpdates(Local, Remote)=%d",
2,724✔
1382
                                numUpdates,
2,724✔
1383
                        )
2,724✔
1384
                } else {
11,641✔
1385
                        l.cfg.BatchTicker.Pause()
8,917✔
1386
                        l.log.Trace("BatchTicker paused due to zero " +
8,917✔
1387
                                "NumPendingUpdates(Local, Remote)")
8,917✔
1388
                }
8,917✔
1389

1390
                select {
11,641✔
1391
                // We have a new hook that needs to be run when we reach a clean
1392
                // channel state.
1393
                case hook := <-l.flushHooks.newTransients:
1✔
1394
                        if l.channel.IsChannelClean() {
1✔
UNCOV
1395
                                hook()
×
1396
                        } else {
1✔
1397
                                l.flushHooks.alloc(hook)
1✔
1398
                        }
1✔
1399

1400
                // We have a new hook that needs to be run when we have
1401
                // committed all of our updates.
1402
                case hook := <-l.outgoingCommitHooks.newTransients:
1✔
1403
                        if !l.channel.OweCommitment() {
1✔
UNCOV
1404
                                hook()
×
1405
                        } else {
1✔
1406
                                l.outgoingCommitHooks.alloc(hook)
1✔
1407
                        }
1✔
1408

1409
                // We have a new hook that needs to be run when our peer has
1410
                // committed all of their updates.
1411
                case hook := <-l.incomingCommitHooks.newTransients:
×
1412
                        if !l.channel.NeedCommitment() {
×
1413
                                hook()
×
1414
                        } else {
×
1415
                                l.incomingCommitHooks.alloc(hook)
×
1416
                        }
×
1417

1418
                // Our update fee timer has fired, so we'll check the network
1419
                // fee to see if we should adjust our commitment fee.
1420
                case <-l.updateFeeTimer.C:
4✔
1421
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1422

4✔
1423
                        // If we're not the initiator of the channel, don't we
4✔
1424
                        // don't control the fees, so we can ignore this.
4✔
1425
                        if !l.channel.IsInitiator() {
4✔
1426
                                continue
×
1427
                        }
1428

1429
                        // If we are the initiator, then we'll sample the
1430
                        // current fee rate to get into the chain within 3
1431
                        // blocks.
1432
                        netFee, err := l.sampleNetworkFee()
4✔
1433
                        if err != nil {
4✔
1434
                                l.log.Errorf("unable to sample network fee: %v",
×
1435
                                        err)
×
1436
                                continue
×
1437
                        }
1438

1439
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1440

4✔
1441
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1442
                                netFee, minRelayFee,
4✔
1443
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1444
                                l.cfg.MaxFeeAllocation,
4✔
1445
                        )
4✔
1446

4✔
1447
                        // We determine if we should adjust the commitment fee
4✔
1448
                        // based on the current commitment fee, the suggested
4✔
1449
                        // new commitment fee and the current minimum relay fee
4✔
1450
                        // rate.
4✔
1451
                        commitFee := l.channel.CommitFeeRate()
4✔
1452
                        if !shouldAdjustCommitFee(
4✔
1453
                                newCommitFee, commitFee, minRelayFee,
4✔
1454
                        ) {
5✔
1455

1✔
1456
                                continue
1✔
1457
                        }
1458

1459
                        // If we do, then we'll send a new UpdateFee message to
1460
                        // the remote party, to be locked in with a new update.
1461
                        if err := l.updateChannelFee(newCommitFee); err != nil {
3✔
1462
                                l.log.Errorf("unable to update fee rate: %v",
×
1463
                                        err)
×
1464
                                continue
×
1465
                        }
1466

1467
                // The underlying channel has notified us of a unilateral close
1468
                // carried out by the remote peer. In the case of such an
1469
                // event, we'll wipe the channel state from the peer, and mark
1470
                // the contract as fully settled. Afterwards we can exit.
1471
                //
1472
                // TODO(roasbeef): add force closure? also breach?
UNCOV
1473
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
×
UNCOV
1474
                        l.log.Warnf("remote peer has closed on-chain")
×
UNCOV
1475

×
UNCOV
1476
                        // TODO(roasbeef): remove all together
×
UNCOV
1477
                        go func() {
×
UNCOV
1478
                                chanPoint := l.channel.ChannelPoint()
×
UNCOV
1479
                                l.cfg.Peer.WipeChannel(&chanPoint)
×
UNCOV
1480
                        }()
×
1481

UNCOV
1482
                        return
×
1483

1484
                case <-l.cfg.BatchTicker.Ticks():
262✔
1485
                        // Attempt to extend the remote commitment chain
262✔
1486
                        // including all the currently pending entries. If the
262✔
1487
                        // send was unsuccessful, then abandon the update,
262✔
1488
                        // waiting for the revocation window to open up.
262✔
1489
                        if !l.updateCommitTxOrFail() {
262✔
1490
                                return
×
1491
                        }
×
1492

1493
                case <-l.cfg.PendingCommitTicker.Ticks():
2✔
1494
                        l.failf(
2✔
1495
                                LinkFailureError{
2✔
1496
                                        code:          ErrRemoteUnresponsive,
2✔
1497
                                        FailureAction: LinkFailureDisconnect,
2✔
1498
                                },
2✔
1499
                                "unable to complete dance",
2✔
1500
                        )
2✔
1501
                        return
2✔
1502

1503
                // A message from the switch was just received. This indicates
1504
                // that the link is an intermediate hop in a multi-hop HTLC
1505
                // circuit.
1506
                case pkt := <-l.downstream:
1,562✔
1507
                        l.handleDownstreamPkt(pkt)
1,562✔
1508

1509
                // A message from the connected peer was just received. This
1510
                // indicates that we have a new incoming HTLC, either directly
1511
                // for us, or part of a multi-hop HTLC circuit.
1512
                case msg := <-l.upstream:
7,460✔
1513
                        l.handleUpstreamMsg(msg)
7,460✔
1514

1515
                // A htlc resolution is received. This means that we now have a
1516
                // resolution for a previously accepted htlc.
1517
                case hodlItem := <-l.hodlQueue.ChanOut():
488✔
1518
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
488✔
1519
                        err := l.processHodlQueue(htlcResolution)
488✔
1520
                        switch err {
488✔
1521
                        // No error, success.
1522
                        case nil:
487✔
1523

1524
                        // If the duplicate keystone error was encountered,
1525
                        // fail back gracefully.
1526
                        case ErrDuplicateKeystone:
×
1527
                                l.failf(LinkFailureError{
×
1528
                                        code: ErrCircuitError,
×
1529
                                }, "process hodl queue: "+
×
1530
                                        "temporary circuit error: %v",
×
1531
                                        err,
×
1532
                                )
×
1533

1534
                        // Send an Error message to the peer.
1535
                        default:
1✔
1536
                                l.failf(LinkFailureError{
1✔
1537
                                        code: ErrInternalError,
1✔
1538
                                }, "process hodl queue: unable to update "+
1✔
1539
                                        "commitment: %v", err,
1✔
1540
                                )
1✔
1541
                        }
1542

1543
                case qReq := <-l.quiescenceReqs:
1✔
1544
                        l.quiescer.initStfu(qReq)
1✔
1545

1✔
1546
                        if err := l.quiescer.drive(); err != nil {
1✔
NEW
1547
                                l.stfuFailf("%s", err.Error())
×
NEW
1548
                        }
×
1549

1550
                case runQuery := <-l.stateQueries:
1,654✔
1551
                        runQuery()
1,654✔
1552

1553
                case <-l.quit:
198✔
1554
                        return
198✔
1555
                }
1556
        }
1557
}
1558

1559
// processHodlQueue processes a received htlc resolution and continues reading
1560
// from the hodl queue until no more resolutions remain. When this function
1561
// returns without an error, the commit tx should be updated.
1562
func (l *channelLink) processHodlQueue(
1563
        firstResolution invoices.HtlcResolution) error {
488✔
1564

488✔
1565
        // Try to read all waiting resolution messages, so that they can all be
488✔
1566
        // processed in a single commitment tx update.
488✔
1567
        htlcResolution := firstResolution
488✔
1568
loop:
488✔
1569
        for {
976✔
1570
                // Lookup all hodl htlcs that can be failed or settled with this event.
488✔
1571
                // The hodl htlc must be present in the map.
488✔
1572
                circuitKey := htlcResolution.CircuitKey()
488✔
1573
                hodlHtlc, ok := l.hodlMap[circuitKey]
488✔
1574
                if !ok {
488✔
1575
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1576
                }
×
1577

1578
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
488✔
1579
                        return err
×
1580
                }
×
1581

1582
                // Clean up hodl map.
1583
                delete(l.hodlMap, circuitKey)
488✔
1584

488✔
1585
                select {
488✔
UNCOV
1586
                case item := <-l.hodlQueue.ChanOut():
×
UNCOV
1587
                        htlcResolution = item.(invoices.HtlcResolution)
×
1588
                default:
488✔
1589
                        break loop
488✔
1590
                }
1591
        }
1592

1593
        // Update the commitment tx.
1594
        if err := l.updateCommitTx(); err != nil {
489✔
1595
                return err
1✔
1596
        }
1✔
1597

1598
        return nil
487✔
1599
}
1600

1601
// processHtlcResolution applies a received htlc resolution to the provided
1602
// htlc. When this function returns without an error, the commit tx should be
1603
// updated.
1604
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1605
        htlc hodlHtlc) error {
634✔
1606

634✔
1607
        circuitKey := resolution.CircuitKey()
634✔
1608

634✔
1609
        // Determine required action for the resolution based on the type of
634✔
1610
        // resolution we have received.
634✔
1611
        switch res := resolution.(type) {
634✔
1612
        // Settle htlcs that returned a settle resolution using the preimage
1613
        // in the resolution.
1614
        case *invoices.HtlcSettleResolution:
630✔
1615
                l.log.Debugf("received settle resolution for %v "+
630✔
1616
                        "with outcome: %v", circuitKey, res.Outcome)
630✔
1617

630✔
1618
                return l.settleHTLC(
630✔
1619
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
630✔
1620
                )
630✔
1621

1622
        // For htlc failures, we get the relevant failure message based
1623
        // on the failure resolution and then fail the htlc.
1624
        case *invoices.HtlcFailResolution:
4✔
1625
                l.log.Debugf("received cancel resolution for "+
4✔
1626
                        "%v with outcome: %v", circuitKey, res.Outcome)
4✔
1627

4✔
1628
                // Get the lnwire failure message based on the resolution
4✔
1629
                // result.
4✔
1630
                failure := getResolutionFailure(res, htlc.add.Amount)
4✔
1631

4✔
1632
                l.sendHTLCError(
4✔
1633
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
4✔
1634
                        true,
4✔
1635
                )
4✔
1636
                return nil
4✔
1637

1638
        // Fail if we do not get a settle of fail resolution, since we
1639
        // are only expecting to handle settles and fails.
1640
        default:
×
1641
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1642
                        resolution)
×
1643
        }
1644
}
1645

1646
// getResolutionFailure returns the wire message that a htlc resolution should
1647
// be failed with.
1648
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1649
        amount lnwire.MilliSatoshi) *LinkError {
4✔
1650

4✔
1651
        // If the resolution has been resolved as part of a MPP timeout,
4✔
1652
        // we need to fail the htlc with lnwire.FailMppTimeout.
4✔
1653
        if resolution.Outcome == invoices.ResultMppTimeout {
4✔
1654
                return NewDetailedLinkError(
×
1655
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1656
                )
×
1657
        }
×
1658

1659
        // If the htlc is not a MPP timeout, we fail it with
1660
        // FailIncorrectDetails. This error is sent for invoice payment
1661
        // failures such as underpayment/ expiry too soon and hodl invoices
1662
        // (which return FailIncorrectDetails to avoid leaking information).
1663
        incorrectDetails := lnwire.NewFailIncorrectDetails(
4✔
1664
                amount, uint32(resolution.AcceptHeight),
4✔
1665
        )
4✔
1666

4✔
1667
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
4✔
1668
}
1669

1670
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1671
// within the link's configuration that will be used to determine when the link
1672
// should propose an update to its commitment fee rate.
1673
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
217✔
1674
        lower := int64(l.cfg.MinUpdateTimeout)
217✔
1675
        upper := int64(l.cfg.MaxUpdateTimeout)
217✔
1676
        return time.Duration(prand.Int63n(upper-lower) + lower)
217✔
1677
}
217✔
1678

1679
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1680
// downstream HTLC Switch.
1681
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
1,521✔
1682
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
1,521✔
1683
        if !ok {
1,521✔
1684
                return errors.New("not an UpdateAddHTLC packet")
×
1685
        }
×
1686

1687
        // If we are flushing the link in the outgoing direction or we have
1688
        // already sent Stfu, then we can't add new htlcs to the link and we
1689
        // need to bounce it.
1690
        if l.IsFlushing(Outgoing) || !l.quiescer.canSendUpdates() {
1,521✔
1691
                l.mailBox.FailAdd(pkt)
×
1692

×
1693
                return NewDetailedLinkError(
×
NEW
1694
                        &lnwire.FailTemporaryChannelFailure{},
×
1695
                        OutgoingFailureLinkNotEligible,
×
1696
                )
×
1697
        }
×
1698

1699
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1700
        // arbitrary delays between the switch adding an ADD to the
1701
        // mailbox, and the HTLC being added to the commitment state.
1702
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
1,521✔
1703
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1704
                l.mailBox.AckPacket(pkt.inKey())
×
1705
                return nil
×
1706
        }
×
1707

1708
        // Check if we can add the HTLC here without exceededing the max fee
1709
        // exposure threshold.
1710
        if l.isOverexposedWithHtlc(htlc, false) {
1,525✔
1711
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1712
                        "exposure exceeded")
4✔
1713

4✔
1714
                l.mailBox.FailAdd(pkt)
4✔
1715

4✔
1716
                return NewDetailedLinkError(
4✔
1717
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1718
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1719
                )
4✔
1720
        }
4✔
1721

1722
        // A new payment has been initiated via the downstream channel,
1723
        // so we add the new HTLC to our local log, then update the
1724
        // commitment chains.
1725
        htlc.ChanID = l.ChanID()
1,517✔
1726
        openCircuitRef := pkt.inKey()
1,517✔
1727

1,517✔
1728
        // We enforce the fee buffer for the commitment transaction because
1,517✔
1729
        // we are in control of adding this htlc. Nothing has locked-in yet so
1,517✔
1730
        // we can securely enforce the fee buffer which is only relevant if we
1,517✔
1731
        // are the initiator of the channel.
1,517✔
1732
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
1,517✔
1733
        if err != nil {
1,518✔
1734
                // The HTLC was unable to be added to the state machine,
1✔
1735
                // as a result, we'll signal the switch to cancel the
1✔
1736
                // pending payment.
1✔
1737
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
1✔
1738
                        err)
1✔
1739

1✔
1740
                // Remove this packet from the link's mailbox, this
1✔
1741
                // prevents it from being reprocessed if the link
1✔
1742
                // restarts and resets it mailbox. If this response
1✔
1743
                // doesn't make it back to the originating link, it will
1✔
1744
                // be rejected upon attempting to reforward the Add to
1✔
1745
                // the switch, since the circuit was never fully opened,
1✔
1746
                // and the forwarding package shows it as
1✔
1747
                // unacknowledged.
1✔
1748
                l.mailBox.FailAdd(pkt)
1✔
1749

1✔
1750
                return NewDetailedLinkError(
1✔
1751
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
1752
                        OutgoingFailureDownstreamHtlcAdd,
1✔
1753
                )
1✔
1754
        }
1✔
1755

1756
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
1,516✔
1757
                "local_log_index=%v, pend_updates=%v",
1,516✔
1758
                htlc.PaymentHash[:], index,
1,516✔
1759
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
1,516✔
1760

1,516✔
1761
        pkt.outgoingChanID = l.ShortChanID()
1,516✔
1762
        pkt.outgoingHTLCID = index
1,516✔
1763
        htlc.ID = index
1,516✔
1764

1,516✔
1765
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
1,516✔
1766
                pkt.inKey(), pkt.outKey())
1,516✔
1767

1,516✔
1768
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
1,516✔
1769
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
1,516✔
1770

1,516✔
1771
        _ = l.cfg.Peer.SendMessage(false, htlc)
1,516✔
1772

1,516✔
1773
        // Send a forward event notification to htlcNotifier.
1,516✔
1774
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
1,516✔
1775
                newHtlcKey(pkt),
1,516✔
1776
                HtlcInfo{
1,516✔
1777
                        IncomingTimeLock: pkt.incomingTimeout,
1,516✔
1778
                        IncomingAmt:      pkt.incomingAmount,
1,516✔
1779
                        OutgoingTimeLock: htlc.Expiry,
1,516✔
1780
                        OutgoingAmt:      htlc.Amount,
1,516✔
1781
                },
1,516✔
1782
                getEventType(pkt),
1,516✔
1783
        )
1,516✔
1784

1,516✔
1785
        l.tryBatchUpdateCommitTx()
1,516✔
1786

1,516✔
1787
        return nil
1,516✔
1788
}
1789

1790
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1791
// Switch. Possible messages sent by the switch include requests to forward new
1792
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1793
// cleared HTLCs with the upstream peer.
1794
//
1795
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1796
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
1,562✔
1797
        if pkt.htlc.MsgType().IsChannelUpdate() &&
1,562✔
1798
                !l.quiescer.canRecvUpdates() {
1,562✔
NEW
1799

×
NEW
1800
                l.log.Warnf("unable to process channel update. "+
×
NEW
1801
                        "ChannelID=%v is quiescent.", l.ChanID)
×
NEW
1802

×
NEW
1803
                return
×
NEW
1804
        }
×
1805

1806
        switch htlc := pkt.htlc.(type) {
1,562✔
1807
        case *lnwire.UpdateAddHTLC:
1,521✔
1808
                // Handle add message. The returned error can be ignored,
1,521✔
1809
                // because it is also sent through the mailbox.
1,521✔
1810
                _ = l.handleDownstreamUpdateAdd(pkt)
1,521✔
1811

1812
        case *lnwire.UpdateFulfillHTLC:
23✔
1813
                // If hodl.SettleOutgoing mode is active, we exit early to
23✔
1814
                // simulate arbitrary delays between the switch adding the
23✔
1815
                // SETTLE to the mailbox, and the HTLC being added to the
23✔
1816
                // commitment state.
23✔
1817
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
23✔
1818
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1819
                        l.mailBox.AckPacket(pkt.inKey())
×
1820
                        return
×
1821
                }
×
1822

1823
                // An HTLC we forward to the switch has just settled somewhere
1824
                // upstream. Therefore we settle the HTLC within the our local
1825
                // state machine.
1826
                inKey := pkt.inKey()
23✔
1827
                err := l.channel.SettleHTLC(
23✔
1828
                        htlc.PaymentPreimage,
23✔
1829
                        pkt.incomingHTLCID,
23✔
1830
                        pkt.sourceRef,
23✔
1831
                        pkt.destRef,
23✔
1832
                        &inKey,
23✔
1833
                )
23✔
1834
                if err != nil {
23✔
1835
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1836
                                "circuit-key=%v: %v", inKey, err)
×
1837

×
1838
                        // If the HTLC index for Settle response was not known
×
1839
                        // to our commitment state, it has already been
×
1840
                        // cleaned up by a prior response. We'll thus try to
×
1841
                        // clean up any lingering state to ensure we don't
×
1842
                        // continue reforwarding.
×
1843
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1844
                                l.cleanupSpuriousResponse(pkt)
×
1845
                        }
×
1846

1847
                        // Remove the packet from the link's mailbox to ensure
1848
                        // it doesn't get replayed after a reconnection.
1849
                        l.mailBox.AckPacket(inKey)
×
1850

×
1851
                        return
×
1852
                }
1853

1854
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
23✔
1855
                        "%s->%s", pkt.inKey(), pkt.outKey())
23✔
1856

23✔
1857
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
23✔
1858

23✔
1859
                // With the HTLC settled, we'll need to populate the wire
23✔
1860
                // message to target the specific channel and HTLC to be
23✔
1861
                // canceled.
23✔
1862
                htlc.ChanID = l.ChanID()
23✔
1863
                htlc.ID = pkt.incomingHTLCID
23✔
1864

23✔
1865
                // Then we send the HTLC settle message to the connected peer
23✔
1866
                // so we can continue the propagation of the settle message.
23✔
1867
                l.cfg.Peer.SendMessage(false, htlc)
23✔
1868

23✔
1869
                // Send a settle event notification to htlcNotifier.
23✔
1870
                l.cfg.HtlcNotifier.NotifySettleEvent(
23✔
1871
                        newHtlcKey(pkt),
23✔
1872
                        htlc.PaymentPreimage,
23✔
1873
                        getEventType(pkt),
23✔
1874
                )
23✔
1875

23✔
1876
                // Immediately update the commitment tx to minimize latency.
23✔
1877
                l.updateCommitTxOrFail()
23✔
1878

1879
        case *lnwire.UpdateFailHTLC:
18✔
1880
                // If hodl.FailOutgoing mode is active, we exit early to
18✔
1881
                // simulate arbitrary delays between the switch adding a FAIL to
18✔
1882
                // the mailbox, and the HTLC being added to the commitment
18✔
1883
                // state.
18✔
1884
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
18✔
1885
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1886
                        l.mailBox.AckPacket(pkt.inKey())
×
1887
                        return
×
1888
                }
×
1889

1890
                // An HTLC cancellation has been triggered somewhere upstream,
1891
                // we'll remove then HTLC from our local state machine.
1892
                inKey := pkt.inKey()
18✔
1893
                err := l.channel.FailHTLC(
18✔
1894
                        pkt.incomingHTLCID,
18✔
1895
                        htlc.Reason,
18✔
1896
                        pkt.sourceRef,
18✔
1897
                        pkt.destRef,
18✔
1898
                        &inKey,
18✔
1899
                )
18✔
1900
                if err != nil {
20✔
1901
                        l.log.Errorf("unable to cancel incoming HTLC for "+
2✔
1902
                                "circuit-key=%v: %v", inKey, err)
2✔
1903

2✔
1904
                        // If the HTLC index for Fail response was not known to
2✔
1905
                        // our commitment state, it has already been cleaned up
2✔
1906
                        // by a prior response. We'll thus try to clean up any
2✔
1907
                        // lingering state to ensure we don't continue
2✔
1908
                        // reforwarding.
2✔
1909
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
4✔
1910
                                l.cleanupSpuriousResponse(pkt)
2✔
1911
                        }
2✔
1912

1913
                        // Remove the packet from the link's mailbox to ensure
1914
                        // it doesn't get replayed after a reconnection.
1915
                        l.mailBox.AckPacket(inKey)
2✔
1916

2✔
1917
                        return
2✔
1918
                }
1919

1920
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
16✔
1921
                        pkt.inKey(), pkt.outKey())
16✔
1922

16✔
1923
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
16✔
1924

16✔
1925
                // With the HTLC removed, we'll need to populate the wire
16✔
1926
                // message to target the specific channel and HTLC to be
16✔
1927
                // canceled. The "Reason" field will have already been set
16✔
1928
                // within the switch.
16✔
1929
                htlc.ChanID = l.ChanID()
16✔
1930
                htlc.ID = pkt.incomingHTLCID
16✔
1931

16✔
1932
                // We send the HTLC message to the peer which initially created
16✔
1933
                // the HTLC. If the incoming blinding point is non-nil, we
16✔
1934
                // know that we are a relaying node in a blinded path.
16✔
1935
                // Otherwise, we're either an introduction node or not part of
16✔
1936
                // a blinded path at all.
16✔
1937
                if err := l.sendIncomingHTLCFailureMsg(
16✔
1938
                        htlc.ID,
16✔
1939
                        pkt.obfuscator,
16✔
1940
                        htlc.Reason,
16✔
1941
                ); err != nil {
16✔
1942
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1943
                                err)
×
1944

×
1945
                        return
×
1946
                }
×
1947

1948
                // If the packet does not have a link failure set, it failed
1949
                // further down the route so we notify a forwarding failure.
1950
                // Otherwise, we notify a link failure because it failed at our
1951
                // node.
1952
                if pkt.linkFailure != nil {
26✔
1953
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
10✔
1954
                                newHtlcKey(pkt),
10✔
1955
                                newHtlcInfo(pkt),
10✔
1956
                                getEventType(pkt),
10✔
1957
                                pkt.linkFailure,
10✔
1958
                                false,
10✔
1959
                        )
10✔
1960
                } else {
16✔
1961
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
6✔
1962
                                newHtlcKey(pkt), getEventType(pkt),
6✔
1963
                        )
6✔
1964
                }
6✔
1965

1966
                // Immediately update the commitment tx to minimize latency.
1967
                l.updateCommitTxOrFail()
16✔
1968
        }
1969
}
1970

1971
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1972
// full.
1973
func (l *channelLink) tryBatchUpdateCommitTx() {
1,516✔
1974
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
1,516✔
1975
        if pending < uint64(l.cfg.BatchSize) {
2,617✔
1976
                return
1,101✔
1977
        }
1,101✔
1978

1979
        l.updateCommitTxOrFail()
415✔
1980
}
1981

1982
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1983
// associated with this packet. If successful in doing so, it will also purge
1984
// the open circuit from the circuit map and remove the packet from the link's
1985
// mailbox.
1986
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1987
        inKey := pkt.inKey()
2✔
1988

2✔
1989
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1990
                "circuit-key=%v", inKey)
2✔
1991

2✔
1992
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1993
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1994
        if pkt.sourceRef == nil {
3✔
1995
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1996
                        "circuit-key=%v, does not contain source reference",
1✔
1997
                        inKey)
1✔
1998
                return
1✔
1999
        }
1✔
2000

2001
        // If the source reference is present,  we will try to prevent this link
2002
        // from resending the packet to the switch. To do so, we ack the AddRef
2003
        // of the incoming HTLC belonging to this link.
2004
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
2005
        if err != nil {
1✔
2006
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2007
                        "circuit-key=%v: %v", inKey, err)
×
2008

×
2009
                // If this operation failed, it is unsafe to attempt removal of
×
2010
                // the destination reference or circuit, so we exit early. The
×
2011
                // cleanup may proceed with a different packet in the future
×
2012
                // that succeeds on this step.
×
2013
                return
×
2014
        }
×
2015

2016
        // Now that we know this link will stop retransmitting Adds to the
2017
        // switch, we can begin to teardown the response reference and circuit
2018
        // map.
2019
        //
2020
        // If the packet includes a destination reference, then a response for
2021
        // this HTLC was locked into the outgoing channel. Attempt to remove
2022
        // this reference, so we stop retransmitting the response internally.
2023
        // Even if this fails, we will proceed in trying to delete the circuit.
2024
        // When retransmitting responses, the destination references will be
2025
        // cleaned up if an open circuit is not found in the circuit map.
2026
        if pkt.destRef != nil {
1✔
2027
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2028
                if err != nil {
×
2029
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2030
                                "for incoming circuit-key=%v: %v",
×
2031
                                inKey, err)
×
2032
                }
×
2033
        }
2034

2035
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
2036

1✔
2037
        // With all known references acked, we can now safely delete the circuit
1✔
2038
        // from the switch's circuit map, as the state is no longer needed.
1✔
2039
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
2040
        if err != nil {
1✔
2041
                l.log.Errorf("unable to delete circuit for "+
×
2042
                        "circuit-key=%v: %v", inKey, err)
×
2043
        }
×
2044
}
2045

2046
// handleUpstreamMsg processes wire messages related to commitment state
2047
// updates from the upstream peer. The upstream peer is the peer whom we have a
2048
// direct channel with, updating our respective commitment chains.
2049
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
7,460✔
2050
        // First check if the message is an update and we are capable of
7,460✔
2051
        // receiving updates right now.
7,460✔
2052
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.canRecvUpdates() {
7,460✔
NEW
2053
                l.stfuFailf("update received after stfu: %T", msg)
×
NEW
2054
                return
×
NEW
2055
        }
×
2056

2057
        switch msg := msg.(type) {
7,460✔
2058
        case *lnwire.UpdateAddHTLC:
1,491✔
2059
                if l.IsFlushing(Incoming) {
1,491✔
2060
                        // This is forbidden by the protocol specification.
×
2061
                        // The best chance we have to deal with this is to drop
×
2062
                        // the connection. This should roll back the channel
×
2063
                        // state to the last CommitSig. If the remote has
×
2064
                        // already sent a CommitSig we haven't received yet,
×
2065
                        // channel state will be re-synchronized with a
×
2066
                        // ChannelReestablish message upon reconnection and the
×
2067
                        // protocol state that caused us to flush the link will
×
2068
                        // be rolled back. In the event that there was some
×
2069
                        // non-deterministic behavior in the remote that caused
×
2070
                        // them to violate the protocol, we have a decent shot
×
2071
                        // at correcting it this way, since reconnecting will
×
2072
                        // put us in the cleanest possible state to try again.
×
2073
                        //
×
2074
                        // In addition to the above, it is possible for us to
×
2075
                        // hit this case in situations where we improperly
×
2076
                        // handle message ordering due to concurrency choices.
×
2077
                        // An issue has been filed to address this here:
×
2078
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2079
                        l.failf(
×
2080
                                LinkFailureError{
×
2081
                                        code:             ErrInvalidUpdate,
×
2082
                                        FailureAction:    LinkFailureDisconnect,
×
2083
                                        PermanentFailure: false,
×
2084
                                        Warning:          true,
×
2085
                                },
×
2086
                                "received add while link is flushing",
×
2087
                        )
×
2088

×
2089
                        return
×
2090
                }
×
2091

2092
                // Disallow htlcs with blinding points set if we haven't
2093
                // enabled the feature. This saves us from having to process
2094
                // the onion at all, but will only catch blinded payments
2095
                // where we are a relaying node (as the blinding point will
2096
                // be in the payload when we're the introduction node).
2097
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
1,491✔
2098
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2099
                                "blinding point included when route blinding "+
×
2100
                                        "is disabled")
×
2101

×
2102
                        return
×
2103
                }
×
2104

2105
                // We have to check the limit here rather than later in the
2106
                // switch because the counterparty can keep sending HTLC's
2107
                // without sending a revoke. This would mean that the switch
2108
                // check would only occur later.
2109
                if l.isOverexposedWithHtlc(msg, true) {
1,491✔
2110
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2111
                                "peer sent us an HTLC that exceeded our max "+
×
2112
                                        "fee exposure")
×
2113

×
2114
                        return
×
2115
                }
×
2116

2117
                // We just received an add request from an upstream peer, so we
2118
                // add it to our state machine, then add the HTLC to our
2119
                // "settle" list in the event that we know the preimage.
2120
                index, err := l.channel.ReceiveHTLC(msg)
1,491✔
2121
                if err != nil {
1,491✔
2122
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2123
                                "unable to handle upstream add HTLC: %v", err)
×
2124
                        return
×
2125
                }
×
2126

2127
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
1,491✔
2128
                        "assigning index: %v", msg.PaymentHash[:], index)
1,491✔
2129

2130
        case *lnwire.UpdateFulfillHTLC:
660✔
2131
                pre := msg.PaymentPreimage
660✔
2132
                idx := msg.ID
660✔
2133

660✔
2134
                // Before we pipeline the settle, we'll check the set of active
660✔
2135
                // htlc's to see if the related UpdateAddHTLC has been fully
660✔
2136
                // locked-in.
660✔
2137
                var lockedin bool
660✔
2138
                htlcs := l.channel.ActiveHtlcs()
660✔
2139
                for _, add := range htlcs {
22,196✔
2140
                        // The HTLC will be outgoing and match idx.
21,536✔
2141
                        if !add.Incoming && add.HtlcIndex == idx {
22,194✔
2142
                                lockedin = true
658✔
2143
                                break
658✔
2144
                        }
2145
                }
2146

2147
                if !lockedin {
662✔
2148
                        l.failf(
2✔
2149
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2150
                                "unable to handle upstream settle",
2✔
2151
                        )
2✔
2152
                        return
2✔
2153
                }
2✔
2154

2155
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
658✔
UNCOV
2156
                        l.failf(
×
UNCOV
2157
                                LinkFailureError{
×
UNCOV
2158
                                        code:          ErrInvalidUpdate,
×
UNCOV
2159
                                        FailureAction: LinkFailureForceClose,
×
UNCOV
2160
                                },
×
UNCOV
2161
                                "unable to handle upstream settle HTLC: %v", err,
×
UNCOV
2162
                        )
×
UNCOV
2163
                        return
×
UNCOV
2164
                }
×
2165

2166
                settlePacket := &htlcPacket{
658✔
2167
                        outgoingChanID: l.ShortChanID(),
658✔
2168
                        outgoingHTLCID: idx,
658✔
2169
                        htlc: &lnwire.UpdateFulfillHTLC{
658✔
2170
                                PaymentPreimage: pre,
658✔
2171
                        },
658✔
2172
                }
658✔
2173

658✔
2174
                // Add the newly discovered preimage to our growing list of
658✔
2175
                // uncommitted preimage. These will be written to the witness
658✔
2176
                // cache just before accepting the next commitment signature
658✔
2177
                // from the remote peer.
658✔
2178
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
658✔
2179

658✔
2180
                // Pipeline this settle, send it to the switch.
658✔
2181
                go l.forwardBatch(false, settlePacket)
658✔
2182

2183
        case *lnwire.UpdateFailMalformedHTLC:
3✔
2184
                // Convert the failure type encoded within the HTLC fail
3✔
2185
                // message to the proper generic lnwire error code.
3✔
2186
                var failure lnwire.FailureMessage
3✔
2187
                switch msg.FailureCode {
3✔
2188
                case lnwire.CodeInvalidOnionVersion:
1✔
2189
                        failure = &lnwire.FailInvalidOnionVersion{
1✔
2190
                                OnionSHA256: msg.ShaOnionBlob,
1✔
2191
                        }
1✔
2192
                case lnwire.CodeInvalidOnionHmac:
×
2193
                        failure = &lnwire.FailInvalidOnionHmac{
×
2194
                                OnionSHA256: msg.ShaOnionBlob,
×
2195
                        }
×
2196

2197
                case lnwire.CodeInvalidOnionKey:
×
2198
                        failure = &lnwire.FailInvalidOnionKey{
×
2199
                                OnionSHA256: msg.ShaOnionBlob,
×
2200
                        }
×
2201

2202
                // Handle malformed errors that are part of a blinded route.
2203
                // This case is slightly different, because we expect every
2204
                // relaying node in the blinded portion of the route to send
2205
                // malformed errors. If we're also a relaying node, we're
2206
                // likely going to switch this error out anyway for our own
2207
                // malformed error, but we handle the case here for
2208
                // completeness.
UNCOV
2209
                case lnwire.CodeInvalidBlinding:
×
UNCOV
2210
                        failure = &lnwire.FailInvalidBlinding{
×
UNCOV
2211
                                OnionSHA256: msg.ShaOnionBlob,
×
UNCOV
2212
                        }
×
2213

2214
                default:
2✔
2215
                        l.log.Warnf("unexpected failure code received in "+
2✔
2216
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2217

2✔
2218
                        // We don't just pass back the error we received from
2✔
2219
                        // our successor. Otherwise we might report a failure
2✔
2220
                        // that penalizes us more than needed. If the onion that
2✔
2221
                        // we forwarded was correct, the node should have been
2✔
2222
                        // able to send back its own failure. The node did not
2✔
2223
                        // send back its own failure, so we assume there was a
2✔
2224
                        // problem with the onion and report that back. We reuse
2✔
2225
                        // the invalid onion key failure because there is no
2✔
2226
                        // specific error for this case.
2✔
2227
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2228
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2229
                        }
2✔
2230
                }
2231

2232
                // With the error parsed, we'll convert the into it's opaque
2233
                // form.
2234
                var b bytes.Buffer
3✔
2235
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
3✔
2236
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2237
                        return
×
2238
                }
×
2239

2240
                // If remote side have been unable to parse the onion blob we
2241
                // have sent to it, than we should transform the malformed HTLC
2242
                // message to the usual HTLC fail message.
2243
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
3✔
2244
                if err != nil {
3✔
2245
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2246
                                "unable to handle upstream fail HTLC: %v", err)
×
2247
                        return
×
2248
                }
×
2249

2250
        case *lnwire.UpdateFailHTLC:
120✔
2251
                // Verify that the failure reason is at least 256 bytes plus
120✔
2252
                // overhead.
120✔
2253
                const minimumFailReasonLength = lnwire.FailureMessageLength +
120✔
2254
                        2 + 2 + 32
120✔
2255

120✔
2256
                if len(msg.Reason) < minimumFailReasonLength {
121✔
2257
                        // We've received a reason with a non-compliant length.
1✔
2258
                        // Older nodes happily relay back these failures that
1✔
2259
                        // may originate from a node further downstream.
1✔
2260
                        // Therefore we can't just fail the channel.
1✔
2261
                        //
1✔
2262
                        // We want to be compliant ourselves, so we also can't
1✔
2263
                        // pass back the reason unmodified. And we must make
1✔
2264
                        // sure that we don't hit the magic length check of 260
1✔
2265
                        // bytes in processRemoteSettleFails either.
1✔
2266
                        //
1✔
2267
                        // Because the reason is unreadable for the payer
1✔
2268
                        // anyway, we just replace it by a compliant-length
1✔
2269
                        // series of random bytes.
1✔
2270
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2271
                        _, err := crand.Read(msg.Reason[:])
1✔
2272
                        if err != nil {
1✔
2273
                                l.log.Errorf("Random generation error: %v", err)
×
2274

×
2275
                                return
×
2276
                        }
×
2277
                }
2278

2279
                // Add fail to the update log.
2280
                idx := msg.ID
120✔
2281
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
120✔
2282
                if err != nil {
120✔
2283
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2284
                                "unable to handle upstream fail HTLC: %v", err)
×
2285
                        return
×
2286
                }
×
2287

2288
        case *lnwire.CommitSig:
2,599✔
2289
                // Since we may have learned new preimages for the first time,
2,599✔
2290
                // we'll add them to our preimage cache. By doing this, we
2,599✔
2291
                // ensure any contested contracts watched by any on-chain
2,599✔
2292
                // arbitrators can now sweep this HTLC on-chain. We delay
2,599✔
2293
                // committing the preimages until just before accepting the new
2,599✔
2294
                // remote commitment, as afterwards the peer won't resend the
2,599✔
2295
                // Settle messages on the next channel reestablishment. Doing so
2,599✔
2296
                // allows us to more effectively batch this operation, instead
2,599✔
2297
                // of doing a single write per preimage.
2,599✔
2298
                err := l.cfg.PreimageCache.AddPreimages(
2,599✔
2299
                        l.uncommittedPreimages...,
2,599✔
2300
                )
2,599✔
2301
                if err != nil {
2,599✔
2302
                        l.failf(
×
2303
                                LinkFailureError{code: ErrInternalError},
×
2304
                                "unable to add preimages=%v to cache: %v",
×
2305
                                l.uncommittedPreimages, err,
×
2306
                        )
×
2307
                        return
×
2308
                }
×
2309

2310
                // Instead of truncating the slice to conserve memory
2311
                // allocations, we simply set the uncommitted preimage slice to
2312
                // nil so that a new one will be initialized if any more
2313
                // witnesses are discovered. We do this because the maximum size
2314
                // that the slice can occupy is 15KB, and we want to ensure we
2315
                // release that memory back to the runtime.
2316
                l.uncommittedPreimages = nil
2,599✔
2317

2,599✔
2318
                // We just received a new updates to our local commitment
2,599✔
2319
                // chain, validate this new commitment, closing the link if
2,599✔
2320
                // invalid.
2,599✔
2321
                auxSigBlob, err := msg.CustomRecords.Serialize()
2,599✔
2322
                if err != nil {
2,599✔
2323
                        l.failf(
×
2324
                                LinkFailureError{code: ErrInvalidCommitment},
×
2325
                                "unable to serialize custom records: %v", err,
×
2326
                        )
×
2327

×
2328
                        return
×
2329
                }
×
2330
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
2,599✔
2331
                        CommitSig:  msg.CommitSig,
2,599✔
2332
                        HtlcSigs:   msg.HtlcSigs,
2,599✔
2333
                        PartialSig: msg.PartialSig,
2,599✔
2334
                        AuxSigBlob: auxSigBlob,
2,599✔
2335
                })
2,599✔
2336
                if err != nil {
2,599✔
2337
                        // If we were unable to reconstruct their proposed
×
2338
                        // commitment, then we'll examine the type of error. If
×
2339
                        // it's an InvalidCommitSigError, then we'll send a
×
2340
                        // direct error.
×
2341
                        var sendData []byte
×
2342
                        switch err.(type) {
×
2343
                        case *lnwallet.InvalidCommitSigError:
×
2344
                                sendData = []byte(err.Error())
×
2345
                        case *lnwallet.InvalidHtlcSigError:
×
2346
                                sendData = []byte(err.Error())
×
2347
                        }
2348
                        l.failf(
×
2349
                                LinkFailureError{
×
2350
                                        code:          ErrInvalidCommitment,
×
2351
                                        FailureAction: LinkFailureForceClose,
×
2352
                                        SendData:      sendData,
×
2353
                                },
×
2354
                                "ChannelPoint(%v): unable to accept new "+
×
2355
                                        "commitment: %v",
×
2356
                                l.channel.ChannelPoint(), err,
×
2357
                        )
×
2358
                        return
×
2359
                }
2360

2361
                // As we've just accepted a new state, we'll now
2362
                // immediately send the remote peer a revocation for our prior
2363
                // state.
2364
                nextRevocation, currentHtlcs, finalHTLCs, err :=
2,599✔
2365
                        l.channel.RevokeCurrentCommitment()
2,599✔
2366
                if err != nil {
2,599✔
2367
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2368

×
2369
                        // We need to fail the channel in case revoking our
×
2370
                        // local commitment does not succeed. We might have
×
2371
                        // already advanced our channel state which would lead
×
2372
                        // us to proceed with an unclean state.
×
2373
                        //
×
2374
                        // NOTE: We do not trigger a force close because this
×
2375
                        // could resolve itself in case our db was just busy
×
2376
                        // not accepting new transactions.
×
2377
                        l.failf(
×
2378
                                LinkFailureError{
×
2379
                                        code:          ErrInternalError,
×
2380
                                        Warning:       true,
×
2381
                                        FailureAction: LinkFailureDisconnect,
×
2382
                                },
×
2383
                                "ChannelPoint(%v): unable to accept new "+
×
2384
                                        "commitment: %v",
×
2385
                                l.channel.ChannelPoint(), err,
×
2386
                        )
×
2387
                        return
×
2388
                }
×
2389

2390
                // As soon as we are ready to send our next revocation, we can
2391
                // invoke the incoming commit hooks.
2392
                l.RWMutex.Lock()
2,599✔
2393
                l.incomingCommitHooks.invoke()
2,599✔
2394
                l.RWMutex.Unlock()
2,599✔
2395

2,599✔
2396
                l.cfg.Peer.SendMessage(false, nextRevocation)
2,599✔
2397

2,599✔
2398
                // Notify the incoming htlcs of which the resolutions were
2,599✔
2399
                // locked in.
2,599✔
2400
                for id, settled := range finalHTLCs {
3,367✔
2401
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
768✔
2402
                                models.CircuitKey{
768✔
2403
                                        ChanID: l.ShortChanID(),
768✔
2404
                                        HtlcID: id,
768✔
2405
                                },
768✔
2406
                                channeldb.FinalHtlcInfo{
768✔
2407
                                        Settled:  settled,
768✔
2408
                                        Offchain: true,
768✔
2409
                                },
768✔
2410
                        )
768✔
2411
                }
768✔
2412

2413
                // Since we just revoked our commitment, we may have a new set
2414
                // of HTLC's on our commitment, so we'll send them using our
2415
                // function closure NotifyContractUpdate.
2416
                newUpdate := &contractcourt.ContractUpdate{
2,599✔
2417
                        HtlcKey: contractcourt.LocalHtlcSet,
2,599✔
2418
                        Htlcs:   currentHtlcs,
2,599✔
2419
                }
2,599✔
2420
                err = l.cfg.NotifyContractUpdate(newUpdate)
2,599✔
2421
                if err != nil {
2,599✔
2422
                        l.log.Errorf("unable to notify contract update: %v",
×
2423
                                err)
×
2424
                        return
×
2425
                }
×
2426

2427
                select {
2,599✔
2428
                case <-l.quit:
9✔
2429
                        return
9✔
2430
                default:
2,590✔
2431
                }
2432

2433
                // If the remote party initiated the state transition,
2434
                // we'll reply with a signature to provide them with their
2435
                // version of the latest commitment. Otherwise, both commitment
2436
                // chains are fully synced from our PoV, then we don't need to
2437
                // reply with a signature as both sides already have a
2438
                // commitment with the latest accepted.
2439
                if l.channel.OweCommitment() {
4,083✔
2440
                        if !l.updateCommitTxOrFail() {
1,493✔
2441
                                return
×
2442
                        }
×
2443
                }
2444

2445
                // If we need to send out an Stfu, this would be the time to do
2446
                // so.
2447
                err = l.quiescer.drive()
2,590✔
2448
                if err != nil {
2,590✔
NEW
2449
                        l.stfuFailf("%s", err.Error())
×
NEW
2450
                }
×
2451

2452
                // Now that we have finished processing the incoming CommitSig
2453
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2454
                // the channel state is clean.
2455
                l.RWMutex.Lock()
2,590✔
2456
                if l.channel.IsChannelClean() {
2,815✔
2457
                        l.flushHooks.invoke()
225✔
2458
                }
225✔
2459
                l.RWMutex.Unlock()
2,590✔
2460

2461
        case *lnwire.RevokeAndAck:
2,581✔
2462
                // We've received a revocation from the remote chain, if valid,
2,581✔
2463
                // this moves the remote chain forward, and expands our
2,581✔
2464
                // revocation window.
2,581✔
2465

2,581✔
2466
                // We now process the message and advance our remote commit
2,581✔
2467
                // chain.
2,581✔
2468
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
2,581✔
2469
                if err != nil {
2,581✔
2470
                        // TODO(halseth): force close?
×
2471
                        l.failf(
×
2472
                                LinkFailureError{
×
2473
                                        code:          ErrInvalidRevocation,
×
2474
                                        FailureAction: LinkFailureDisconnect,
×
2475
                                },
×
2476
                                "unable to accept revocation: %v", err,
×
2477
                        )
×
2478
                        return
×
2479
                }
×
2480

2481
                // The remote party now has a new primary commitment, so we'll
2482
                // update the contract court to be aware of this new set (the
2483
                // prior old remote pending).
2484
                newUpdate := &contractcourt.ContractUpdate{
2,581✔
2485
                        HtlcKey: contractcourt.RemoteHtlcSet,
2,581✔
2486
                        Htlcs:   remoteHTLCs,
2,581✔
2487
                }
2,581✔
2488
                err = l.cfg.NotifyContractUpdate(newUpdate)
2,581✔
2489
                if err != nil {
2,581✔
2490
                        l.log.Errorf("unable to notify contract update: %v",
×
2491
                                err)
×
2492
                        return
×
2493
                }
×
2494

2495
                select {
2,581✔
2496
                case <-l.quit:
3✔
2497
                        return
3✔
2498
                default:
2,578✔
2499
                }
2500

2501
                // If we have a tower client for this channel type, we'll
2502
                // create a backup for the current state.
2503
                if l.cfg.TowerClient != nil {
2,578✔
UNCOV
2504
                        state := l.channel.State()
×
UNCOV
2505
                        chanID := l.ChanID()
×
UNCOV
2506

×
UNCOV
2507
                        err = l.cfg.TowerClient.BackupState(
×
UNCOV
2508
                                &chanID, state.RemoteCommitment.CommitHeight-1,
×
UNCOV
2509
                        )
×
UNCOV
2510
                        if err != nil {
×
2511
                                l.failf(LinkFailureError{
×
2512
                                        code: ErrInternalError,
×
2513
                                }, "unable to queue breach backup: %v", err)
×
2514
                                return
×
2515
                        }
×
2516
                }
2517

2518
                // If we can send updates then we can process adds in case we
2519
                // are the exit hop and need to send back resolutions, or in
2520
                // case there are validity issues with the packets. Otherwise
2521
                // we defer the action until resume.
2522
                //
2523
                // We are free to process the settles and fails without this
2524
                // check since processing those can't result in further updates
2525
                // to this channel link.
2526
                if l.quiescer.canSendUpdates() {
5,155✔
2527
                        l.processRemoteAdds(fwdPkg)
2,577✔
2528
                } else {
2,578✔
2529
                        l.quiescer.onResume(func() {
1✔
NEW
2530
                                l.processRemoteAdds(fwdPkg)
×
NEW
2531
                        })
×
2532
                }
2533
                l.processRemoteSettleFails(fwdPkg)
2,578✔
2534

2,578✔
2535
                // If the link failed during processing the adds, we must
2,578✔
2536
                // return to ensure we won't attempted to update the state
2,578✔
2537
                // further.
2,578✔
2538
                if l.failed {
2,578✔
UNCOV
2539
                        return
×
UNCOV
2540
                }
×
2541

2542
                // The revocation window opened up. If there are pending local
2543
                // updates, try to update the commit tx. Pending updates could
2544
                // already have been present because of a previously failed
2545
                // update to the commit tx or freshly added in by
2546
                // processRemoteAdds. Also in case there are no local updates,
2547
                // but there are still remote updates that are not in the remote
2548
                // commit tx yet, send out an update.
2549
                if l.channel.OweCommitment() {
3,415✔
2550
                        if !l.updateCommitTxOrFail() {
838✔
2551
                                return
1✔
2552
                        }
1✔
2553
                }
2554

2555
                // Now that we have finished processing the RevokeAndAck, we
2556
                // can invoke the flushHooks if the channel state is clean.
2557
                l.RWMutex.Lock()
2,577✔
2558
                if l.channel.IsChannelClean() {
2,740✔
2559
                        l.flushHooks.invoke()
163✔
2560
                }
163✔
2561
                l.RWMutex.Unlock()
2,577✔
2562

2563
        case *lnwire.UpdateFee:
3✔
2564
                // Check and see if their proposed fee-rate would make us
3✔
2565
                // exceed the fee threshold.
3✔
2566
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2567

3✔
2568
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2569
                if err != nil {
3✔
2570
                        // This shouldn't typically happen. If it does, it
×
2571
                        // indicates something is wrong with our channel state.
×
2572
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2573
                                "exceeded")
×
2574
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2575
                                "error calculating fee exposure: %v", err)
×
2576

×
2577
                        return
×
2578
                }
×
2579

2580
                if isDust {
3✔
2581
                        // The proposed fee-rate makes us exceed the fee
×
2582
                        // threshold.
×
2583
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2584
                                "fee threshold exceeded: %v", err)
×
2585
                        return
×
2586
                }
×
2587

2588
                // We received fee update from peer. If we are the initiator we
2589
                // will fail the channel, if not we will apply the update.
2590
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2591
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2592
                                "error receiving fee update: %v", err)
×
2593
                        return
×
2594
                }
×
2595

2596
                // Update the mailbox's feerate as well.
2597
                l.mailBox.SetFeeRate(fee)
3✔
2598

2599
        case *lnwire.Stfu:
2✔
2600
                err := l.handleStfu(msg)
2✔
2601
                if err != nil {
2✔
NEW
2602
                        l.stfuFailf("%s", err.Error())
×
NEW
2603
                }
×
2604

2605
        // In the case where we receive a warning message from our peer, just
2606
        // log it and move on. We choose not to disconnect from our peer,
2607
        // although we "MAY" do so according to the specification.
2608
        case *lnwire.Warning:
1✔
2609
                l.log.Warnf("received warning message from peer: %v",
1✔
2610
                        msg.Warning())
1✔
2611

UNCOV
2612
        case *lnwire.Error:
×
UNCOV
2613
                // Error received from remote, MUST fail channel, but should
×
UNCOV
2614
                // only print the contents of the error message if all
×
UNCOV
2615
                // characters are printable ASCII.
×
UNCOV
2616
                l.failf(
×
UNCOV
2617
                        LinkFailureError{
×
UNCOV
2618
                                code: ErrRemoteError,
×
UNCOV
2619

×
UNCOV
2620
                                // TODO(halseth): we currently don't fail the
×
UNCOV
2621
                                // channel permanently, as there are some sync
×
UNCOV
2622
                                // issues with other implementations that will
×
UNCOV
2623
                                // lead to them sending an error message, but
×
UNCOV
2624
                                // we can recover from on next connection. See
×
UNCOV
2625
                                // https://github.com/ElementsProject/lightning/issues/4212
×
UNCOV
2626
                                PermanentFailure: false,
×
UNCOV
2627
                        },
×
UNCOV
2628
                        "ChannelPoint(%v): received error from peer: %v",
×
UNCOV
2629
                        l.channel.ChannelPoint(), msg.Error(),
×
UNCOV
2630
                )
×
2631
        default:
×
2632
                l.log.Warnf("received unknown message of type %T", msg)
×
2633
        }
2634

2635
}
2636

2637
// handleStfu implements the top-level logic for handling the Stfu message from
2638
// our peer.
2639
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
2✔
2640
        err := l.quiescer.recvStfu(*stfu)
2✔
2641
        if err != nil {
2✔
NEW
2642
                return err
×
NEW
2643
        }
×
2644

2645
        // If we can immediately send an Stfu response back, we will.
2646
        return l.quiescer.drive()
2✔
2647
}
2648

2649
// stfuFailf fails the link in the case where the requirements of the quiescence
2650
// protocol are violated. In all cases we opt to drop the connection as only
2651
// link state (as opposed to channel state) is affected.
NEW
2652
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
NEW
2653
        l.failf(LinkFailureError{
×
NEW
2654
                code:             ErrStfuViolation,
×
NEW
2655
                FailureAction:    LinkFailureDisconnect,
×
NEW
2656
                PermanentFailure: false,
×
NEW
2657
                Warning:          true,
×
NEW
2658
        }, format, args...)
×
NEW
2659
}
×
2660

2661
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2662
// for packets delivered from server, and cleaning up any circuits closed by
2663
// signing a previous commitment txn. This method ensures that the circuits are
2664
// removed from the circuit map before removing them from the link's mailbox,
2665
// otherwise it could be possible for some circuit to be missed if this link
2666
// flaps.
2667
func (l *channelLink) ackDownStreamPackets() error {
2,770✔
2668
        // First, remove the downstream Add packets that were included in the
2,770✔
2669
        // previous commitment signature. This will prevent the Adds from being
2,770✔
2670
        // replayed if this link disconnects.
2,770✔
2671
        for _, inKey := range l.openedCircuits {
4,275✔
2672
                // In order to test the sphinx replay logic of the remote
1,505✔
2673
                // party, unsafe replay does not acknowledge the packets from
1,505✔
2674
                // the mailbox. We can then force a replay of any Add packets
1,505✔
2675
                // held in memory by disconnecting and reconnecting the link.
1,505✔
2676
                if l.cfg.UnsafeReplay {
1,505✔
UNCOV
2677
                        continue
×
2678
                }
2679

2680
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
1,505✔
2681
                l.mailBox.AckPacket(inKey)
1,505✔
2682
        }
2683

2684
        // Now, we will delete all circuits closed by the previous commitment
2685
        // signature, which is the result of downstream Settle/Fail packets. We
2686
        // batch them here to ensure circuits are closed atomically and for
2687
        // performance.
2688
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
2,770✔
2689
        switch err {
2,770✔
2690
        case nil:
2,770✔
2691
                // Successful deletion.
2692

2693
        default:
×
2694
                l.log.Errorf("unable to delete %d circuits: %v",
×
2695
                        len(l.closedCircuits), err)
×
2696
                return err
×
2697
        }
2698

2699
        // With the circuits removed from memory and disk, we now ack any
2700
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2701
        // after startup. If forgive is enabled and we've reached this point,
2702
        // the circuits must have been removed at some point, so it is now safe
2703
        // to un-queue the corresponding Settle/Fails.
2704
        for _, inKey := range l.closedCircuits {
2,809✔
2705
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
39✔
2706
                        inKey)
39✔
2707
                l.mailBox.AckPacket(inKey)
39✔
2708
        }
39✔
2709

2710
        // Lastly, reset our buffers to be empty while keeping any acquired
2711
        // growth in the backing array.
2712
        l.openedCircuits = l.openedCircuits[:0]
2,770✔
2713
        l.closedCircuits = l.closedCircuits[:0]
2,770✔
2714

2,770✔
2715
        return nil
2,770✔
2716
}
2717

2718
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2719
// the link.
2720
func (l *channelLink) updateCommitTxOrFail() bool {
3,046✔
2721
        err := l.updateCommitTx()
3,046✔
2722
        switch err {
3,046✔
2723
        // No error encountered, success.
2724
        case nil:
3,045✔
2725

2726
        // A duplicate keystone error should be resolved and is not fatal, so
2727
        // we won't send an Error message to the peer.
2728
        case ErrDuplicateKeystone:
×
2729
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2730
                        "temporary circuit error: %v", err)
×
2731
                return false
×
2732

2733
        // Any other error is treated results in an Error message being sent to
2734
        // the peer.
2735
        default:
1✔
2736
                l.failf(LinkFailureError{code: ErrInternalError},
1✔
2737
                        "unable to update commitment: %v", err)
1✔
2738
                return false
1✔
2739
        }
2740

2741
        return true
3,045✔
2742
}
2743

2744
// updateCommitTx signs, then sends an update to the remote peer adding a new
2745
// commitment to their commitment chain which includes all the latest updates
2746
// we've received+processed up to this point.
2747
func (l *channelLink) updateCommitTx() error {
3,537✔
2748
        // Preemptively write all pending keystones to disk, just in case the
3,537✔
2749
        // HTLCs we have in memory are included in the subsequent attempt to
3,537✔
2750
        // sign a commitment state.
3,537✔
2751
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
3,537✔
2752
        if err != nil {
3,537✔
2753
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2754
                // it.
×
2755
                return err
×
2756
        }
×
2757

2758
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2759
        l.keystoneBatch = l.keystoneBatch[:0]
3,537✔
2760

3,537✔
2761
        // If hodl.Commit mode is active, we will refrain from attempting to
3,537✔
2762
        // commit any in-memory modifications to the channel state. Exiting here
3,537✔
2763
        // permits testing of either the switch or link's ability to trim
3,537✔
2764
        // circuits that have been opened, but unsuccessfully committed.
3,537✔
2765
        if l.cfg.HodlMask.Active(hodl.Commit) {
3,541✔
2766
                l.log.Warnf(hodl.Commit.Warning())
4✔
2767
                return nil
4✔
2768
        }
4✔
2769

2770
        newCommit, err := l.channel.SignNextCommitment()
3,533✔
2771
        if err == lnwallet.ErrNoWindow {
4,466✔
2772
                l.cfg.PendingCommitTicker.Resume()
933✔
2773
                l.log.Trace("PendingCommitTicker resumed")
933✔
2774

933✔
2775
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
933✔
2776
                l.log.Tracef("revocation window exhausted, unable to send: "+
933✔
2777
                        "%v, pend_updates=%v, dangling_closes%v", n,
933✔
2778
                        lnutils.SpewLogClosure(l.openedCircuits),
933✔
2779
                        lnutils.SpewLogClosure(l.closedCircuits))
933✔
2780

933✔
2781
                return nil
933✔
2782
        } else if err != nil {
3,533✔
2783
                return err
×
2784
        }
×
2785

2786
        if err := l.ackDownStreamPackets(); err != nil {
2,600✔
2787
                return err
×
2788
        }
×
2789

2790
        l.cfg.PendingCommitTicker.Pause()
2,600✔
2791
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
2,600✔
2792

2,600✔
2793
        // The remote party now has a new pending commitment, so we'll update
2,600✔
2794
        // the contract court to be aware of this new set (the prior old remote
2,600✔
2795
        // pending).
2,600✔
2796
        newUpdate := &contractcourt.ContractUpdate{
2,600✔
2797
                HtlcKey: contractcourt.RemotePendingHtlcSet,
2,600✔
2798
                Htlcs:   newCommit.PendingHTLCs,
2,600✔
2799
        }
2,600✔
2800
        err = l.cfg.NotifyContractUpdate(newUpdate)
2,600✔
2801
        if err != nil {
2,600✔
2802
                l.log.Errorf("unable to notify contract update: %v", err)
×
2803
                return err
×
2804
        }
×
2805

2806
        select {
2,600✔
2807
        case <-l.quit:
2✔
2808
                return ErrLinkShuttingDown
2✔
2809
        default:
2,598✔
2810
        }
2811

2812
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
2,598✔
2813
        if err != nil {
2,598✔
2814
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2815
        }
×
2816

2817
        commitSig := &lnwire.CommitSig{
2,598✔
2818
                ChanID:        l.ChanID(),
2,598✔
2819
                CommitSig:     newCommit.CommitSig,
2,598✔
2820
                HtlcSigs:      newCommit.HtlcSigs,
2,598✔
2821
                PartialSig:    newCommit.PartialSig,
2,598✔
2822
                CustomRecords: auxBlobRecords,
2,598✔
2823
        }
2,598✔
2824
        l.cfg.Peer.SendMessage(false, commitSig)
2,598✔
2825

2,598✔
2826
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2,598✔
2827
        // of commit hooks.
2,598✔
2828
        l.RWMutex.Lock()
2,598✔
2829
        l.outgoingCommitHooks.invoke()
2,598✔
2830
        l.RWMutex.Unlock()
2,598✔
2831

2,598✔
2832
        return nil
2,598✔
2833
}
2834

2835
// Peer returns the representation of remote peer with which we have the
2836
// channel link opened.
2837
//
2838
// NOTE: Part of the ChannelLink interface.
2839
func (l *channelLink) PeerPubKey() [33]byte {
441✔
2840
        return l.cfg.Peer.PubKey()
441✔
2841
}
441✔
2842

2843
// ChannelPoint returns the channel outpoint for the channel link.
2844
// NOTE: Part of the ChannelLink interface.
2845
func (l *channelLink) ChannelPoint() wire.OutPoint {
852✔
2846
        return l.channel.ChannelPoint()
852✔
2847
}
852✔
2848

2849
// ShortChanID returns the short channel ID for the channel link. The short
2850
// channel ID encodes the exact location in the main chain that the original
2851
// funding output can be found.
2852
//
2853
// NOTE: Part of the ChannelLink interface.
2854
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
8,055✔
2855
        l.RLock()
8,055✔
2856
        defer l.RUnlock()
8,055✔
2857

8,055✔
2858
        return l.channel.ShortChanID()
8,055✔
2859
}
8,055✔
2860

2861
// UpdateShortChanID updates the short channel ID for a link. This may be
2862
// required in the event that a link is created before the short chan ID for it
2863
// is known, or a re-org occurs, and the funding transaction changes location
2864
// within the chain.
2865
//
2866
// NOTE: Part of the ChannelLink interface.
UNCOV
2867
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
×
UNCOV
2868
        chanID := l.ChanID()
×
UNCOV
2869

×
UNCOV
2870
        // Refresh the channel state's short channel ID by loading it from disk.
×
UNCOV
2871
        // This ensures that the channel state accurately reflects the updated
×
UNCOV
2872
        // short channel ID.
×
UNCOV
2873
        err := l.channel.State().Refresh()
×
UNCOV
2874
        if err != nil {
×
2875
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2876
                        "%v", chanID, err)
×
2877
                return hop.Source, err
×
2878
        }
×
2879

UNCOV
2880
        return hop.Source, nil
×
2881
}
2882

2883
// ChanID returns the channel ID for the channel link. The channel ID is a more
2884
// compact representation of a channel's full outpoint.
2885
//
2886
// NOTE: Part of the ChannelLink interface.
2887
func (l *channelLink) ChanID() lnwire.ChannelID {
7,412✔
2888
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
7,412✔
2889
}
7,412✔
2890

2891
// Bandwidth returns the total amount that can flow through the channel link at
2892
// this given instance. The value returned is expressed in millisatoshi and can
2893
// be used by callers when making forwarding decisions to determine if a link
2894
// can accept an HTLC.
2895
//
2896
// NOTE: Part of the ChannelLink interface.
2897
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
1,853✔
2898
        // Get the balance available on the channel for new HTLCs. This takes
1,853✔
2899
        // the channel reserve into account so HTLCs up to this value won't
1,853✔
2900
        // violate it.
1,853✔
2901
        return l.channel.AvailableBalance()
1,853✔
2902
}
1,853✔
2903

2904
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2905
// amount provided to the link. This check does not reserve a space, since
2906
// forwards or other payments may use the available slot, so it should be
2907
// considered best-effort.
UNCOV
2908
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
×
UNCOV
2909
        return l.channel.MayAddOutgoingHtlc(amt)
×
UNCOV
2910
}
×
2911

2912
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2913
// method.
2914
//
2915
// NOTE: Part of the dustHandler interface.
2916
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2917
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
7,903✔
2918

7,903✔
2919
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
7,903✔
2920
}
7,903✔
2921

2922
// getFeeRate is a wrapper method that retrieves the underlying channel's
2923
// feerate.
2924
//
2925
// NOTE: Part of the dustHandler interface.
2926
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
1,710✔
2927
        return l.channel.CommitFeeRate()
1,710✔
2928
}
1,710✔
2929

2930
// getDustClosure returns a closure that can be used by the switch or mailbox
2931
// to evaluate whether a given HTLC is dust.
2932
//
2933
// NOTE: Part of the dustHandler interface.
2934
func (l *channelLink) getDustClosure() dustClosure {
4,722✔
2935
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
4,722✔
2936
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
4,722✔
2937
        chanType := l.channel.State().ChanType
4,722✔
2938

4,722✔
2939
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
4,722✔
2940
}
4,722✔
2941

2942
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2943
// is used so that the Switch can have access to the commitment fee without
2944
// needing to have a *LightningChannel. This doesn't include dust.
2945
//
2946
// NOTE: Part of the dustHandler interface.
2947
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
6,449✔
2948
        if remote {
9,886✔
2949
                return l.channel.State().RemoteCommitment.CommitFee
3,437✔
2950
        }
3,437✔
2951

2952
        return l.channel.State().LocalCommitment.CommitFee
3,012✔
2953
}
2954

2955
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2956
// increases the total dust and fees within the channel past the configured
2957
// fee threshold. It first calculates the dust sum over every update in the
2958
// update log with the proposed fee-rate and taking into account both the local
2959
// and remote dust limits. It uses every update in the update log instead of
2960
// what is actually on the local and remote commitments because it is assumed
2961
// that in a worst-case scenario, every update in the update log could
2962
// theoretically be on either commitment transaction and this needs to be
2963
// accounted for with this fee-rate. It then calculates the local and remote
2964
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2965
// and determines if the fee threshold has been exceeded.
2966
func (l *channelLink) exceedsFeeExposureLimit(
2967
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2968

6✔
2969
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2970

6✔
2971
        // Get the sum of dust for both the local and remote commitments using
6✔
2972
        // this "dry-run" fee.
6✔
2973
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2974
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2975

6✔
2976
        // Calculate the local and remote commitment fees using this dry-run
6✔
2977
        // fee.
6✔
2978
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2979
        if err != nil {
6✔
2980
                return false, err
×
2981
        }
×
2982

2983
        // Finally, check whether the max fee exposure was exceeded on either
2984
        // future commitment transaction with the fee-rate.
2985
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
2986
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
2987
                return true, nil
×
2988
        }
×
2989

2990
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
2991
                remoteFee,
6✔
2992
        )
6✔
2993

6✔
2994
        return totalRemoteDust > l.cfg.MaxFeeExposure, nil
6✔
2995
}
2996

2997
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2998
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2999
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3000
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3001
// the overall dust sum. If it is not dust, it contributes to weight, which
3002
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3003
// either commitment exceeds the configured fee threshold, this function
3004
// returns true.
3005
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3006
        incoming bool) bool {
3,012✔
3007

3,012✔
3008
        dustClosure := l.getDustClosure()
3,012✔
3009

3,012✔
3010
        feeRate := l.channel.WorstCaseFeeRate()
3,012✔
3011

3,012✔
3012
        amount := htlc.Amount.ToSatoshis()
3,012✔
3013

3,012✔
3014
        // See if this HTLC is dust on both the local and remote commitments.
3,012✔
3015
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
3,012✔
3016
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
3,012✔
3017

3,012✔
3018
        // Calculate the dust sum for the local and remote commitments.
3,012✔
3019
        localDustSum := l.getDustSum(
3,012✔
3020
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
3,012✔
3021
        )
3,012✔
3022
        remoteDustSum := l.getDustSum(
3,012✔
3023
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
3,012✔
3024
        )
3,012✔
3025

3,012✔
3026
        // Grab the larger of the local and remote commitment fees w/o dust.
3,012✔
3027
        commitFee := l.getCommitFee(false)
3,012✔
3028

3,012✔
3029
        if l.getCommitFee(true) > commitFee {
3,437✔
3030
                commitFee = l.getCommitFee(true)
425✔
3031
        }
425✔
3032

3033
        localDustSum += lnwire.NewMSatFromSatoshis(commitFee)
3,012✔
3034
        remoteDustSum += lnwire.NewMSatFromSatoshis(commitFee)
3,012✔
3035

3,012✔
3036
        // Calculate the additional fee increase if this is a non-dust HTLC.
3,012✔
3037
        weight := lntypes.WeightUnit(input.HTLCWeight)
3,012✔
3038
        additional := lnwire.NewMSatFromSatoshis(
3,012✔
3039
                feeRate.FeeForWeight(weight),
3,012✔
3040
        )
3,012✔
3041

3,012✔
3042
        if isLocalDust {
4,861✔
3043
                // If this is dust, it doesn't contribute to weight but does
1,849✔
3044
                // contribute to the overall dust sum.
1,849✔
3045
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
1,849✔
3046
        } else {
3,012✔
3047
                // Account for the fee increase that comes with an increase in
1,163✔
3048
                // weight.
1,163✔
3049
                localDustSum += additional
1,163✔
3050
        }
1,163✔
3051

3052
        if localDustSum > l.cfg.MaxFeeExposure {
3,016✔
3053
                // The max fee exposure was exceeded.
4✔
3054
                return true
4✔
3055
        }
4✔
3056

3057
        if isRemoteDust {
4,854✔
3058
                // If this is dust, it doesn't contribute to weight but does
1,846✔
3059
                // contribute to the overall dust sum.
1,846✔
3060
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
1,846✔
3061
        } else {
3,008✔
3062
                // Account for the fee increase that comes with an increase in
1,162✔
3063
                // weight.
1,162✔
3064
                remoteDustSum += additional
1,162✔
3065
        }
1,162✔
3066

3067
        return remoteDustSum > l.cfg.MaxFeeExposure
3,008✔
3068
}
3069

3070
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3071
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3072
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3073
// whether to evaluate on the local or remote commit, and finally an HTLC
3074
// amount to test.
3075
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3076
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3077

3078
// dustHelper is used to construct the dustClosure.
3079
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3080
        remoteDustLimit btcutil.Amount) dustClosure {
4,922✔
3081

4,922✔
3082
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
4,922✔
3083
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
27,018✔
3084

22,096✔
3085
                var dustLimit btcutil.Amount
22,096✔
3086
                if whoseCommit.IsLocal() {
33,144✔
3087
                        dustLimit = localDustLimit
11,048✔
3088
                } else {
22,096✔
3089
                        dustLimit = remoteDustLimit
11,048✔
3090
                }
11,048✔
3091

3092
                return lnwallet.HtlcIsDust(
22,096✔
3093
                        chantype, incoming, whoseCommit, feerate, amt,
22,096✔
3094
                        dustLimit,
22,096✔
3095
                )
22,096✔
3096
        }
3097

3098
        return isDust
4,922✔
3099
}
3100

3101
// zeroConfConfirmed returns whether or not the zero-conf channel has
3102
// confirmed on-chain.
3103
//
3104
// Part of the scidAliasHandler interface.
3105
func (l *channelLink) zeroConfConfirmed() bool {
3✔
3106
        return l.channel.State().ZeroConfConfirmed()
3✔
3107
}
3✔
3108

3109
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3110
// should not be called for non-zero-conf channels.
3111
//
3112
// Part of the scidAliasHandler interface.
3113
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
3✔
3114
        return l.channel.State().ZeroConfRealScid()
3✔
3115
}
3✔
3116

3117
// isZeroConf returns whether or not the underlying channel is a zero-conf
3118
// channel.
3119
//
3120
// Part of the scidAliasHandler interface.
3121
func (l *channelLink) isZeroConf() bool {
213✔
3122
        return l.channel.State().IsZeroConf()
213✔
3123
}
213✔
3124

3125
// negotiatedAliasFeature returns whether or not the underlying channel has
3126
// negotiated the option-scid-alias feature bit. This will be true for both
3127
// option-scid-alias and zero-conf channel-types. It will also be true for
3128
// channels with the feature bit but without the above channel-types.
3129
//
3130
// Part of the scidAliasFeature interface.
3131
func (l *channelLink) negotiatedAliasFeature() bool {
374✔
3132
        return l.channel.State().NegotiatedAliasFeature()
374✔
3133
}
374✔
3134

3135
// getAliases returns the set of aliases for the underlying channel.
3136
//
3137
// Part of the scidAliasHandler interface.
3138
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
219✔
3139
        return l.cfg.GetAliases(l.ShortChanID())
219✔
3140
}
219✔
3141

3142
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3143
//
3144
// Part of the scidAliasHandler interface.
3145
func (l *channelLink) attachFailAliasUpdate(closure func(
3146
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
214✔
3147

214✔
3148
        l.Lock()
214✔
3149
        l.cfg.FailAliasUpdate = closure
214✔
3150
        l.Unlock()
214✔
3151
}
214✔
3152

3153
// AttachMailBox updates the current mailbox used by this link, and hooks up
3154
// the mailbox's message and packet outboxes to the link's upstream and
3155
// downstream chans, respectively.
3156
func (l *channelLink) AttachMailBox(mailbox MailBox) {
213✔
3157
        l.Lock()
213✔
3158
        l.mailBox = mailbox
213✔
3159
        l.upstream = mailbox.MessageOutBox()
213✔
3160
        l.downstream = mailbox.PacketOutBox()
213✔
3161
        l.Unlock()
213✔
3162

213✔
3163
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
213✔
3164
        // never committed.
213✔
3165
        l.mailBox.SetFeeRate(l.getFeeRate())
213✔
3166

213✔
3167
        // Also set the mailbox's dust closure so that it can query whether HTLC's
213✔
3168
        // are dust given the current feerate.
213✔
3169
        l.mailBox.SetDustClosure(l.getDustClosure())
213✔
3170
}
213✔
3171

3172
// UpdateForwardingPolicy updates the forwarding policy for the target
3173
// ChannelLink. Once updated, the link will use the new forwarding policy to
3174
// govern if it an incoming HTLC should be forwarded or not. We assume that
3175
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3176
// update all of the link's FwrdingPolicy's values.
3177
//
3178
// NOTE: Part of the ChannelLink interface.
3179
func (l *channelLink) UpdateForwardingPolicy(
3180
        newPolicy models.ForwardingPolicy) {
12✔
3181

12✔
3182
        l.Lock()
12✔
3183
        defer l.Unlock()
12✔
3184

12✔
3185
        l.cfg.FwrdingPolicy = newPolicy
12✔
3186
}
12✔
3187

3188
// CheckHtlcForward should return a nil error if the passed HTLC details
3189
// satisfy the current forwarding policy fo the target link. Otherwise,
3190
// a LinkError with a valid protocol failure message should be returned
3191
// in order to signal to the source of the HTLC, the policy consistency
3192
// issue.
3193
//
3194
// NOTE: Part of the ChannelLink interface.
3195
func (l *channelLink) CheckHtlcForward(payHash [32]byte,
3196
        incomingHtlcAmt, amtToForward lnwire.MilliSatoshi,
3197
        incomingTimeout, outgoingTimeout uint32,
3198
        inboundFee models.InboundFee,
3199
        heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {
49✔
3200

49✔
3201
        l.RLock()
49✔
3202
        policy := l.cfg.FwrdingPolicy
49✔
3203
        l.RUnlock()
49✔
3204

49✔
3205
        // Using the outgoing HTLC amount, we'll calculate the outgoing
49✔
3206
        // fee this incoming HTLC must carry in order to satisfy the constraints
49✔
3207
        // of the outgoing link.
49✔
3208
        outFee := ExpectedFee(policy, amtToForward)
49✔
3209

49✔
3210
        // Then calculate the inbound fee that we charge based on the sum of
49✔
3211
        // outgoing HTLC amount and outgoing fee.
49✔
3212
        inFee := inboundFee.CalcFee(amtToForward + outFee)
49✔
3213

49✔
3214
        // Add up both fee components. It is important to calculate both fees
49✔
3215
        // separately. An alternative way of calculating is to first determine
49✔
3216
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
49✔
3217
        // rounding may cause the result to be slightly higher than in the case
49✔
3218
        // of separately rounded fee components. This potentially causes failed
49✔
3219
        // forwards for senders and is something to be avoided.
49✔
3220
        expectedFee := inFee + int64(outFee)
49✔
3221

49✔
3222
        // If the actual fee is less than our expected fee, then we'll reject
49✔
3223
        // this HTLC as it didn't provide a sufficient amount of fees, or the
49✔
3224
        // values have been tampered with, or the send used incorrect/dated
49✔
3225
        // information to construct the forwarding information for this hop. In
49✔
3226
        // any case, we'll cancel this HTLC.
49✔
3227
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
49✔
3228
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
55✔
3229
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
6✔
3230
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
6✔
3231
                        "inboundFee=%v",
6✔
3232
                        payHash[:], expectedFee, actualFee,
6✔
3233
                        incomingHtlcAmt, amtToForward, inboundFee,
6✔
3234
                )
6✔
3235

6✔
3236
                // As part of the returned error, we'll send our latest routing
6✔
3237
                // policy so the sending node obtains the most up to date data.
6✔
3238
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
12✔
3239
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
6✔
3240
                }
6✔
3241
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
3242
                return NewLinkError(failure)
6✔
3243
        }
3244

3245
        // Check whether the outgoing htlc satisfies the channel policy.
3246
        err := l.canSendHtlc(
43✔
3247
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
43✔
3248
                originalScid,
43✔
3249
        )
43✔
3250
        if err != nil {
56✔
3251
                return err
13✔
3252
        }
13✔
3253

3254
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3255
        // the following constraint: the incoming time-lock minus our time-lock
3256
        // delta should equal the outgoing time lock. Otherwise, whether the
3257
        // sender messed up, or an intermediate node tampered with the HTLC.
3258
        timeDelta := policy.TimeLockDelta
30✔
3259
        if incomingTimeout < outgoingTimeout+timeDelta {
32✔
3260
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3261
                        "expected at least %v block delta, got %v block delta",
2✔
3262
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3263

2✔
3264
                // Grab the latest routing policy so the sending node is up to
2✔
3265
                // date with our current policy.
2✔
3266
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3267
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3268
                                incomingTimeout, *upd,
2✔
3269
                        )
2✔
3270
                }
2✔
3271
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3272
                return NewLinkError(failure)
2✔
3273
        }
3274

3275
        return nil
28✔
3276
}
3277

3278
// CheckHtlcTransit should return a nil error if the passed HTLC details
3279
// satisfy the current channel policy.  Otherwise, a LinkError with a
3280
// valid protocol failure message should be returned in order to signal
3281
// the violation. This call is intended to be used for locally initiated
3282
// payments for which there is no corresponding incoming htlc.
3283
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3284
        amt lnwire.MilliSatoshi, timeout uint32,
3285
        heightNow uint32) *LinkError {
1,447✔
3286

1,447✔
3287
        l.RLock()
1,447✔
3288
        policy := l.cfg.FwrdingPolicy
1,447✔
3289
        l.RUnlock()
1,447✔
3290

1,447✔
3291
        // We pass in hop.Source here as this is only used in the Switch when
1,447✔
3292
        // trying to send over a local link. This causes the fallback mechanism
1,447✔
3293
        // to occur.
1,447✔
3294
        return l.canSendHtlc(
1,447✔
3295
                policy, payHash, amt, timeout, heightNow, hop.Source,
1,447✔
3296
        )
1,447✔
3297
}
1,447✔
3298

3299
// canSendHtlc checks whether the given htlc parameters satisfy
3300
// the channel's amount and time lock constraints.
3301
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3302
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3303
        heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {
1,490✔
3304

1,490✔
3305
        // As our first sanity check, we'll ensure that the passed HTLC isn't
1,490✔
3306
        // too small for the next hop. If so, then we'll cancel the HTLC
1,490✔
3307
        // directly.
1,490✔
3308
        if amt < policy.MinHTLCOut {
1,498✔
3309
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
8✔
3310
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
8✔
3311
                        amt)
8✔
3312

8✔
3313
                // As part of the returned error, we'll send our latest routing
8✔
3314
                // policy so the sending node obtains the most up to date data.
8✔
3315
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
16✔
3316
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
8✔
3317
                }
8✔
3318
                failure := l.createFailureWithUpdate(false, originalScid, cb)
8✔
3319
                return NewLinkError(failure)
8✔
3320
        }
3321

3322
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3323
        // cancel the HTLC directly.
3324
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
1,485✔
3325
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
3✔
3326
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
3✔
3327

3✔
3328
                // As part of the returned error, we'll send our latest routing
3✔
3329
                // policy so the sending node obtains the most up-to-date data.
3✔
3330
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
6✔
3331
                        return lnwire.NewTemporaryChannelFailure(upd)
3✔
3332
                }
3✔
3333
                failure := l.createFailureWithUpdate(false, originalScid, cb)
3✔
3334
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
3✔
3335
        }
3336

3337
        // We want to avoid offering an HTLC which will expire in the near
3338
        // future, so we'll reject an HTLC if the outgoing expiration time is
3339
        // too close to the current height.
3340
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
1,481✔
3341
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3342
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3343
                        timeout, heightNow)
2✔
3344

2✔
3345
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3346
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3347
                }
2✔
3348
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3349
                return NewLinkError(failure)
2✔
3350
        }
3351

3352
        // Check absolute max delta.
3353
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
1,478✔
3354
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3355
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3356
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3357

1✔
3358
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3359
        }
1✔
3360

3361
        // Check to see if there is enough balance in this channel.
3362
        if amt > l.Bandwidth() {
1,477✔
3363
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
1✔
3364
                        "larger than %v", amt, l.Bandwidth())
1✔
3365
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
2✔
3366
                        return lnwire.NewTemporaryChannelFailure(upd)
1✔
3367
                }
1✔
3368
                failure := l.createFailureWithUpdate(false, originalScid, cb)
1✔
3369
                return NewDetailedLinkError(
1✔
3370
                        failure, OutgoingFailureInsufficientBalance,
1✔
3371
                )
1✔
3372
        }
3373

3374
        return nil
1,475✔
3375
}
3376

3377
// Stats returns the statistics of channel link.
3378
//
3379
// NOTE: Part of the ChannelLink interface.
3380
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
20✔
3381
        snapshot := l.channel.StateSnapshot()
20✔
3382

20✔
3383
        return snapshot.ChannelCommitment.CommitHeight,
20✔
3384
                snapshot.TotalMSatSent,
20✔
3385
                snapshot.TotalMSatReceived
20✔
3386
}
20✔
3387

3388
// String returns the string representation of channel link.
3389
//
3390
// NOTE: Part of the ChannelLink interface.
3391
func (l *channelLink) String() string {
×
3392
        return l.channel.ChannelPoint().String()
×
3393
}
×
3394

3395
// handleSwitchPacket handles the switch packets. This packets which might be
3396
// forwarded to us from another channel link in case the htlc update came from
3397
// another peer or if the update was created by user
3398
//
3399
// NOTE: Part of the packetHandler interface.
3400
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
1,520✔
3401
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
1,520✔
3402
                pkt.inKey(), pkt.outKey())
1,520✔
3403

1,520✔
3404
        return l.mailBox.AddPacket(pkt)
1,520✔
3405
}
1,520✔
3406

3407
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3408
// to us from remote peer we have a channel with.
3409
//
3410
// NOTE: Part of the ChannelLink interface.
3411
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
7,631✔
3412
        select {
7,631✔
3413
        case <-l.quit:
×
3414
                // Return early if the link is already in the process of
×
3415
                // quitting. It doesn't make sense to hand the message to the
×
3416
                // mailbox here.
×
3417
                return
×
3418
        default:
7,631✔
3419
        }
3420

3421
        err := l.mailBox.AddMessage(message)
7,631✔
3422
        if err != nil {
7,631✔
3423
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3424
        }
×
3425
}
3426

3427
// updateChannelFee updates the commitment fee-per-kw on this channel by
3428
// committing to an update_fee message.
3429
func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
3✔
3430
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3431

3✔
3432
        // We skip sending the UpdateFee message if the channel is not
3✔
3433
        // currently eligible to forward messages.
3✔
3434
        if !l.eligibleToUpdate() {
3✔
3435
                l.log.Debugf("skipping fee update for inactive channel")
×
3436
                return nil
×
3437
        }
×
3438

3439
        // Check and see if our proposed fee-rate would make us exceed the fee
3440
        // threshold.
3441
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3442
        if err != nil {
3✔
3443
                // This shouldn't typically happen. If it does, it indicates
×
3444
                // something is wrong with our channel state.
×
3445
                return err
×
3446
        }
×
3447

3448
        if thresholdExceeded {
3✔
3449
                return fmt.Errorf("link fee threshold exceeded")
×
3450
        }
×
3451

3452
        // First, we'll update the local fee on our commitment.
3453
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3454
                return err
×
3455
        }
×
3456

3457
        // The fee passed the channel's validation checks, so we update the
3458
        // mailbox feerate.
3459
        l.mailBox.SetFeeRate(feePerKw)
3✔
3460

3✔
3461
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3462
        // in immediately by triggering a commitment update.
3✔
3463
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3464
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3465
                return err
×
3466
        }
×
3467
        return l.updateCommitTx()
3✔
3468
}
3469

3470
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3471
// after receiving a revocation from the remote party, and reprocesses them in
3472
// the context of the provided forwarding package. Any settles or fails that
3473
// have already been acknowledged in the forwarding package will not be sent to
3474
// the switch.
3475
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
2,578✔
3476
        if len(fwdPkg.SettleFails) == 0 {
4,409✔
3477
                return
1,831✔
3478
        }
1,831✔
3479

3480
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
747✔
3481

747✔
3482
        var switchPackets []*htlcPacket
747✔
3483
        for i, update := range fwdPkg.SettleFails {
1,495✔
3484
                destRef := fwdPkg.DestRef(uint16(i))
748✔
3485

748✔
3486
                // Skip any settles or fails that have already been
748✔
3487
                // acknowledged by the incoming link that originated the
748✔
3488
                // forwarded Add.
748✔
3489
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
748✔
3490
                        continue
×
3491
                }
3492

3493
                // TODO(roasbeef): rework log entries to a shared
3494
                // interface.
3495

3496
                switch msg := update.UpdateMsg.(type) {
748✔
3497
                // A settle for an HTLC we previously forwarded HTLC has been
3498
                // received. So we'll forward the HTLC to the switch which will
3499
                // handle propagating the settle to the prior hop.
3500
                case *lnwire.UpdateFulfillHTLC:
625✔
3501
                        // If hodl.SettleIncoming is requested, we will not
625✔
3502
                        // forward the SETTLE to the switch and will not signal
625✔
3503
                        // a free slot on the commitment transaction.
625✔
3504
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
625✔
3505
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3506
                                continue
×
3507
                        }
3508

3509
                        settlePacket := &htlcPacket{
625✔
3510
                                outgoingChanID: l.ShortChanID(),
625✔
3511
                                outgoingHTLCID: msg.ID,
625✔
3512
                                destRef:        &destRef,
625✔
3513
                                htlc:           msg,
625✔
3514
                        }
625✔
3515

625✔
3516
                        // Add the packet to the batch to be forwarded, and
625✔
3517
                        // notify the overflow queue that a spare spot has been
625✔
3518
                        // freed up within the commitment state.
625✔
3519
                        switchPackets = append(switchPackets, settlePacket)
625✔
3520

3521
                // A failureCode message for a previously forwarded HTLC has
3522
                // been received. As a result a new slot will be freed up in
3523
                // our commitment state, so we'll forward this to the switch so
3524
                // the backwards undo can continue.
3525
                case *lnwire.UpdateFailHTLC:
123✔
3526
                        // If hodl.SettleIncoming is requested, we will not
123✔
3527
                        // forward the FAIL to the switch and will not signal a
123✔
3528
                        // free slot on the commitment transaction.
123✔
3529
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
123✔
3530
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3531
                                continue
×
3532
                        }
3533

3534
                        // Fetch the reason the HTLC was canceled so we can
3535
                        // continue to propagate it. This failure originated
3536
                        // from another node, so the linkFailure field is not
3537
                        // set on the packet.
3538
                        failPacket := &htlcPacket{
123✔
3539
                                outgoingChanID: l.ShortChanID(),
123✔
3540
                                outgoingHTLCID: msg.ID,
123✔
3541
                                destRef:        &destRef,
123✔
3542
                                htlc:           msg,
123✔
3543
                        }
123✔
3544

123✔
3545
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
123✔
3546

123✔
3547
                        // If the failure message lacks an HMAC (but includes
123✔
3548
                        // the 4 bytes for encoding the message and padding
123✔
3549
                        // lengths, then this means that we received it as an
123✔
3550
                        // UpdateFailMalformedHTLC. As a result, we'll signal
123✔
3551
                        // that we need to convert this error within the switch
123✔
3552
                        // to an actual error, by encrypting it as if we were
123✔
3553
                        // the originating hop.
123✔
3554
                        convertedErrorSize := lnwire.FailureMessageLength + 4
123✔
3555
                        if len(msg.Reason) == convertedErrorSize {
126✔
3556
                                failPacket.convertedError = true
3✔
3557
                        }
3✔
3558

3559
                        // Add the packet to the batch to be forwarded, and
3560
                        // notify the overflow queue that a spare spot has been
3561
                        // freed up within the commitment state.
3562
                        switchPackets = append(switchPackets, failPacket)
123✔
3563
                }
3564
        }
3565

3566
        // Only spawn the task forward packets we have a non-zero number.
3567
        if len(switchPackets) > 0 {
1,494✔
3568
                go l.forwardBatch(false, switchPackets...)
747✔
3569
        }
747✔
3570
}
3571

3572
// processRemoteAdds serially processes each of the Add payment descriptors
3573
// which have been "locked-in" by receiving a revocation from the remote party.
3574
// The forwarding package provided instructs how to process this batch,
3575
// indicating whether this is the first time these Adds are being processed, or
3576
// whether we are reprocessing as a result of a failure or restart. Adds that
3577
// have already been acknowledged in the forwarding package will be ignored.
3578
//
3579
//nolint:funlen
3580
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
2,580✔
3581
        l.log.Tracef("processing %d remote adds for height %d",
2,580✔
3582
                len(fwdPkg.Adds), fwdPkg.Height)
2,580✔
3583

2,580✔
3584
        decodeReqs := make(
2,580✔
3585
                []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds),
2,580✔
3586
        )
2,580✔
3587
        for _, update := range fwdPkg.Adds {
4,070✔
3588
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
2,980✔
3589
                        // Before adding the new htlc to the state machine,
1,490✔
3590
                        // parse the onion object in order to obtain the
1,490✔
3591
                        // routing information with DecodeHopIterator function
1,490✔
3592
                        // which process the Sphinx packet.
1,490✔
3593
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
1,490✔
3594

1,490✔
3595
                        req := hop.DecodeHopIteratorRequest{
1,490✔
3596
                                OnionReader:    onionReader,
1,490✔
3597
                                RHash:          msg.PaymentHash[:],
1,490✔
3598
                                IncomingCltv:   msg.Expiry,
1,490✔
3599
                                IncomingAmount: msg.Amount,
1,490✔
3600
                                BlindingPoint:  msg.BlindingPoint,
1,490✔
3601
                        }
1,490✔
3602

1,490✔
3603
                        decodeReqs = append(decodeReqs, req)
1,490✔
3604
                }
1,490✔
3605
        }
3606

3607
        // Atomically decode the incoming htlcs, simultaneously checking for
3608
        // replay attempts. A particular index in the returned, spare list of
3609
        // channel iterators should only be used if the failure code at the
3610
        // same index is lnwire.FailCodeNone.
3611
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
2,580✔
3612
                fwdPkg.ID(), decodeReqs,
2,580✔
3613
        )
2,580✔
3614
        if sphinxErr != nil {
2,580✔
3615
                l.failf(LinkFailureError{code: ErrInternalError},
×
3616
                        "unable to decode hop iterators: %v", sphinxErr)
×
3617
                return
×
3618
        }
×
3619

3620
        var switchPackets []*htlcPacket
2,580✔
3621

2,580✔
3622
        for i, update := range fwdPkg.Adds {
4,070✔
3623
                idx := uint16(i)
1,490✔
3624

1,490✔
3625
                //nolint:forcetypeassert
1,490✔
3626
                add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC)
1,490✔
3627
                sourceRef := fwdPkg.SourceRef(idx)
1,490✔
3628

1,490✔
3629
                if fwdPkg.State == channeldb.FwdStateProcessed &&
1,490✔
3630
                        fwdPkg.AckFilter.Contains(idx) {
1,490✔
3631

×
3632
                        // If this index is already found in the ack filter,
×
3633
                        // the response to this forwarding decision has already
×
3634
                        // been committed by one of our commitment txns. ADDs
×
3635
                        // in this state are waiting for the rest of the fwding
×
3636
                        // package to get acked before being garbage collected.
×
3637
                        continue
×
3638
                }
3639

3640
                // An incoming HTLC add has been full-locked in. As a result we
3641
                // can now examine the forwarding details of the HTLC, and the
3642
                // HTLC itself to decide if: we should forward it, cancel it,
3643
                // or are able to settle it (and it adheres to our fee related
3644
                // constraints).
3645

3646
                // Before adding the new htlc to the state machine, parse the
3647
                // onion object in order to obtain the routing information with
3648
                // DecodeHopIterator function which process the Sphinx packet.
3649
                chanIterator, failureCode := decodeResps[i].Result()
1,490✔
3650
                if failureCode != lnwire.CodeNone {
1,492✔
3651
                        // If we're unable to process the onion blob then we
2✔
3652
                        // should send the malformed htlc error to payment
2✔
3653
                        // sender.
2✔
3654
                        l.sendMalformedHTLCError(
2✔
3655
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
2✔
3656
                        )
2✔
3657

2✔
3658
                        l.log.Errorf("unable to decode onion hop "+
2✔
3659
                                "iterator: %v", failureCode)
2✔
3660
                        continue
2✔
3661
                }
3662

3663
                heightNow := l.cfg.BestHeight()
1,488✔
3664

1,488✔
3665
                pld, routeRole, pldErr := chanIterator.HopPayload()
1,488✔
3666
                if pldErr != nil {
1,488✔
UNCOV
3667
                        // If we're unable to process the onion payload, or we
×
UNCOV
3668
                        // received invalid onion payload failure, then we
×
UNCOV
3669
                        // should send an error back to the caller so the HTLC
×
UNCOV
3670
                        // can be canceled.
×
UNCOV
3671
                        var failedType uint64
×
UNCOV
3672

×
UNCOV
3673
                        // We need to get the underlying error value, so we
×
UNCOV
3674
                        // can't use errors.As as suggested by the linter.
×
UNCOV
3675
                        //nolint:errorlint
×
UNCOV
3676
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
×
3677
                                failedType = uint64(e.Type)
×
3678
                        }
×
3679

3680
                        // If we couldn't parse the payload, make our best
3681
                        // effort at creating an error encrypter that knows
3682
                        // what blinding type we were, but if we couldn't
3683
                        // parse the payload we have no way of knowing whether
3684
                        // we were the introduction node or not.
3685
                        //
3686
                        //nolint:lll
UNCOV
3687
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
×
UNCOV
3688
                                l.cfg.ExtractErrorEncrypter,
×
UNCOV
3689
                                // We need our route role here because we
×
UNCOV
3690
                                // couldn't parse or validate the payload.
×
UNCOV
3691
                                routeRole == hop.RouteRoleIntroduction,
×
UNCOV
3692
                        )
×
UNCOV
3693
                        if failCode != lnwire.CodeNone {
×
3694
                                l.log.Errorf("could not extract error "+
×
3695
                                        "encrypter: %v", pldErr)
×
3696

×
3697
                                // We can't process this htlc, send back
×
3698
                                // malformed.
×
3699
                                l.sendMalformedHTLCError(
×
3700
                                        add.ID, failureCode, add.OnionBlob,
×
3701
                                        &sourceRef,
×
3702
                                )
×
3703

×
3704
                                continue
×
3705
                        }
3706

3707
                        // TODO: currently none of the test unit infrastructure
3708
                        // is setup to handle TLV payloads, so testing this
3709
                        // would require implementing a separate mock iterator
3710
                        // for TLV payloads that also supports injecting invalid
3711
                        // payloads. Deferring this non-trival effort till a
3712
                        // later date
UNCOV
3713
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
×
UNCOV
3714

×
UNCOV
3715
                        l.sendHTLCError(
×
UNCOV
3716
                                add, sourceRef, NewLinkError(failure),
×
UNCOV
3717
                                obfuscator, false,
×
UNCOV
3718
                        )
×
UNCOV
3719

×
UNCOV
3720
                        l.log.Errorf("unable to decode forwarding "+
×
UNCOV
3721
                                "instructions: %v", pldErr)
×
UNCOV
3722

×
UNCOV
3723
                        continue
×
3724
                }
3725

3726
                // Retrieve onion obfuscator from onion blob in order to
3727
                // produce initial obfuscation of the onion failureCode.
3728
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
1,488✔
3729
                        l.cfg.ExtractErrorEncrypter,
1,488✔
3730
                        routeRole == hop.RouteRoleIntroduction,
1,488✔
3731
                )
1,488✔
3732
                if failureCode != lnwire.CodeNone {
1,489✔
3733
                        // If we're unable to process the onion blob than we
1✔
3734
                        // should send the malformed htlc error to payment
1✔
3735
                        // sender.
1✔
3736
                        l.sendMalformedHTLCError(
1✔
3737
                                add.ID, failureCode, add.OnionBlob,
1✔
3738
                                &sourceRef,
1✔
3739
                        )
1✔
3740

1✔
3741
                        l.log.Errorf("unable to decode onion "+
1✔
3742
                                "obfuscator: %v", failureCode)
1✔
3743

1✔
3744
                        continue
1✔
3745
                }
3746

3747
                fwdInfo := pld.ForwardingInfo()
1,487✔
3748

1,487✔
3749
                // Check whether the payload we've just processed uses our
1,487✔
3750
                // node as the introduction point (gave us a blinding key in
1,487✔
3751
                // the payload itself) and fail it back if we don't support
1,487✔
3752
                // route blinding.
1,487✔
3753
                if fwdInfo.NextBlinding.IsSome() &&
1,487✔
3754
                        l.cfg.DisallowRouteBlinding {
1,487✔
UNCOV
3755

×
UNCOV
3756
                        failure := lnwire.NewInvalidBlinding(
×
UNCOV
3757
                                fn.Some(add.OnionBlob),
×
UNCOV
3758
                        )
×
UNCOV
3759

×
UNCOV
3760
                        l.sendHTLCError(
×
UNCOV
3761
                                add, sourceRef, NewLinkError(failure),
×
UNCOV
3762
                                obfuscator, false,
×
UNCOV
3763
                        )
×
UNCOV
3764

×
UNCOV
3765
                        l.log.Error("rejected htlc that uses use as an " +
×
UNCOV
3766
                                "introduction point when we do not support " +
×
UNCOV
3767
                                "route blinding")
×
UNCOV
3768

×
UNCOV
3769
                        continue
×
3770
                }
3771

3772
                switch fwdInfo.NextHop {
1,487✔
3773
                case hop.Exit:
1,451✔
3774
                        err := l.processExitHop(
1,451✔
3775
                                add, sourceRef, obfuscator, fwdInfo,
1,451✔
3776
                                heightNow, pld,
1,451✔
3777
                        )
1,451✔
3778
                        if err != nil {
1,451✔
UNCOV
3779
                                l.failf(LinkFailureError{
×
UNCOV
3780
                                        code: ErrInternalError,
×
UNCOV
3781
                                }, err.Error()) //nolint
×
UNCOV
3782

×
UNCOV
3783
                                return
×
UNCOV
3784
                        }
×
3785

3786
                // There are additional channels left within this route. So
3787
                // we'll simply do some forwarding package book-keeping.
3788
                default:
36✔
3789
                        // If hodl.AddIncoming is requested, we will not
36✔
3790
                        // validate the forwarded ADD, nor will we send the
36✔
3791
                        // packet to the htlc switch.
36✔
3792
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
36✔
3793
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3794
                                continue
×
3795
                        }
3796

3797
                        switch fwdPkg.State {
36✔
UNCOV
3798
                        case channeldb.FwdStateProcessed:
×
UNCOV
3799
                                // This add was not forwarded on the previous
×
UNCOV
3800
                                // processing phase, run it through our
×
UNCOV
3801
                                // validation pipeline to reproduce an error.
×
UNCOV
3802
                                // This may trigger a different error due to
×
UNCOV
3803
                                // expiring timelocks, but we expect that an
×
UNCOV
3804
                                // error will be reproduced.
×
UNCOV
3805
                                if !fwdPkg.FwdFilter.Contains(idx) {
×
3806
                                        break
×
3807
                                }
3808

3809
                                // Otherwise, it was already processed, we can
3810
                                // can collect it and continue.
UNCOV
3811
                                outgoingAdd := &lnwire.UpdateAddHTLC{
×
UNCOV
3812
                                        Expiry:        fwdInfo.OutgoingCTLV,
×
UNCOV
3813
                                        Amount:        fwdInfo.AmountToForward,
×
UNCOV
3814
                                        PaymentHash:   add.PaymentHash,
×
UNCOV
3815
                                        BlindingPoint: fwdInfo.NextBlinding,
×
UNCOV
3816
                                }
×
UNCOV
3817

×
UNCOV
3818
                                // Finally, we'll encode the onion packet for
×
UNCOV
3819
                                // the _next_ hop using the hop iterator
×
UNCOV
3820
                                // decoded for the current hop.
×
UNCOV
3821
                                buf := bytes.NewBuffer(
×
UNCOV
3822
                                        outgoingAdd.OnionBlob[0:0],
×
UNCOV
3823
                                )
×
UNCOV
3824

×
UNCOV
3825
                                // We know this cannot fail, as this ADD
×
UNCOV
3826
                                // was marked forwarded in a previous
×
UNCOV
3827
                                // round of processing.
×
UNCOV
3828
                                chanIterator.EncodeNextHop(buf)
×
UNCOV
3829

×
UNCOV
3830
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
×
UNCOV
3831

×
UNCOV
3832
                                //nolint:lll
×
UNCOV
3833
                                updatePacket := &htlcPacket{
×
UNCOV
3834
                                        incomingChanID:       l.ShortChanID(),
×
UNCOV
3835
                                        incomingHTLCID:       add.ID,
×
UNCOV
3836
                                        outgoingChanID:       fwdInfo.NextHop,
×
UNCOV
3837
                                        sourceRef:            &sourceRef,
×
UNCOV
3838
                                        incomingAmount:       add.Amount,
×
UNCOV
3839
                                        amount:               outgoingAdd.Amount,
×
UNCOV
3840
                                        htlc:                 outgoingAdd,
×
UNCOV
3841
                                        obfuscator:           obfuscator,
×
UNCOV
3842
                                        incomingTimeout:      add.Expiry,
×
UNCOV
3843
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
×
UNCOV
3844
                                        inOnionCustomRecords: pld.CustomRecords(),
×
UNCOV
3845
                                        inboundFee:           inboundFee,
×
UNCOV
3846
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
×
UNCOV
3847
                                }
×
UNCOV
3848
                                switchPackets = append(
×
UNCOV
3849
                                        switchPackets, updatePacket,
×
UNCOV
3850
                                )
×
UNCOV
3851

×
UNCOV
3852
                                continue
×
3853
                        }
3854

3855
                        // TODO(roasbeef): ensure don't accept outrageous
3856
                        // timeout for htlc
3857

3858
                        // With all our forwarding constraints met, we'll
3859
                        // create the outgoing HTLC using the parameters as
3860
                        // specified in the forwarding info.
3861
                        addMsg := &lnwire.UpdateAddHTLC{
36✔
3862
                                Expiry:        fwdInfo.OutgoingCTLV,
36✔
3863
                                Amount:        fwdInfo.AmountToForward,
36✔
3864
                                PaymentHash:   add.PaymentHash,
36✔
3865
                                BlindingPoint: fwdInfo.NextBlinding,
36✔
3866
                        }
36✔
3867

36✔
3868
                        // Finally, we'll encode the onion packet for the
36✔
3869
                        // _next_ hop using the hop iterator decoded for the
36✔
3870
                        // current hop.
36✔
3871
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
36✔
3872
                        err := chanIterator.EncodeNextHop(buf)
36✔
3873
                        if err != nil {
36✔
3874
                                l.log.Errorf("unable to encode the "+
×
3875
                                        "remaining route %v", err)
×
3876

×
3877
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:lll
×
3878
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3879
                                }
×
3880

3881
                                failure := l.createFailureWithUpdate(
×
3882
                                        true, hop.Source, cb,
×
3883
                                )
×
3884

×
3885
                                l.sendHTLCError(
×
3886
                                        add, sourceRef, NewLinkError(failure),
×
3887
                                        obfuscator, false,
×
3888
                                )
×
3889
                                continue
×
3890
                        }
3891

3892
                        // Now that this add has been reprocessed, only append
3893
                        // it to our list of packets to forward to the switch
3894
                        // this is the first time processing the add. If the
3895
                        // fwd pkg has already been processed, then we entered
3896
                        // the above section to recreate a previous error.  If
3897
                        // the packet had previously been forwarded, it would
3898
                        // have been added to switchPackets at the top of this
3899
                        // section.
3900
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
72✔
3901
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
36✔
3902

36✔
3903
                                //nolint:lll
36✔
3904
                                updatePacket := &htlcPacket{
36✔
3905
                                        incomingChanID:       l.ShortChanID(),
36✔
3906
                                        incomingHTLCID:       add.ID,
36✔
3907
                                        outgoingChanID:       fwdInfo.NextHop,
36✔
3908
                                        sourceRef:            &sourceRef,
36✔
3909
                                        incomingAmount:       add.Amount,
36✔
3910
                                        amount:               addMsg.Amount,
36✔
3911
                                        htlc:                 addMsg,
36✔
3912
                                        obfuscator:           obfuscator,
36✔
3913
                                        incomingTimeout:      add.Expiry,
36✔
3914
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
36✔
3915
                                        inOnionCustomRecords: pld.CustomRecords(),
36✔
3916
                                        inboundFee:           inboundFee,
36✔
3917
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
36✔
3918
                                }
36✔
3919

36✔
3920
                                fwdPkg.FwdFilter.Set(idx)
36✔
3921
                                switchPackets = append(switchPackets,
36✔
3922
                                        updatePacket)
36✔
3923
                        }
36✔
3924
                }
3925
        }
3926

3927
        // Commit the htlcs we are intending to forward if this package has not
3928
        // been fully processed.
3929
        if fwdPkg.State == channeldb.FwdStateLockedIn {
5,157✔
3930
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
2,577✔
3931
                if err != nil {
2,577✔
3932
                        l.failf(LinkFailureError{code: ErrInternalError},
×
3933
                                "unable to set fwd filter: %v", err)
×
3934
                        return
×
3935
                }
×
3936
        }
3937

3938
        if len(switchPackets) == 0 {
5,124✔
3939
                return
2,544✔
3940
        }
2,544✔
3941

3942
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
36✔
3943

36✔
3944
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
36✔
3945
                len(switchPackets), replay)
36✔
3946

36✔
3947
        // NOTE: This call is made synchronous so that we ensure all circuits
36✔
3948
        // are committed in the exact order that they are processed in the link.
36✔
3949
        // Failing to do this could cause reorderings/gaps in the range of
36✔
3950
        // opened circuits, which violates assumptions made by the circuit
36✔
3951
        // trimming.
36✔
3952
        l.forwardBatch(replay, switchPackets...)
36✔
3953
}
3954

3955
// processExitHop handles an htlc for which this link is the exit hop. It
3956
// returns a boolean indicating whether the commitment tx needs an update.
3957
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
3958
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
3959
        fwdInfo hop.ForwardingInfo, heightNow uint32,
3960
        payload invoices.Payload) error {
1,451✔
3961

1,451✔
3962
        // If hodl.ExitSettle is requested, we will not validate the final hop's
1,451✔
3963
        // ADD, nor will we settle the corresponding invoice or respond with the
1,451✔
3964
        // preimage.
1,451✔
3965
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
2,166✔
3966
                l.log.Warnf(hodl.ExitSettle.Warning())
715✔
3967

715✔
3968
                return nil
715✔
3969
        }
715✔
3970

3971
        // As we're the exit hop, we'll double check the hop-payload included in
3972
        // the HTLC to ensure that it was crafted correctly by the sender and
3973
        // is compatible with the HTLC we were extended.
3974
        //
3975
        // For a special case, if the fwdInfo doesn't have any blinded path
3976
        // information, and the incoming HTLC had special extra data, then
3977
        // we'll skip this amount check. The invoice acceptor will make sure we
3978
        // reject the HTLC if it's not containing the correct amount after
3979
        // examining the custom data.
3980
        hasBlindedPath := fwdInfo.NextBlinding.IsSome()
736✔
3981
        customHTLC := len(add.CustomRecords) > 0 && !hasBlindedPath
736✔
3982
        log.Tracef("Exit hop has_blinded_path=%v custom_htlc_bypass=%v",
736✔
3983
                hasBlindedPath, customHTLC)
736✔
3984

736✔
3985
        if !customHTLC && add.Amount < fwdInfo.AmountToForward {
836✔
3986
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
3987
                        "incompatible value: expected <=%v, got %v",
100✔
3988
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
3989

100✔
3990
                failure := NewLinkError(
100✔
3991
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
3992
                )
100✔
3993
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
3994

100✔
3995
                return nil
100✔
3996
        }
100✔
3997

3998
        // We'll also ensure that our time-lock value has been computed
3999
        // correctly.
4000
        if add.Expiry < fwdInfo.OutgoingCTLV {
637✔
4001
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
4002
                        "incompatible time-lock: expected <=%v, got %v",
1✔
4003
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
4004

1✔
4005
                failure := NewLinkError(
1✔
4006
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
4007
                )
1✔
4008

1✔
4009
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
4010

1✔
4011
                return nil
1✔
4012
        }
1✔
4013

4014
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4015
        // after this, this code will be re-executed after restart. We will
4016
        // receive back a resolution event.
4017
        invoiceHash := lntypes.Hash(add.PaymentHash)
635✔
4018

635✔
4019
        circuitKey := models.CircuitKey{
635✔
4020
                ChanID: l.ShortChanID(),
635✔
4021
                HtlcID: add.ID,
635✔
4022
        }
635✔
4023

635✔
4024
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
635✔
4025
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
635✔
4026
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
635✔
4027
        )
635✔
4028
        if err != nil {
635✔
UNCOV
4029
                return err
×
UNCOV
4030
        }
×
4031

4032
        // Create a hodlHtlc struct and decide either resolved now or later.
4033
        htlc := hodlHtlc{
635✔
4034
                add:        add,
635✔
4035
                sourceRef:  sourceRef,
635✔
4036
                obfuscator: obfuscator,
635✔
4037
        }
635✔
4038

635✔
4039
        // If the event is nil, the invoice is being held, so we save payment
635✔
4040
        // descriptor for future reference.
635✔
4041
        if event == nil {
1,124✔
4042
                l.hodlMap[circuitKey] = htlc
489✔
4043
                return nil
489✔
4044
        }
489✔
4045

4046
        // Process the received resolution.
4047
        return l.processHtlcResolution(event, htlc)
146✔
4048
}
4049

4050
// settleHTLC settles the HTLC on the channel.
4051
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4052
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
630✔
4053

630✔
4054
        hash := preimage.Hash()
630✔
4055

630✔
4056
        l.log.Infof("settling htlc %v as exit hop", hash)
630✔
4057

630✔
4058
        err := l.channel.SettleHTLC(
630✔
4059
                preimage, htlcIndex, &sourceRef, nil, nil,
630✔
4060
        )
630✔
4061
        if err != nil {
630✔
4062
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4063
        }
×
4064

4065
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4066
        // fake one before sending it to the peer.
4067
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
630✔
UNCOV
4068
                l.log.Warnf(hodl.BogusSettle.Warning())
×
UNCOV
4069
                preimage = [32]byte{}
×
UNCOV
4070
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
×
UNCOV
4071
        }
×
4072

4073
        // HTLC was successfully settled locally send notification about it
4074
        // remote peer.
4075
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
630✔
4076
                ChanID:          l.ChanID(),
630✔
4077
                ID:              htlcIndex,
630✔
4078
                PaymentPreimage: preimage,
630✔
4079
        })
630✔
4080

630✔
4081
        // Once we have successfully settled the htlc, notify a settle event.
630✔
4082
        l.cfg.HtlcNotifier.NotifySettleEvent(
630✔
4083
                HtlcKey{
630✔
4084
                        IncomingCircuit: models.CircuitKey{
630✔
4085
                                ChanID: l.ShortChanID(),
630✔
4086
                                HtlcID: htlcIndex,
630✔
4087
                        },
630✔
4088
                },
630✔
4089
                preimage,
630✔
4090
                HtlcEventTypeReceive,
630✔
4091
        )
630✔
4092

630✔
4093
        return nil
630✔
4094
}
4095

4096
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4097
// err chan for the individual responses. This method is intended to be spawned
4098
// as a goroutine so the responses can be handled in the background.
4099
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
1,441✔
4100
        // Don't forward packets for which we already have a response in our
1,441✔
4101
        // mailbox. This could happen if a packet fails and is buffered in the
1,441✔
4102
        // mailbox, and the incoming link flaps.
1,441✔
4103
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
1,441✔
4104
        for _, pkt := range packets {
2,883✔
4105
                if l.mailBox.HasPacket(pkt.inKey()) {
1,442✔
UNCOV
4106
                        continue
×
4107
                }
4108

4109
                filteredPkts = append(filteredPkts, pkt)
1,442✔
4110
        }
4111

4112
        err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
1,441✔
4113
        if err != nil {
1,452✔
4114
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
4115
                        "settle/fail over htlcswitch: %v", err)
11✔
4116
        }
11✔
4117
}
4118

4119
// sendHTLCError functions cancels HTLC and send cancel message back to the
4120
// peer from which HTLC was received.
4121
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4122
        sourceRef channeldb.AddRef, failure *LinkError,
4123
        e hop.ErrorEncrypter, isReceive bool) {
105✔
4124

105✔
4125
        reason, err := e.EncryptFirstHop(failure.WireMessage())
105✔
4126
        if err != nil {
105✔
4127
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4128
                return
×
4129
        }
×
4130

4131
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
105✔
4132
        if err != nil {
105✔
4133
                l.log.Errorf("unable cancel htlc: %v", err)
×
4134
                return
×
4135
        }
×
4136

4137
        // Send the appropriate failure message depending on whether we're
4138
        // in a blinded route or not.
4139
        if err := l.sendIncomingHTLCFailureMsg(
105✔
4140
                add.ID, e, reason,
105✔
4141
        ); err != nil {
105✔
4142
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4143
                return
×
4144
        }
×
4145

4146
        // Notify a link failure on our incoming link. Outgoing htlc information
4147
        // is not available at this point, because we have not decrypted the
4148
        // onion, so it is excluded.
4149
        var eventType HtlcEventType
105✔
4150
        if isReceive {
210✔
4151
                eventType = HtlcEventTypeReceive
105✔
4152
        } else {
105✔
UNCOV
4153
                eventType = HtlcEventTypeForward
×
UNCOV
4154
        }
×
4155

4156
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
105✔
4157
                HtlcKey{
105✔
4158
                        IncomingCircuit: models.CircuitKey{
105✔
4159
                                ChanID: l.ShortChanID(),
105✔
4160
                                HtlcID: add.ID,
105✔
4161
                        },
105✔
4162
                },
105✔
4163
                HtlcInfo{
105✔
4164
                        IncomingTimeLock: add.Expiry,
105✔
4165
                        IncomingAmt:      add.Amount,
105✔
4166
                },
105✔
4167
                eventType,
105✔
4168
                failure,
105✔
4169
                true,
105✔
4170
        )
105✔
4171
}
4172

4173
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4174
// peer from which the HTLC was received. This function is primarily used to
4175
// handle the special requirements of route blinding, specifically:
4176
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4177
// - Introduction nodes should return regular HTLC failure messages.
4178
//
4179
// It accepts the original opaque failure, which will be used in the case
4180
// that we're not part of a blinded route and an error encrypter that'll be
4181
// used if we are the introduction node and need to present an error as if
4182
// we're the failing party.
4183
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4184
        e hop.ErrorEncrypter,
4185
        originalFailure lnwire.OpaqueReason) error {
121✔
4186

121✔
4187
        var msg lnwire.Message
121✔
4188
        switch {
121✔
4189
        // Our circuit's error encrypter will be nil if this was a locally
4190
        // initiated payment. We can only hit a blinded error for a locally
4191
        // initiated payment if we allow ourselves to be picked as the
4192
        // introduction node for our own payments and in that case we
4193
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4194
        // we fail it back and log an error.
4195
        // code.
4196
        case e == nil:
×
4197
                msg = &lnwire.UpdateFailHTLC{
×
4198
                        ChanID: l.ChanID(),
×
4199
                        ID:     htlcIndex,
×
4200
                        Reason: originalFailure,
×
4201
                }
×
4202

×
4203
                l.log.Errorf("Unexpected blinded failure when "+
×
4204
                        "we are the sending node, incoming htlc: %v(%v)",
×
4205
                        l.ShortChanID(), htlcIndex)
×
4206

4207
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4208
        // transformation on the error message and can just send the original.
4209
        case !e.Type().IsBlinded():
121✔
4210
                msg = &lnwire.UpdateFailHTLC{
121✔
4211
                        ChanID: l.ChanID(),
121✔
4212
                        ID:     htlcIndex,
121✔
4213
                        Reason: originalFailure,
121✔
4214
                }
121✔
4215

4216
        // When we're the introduction node, we need to convert the error to
4217
        // a UpdateFailHTLC.
UNCOV
4218
        case e.Type() == hop.EncrypterTypeIntroduction:
×
UNCOV
4219
                l.log.Debugf("Introduction blinded node switching out failure "+
×
UNCOV
4220
                        "error: %v", htlcIndex)
×
UNCOV
4221

×
UNCOV
4222
                // The specification does not require that we set the onion
×
UNCOV
4223
                // blob.
×
UNCOV
4224
                failureMsg := lnwire.NewInvalidBlinding(
×
UNCOV
4225
                        fn.None[[lnwire.OnionPacketSize]byte](),
×
UNCOV
4226
                )
×
UNCOV
4227
                reason, err := e.EncryptFirstHop(failureMsg)
×
UNCOV
4228
                if err != nil {
×
4229
                        return err
×
4230
                }
×
4231

UNCOV
4232
                msg = &lnwire.UpdateFailHTLC{
×
UNCOV
4233
                        ChanID: l.ChanID(),
×
UNCOV
4234
                        ID:     htlcIndex,
×
UNCOV
4235
                        Reason: reason,
×
UNCOV
4236
                }
×
4237

4238
        // If we are a relaying node, we need to switch out any error that
4239
        // we've received to a malformed HTLC error.
UNCOV
4240
        case e.Type() == hop.EncrypterTypeRelaying:
×
UNCOV
4241
                l.log.Debugf("Relaying blinded node switching out malformed "+
×
UNCOV
4242
                        "error: %v", htlcIndex)
×
UNCOV
4243

×
UNCOV
4244
                msg = &lnwire.UpdateFailMalformedHTLC{
×
UNCOV
4245
                        ChanID:      l.ChanID(),
×
UNCOV
4246
                        ID:          htlcIndex,
×
UNCOV
4247
                        FailureCode: lnwire.CodeInvalidBlinding,
×
UNCOV
4248
                }
×
4249

4250
        default:
×
4251
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4252
        }
4253

4254
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
121✔
4255
                l.log.Warnf("Send update fail failed: %v", err)
×
4256
        }
×
4257

4258
        return nil
121✔
4259
}
4260

4261
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4262
// to the payment sender.
4263
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4264
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4265
        sourceRef *channeldb.AddRef) {
3✔
4266

3✔
4267
        shaOnionBlob := sha256.Sum256(onionBlob[:])
3✔
4268
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
3✔
4269
        if err != nil {
3✔
4270
                l.log.Errorf("unable cancel htlc: %v", err)
×
4271
                return
×
4272
        }
×
4273

4274
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
3✔
4275
                ChanID:       l.ChanID(),
3✔
4276
                ID:           htlcIndex,
3✔
4277
                ShaOnionBlob: shaOnionBlob,
3✔
4278
                FailureCode:  code,
3✔
4279
        })
3✔
4280
}
4281

4282
// failf is a function which is used to encapsulate the action necessary for
4283
// properly failing the link. It takes a LinkFailureError, which will be passed
4284
// to the OnChannelFailure closure, in order for it to determine if we should
4285
// force close the channel, and if we should send an error message to the
4286
// remote peer.
4287
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4288
        a ...interface{}) {
7✔
4289

7✔
4290
        reason := fmt.Errorf(format, a...)
7✔
4291

7✔
4292
        // Return if we have already notified about a failure.
7✔
4293
        if l.failed {
7✔
UNCOV
4294
                l.log.Warnf("ignoring link failure (%v), as link already "+
×
UNCOV
4295
                        "failed", reason)
×
UNCOV
4296
                return
×
UNCOV
4297
        }
×
4298

4299
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
7✔
4300

7✔
4301
        // Set failed, such that we won't process any more updates, and notify
7✔
4302
        // the peer about the failure.
7✔
4303
        l.failed = true
7✔
4304
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
7✔
4305
}
4306

4307
// FundingCustomBlob returns the custom funding blob of the channel that this
4308
// link is associated with. The funding blob represents static information about
4309
// the channel that was created at channel funding time.
4310
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4311
        if l.channel == nil {
×
4312
                return fn.None[tlv.Blob]()
×
4313
        }
×
4314

4315
        if l.channel.State() == nil {
×
4316
                return fn.None[tlv.Blob]()
×
4317
        }
×
4318

4319
        return l.channel.State().CustomBlob
×
4320
}
4321

4322
// CommitmentCustomBlob returns the custom blob of the current local commitment
4323
// of the channel that this link is associated with.
4324
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4325
        if l.channel == nil {
×
4326
                return fn.None[tlv.Blob]()
×
4327
        }
×
4328

4329
        return l.channel.LocalCommitmentBlob()
×
4330
}
4331

4332
// runInEventLoop is a helper function that runs the given function on the
4333
// provided argument inside the main event loop of the channel link.
4334
func runInEventLoop[A, B any](l *channelLink, a A, f func(A) B) fn.Result[B] {
1,654✔
4335
        req, res := fn.NewReq[A, B](a)
1,654✔
4336

1,654✔
4337
        // Build a closure that reads the state we care about. This is done so
1,654✔
4338
        // we can get a fully consistent read of state.
1,654✔
4339
        query := func() {
3,308✔
4340
                req.Resolve(f(a))
1,654✔
4341
        }
1,654✔
4342

4343
        // Chuck that closure into the event loop to ensure it is executed by
4344
        // the thread that writes this state.
4345
        if !fn.SendOrQuit(l.stateQueries, query, l.quit) {
1,654✔
NEW
4346
                return fn.Errf[B]("channel link is shutting down")
×
NEW
4347
        }
×
4348

4349
        return fn.NewResult(fn.RecvResp(res, nil, l.quit))
1,654✔
4350
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc