• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12231552240

09 Dec 2024 08:17AM UTC coverage: 58.955% (+0.02%) from 58.933%
12231552240

Pull #9242

github

aakselrod
go.mod: update btcwallet to latest to eliminate waddrmgr deadlock
Pull Request #9242: Reapply #8644

24 of 40 new or added lines in 3 files covered. (60.0%)

89 existing lines in 18 files now uncovered.

133525 of 226485 relevant lines covered (58.96%)

19398.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "crypto/sha256"
7
        "errors"
8
        "fmt"
9
        prand "math/rand"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/btcsuite/btclog/v2"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/contractcourt"
19
        "github.com/lightningnetwork/lnd/fn"
20
        "github.com/lightningnetwork/lnd/graph/db/models"
21
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
23
        "github.com/lightningnetwork/lnd/input"
24
        "github.com/lightningnetwork/lnd/invoices"
25
        "github.com/lightningnetwork/lnd/lnpeer"
26
        "github.com/lightningnetwork/lnd/lntypes"
27
        "github.com/lightningnetwork/lnd/lnutils"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/queue"
32
        "github.com/lightningnetwork/lnd/record"
33
        "github.com/lightningnetwork/lnd/ticker"
34
        "github.com/lightningnetwork/lnd/tlv"
35
)
36

37
func init() {
15✔
38
        prand.Seed(time.Now().UnixNano())
15✔
39
}
15✔
40

41
const (
42
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
43
        // the node accepts for forwarded payments. The value is relative to the
44
        // current block height. The reason to have a maximum is to prevent
45
        // funds getting locked up unreasonably long. Otherwise, an attacker
46
        // willing to lock its own funds too, could force the funds of this node
47
        // to be locked up for an indefinite (max int32) number of blocks.
48
        //
49
        // The value 2016 corresponds to on average two weeks worth of blocks
50
        // and is based on the maximum number of hops (20), the default CLTV
51
        // delta (40), and some extra margin to account for the other lightning
52
        // implementations and past lnd versions which used to have a default
53
        // CLTV delta of 144.
54
        DefaultMaxOutgoingCltvExpiry = 2016
55

56
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
57
        // which a link should propose to update its commitment fee rate.
58
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
59

60
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
61
        // which a link should propose to update its commitment fee rate.
62
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
63

64
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
65
        // a channel's commitment fee to be of its balance. This only applies to
66
        // the initiator of the channel.
67
        DefaultMaxLinkFeeAllocation float64 = 0.5
68
)
69

70
// ExpectedFee computes the expected fee for a given htlc amount. The value
71
// returned from this function is to be used as a sanity check when forwarding
72
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
73
// forwarding policy.
74
//
75
// TODO(roasbeef): also add in current available channel bandwidth, inverse
76
// func
77
func ExpectedFee(f models.ForwardingPolicy,
78
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
82✔
79

82✔
80
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
82✔
81
}
82✔
82

83
// ChannelLinkConfig defines the configuration for the channel link. ALL
84
// elements within the configuration MUST be non-nil for channel link to carry
85
// out its duties.
86
type ChannelLinkConfig struct {
87
        // FwrdingPolicy is the initial forwarding policy to be used when
88
        // deciding whether to forwarding incoming HTLC's or not. This value
89
        // can be updated with subsequent calls to UpdateForwardingPolicy
90
        // targeted at a given ChannelLink concrete interface implementation.
91
        FwrdingPolicy models.ForwardingPolicy
92

93
        // Circuits provides restricted access to the switch's circuit map,
94
        // allowing the link to open and close circuits.
95
        Circuits CircuitModifier
96

97
        // BestHeight returns the best known height.
98
        BestHeight func() uint32
99

100
        // ForwardPackets attempts to forward the batch of htlcs through the
101
        // switch. The function returns and error in case it fails to send one or
102
        // more packets. The link's quit signal should be provided to allow
103
        // cancellation of forwarding during link shutdown.
104
        ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
105

106
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
107
        // blobs, which are then used to inform how to forward an HTLC.
108
        //
109
        // NOTE: This function assumes the same set of readers and preimages
110
        // are always presented for the same identifier.
111
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
112
                []hop.DecodeHopIteratorResponse, error)
113

114
        // ExtractErrorEncrypter function is responsible for decoding HTLC
115
        // Sphinx onion blob, and creating onion failure obfuscator.
116
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
117

118
        // FetchLastChannelUpdate retrieves the latest routing policy for a
119
        // target channel. This channel will typically be the outgoing channel
120
        // specified when we receive an incoming HTLC.  This will be used to
121
        // provide payment senders our latest policy when sending encrypted
122
        // error messages.
123
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
124
                *lnwire.ChannelUpdate1, error)
125

126
        // Peer is a lightning network node with which we have the channel link
127
        // opened.
128
        Peer lnpeer.Peer
129

130
        // Registry is a sub-system which responsible for managing the invoices
131
        // in thread-safe manner.
132
        Registry InvoiceDatabase
133

134
        // PreimageCache is a global witness beacon that houses any new
135
        // preimages discovered by other links. We'll use this to add new
136
        // witnesses that we discover which will notify any sub-systems
137
        // subscribed to new events.
138
        PreimageCache contractcourt.WitnessBeacon
139

140
        // OnChannelFailure is a function closure that we'll call if the
141
        // channel failed for some reason. Depending on the severity of the
142
        // error, the closure potentially must force close this channel and
143
        // disconnect the peer.
144
        //
145
        // NOTE: The method must return in order for the ChannelLink to be able
146
        // to shut down properly.
147
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
148
                LinkFailureError)
149

150
        // UpdateContractSignals is a function closure that we'll use to update
151
        // outside sub-systems with this channel's latest ShortChannelID.
152
        UpdateContractSignals func(*contractcourt.ContractSignals) error
153

154
        // NotifyContractUpdate is a function closure that we'll use to update
155
        // the contractcourt and more specifically the ChannelArbitrator of the
156
        // latest channel state.
157
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
158

159
        // ChainEvents is an active subscription to the chain watcher for this
160
        // channel to be notified of any on-chain activity related to this
161
        // channel.
162
        ChainEvents *contractcourt.ChainEventSubscription
163

164
        // FeeEstimator is an instance of a live fee estimator which will be
165
        // used to dynamically regulate the current fee of the commitment
166
        // transaction to ensure timely confirmation.
167
        FeeEstimator chainfee.Estimator
168

169
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
170
        // for HTLC forwarding internal to the switch.
171
        //
172
        // NOTE: This should only be used for testing.
173
        HodlMask hodl.Mask
174

175
        // SyncStates is used to indicate that we need send the channel
176
        // reestablishment message to the remote peer. It should be done if our
177
        // clients have been restarted, or remote peer have been reconnected.
178
        SyncStates bool
179

180
        // BatchTicker is the ticker that determines the interval that we'll
181
        // use to check the batch to see if there're any updates we should
182
        // flush out. By batching updates into a single commit, we attempt to
183
        // increase throughput by maximizing the number of updates coalesced
184
        // into a single commit.
185
        BatchTicker ticker.Ticker
186

187
        // FwdPkgGCTicker is the ticker determining the frequency at which
188
        // garbage collection of forwarding packages occurs. We use a
189
        // time-based approach, as opposed to block epochs, as to not hinder
190
        // syncing.
191
        FwdPkgGCTicker ticker.Ticker
192

193
        // PendingCommitTicker is a ticker that allows the link to determine if
194
        // a locally initiated commitment dance gets stuck waiting for the
195
        // remote party to revoke.
196
        PendingCommitTicker ticker.Ticker
197

198
        // BatchSize is the max size of a batch of updates done to the link
199
        // before we do a state update.
200
        BatchSize uint32
201

202
        // UnsafeReplay will cause a link to replay the adds in its latest
203
        // commitment txn after the link is restarted. This should only be used
204
        // in testing, it is here to ensure the sphinx replay detection on the
205
        // receiving node is persistent.
206
        UnsafeReplay bool
207

208
        // MinUpdateTimeout represents the minimum interval in which a link
209
        // will propose to update its commitment fee rate. A random timeout will
210
        // be selected between this and MaxUpdateTimeout.
211
        MinUpdateTimeout time.Duration
212

213
        // MaxUpdateTimeout represents the maximum interval in which a link
214
        // will propose to update its commitment fee rate. A random timeout will
215
        // be selected between this and MinUpdateTimeout.
216
        MaxUpdateTimeout time.Duration
217

218
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
219
        // an htlc where we don't offer an htlc anymore. This should be at least
220
        // the outgoing broadcast delta, because in any case we don't want to
221
        // risk offering an htlc that triggers channel closure.
222
        OutgoingCltvRejectDelta uint32
223

224
        // TowerClient is an optional engine that manages the signing,
225
        // encrypting, and uploading of justice transactions to the daemon's
226
        // configured set of watchtowers for legacy channels.
227
        TowerClient TowerClient
228

229
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
230
        // should accept for a forwarded HTLC. The value is relative to the
231
        // current block height.
232
        MaxOutgoingCltvExpiry uint32
233

234
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
235
        // commitment fee to be of its balance. This only applies to the
236
        // initiator of the channel.
237
        MaxFeeAllocation float64
238

239
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
240
        // the initiator for channels of the anchor type.
241
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
242

243
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
244
        // link is first started.
245
        NotifyActiveLink func(wire.OutPoint)
246

247
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
248
        // channels becomes active.
249
        NotifyActiveChannel func(wire.OutPoint)
250

251
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
252
        // when channels become inactive.
253
        NotifyInactiveChannel func(wire.OutPoint)
254

255
        // NotifyInactiveLinkEvent allows the switch to tell the
256
        // ChannelNotifier when a channel link become inactive.
257
        NotifyInactiveLinkEvent func(wire.OutPoint)
258

259
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
260
        // events through.
261
        HtlcNotifier htlcNotifier
262

263
        // FailAliasUpdate is a function used to fail an HTLC for an
264
        // option_scid_alias channel.
265
        FailAliasUpdate func(sid lnwire.ShortChannelID,
266
                incoming bool) *lnwire.ChannelUpdate1
267

268
        // GetAliases is used by the link and switch to fetch the set of
269
        // aliases for a given link.
270
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
271

272
        // PreviouslySentShutdown is an optional value that is set if, at the
273
        // time of the link being started, persisted shutdown info was found for
274
        // the channel. This value being set means that we previously sent a
275
        // Shutdown message to our peer, and so we should do so again on
276
        // re-establish and should not allow anymore HTLC adds on the outgoing
277
        // direction of the link.
278
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
279

280
        // Adds the option to disable forwarding payments in blinded routes
281
        // by failing back any blinding-related payloads as if they were
282
        // invalid.
283
        DisallowRouteBlinding bool
284

285
        // DisallowQuiescence is a flag that can be used to disable the
286
        // quiescence protocol.
287
        DisallowQuiescence bool
288

289
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
290
        // restrict the flow of HTLCs and fee updates.
291
        MaxFeeExposure lnwire.MilliSatoshi
292

293
        // ShouldFwdExpEndorsement is a closure that indicates whether the link
294
        // should forward experimental endorsement signals.
295
        ShouldFwdExpEndorsement func() bool
296

297
        // AuxTrafficShaper is an optional auxiliary traffic shaper that can be
298
        // used to manage the bandwidth of the link.
299
        AuxTrafficShaper fn.Option[AuxTrafficShaper]
300
}
301

302
// channelLink is the service which drives a channel's commitment update
303
// state-machine. In the event that an HTLC needs to be propagated to another
304
// link, the forward handler from config is used which sends HTLC to the
305
// switch. Additionally, the link encapsulate logic of commitment protocol
306
// message ordering and updates.
307
type channelLink struct {
308
        // The following fields are only meant to be used *atomically*
309
        started       int32
310
        reestablished int32
311
        shutdown      int32
312

313
        // failed should be set to true in case a link error happens, making
314
        // sure we don't process any more updates.
315
        failed bool
316

317
        // keystoneBatch represents a volatile list of keystones that must be
318
        // written before attempting to sign the next commitment txn. These
319
        // represent all the HTLC's forwarded to the link from the switch. Once
320
        // we lock them into our outgoing commitment, then the circuit has a
321
        // keystone, and is fully opened.
322
        keystoneBatch []Keystone
323

324
        // openedCircuits is the set of all payment circuits that will be open
325
        // once we make our next commitment. After making the commitment we'll
326
        // ACK all these from our mailbox to ensure that they don't get
327
        // re-delivered if we reconnect.
328
        openedCircuits []CircuitKey
329

330
        // closedCircuits is the set of all payment circuits that will be
331
        // closed once we make our next commitment. After taking the commitment
332
        // we'll ACK all these to ensure that they don't get re-delivered if we
333
        // reconnect.
334
        closedCircuits []CircuitKey
335

336
        // channel is a lightning network channel to which we apply htlc
337
        // updates.
338
        channel *lnwallet.LightningChannel
339

340
        // cfg is a structure which carries all dependable fields/handlers
341
        // which may affect behaviour of the service.
342
        cfg ChannelLinkConfig
343

344
        // mailBox is the main interface between the outside world and the
345
        // link. All incoming messages will be sent over this mailBox. Messages
346
        // include new updates from our connected peer, and new packets to be
347
        // forwarded sent by the switch.
348
        mailBox MailBox
349

350
        // upstream is a channel that new messages sent from the remote peer to
351
        // the local peer will be sent across.
352
        upstream chan lnwire.Message
353

354
        // downstream is a channel in which new multi-hop HTLC's to be
355
        // forwarded will be sent across. Messages from this channel are sent
356
        // by the HTLC switch.
357
        downstream chan *htlcPacket
358

359
        // updateFeeTimer is the timer responsible for updating the link's
360
        // commitment fee every time it fires.
361
        updateFeeTimer *time.Timer
362

363
        // uncommittedPreimages stores a list of all preimages that have been
364
        // learned since receiving the last CommitSig from the remote peer. The
365
        // batch will be flushed just before accepting the subsequent CommitSig
366
        // or on shutdown to avoid doing a write for each preimage received.
367
        uncommittedPreimages []lntypes.Preimage
368

369
        sync.RWMutex
370

371
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
372
        // registry.
373
        hodlQueue *queue.ConcurrentQueue
374

375
        // hodlMap stores related htlc data for a circuit key. It allows
376
        // resolving those htlcs when we receive a message on hodlQueue.
377
        hodlMap map[models.CircuitKey]hodlHtlc
378

379
        // log is a link-specific logging instance.
380
        log btclog.Logger
381

382
        // isOutgoingAddBlocked tracks whether the channelLink can send an
383
        // UpdateAddHTLC.
384
        isOutgoingAddBlocked atomic.Bool
385

386
        // isIncomingAddBlocked tracks whether the channelLink can receive an
387
        // UpdateAddHTLC.
388
        isIncomingAddBlocked atomic.Bool
389

390
        // flushHooks is a hookMap that is triggered when we reach a channel
391
        // state with no live HTLCs.
392
        flushHooks hookMap
393

394
        // outgoingCommitHooks is a hookMap that is triggered after we send our
395
        // next CommitSig.
396
        outgoingCommitHooks hookMap
397

398
        // incomingCommitHooks is a hookMap that is triggered after we receive
399
        // our next CommitSig.
400
        incomingCommitHooks hookMap
401

402
        // quiescer is the state machine that tracks where this channel is with
403
        // respect to the quiescence protocol.
404
        quiescer Quiescer
405

406
        // quiescenceReqs is a queue of requests to quiesce this link. The
407
        // members of the queue are send-only channels we should call back with
408
        // the result.
409
        quiescenceReqs chan StfuReq
410

411
        // ContextGuard is a helper that encapsulates a wait group and quit
412
        // channel and allows contexts that either block or cancel on those
413
        // depending on the use case.
414
        *fn.ContextGuard
415
}
416

417
// hookMap is a data structure that is used to track the hooks that need to be
418
// called in various parts of the channelLink's lifecycle.
419
//
420
// WARNING: NOT thread-safe.
421
type hookMap struct {
422
        // allocIdx keeps track of the next id we haven't yet allocated.
423
        allocIdx atomic.Uint64
424

425
        // transient is a map of hooks that are only called the next time invoke
426
        // is called. These hooks are deleted during invoke.
427
        transient map[uint64]func()
428

429
        // newTransients is a channel that we use to accept new hooks into the
430
        // hookMap.
431
        newTransients chan func()
432
}
433

434
// newHookMap initializes a new empty hookMap.
435
func newHookMap() hookMap {
649✔
436
        return hookMap{
649✔
437
                allocIdx:      atomic.Uint64{},
649✔
438
                transient:     make(map[uint64]func()),
649✔
439
                newTransients: make(chan func()),
649✔
440
        }
649✔
441
}
649✔
442

443
// alloc allocates space in the hook map for the supplied hook, the second
444
// argument determines whether it goes into the transient or persistent part
445
// of the hookMap.
446
func (m *hookMap) alloc(hook func()) uint64 {
6✔
447
        // We assume we never overflow a uint64. Seems OK.
6✔
448
        hookID := m.allocIdx.Add(1)
6✔
449
        if hookID == 0 {
6✔
450
                panic("hookMap allocIdx overflow")
×
451
        }
452
        m.transient[hookID] = hook
6✔
453

6✔
454
        return hookID
6✔
455
}
456

457
// invoke is used on a hook map to call all the registered hooks and then clear
458
// out the transient hooks so they are not called again.
459
func (m *hookMap) invoke() {
2,697✔
460
        for _, hook := range m.transient {
2,703✔
461
                hook()
6✔
462
        }
6✔
463

464
        m.transient = make(map[uint64]func())
2,697✔
465
}
466

467
// hodlHtlc contains htlc data that is required for resolution.
468
type hodlHtlc struct {
469
        add        lnwire.UpdateAddHTLC
470
        sourceRef  channeldb.AddRef
471
        obfuscator hop.ErrorEncrypter
472
}
473

474
// NewChannelLink creates a new instance of a ChannelLink given a configuration
475
// and active channel that will be used to verify/apply updates to.
476
func NewChannelLink(cfg ChannelLinkConfig,
477
        channel *lnwallet.LightningChannel) ChannelLink {
219✔
478

219✔
479
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
219✔
480

219✔
481
        // If the max fee exposure isn't set, use the default.
219✔
482
        if cfg.MaxFeeExposure == 0 {
434✔
483
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
215✔
484
        }
215✔
485

486
        var qsm Quiescer
219✔
487
        if !cfg.DisallowQuiescence {
438✔
488
                qsm = NewQuiescer(QuiescerCfg{
219✔
489
                        chanID: lnwire.NewChanIDFromOutPoint(
219✔
490
                                channel.ChannelPoint(),
219✔
491
                        ),
219✔
492
                        channelInitiator: channel.Initiator(),
219✔
493
                        sendMsg: func(s lnwire.Stfu) error {
225✔
494
                                return cfg.Peer.SendMessage(false, &s)
6✔
495
                        },
6✔
496
                        timeoutDuration: defaultQuiescenceTimeout,
497
                        onTimeout: func() {
2✔
498
                                cfg.Peer.Disconnect(ErrQuiescenceTimeout)
2✔
499
                        },
2✔
500
                })
501
        } else {
×
502
                qsm = &quiescerNoop{}
×
503
        }
×
504

505
        quiescenceReqs := make(
219✔
506
                chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1,
219✔
507
        )
219✔
508

219✔
509
        return &channelLink{
219✔
510
                cfg:                 cfg,
219✔
511
                channel:             channel,
219✔
512
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
219✔
513
                hodlQueue:           queue.NewConcurrentQueue(10),
219✔
514
                log:                 log.WithPrefix(logPrefix),
219✔
515
                flushHooks:          newHookMap(),
219✔
516
                outgoingCommitHooks: newHookMap(),
219✔
517
                incomingCommitHooks: newHookMap(),
219✔
518
                quiescer:            qsm,
219✔
519
                quiescenceReqs:      quiescenceReqs,
219✔
520
                ContextGuard:        fn.NewContextGuard(),
219✔
521
        }
219✔
522
}
523

524
// A compile time check to ensure channelLink implements the ChannelLink
525
// interface.
526
var _ ChannelLink = (*channelLink)(nil)
527

528
// Start starts all helper goroutines required for the operation of the channel
529
// link.
530
//
531
// NOTE: Part of the ChannelLink interface.
532
func (l *channelLink) Start() error {
217✔
533
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
217✔
534
                err := fmt.Errorf("channel link(%v): already started", l)
×
535
                l.log.Warn("already started")
×
536
                return err
×
537
        }
×
538

539
        l.log.Info("starting")
217✔
540

217✔
541
        // If the config supplied watchtower client, ensure the channel is
217✔
542
        // registered before trying to use it during operation.
217✔
543
        if l.cfg.TowerClient != nil {
221✔
544
                err := l.cfg.TowerClient.RegisterChannel(
4✔
545
                        l.ChanID(), l.channel.State().ChanType,
4✔
546
                )
4✔
547
                if err != nil {
4✔
548
                        return err
×
549
                }
×
550
        }
551

552
        l.mailBox.ResetMessages()
217✔
553
        l.hodlQueue.Start()
217✔
554

217✔
555
        // Before launching the htlcManager messages, revert any circuits that
217✔
556
        // were marked open in the switch's circuit map, but did not make it
217✔
557
        // into a commitment txn. We use the next local htlc index as the cut
217✔
558
        // off point, since all indexes below that are committed. This action
217✔
559
        // is only performed if the link's final short channel ID has been
217✔
560
        // assigned, otherwise we would try to trim the htlcs belonging to the
217✔
561
        // all-zero, hop.Source ID.
217✔
562
        if l.ShortChanID() != hop.Source {
434✔
563
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
217✔
564
                if err != nil {
217✔
565
                        return fmt.Errorf("unable to retrieve next local "+
×
566
                                "htlc index: %v", err)
×
567
                }
×
568

569
                // NOTE: This is automatically done by the switch when it
570
                // starts up, but is necessary to prevent inconsistencies in
571
                // the case that the link flaps. This is a result of a link's
572
                // life-cycle being shorter than that of the switch.
573
                chanID := l.ShortChanID()
217✔
574
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
217✔
575
                if err != nil {
217✔
576
                        return fmt.Errorf("unable to trim circuits above "+
×
577
                                "local htlc index %d: %v", localHtlcIndex, err)
×
578
                }
×
579

580
                // Since the link is live, before we start the link we'll update
581
                // the ChainArbitrator with the set of new channel signals for
582
                // this channel.
583
                //
584
                // TODO(roasbeef): split goroutines within channel arb to avoid
585
                go func() {
434✔
586
                        signals := &contractcourt.ContractSignals{
217✔
587
                                ShortChanID: l.channel.ShortChanID(),
217✔
588
                        }
217✔
589

217✔
590
                        err := l.cfg.UpdateContractSignals(signals)
217✔
591
                        if err != nil {
217✔
592
                                l.log.Errorf("unable to update signals")
×
593
                        }
×
594
                }()
595
        }
596

597
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
217✔
598

217✔
599
        l.Wg.Add(1)
217✔
600
        go l.htlcManager()
217✔
601

217✔
602
        return nil
217✔
603
}
604

605
// Stop gracefully stops all active helper goroutines, then waits until they've
606
// exited.
607
//
608
// NOTE: Part of the ChannelLink interface.
609
func (l *channelLink) Stop() {
218✔
610
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
230✔
611
                l.log.Warn("already stopped")
12✔
612
                return
12✔
613
        }
12✔
614

615
        l.log.Info("stopping")
206✔
616

206✔
617
        // As the link is stopping, we are no longer interested in htlc
206✔
618
        // resolutions coming from the invoice registry.
206✔
619
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
206✔
620

206✔
621
        if l.cfg.ChainEvents.Cancel != nil {
210✔
622
                l.cfg.ChainEvents.Cancel()
4✔
623
        }
4✔
624

625
        // Ensure the channel for the timer is drained.
626
        if l.updateFeeTimer != nil {
412✔
627
                if !l.updateFeeTimer.Stop() {
206✔
628
                        select {
×
629
                        case <-l.updateFeeTimer.C:
×
630
                        default:
×
631
                        }
632
                }
633
        }
634

635
        if l.hodlQueue != nil {
412✔
636
                l.hodlQueue.Stop()
206✔
637
        }
206✔
638

639
        close(l.Quit)
206✔
640
        l.Wg.Wait()
206✔
641

206✔
642
        // Now that the htlcManager has completely exited, reset the packet
206✔
643
        // courier. This allows the mailbox to revaluate any lingering Adds that
206✔
644
        // were delivered but didn't make it on a commitment to be failed back
206✔
645
        // if the link is offline for an extended period of time. The error is
206✔
646
        // ignored since it can only fail when the daemon is exiting.
206✔
647
        _ = l.mailBox.ResetPackets()
206✔
648

206✔
649
        // As a final precaution, we will attempt to flush any uncommitted
206✔
650
        // preimages to the preimage cache. The preimages should be re-delivered
206✔
651
        // after channel reestablishment, however this adds an extra layer of
206✔
652
        // protection in case the peer never returns. Without this, we will be
206✔
653
        // unable to settle any contracts depending on the preimages even though
206✔
654
        // we had learned them at some point.
206✔
655
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
206✔
656
        if err != nil {
206✔
657
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
658
                        l.uncommittedPreimages, err)
×
659
        }
×
660
}
661

662
// WaitForShutdown blocks until the link finishes shutting down, which includes
663
// termination of all dependent goroutines.
664
func (l *channelLink) WaitForShutdown() {
×
665
        l.Wg.Wait()
×
666
}
×
667

668
// EligibleToForward returns a bool indicating if the channel is able to
669
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
670
// we are eligible to update AND the channel isn't currently flushing the
671
// outgoing half of the channel.
672
//
673
// NOTE: MUST NOT be called from the main event loop.
674
func (l *channelLink) EligibleToForward() bool {
617✔
675
        l.RLock()
617✔
676
        defer l.RUnlock()
617✔
677

617✔
678
        return l.eligibleToForward()
617✔
679
}
617✔
680

681
// eligibleToForward returns a bool indicating if the channel is able to
682
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
683
// we are eligible to update AND the channel isn't currently flushing the
684
// outgoing half of the channel.
685
//
686
// NOTE: MUST be called from the main event loop.
687
func (l *channelLink) eligibleToForward() bool {
617✔
688
        return l.eligibleToUpdate() && !l.IsFlushing(Outgoing)
617✔
689
}
617✔
690

691
// eligibleToUpdate returns a bool indicating if the channel is able to update
692
// channel state. We're able to update channel state if we know the remote
693
// party's next revocation point. Otherwise, we can't initiate new channel
694
// state. We also require that the short channel ID not be the all-zero source
695
// ID, meaning that the channel has had its ID finalized.
696
//
697
// NOTE: MUST be called from the main event loop.
698
func (l *channelLink) eligibleToUpdate() bool {
620✔
699
        return l.channel.RemoteNextRevocation() != nil &&
620✔
700
                l.channel.ShortChanID() != hop.Source &&
620✔
701
                l.isReestablished() &&
620✔
702
                l.quiescer.CanSendUpdates()
620✔
703
}
620✔
704

705
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
706
// the specified direction. It returns true if the state was changed and false
707
// if the desired state was already set before the method was called.
708
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
16✔
709
        if linkDirection == Outgoing {
22✔
710
                return l.isOutgoingAddBlocked.Swap(false)
6✔
711
        }
6✔
712

713
        return l.isIncomingAddBlocked.Swap(false)
10✔
714
}
715

716
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
717
// the specified direction. It returns true if the state was changed and false
718
// if the desired state was already set before the method was called.
719
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
17✔
720
        if linkDirection == Outgoing {
30✔
721
                return !l.isOutgoingAddBlocked.Swap(true)
13✔
722
        }
13✔
723

724
        return !l.isIncomingAddBlocked.Swap(true)
8✔
725
}
726

727
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
728
// the argument.
729
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
1,595✔
730
        if linkDirection == Outgoing {
2,716✔
731
                return l.isOutgoingAddBlocked.Load()
1,121✔
732
        }
1,121✔
733

734
        return l.isIncomingAddBlocked.Load()
478✔
735
}
736

737
// OnFlushedOnce adds a hook that will be called the next time the channel
738
// state reaches zero htlcs. This hook will only ever be called once. If the
739
// channel state already has zero htlcs, then this will be called immediately.
740
func (l *channelLink) OnFlushedOnce(hook func()) {
5✔
741
        select {
5✔
742
        case l.flushHooks.newTransients <- hook:
5✔
743
        case <-l.Quit:
×
744
        }
745
}
746

747
// OnCommitOnce adds a hook that will be called the next time a CommitSig
748
// message is sent in the argument's LinkDirection. This hook will only ever be
749
// called once. If no CommitSig is owed in the argument's LinkDirection, then
750
// we will call this hook be run immediately.
751
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
5✔
752
        var queue chan func()
5✔
753

5✔
754
        if direction == Outgoing {
10✔
755
                queue = l.outgoingCommitHooks.newTransients
5✔
756
        } else {
5✔
757
                queue = l.incomingCommitHooks.newTransients
×
758
        }
×
759

760
        select {
5✔
761
        case queue <- hook:
5✔
762
        case <-l.Quit:
×
763
        }
764
}
765

766
// InitStfu allows us to initiate quiescence on this link. It returns a receive
767
// only channel that will block until quiescence has been achieved, or
768
// definitively fails.
769
//
770
// This operation has been added to allow channels to be quiesced via RPC. It
771
// may be removed or reworked in the future as RPC initiated quiescence is a
772
// holdover until we have downstream protocols that use it.
773
func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] {
5✔
774
        req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]](
5✔
775
                fn.Unit{},
5✔
776
        )
5✔
777

5✔
778
        select {
5✔
779
        case l.quiescenceReqs <- req:
5✔
780
        case <-l.Quit:
×
781
                req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown))
×
782
        }
783

784
        return out
5✔
785
}
786

787
// isReestablished returns true if the link has successfully completed the
788
// channel reestablishment dance.
789
func (l *channelLink) isReestablished() bool {
620✔
790
        return atomic.LoadInt32(&l.reestablished) == 1
620✔
791
}
620✔
792

793
// markReestablished signals that the remote peer has successfully exchanged
794
// channel reestablish messages and that the channel is ready to process
795
// subsequent messages.
796
func (l *channelLink) markReestablished() {
217✔
797
        atomic.StoreInt32(&l.reestablished, 1)
217✔
798
}
217✔
799

800
// IsUnadvertised returns true if the underlying channel is unadvertised.
801
func (l *channelLink) IsUnadvertised() bool {
6✔
802
        state := l.channel.State()
6✔
803
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
6✔
804
}
6✔
805

806
// sampleNetworkFee samples the current fee rate on the network to get into the
807
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
808
// this is the native rate used when computing the fee for commitment
809
// transactions, and the second-level HTLC transactions.
810
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
811
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
812
        // blocks.
4✔
813
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
814
        if err != nil {
4✔
815
                return 0, err
×
816
        }
×
817

818
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
819
                int64(feePerKw))
4✔
820

4✔
821
        return feePerKw, nil
4✔
822
}
823

824
// shouldAdjustCommitFee returns true if we should update our commitment fee to
825
// match that of the network fee. We'll only update our commitment fee if the
826
// network fee is +/- 10% to our commitment fee or if our current commitment
827
// fee is below the minimum relay fee.
828
func shouldAdjustCommitFee(netFee, chanFee,
829
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
830

14✔
831
        switch {
14✔
832
        // If the network fee is greater than our current commitment fee and
833
        // our current commitment fee is below the minimum relay fee then
834
        // we should switch to it no matter if it is less than a 10% increase.
835
        case netFee > chanFee && chanFee < minRelayFee:
1✔
836
                return true
1✔
837

838
        // If the network fee is greater than the commitment fee, then we'll
839
        // switch to it if it's at least 10% greater than the commit fee.
840
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
841
                return true
4✔
842

843
        // If the network fee is less than our commitment fee, then we'll
844
        // switch to it if it's at least 10% less than the commitment fee.
845
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
846
                return true
2✔
847

848
        // Otherwise, we won't modify our fee.
849
        default:
7✔
850
                return false
7✔
851
        }
852
}
853

854
// failCb is used to cut down on the argument verbosity.
855
type failCb func(update *lnwire.ChannelUpdate1) lnwire.FailureMessage
856

857
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
858
// outgoing HTLC. It may return a FailureMessage that references a channel's
859
// alias. If the channel does not have an alias, then the regular channel
860
// update from disk will be returned.
861
func (l *channelLink) createFailureWithUpdate(incoming bool,
862
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
26✔
863

26✔
864
        // Determine which SCID to use in case we need to use aliases in the
26✔
865
        // ChannelUpdate.
26✔
866
        scid := outgoingScid
26✔
867
        if incoming {
26✔
868
                scid = l.ShortChanID()
×
869
        }
×
870

871
        // Try using the FailAliasUpdate function. If it returns nil, fallback
872
        // to the non-alias behavior.
873
        update := l.cfg.FailAliasUpdate(scid, incoming)
26✔
874
        if update == nil {
46✔
875
                // Fallback to the non-alias behavior.
20✔
876
                var err error
20✔
877
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
20✔
878
                if err != nil {
20✔
879
                        return &lnwire.FailTemporaryNodeFailure{}
×
880
                }
×
881
        }
882

883
        return cb(update)
26✔
884
}
885

886
// syncChanState attempts to synchronize channel states with the remote party.
887
// This method is to be called upon reconnection after the initial funding
888
// flow. We'll compare out commitment chains with the remote party, and re-send
889
// either a danging commit signature, a revocation, or both.
890
func (l *channelLink) syncChanStates() error {
174✔
891
        chanState := l.channel.State()
174✔
892

174✔
893
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
174✔
894

174✔
895
        // First, we'll generate our ChanSync message to send to the other
174✔
896
        // side. Based on this message, the remote party will decide if they
174✔
897
        // need to retransmit any data or not.
174✔
898
        localChanSyncMsg, err := chanState.ChanSyncMsg()
174✔
899
        if err != nil {
174✔
900
                return fmt.Errorf("unable to generate chan sync message for "+
×
901
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
902
        }
×
903
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
174✔
904
                return fmt.Errorf("unable to send chan sync message for "+
×
905
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
906
        }
×
907

908
        var msgsToReSend []lnwire.Message
174✔
909

174✔
910
        // Next, we'll wait indefinitely to receive the ChanSync message. The
174✔
911
        // first message sent MUST be the ChanSync message.
174✔
912
        select {
174✔
913
        case msg := <-l.upstream:
174✔
914
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
174✔
915
                        l.cfg.Peer.PubKey())
174✔
916

174✔
917
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
174✔
918
                if !ok {
174✔
919
                        return fmt.Errorf("first message sent to sync "+
×
920
                                "should be ChannelReestablish, instead "+
×
921
                                "received: %T", msg)
×
922
                }
×
923

924
                // If the remote party indicates that they think we haven't
925
                // done any state updates yet, then we'll retransmit the
926
                // channel_ready message first. We do this, as at this point
927
                // we can't be sure if they've really received the
928
                // ChannelReady message.
929
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
174✔
930
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
174✔
931
                        !l.channel.IsPending() {
342✔
932

168✔
933
                        l.log.Infof("resending ChannelReady message to peer")
168✔
934

168✔
935
                        nextRevocation, err := l.channel.NextRevocationKey()
168✔
936
                        if err != nil {
168✔
937
                                return fmt.Errorf("unable to create next "+
×
938
                                        "revocation: %v", err)
×
939
                        }
×
940

941
                        channelReadyMsg := lnwire.NewChannelReady(
168✔
942
                                l.ChanID(), nextRevocation,
168✔
943
                        )
168✔
944

168✔
945
                        // If this is a taproot channel, then we'll send the
168✔
946
                        // very same nonce that we sent above, as they should
168✔
947
                        // take the latest verification nonce we send.
168✔
948
                        if chanState.ChanType.IsTaproot() {
172✔
949
                                //nolint:ll
4✔
950
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
4✔
951
                        }
4✔
952

953
                        // For channels that negotiated the option-scid-alias
954
                        // feature bit, ensure that we send over the alias in
955
                        // the channel_ready message. We'll send the first
956
                        // alias we find for the channel since it does not
957
                        // matter which alias we send. We'll error out if no
958
                        // aliases are found.
959
                        if l.negotiatedAliasFeature() {
172✔
960
                                aliases := l.getAliases()
4✔
961
                                if len(aliases) == 0 {
4✔
962
                                        // This shouldn't happen since we
×
963
                                        // always add at least one alias before
×
964
                                        // the channel reaches the link.
×
965
                                        return fmt.Errorf("no aliases found")
×
966
                                }
×
967

968
                                // getAliases returns a copy of the alias slice
969
                                // so it is ok to use a pointer to the first
970
                                // entry.
971
                                channelReadyMsg.AliasScid = &aliases[0]
4✔
972
                        }
973

974
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
168✔
975
                        if err != nil {
168✔
976
                                return fmt.Errorf("unable to re-send "+
×
977
                                        "ChannelReady: %v", err)
×
978
                        }
×
979
                }
980

981
                // In any case, we'll then process their ChanSync message.
982
                l.log.Info("received re-establishment message from remote side")
174✔
983

174✔
984
                var (
174✔
985
                        openedCircuits []CircuitKey
174✔
986
                        closedCircuits []CircuitKey
174✔
987
                )
174✔
988

174✔
989
                // We've just received a ChanSync message from the remote
174✔
990
                // party, so we'll process the message  in order to determine
174✔
991
                // if we need to re-transmit any messages to the remote party.
174✔
992
                ctx, cancel := l.WithCtxQuitNoTimeout()
174✔
993
                defer cancel()
174✔
994
                msgsToReSend, openedCircuits, closedCircuits, err =
174✔
995
                        l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
174✔
996
                if err != nil {
178✔
997
                        return err
4✔
998
                }
4✔
999

1000
                // Repopulate any identifiers for circuits that may have been
1001
                // opened or unclosed. This may happen if we needed to
1002
                // retransmit a commitment signature message.
1003
                l.openedCircuits = openedCircuits
174✔
1004
                l.closedCircuits = closedCircuits
174✔
1005

174✔
1006
                // Ensure that all packets have been have been removed from the
174✔
1007
                // link's mailbox.
174✔
1008
                if err := l.ackDownStreamPackets(); err != nil {
174✔
1009
                        return err
×
1010
                }
×
1011

1012
                if len(msgsToReSend) > 0 {
179✔
1013
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
1014
                                "state", len(msgsToReSend))
5✔
1015
                }
5✔
1016

1017
                // If we have any messages to retransmit, we'll do so
1018
                // immediately so we return to a synchronized state as soon as
1019
                // possible.
1020
                for _, msg := range msgsToReSend {
185✔
1021
                        l.cfg.Peer.SendMessage(false, msg)
11✔
1022
                }
11✔
1023

1024
        case <-l.Quit:
4✔
1025
                return ErrLinkShuttingDown
4✔
1026
        }
1027

1028
        return nil
174✔
1029
}
1030

1031
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
1032
// reprocesses them in order. The primary goal is to make sure that any HTLCs
1033
// we previously received are reinstated in memory, and forwarded to the switch
1034
// if necessary. After a restart, this will also delete any previously
1035
// completed packages.
1036
func (l *channelLink) resolveFwdPkgs() error {
217✔
1037
        fwdPkgs, err := l.channel.LoadFwdPkgs()
217✔
1038
        if err != nil {
218✔
1039
                return err
1✔
1040
        }
1✔
1041

1042
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
216✔
1043

216✔
1044
        for _, fwdPkg := range fwdPkgs {
226✔
1045
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
10✔
UNCOV
1046
                        return err
×
UNCOV
1047
                }
×
1048
        }
1049

1050
        // If any of our reprocessing steps require an update to the commitment
1051
        // txn, we initiate a state transition to capture all relevant changes.
1052
        if l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote) > 0 {
220✔
1053
                return l.updateCommitTx()
4✔
1054
        }
4✔
1055

1056
        return nil
216✔
1057
}
1058

1059
// resolveFwdPkg interprets the FwdState of the provided package, either
1060
// reprocesses any outstanding htlcs in the package, or performs garbage
1061
// collection on the package.
1062
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
10✔
1063
        // Remove any completed packages to clear up space.
10✔
1064
        if fwdPkg.State == channeldb.FwdStateCompleted {
15✔
1065
                l.log.Debugf("removing completed fwd pkg for height=%d",
5✔
1066
                        fwdPkg.Height)
5✔
1067

5✔
1068
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
5✔
1069
                if err != nil {
5✔
UNCOV
1070
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
UNCOV
1071
                                "%v", fwdPkg.Height, err)
×
UNCOV
1072
                        return err
×
UNCOV
1073
                }
×
1074
        }
1075

1076
        // Otherwise this is either a new package or one has gone through
1077
        // processing, but contains htlcs that need to be restored in memory.
1078
        // We replay this forwarding package to make sure our local mem state
1079
        // is resurrected, we mimic any original responses back to the remote
1080
        // party, and re-forward the relevant HTLCs to the switch.
1081

1082
        // If the package is fully acked but not completed, it must still have
1083
        // settles and fails to propagate.
1084
        if !fwdPkg.SettleFailFilter.IsFull() {
14✔
1085
                l.processRemoteSettleFails(fwdPkg)
4✔
1086
        }
4✔
1087

1088
        // Finally, replay *ALL ADDS* in this forwarding package. The
1089
        // downstream logic is able to filter out any duplicates, but we must
1090
        // shove the entire, original set of adds down the pipeline so that the
1091
        // batch of adds presented to the sphinx router does not ever change.
1092
        if !fwdPkg.AckFilter.IsFull() {
17✔
1093
                l.processRemoteAdds(fwdPkg)
7✔
1094

7✔
1095
                // If the link failed during processing the adds, we must
7✔
1096
                // return to ensure we won't attempted to update the state
7✔
1097
                // further.
7✔
1098
                if l.failed {
7✔
1099
                        return fmt.Errorf("link failed while " +
×
1100
                                "processing remote adds")
×
1101
                }
×
1102
        }
1103

1104
        return nil
10✔
1105
}
1106

1107
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1108
// removes those that can be discarded. It is safe to do this entirely in the
1109
// background, since all state is coordinated on disk. This also ensures the
1110
// link can continue to process messages and interleave database accesses.
1111
//
1112
// NOTE: This MUST be run as a goroutine.
1113
func (l *channelLink) fwdPkgGarbager() {
216✔
1114
        defer l.Wg.Done()
216✔
1115

216✔
1116
        l.cfg.FwdPkgGCTicker.Resume()
216✔
1117
        defer l.cfg.FwdPkgGCTicker.Stop()
216✔
1118

216✔
1119
        if err := l.loadAndRemove(); err != nil {
216✔
1120
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1121
        }
×
1122

1123
        for {
448✔
1124
                select {
232✔
1125
                case <-l.cfg.FwdPkgGCTicker.Ticks():
16✔
1126
                        if err := l.loadAndRemove(); err != nil {
32✔
1127
                                l.log.Warnf("unable to remove fwd pkgs: %v",
16✔
1128
                                        err)
16✔
1129
                                continue
16✔
1130
                        }
1131
                case <-l.Quit:
206✔
1132
                        return
206✔
1133
                }
1134
        }
1135
}
1136

1137
// loadAndRemove loads all the channels forwarding packages and determines if
1138
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1139
// a longer tick interval can be used.
1140
func (l *channelLink) loadAndRemove() error {
232✔
1141
        fwdPkgs, err := l.channel.LoadFwdPkgs()
232✔
1142
        if err != nil {
248✔
1143
                return err
16✔
1144
        }
16✔
1145

1146
        var removeHeights []uint64
216✔
1147
        for _, fwdPkg := range fwdPkgs {
225✔
1148
                if fwdPkg.State != channeldb.FwdStateCompleted {
18✔
1149
                        continue
9✔
1150
                }
1151

1152
                removeHeights = append(removeHeights, fwdPkg.Height)
4✔
1153
        }
1154

1155
        // If removeHeights is empty, return early so we don't use a db
1156
        // transaction.
1157
        if len(removeHeights) == 0 {
432✔
1158
                return nil
216✔
1159
        }
216✔
1160

1161
        return l.channel.RemoveFwdPkgs(removeHeights...)
4✔
1162
}
1163

1164
// handleChanSyncErr performs the error handling logic in the case where we
1165
// could not successfully syncChanStates with our channel peer.
1166
func (l *channelLink) handleChanSyncErr(err error) {
4✔
1167
        l.log.Warnf("error when syncing channel states: %v", err)
4✔
1168

4✔
1169
        var errDataLoss *lnwallet.ErrCommitSyncLocalDataLoss
4✔
1170

4✔
1171
        switch {
4✔
1172
        case errors.Is(err, ErrLinkShuttingDown):
4✔
1173
                l.log.Debugf("unable to sync channel states, link is " +
4✔
1174
                        "shutting down")
4✔
1175
                return
4✔
1176

1177
        // We failed syncing the commit chains, probably because the remote has
1178
        // lost state. We should force close the channel.
1179
        case errors.Is(err, lnwallet.ErrCommitSyncRemoteDataLoss):
4✔
1180
                fallthrough
4✔
1181

1182
        // The remote sent us an invalid last commit secret, we should force
1183
        // close the channel.
1184
        // TODO(halseth): and permanently ban the peer?
1185
        case errors.Is(err, lnwallet.ErrInvalidLastCommitSecret):
4✔
1186
                fallthrough
4✔
1187

1188
        // The remote sent us a commit point different from what they sent us
1189
        // before.
1190
        // TODO(halseth): ban peer?
1191
        case errors.Is(err, lnwallet.ErrInvalidLocalUnrevokedCommitPoint):
4✔
1192
                // We'll fail the link and tell the peer to force close the
4✔
1193
                // channel. Note that the database state is not updated here,
4✔
1194
                // but will be updated when the close transaction is ready to
4✔
1195
                // avoid that we go down before storing the transaction in the
4✔
1196
                // db.
4✔
1197
                l.failf(
4✔
1198
                        LinkFailureError{
4✔
1199
                                code:          ErrSyncError,
4✔
1200
                                FailureAction: LinkFailureForceClose,
4✔
1201
                        },
4✔
1202
                        "unable to synchronize channel states: %v", err,
4✔
1203
                )
4✔
1204

1205
        // We have lost state and cannot safely force close the channel. Fail
1206
        // the channel and wait for the remote to hopefully force close it. The
1207
        // remote has sent us its latest unrevoked commitment point, and we'll
1208
        // store it in the database, such that we can attempt to recover the
1209
        // funds if the remote force closes the channel.
1210
        case errors.As(err, &errDataLoss):
4✔
1211
                err := l.channel.MarkDataLoss(
4✔
1212
                        errDataLoss.CommitPoint,
4✔
1213
                )
4✔
1214
                if err != nil {
4✔
1215
                        l.log.Errorf("unable to mark channel data loss: %v",
×
1216
                                err)
×
1217
                }
×
1218

1219
        // We determined the commit chains were not possible to sync. We
1220
        // cautiously fail the channel, but don't force close.
1221
        // TODO(halseth): can we safely force close in any cases where this
1222
        // error is returned?
1223
        case errors.Is(err, lnwallet.ErrCannotSyncCommitChains):
×
1224
                if err := l.channel.MarkBorked(); err != nil {
×
1225
                        l.log.Errorf("unable to mark channel borked: %v", err)
×
1226
                }
×
1227

1228
        // Other, unspecified error.
1229
        default:
×
1230
        }
1231

1232
        l.failf(
4✔
1233
                LinkFailureError{
4✔
1234
                        code:          ErrRecoveryError,
4✔
1235
                        FailureAction: LinkFailureForceNone,
4✔
1236
                },
4✔
1237
                "unable to synchronize channel states: %v", err,
4✔
1238
        )
4✔
1239
}
1240

1241
// htlcManager is the primary goroutine which drives a channel's commitment
1242
// update state-machine in response to messages received via several channels.
1243
// This goroutine reads messages from the upstream (remote) peer, and also from
1244
// downstream channel managed by the channel link. In the event that an htlc
1245
// needs to be forwarded, then send-only forward handler is used which sends
1246
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1247
// all timeouts for any active HTLCs, manages the channel's revocation window,
1248
// and also the htlc trickle queue+timer for this active channels.
1249
//
1250
// NOTE: This MUST be run as a goroutine.
1251
func (l *channelLink) htlcManager() {
217✔
1252
        defer func() {
425✔
1253
                l.cfg.BatchTicker.Stop()
208✔
1254
                l.Wg.Done()
208✔
1255
                l.log.Infof("exited")
208✔
1256
        }()
208✔
1257

1258
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
217✔
1259

217✔
1260
        // Notify any clients that the link is now in the switch via an
217✔
1261
        // ActiveLinkEvent. We'll also defer an inactive link notification for
217✔
1262
        // when the link exits to ensure that every active notification is
217✔
1263
        // matched by an inactive one.
217✔
1264
        l.cfg.NotifyActiveLink(l.ChannelPoint())
217✔
1265
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
217✔
1266

217✔
1267
        // TODO(roasbeef): need to call wipe chan whenever D/C?
217✔
1268

217✔
1269
        // If this isn't the first time that this channel link has been
217✔
1270
        // created, then we'll need to check to see if we need to
217✔
1271
        // re-synchronize state with the remote peer. settledHtlcs is a map of
217✔
1272
        // HTLC's that we re-settled as part of the channel state sync.
217✔
1273
        if l.cfg.SyncStates {
391✔
1274
                err := l.syncChanStates()
174✔
1275
                if err != nil {
178✔
1276
                        l.handleChanSyncErr(err)
4✔
1277
                        return
4✔
1278
                }
4✔
1279
        }
1280

1281
        // If a shutdown message has previously been sent on this link, then we
1282
        // need to make sure that we have disabled any HTLC adds on the outgoing
1283
        // direction of the link and that we re-resend the same shutdown message
1284
        // that we previously sent.
1285
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
221✔
1286
                // Immediately disallow any new outgoing HTLCs.
4✔
1287
                if !l.DisableAdds(Outgoing) {
4✔
1288
                        l.log.Warnf("Outgoing link adds already disabled")
×
1289
                }
×
1290

1291
                // Re-send the shutdown message the peer. Since syncChanStates
1292
                // would have sent any outstanding CommitSig, it is fine for us
1293
                // to immediately queue the shutdown message now.
1294
                err := l.cfg.Peer.SendMessage(false, &shutdown)
4✔
1295
                if err != nil {
4✔
1296
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1297
                }
×
1298
        })
1299

1300
        // We've successfully reestablished the channel, mark it as such to
1301
        // allow the switch to forward HTLCs in the outbound direction.
1302
        l.markReestablished()
217✔
1303

217✔
1304
        // Now that we've received both channel_ready and channel reestablish,
217✔
1305
        // we can go ahead and send the active channel notification. We'll also
217✔
1306
        // defer the inactive notification for when the link exits to ensure
217✔
1307
        // that every active notification is matched by an inactive one.
217✔
1308
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
217✔
1309
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
217✔
1310

217✔
1311
        // With the channel states synced, we now reset the mailbox to ensure
217✔
1312
        // we start processing all unacked packets in order. This is done here
217✔
1313
        // to ensure that all acknowledgments that occur during channel
217✔
1314
        // resynchronization have taken affect, causing us only to pull unacked
217✔
1315
        // packets after starting to read from the downstream mailbox.
217✔
1316
        l.mailBox.ResetPackets()
217✔
1317

217✔
1318
        // After cleaning up any memory pertaining to incoming packets, we now
217✔
1319
        // replay our forwarding packages to handle any htlcs that can be
217✔
1320
        // processed locally, or need to be forwarded out to the switch. We will
217✔
1321
        // only attempt to resolve packages if our short chan id indicates that
217✔
1322
        // the channel is not pending, otherwise we should have no htlcs to
217✔
1323
        // reforward.
217✔
1324
        if l.ShortChanID() != hop.Source {
434✔
1325
                err := l.resolveFwdPkgs()
217✔
1326
                switch err {
217✔
1327
                // No error was encountered, success.
1328
                case nil:
216✔
1329

1330
                // If the duplicate keystone error was encountered, we'll fail
1331
                // without sending an Error message to the peer.
1332
                case ErrDuplicateKeystone:
×
1333
                        l.failf(LinkFailureError{code: ErrCircuitError},
×
1334
                                "temporary circuit error: %v", err)
×
1335
                        return
×
1336

1337
                // A non-nil error was encountered, send an Error message to
1338
                // the peer.
1339
                default:
2✔
1340
                        l.failf(LinkFailureError{code: ErrInternalError},
2✔
1341
                                "unable to resolve fwd pkgs: %v", err)
2✔
1342
                        return
2✔
1343
                }
1344

1345
                // With our link's in-memory state fully reconstructed, spawn a
1346
                // goroutine to manage the reclamation of disk space occupied by
1347
                // completed forwarding packages.
1348
                l.Wg.Add(1)
216✔
1349
                go l.fwdPkgGarbager()
216✔
1350
        }
1351

1352
        for {
4,372✔
1353
                // We must always check if we failed at some point processing
4,156✔
1354
                // the last update before processing the next.
4,156✔
1355
                if l.failed {
4,173✔
1356
                        l.log.Errorf("link failed, exiting htlcManager")
17✔
1357
                        return
17✔
1358
                }
17✔
1359

1360
                // If the previous event resulted in a non-empty batch, resume
1361
                // the batch ticker so that it can be cleared. Otherwise pause
1362
                // the ticker to prevent waking up the htlcManager while the
1363
                // batch is empty.
1364
                numUpdates := l.channel.NumPendingUpdates(
4,143✔
1365
                        lntypes.Local, lntypes.Remote,
4,143✔
1366
                )
4,143✔
1367
                if numUpdates > 0 {
4,649✔
1368
                        l.cfg.BatchTicker.Resume()
506✔
1369
                        l.log.Tracef("BatchTicker resumed, "+
506✔
1370
                                "NumPendingUpdates(Local, Remote)=%d",
506✔
1371
                                numUpdates,
506✔
1372
                        )
506✔
1373
                } else {
4,147✔
1374
                        l.cfg.BatchTicker.Pause()
3,641✔
1375
                        l.log.Trace("BatchTicker paused due to zero " +
3,641✔
1376
                                "NumPendingUpdates(Local, Remote)")
3,641✔
1377
                }
3,641✔
1378

1379
                select {
4,143✔
1380
                // We have a new hook that needs to be run when we reach a clean
1381
                // channel state.
1382
                case hook := <-l.flushHooks.newTransients:
5✔
1383
                        if l.channel.IsChannelClean() {
9✔
1384
                                hook()
4✔
1385
                        } else {
9✔
1386
                                l.flushHooks.alloc(hook)
5✔
1387
                        }
5✔
1388

1389
                // We have a new hook that needs to be run when we have
1390
                // committed all of our updates.
1391
                case hook := <-l.outgoingCommitHooks.newTransients:
5✔
1392
                        if !l.channel.OweCommitment() {
9✔
1393
                                hook()
4✔
1394
                        } else {
5✔
1395
                                l.outgoingCommitHooks.alloc(hook)
1✔
1396
                        }
1✔
1397

1398
                // We have a new hook that needs to be run when our peer has
1399
                // committed all of their updates.
1400
                case hook := <-l.incomingCommitHooks.newTransients:
×
1401
                        if !l.channel.NeedCommitment() {
×
1402
                                hook()
×
1403
                        } else {
×
1404
                                l.incomingCommitHooks.alloc(hook)
×
1405
                        }
×
1406

1407
                // Our update fee timer has fired, so we'll check the network
1408
                // fee to see if we should adjust our commitment fee.
1409
                case <-l.updateFeeTimer.C:
4✔
1410
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1411

4✔
1412
                        // If we're not the initiator of the channel, don't we
4✔
1413
                        // don't control the fees, so we can ignore this.
4✔
1414
                        if !l.channel.IsInitiator() {
4✔
1415
                                continue
×
1416
                        }
1417

1418
                        // If we are the initiator, then we'll sample the
1419
                        // current fee rate to get into the chain within 3
1420
                        // blocks.
1421
                        netFee, err := l.sampleNetworkFee()
4✔
1422
                        if err != nil {
4✔
1423
                                l.log.Errorf("unable to sample network fee: %v",
×
1424
                                        err)
×
1425
                                continue
×
1426
                        }
1427

1428
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1429

4✔
1430
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1431
                                netFee, minRelayFee,
4✔
1432
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1433
                                l.cfg.MaxFeeAllocation,
4✔
1434
                        )
4✔
1435

4✔
1436
                        // We determine if we should adjust the commitment fee
4✔
1437
                        // based on the current commitment fee, the suggested
4✔
1438
                        // new commitment fee and the current minimum relay fee
4✔
1439
                        // rate.
4✔
1440
                        commitFee := l.channel.CommitFeeRate()
4✔
1441
                        if !shouldAdjustCommitFee(
4✔
1442
                                newCommitFee, commitFee, minRelayFee,
4✔
1443
                        ) {
5✔
1444

1✔
1445
                                continue
1✔
1446
                        }
1447

1448
                        // If we do, then we'll send a new UpdateFee message to
1449
                        // the remote party, to be locked in with a new update.
1450
                        if err := l.updateChannelFee(newCommitFee); err != nil {
3✔
1451
                                l.log.Errorf("unable to update fee rate: %v",
×
1452
                                        err)
×
1453
                                continue
×
1454
                        }
1455

1456
                // The underlying channel has notified us of a unilateral close
1457
                // carried out by the remote peer. In the case of such an
1458
                // event, we'll wipe the channel state from the peer, and mark
1459
                // the contract as fully settled. Afterwards we can exit.
1460
                //
1461
                // TODO(roasbeef): add force closure? also breach?
1462
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
4✔
1463
                        l.log.Warnf("remote peer has closed on-chain")
4✔
1464

4✔
1465
                        // TODO(roasbeef): remove all together
4✔
1466
                        go func() {
8✔
1467
                                chanPoint := l.channel.ChannelPoint()
4✔
1468
                                l.cfg.Peer.WipeChannel(&chanPoint)
4✔
1469
                        }()
4✔
1470

1471
                        return
4✔
1472

1473
                case <-l.cfg.BatchTicker.Ticks():
200✔
1474
                        // Attempt to extend the remote commitment chain
200✔
1475
                        // including all the currently pending entries. If the
200✔
1476
                        // send was unsuccessful, then abandon the update,
200✔
1477
                        // waiting for the revocation window to open up.
200✔
1478
                        if !l.updateCommitTxOrFail() {
200✔
1479
                                return
×
1480
                        }
×
1481

1482
                case <-l.cfg.PendingCommitTicker.Ticks():
1✔
1483
                        l.failf(
1✔
1484
                                LinkFailureError{
1✔
1485
                                        code:          ErrRemoteUnresponsive,
1✔
1486
                                        FailureAction: LinkFailureDisconnect,
1✔
1487
                                },
1✔
1488
                                "unable to complete dance",
1✔
1489
                        )
1✔
1490
                        return
1✔
1491

1492
                // A message from the switch was just received. This indicates
1493
                // that the link is an intermediate hop in a multi-hop HTLC
1494
                // circuit.
1495
                case pkt := <-l.downstream:
525✔
1496
                        l.handleDownstreamPkt(pkt)
525✔
1497

1498
                // A message from the connected peer was just received. This
1499
                // indicates that we have a new incoming HTLC, either directly
1500
                // for us, or part of a multi-hop HTLC circuit.
1501
                case msg := <-l.upstream:
3,165✔
1502
                        l.handleUpstreamMsg(msg)
3,165✔
1503

1504
                // A htlc resolution is received. This means that we now have a
1505
                // resolution for a previously accepted htlc.
1506
                case hodlItem := <-l.hodlQueue.ChanOut():
59✔
1507
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
59✔
1508
                        err := l.processHodlQueue(htlcResolution)
59✔
1509
                        switch err {
59✔
1510
                        // No error, success.
1511
                        case nil:
58✔
1512

1513
                        // If the duplicate keystone error was encountered,
1514
                        // fail back gracefully.
1515
                        case ErrDuplicateKeystone:
×
1516
                                l.failf(LinkFailureError{
×
1517
                                        code: ErrCircuitError,
×
1518
                                }, "process hodl queue: "+
×
1519
                                        "temporary circuit error: %v",
×
1520
                                        err,
×
1521
                                )
×
1522

1523
                        // Send an Error message to the peer.
1524
                        default:
1✔
1525
                                l.failf(LinkFailureError{
1✔
1526
                                        code: ErrInternalError,
1✔
1527
                                }, "process hodl queue: unable to update "+
1✔
1528
                                        "commitment: %v", err,
1✔
1529
                                )
1✔
1530
                        }
1531

1532
                case qReq := <-l.quiescenceReqs:
5✔
1533
                        l.quiescer.InitStfu(qReq)
5✔
1534

5✔
1535
                        if l.noDanglingUpdates(lntypes.Local) {
10✔
1536
                                err := l.quiescer.SendOwedStfu()
5✔
1537
                                if err != nil {
5✔
1538
                                        l.stfuFailf(
×
1539
                                                "SendOwedStfu: %s", err.Error(),
×
1540
                                        )
×
1541
                                        res := fn.Err[lntypes.ChannelParty](err)
×
1542
                                        qReq.Resolve(res)
×
1543
                                }
×
1544
                        }
1545

1546
                case <-l.Quit:
193✔
1547
                        return
193✔
1548
                }
1549
        }
1550
}
1551

1552
// processHodlQueue processes a received htlc resolution and continues reading
1553
// from the hodl queue until no more resolutions remain. When this function
1554
// returns without an error, the commit tx should be updated.
1555
func (l *channelLink) processHodlQueue(
1556
        firstResolution invoices.HtlcResolution) error {
59✔
1557

59✔
1558
        // Try to read all waiting resolution messages, so that they can all be
59✔
1559
        // processed in a single commitment tx update.
59✔
1560
        htlcResolution := firstResolution
59✔
1561
loop:
59✔
1562
        for {
118✔
1563
                // Lookup all hodl htlcs that can be failed or settled with this event.
59✔
1564
                // The hodl htlc must be present in the map.
59✔
1565
                circuitKey := htlcResolution.CircuitKey()
59✔
1566
                hodlHtlc, ok := l.hodlMap[circuitKey]
59✔
1567
                if !ok {
59✔
1568
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1569
                }
×
1570

1571
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
59✔
1572
                        return err
×
1573
                }
×
1574

1575
                // Clean up hodl map.
1576
                delete(l.hodlMap, circuitKey)
59✔
1577

59✔
1578
                select {
59✔
1579
                case item := <-l.hodlQueue.ChanOut():
4✔
1580
                        htlcResolution = item.(invoices.HtlcResolution)
4✔
1581
                default:
59✔
1582
                        break loop
59✔
1583
                }
1584
        }
1585

1586
        // Update the commitment tx.
1587
        if err := l.updateCommitTx(); err != nil {
60✔
1588
                return err
1✔
1589
        }
1✔
1590

1591
        return nil
58✔
1592
}
1593

1594
// processHtlcResolution applies a received htlc resolution to the provided
1595
// htlc. When this function returns without an error, the commit tx should be
1596
// updated.
1597
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1598
        htlc hodlHtlc) error {
205✔
1599

205✔
1600
        circuitKey := resolution.CircuitKey()
205✔
1601

205✔
1602
        // Determine required action for the resolution based on the type of
205✔
1603
        // resolution we have received.
205✔
1604
        switch res := resolution.(type) {
205✔
1605
        // Settle htlcs that returned a settle resolution using the preimage
1606
        // in the resolution.
1607
        case *invoices.HtlcSettleResolution:
201✔
1608
                l.log.Debugf("received settle resolution for %v "+
201✔
1609
                        "with outcome: %v", circuitKey, res.Outcome)
201✔
1610

201✔
1611
                return l.settleHTLC(
201✔
1612
                        res.Preimage, htlc.add.ID, htlc.sourceRef,
201✔
1613
                )
201✔
1614

1615
        // For htlc failures, we get the relevant failure message based
1616
        // on the failure resolution and then fail the htlc.
1617
        case *invoices.HtlcFailResolution:
8✔
1618
                l.log.Debugf("received cancel resolution for "+
8✔
1619
                        "%v with outcome: %v", circuitKey, res.Outcome)
8✔
1620

8✔
1621
                // Get the lnwire failure message based on the resolution
8✔
1622
                // result.
8✔
1623
                failure := getResolutionFailure(res, htlc.add.Amount)
8✔
1624

8✔
1625
                l.sendHTLCError(
8✔
1626
                        htlc.add, htlc.sourceRef, failure, htlc.obfuscator,
8✔
1627
                        true,
8✔
1628
                )
8✔
1629
                return nil
8✔
1630

1631
        // Fail if we do not get a settle of fail resolution, since we
1632
        // are only expecting to handle settles and fails.
1633
        default:
×
1634
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1635
                        resolution)
×
1636
        }
1637
}
1638

1639
// getResolutionFailure returns the wire message that a htlc resolution should
1640
// be failed with.
1641
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1642
        amount lnwire.MilliSatoshi) *LinkError {
8✔
1643

8✔
1644
        // If the resolution has been resolved as part of a MPP timeout,
8✔
1645
        // we need to fail the htlc with lnwire.FailMppTimeout.
8✔
1646
        if resolution.Outcome == invoices.ResultMppTimeout {
8✔
1647
                return NewDetailedLinkError(
×
1648
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1649
                )
×
1650
        }
×
1651

1652
        // If the htlc is not a MPP timeout, we fail it with
1653
        // FailIncorrectDetails. This error is sent for invoice payment
1654
        // failures such as underpayment/ expiry too soon and hodl invoices
1655
        // (which return FailIncorrectDetails to avoid leaking information).
1656
        incorrectDetails := lnwire.NewFailIncorrectDetails(
8✔
1657
                amount, uint32(resolution.AcceptHeight),
8✔
1658
        )
8✔
1659

8✔
1660
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
8✔
1661
}
1662

1663
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1664
// within the link's configuration that will be used to determine when the link
1665
// should propose an update to its commitment fee rate.
1666
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
221✔
1667
        lower := int64(l.cfg.MinUpdateTimeout)
221✔
1668
        upper := int64(l.cfg.MaxUpdateTimeout)
221✔
1669
        return time.Duration(prand.Int63n(upper-lower) + lower)
221✔
1670
}
221✔
1671

1672
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1673
// downstream HTLC Switch.
1674
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
484✔
1675
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
484✔
1676
        if !ok {
484✔
1677
                return errors.New("not an UpdateAddHTLC packet")
×
1678
        }
×
1679

1680
        // If we are flushing the link in the outgoing direction or we have
1681
        // already sent Stfu, then we can't add new htlcs to the link and we
1682
        // need to bounce it.
1683
        if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() {
484✔
1684
                l.mailBox.FailAdd(pkt)
×
1685

×
1686
                return NewDetailedLinkError(
×
1687
                        &lnwire.FailTemporaryChannelFailure{},
×
1688
                        OutgoingFailureLinkNotEligible,
×
1689
                )
×
1690
        }
×
1691

1692
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1693
        // arbitrary delays between the switch adding an ADD to the
1694
        // mailbox, and the HTLC being added to the commitment state.
1695
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
484✔
1696
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1697
                l.mailBox.AckPacket(pkt.inKey())
×
1698
                return nil
×
1699
        }
×
1700

1701
        // Check if we can add the HTLC here without exceededing the max fee
1702
        // exposure threshold.
1703
        if l.isOverexposedWithHtlc(htlc, false) {
488✔
1704
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1705
                        "exposure exceeded")
4✔
1706

4✔
1707
                l.mailBox.FailAdd(pkt)
4✔
1708

4✔
1709
                return NewDetailedLinkError(
4✔
1710
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1711
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1712
                )
4✔
1713
        }
4✔
1714

1715
        // A new payment has been initiated via the downstream channel,
1716
        // so we add the new HTLC to our local log, then update the
1717
        // commitment chains.
1718
        htlc.ChanID = l.ChanID()
480✔
1719
        openCircuitRef := pkt.inKey()
480✔
1720

480✔
1721
        // We enforce the fee buffer for the commitment transaction because
480✔
1722
        // we are in control of adding this htlc. Nothing has locked-in yet so
480✔
1723
        // we can securely enforce the fee buffer which is only relevant if we
480✔
1724
        // are the initiator of the channel.
480✔
1725
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
480✔
1726
        if err != nil {
485✔
1727
                // The HTLC was unable to be added to the state machine,
5✔
1728
                // as a result, we'll signal the switch to cancel the
5✔
1729
                // pending payment.
5✔
1730
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
5✔
1731
                        err)
5✔
1732

5✔
1733
                // Remove this packet from the link's mailbox, this
5✔
1734
                // prevents it from being reprocessed if the link
5✔
1735
                // restarts and resets it mailbox. If this response
5✔
1736
                // doesn't make it back to the originating link, it will
5✔
1737
                // be rejected upon attempting to reforward the Add to
5✔
1738
                // the switch, since the circuit was never fully opened,
5✔
1739
                // and the forwarding package shows it as
5✔
1740
                // unacknowledged.
5✔
1741
                l.mailBox.FailAdd(pkt)
5✔
1742

5✔
1743
                return NewDetailedLinkError(
5✔
1744
                        lnwire.NewTemporaryChannelFailure(nil),
5✔
1745
                        OutgoingFailureDownstreamHtlcAdd,
5✔
1746
                )
5✔
1747
        }
5✔
1748

1749
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
479✔
1750
                "local_log_index=%v, pend_updates=%v",
479✔
1751
                htlc.PaymentHash[:], index,
479✔
1752
                l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote))
479✔
1753

479✔
1754
        pkt.outgoingChanID = l.ShortChanID()
479✔
1755
        pkt.outgoingHTLCID = index
479✔
1756
        htlc.ID = index
479✔
1757

479✔
1758
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
479✔
1759
                pkt.inKey(), pkt.outKey())
479✔
1760

479✔
1761
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
479✔
1762
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
479✔
1763

479✔
1764
        _ = l.cfg.Peer.SendMessage(false, htlc)
479✔
1765

479✔
1766
        // Send a forward event notification to htlcNotifier.
479✔
1767
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
479✔
1768
                newHtlcKey(pkt),
479✔
1769
                HtlcInfo{
479✔
1770
                        IncomingTimeLock: pkt.incomingTimeout,
479✔
1771
                        IncomingAmt:      pkt.incomingAmount,
479✔
1772
                        OutgoingTimeLock: htlc.Expiry,
479✔
1773
                        OutgoingAmt:      htlc.Amount,
479✔
1774
                },
479✔
1775
                getEventType(pkt),
479✔
1776
        )
479✔
1777

479✔
1778
        l.tryBatchUpdateCommitTx()
479✔
1779

479✔
1780
        return nil
479✔
1781
}
1782

1783
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1784
// Switch. Possible messages sent by the switch include requests to forward new
1785
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1786
// cleared HTLCs with the upstream peer.
1787
//
1788
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1789
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
525✔
1790
        if pkt.htlc.MsgType().IsChannelUpdate() &&
525✔
1791
                !l.quiescer.CanSendUpdates() {
525✔
1792

×
1793
                l.log.Warnf("unable to process channel update. "+
×
1794
                        "ChannelID=%v is quiescent.", l.ChanID)
×
1795

×
1796
                return
×
1797
        }
×
1798

1799
        switch htlc := pkt.htlc.(type) {
525✔
1800
        case *lnwire.UpdateAddHTLC:
484✔
1801
                // Handle add message. The returned error can be ignored,
484✔
1802
                // because it is also sent through the mailbox.
484✔
1803
                _ = l.handleDownstreamUpdateAdd(pkt)
484✔
1804

1805
        case *lnwire.UpdateFulfillHTLC:
27✔
1806
                // If hodl.SettleOutgoing mode is active, we exit early to
27✔
1807
                // simulate arbitrary delays between the switch adding the
27✔
1808
                // SETTLE to the mailbox, and the HTLC being added to the
27✔
1809
                // commitment state.
27✔
1810
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
27✔
1811
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1812
                        l.mailBox.AckPacket(pkt.inKey())
×
1813
                        return
×
1814
                }
×
1815

1816
                // An HTLC we forward to the switch has just settled somewhere
1817
                // upstream. Therefore we settle the HTLC within the our local
1818
                // state machine.
1819
                inKey := pkt.inKey()
27✔
1820
                err := l.channel.SettleHTLC(
27✔
1821
                        htlc.PaymentPreimage,
27✔
1822
                        pkt.incomingHTLCID,
27✔
1823
                        pkt.sourceRef,
27✔
1824
                        pkt.destRef,
27✔
1825
                        &inKey,
27✔
1826
                )
27✔
1827
                if err != nil {
27✔
1828
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1829
                                "circuit-key=%v: %v", inKey, err)
×
1830

×
1831
                        // If the HTLC index for Settle response was not known
×
1832
                        // to our commitment state, it has already been
×
1833
                        // cleaned up by a prior response. We'll thus try to
×
1834
                        // clean up any lingering state to ensure we don't
×
1835
                        // continue reforwarding.
×
1836
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1837
                                l.cleanupSpuriousResponse(pkt)
×
1838
                        }
×
1839

1840
                        // Remove the packet from the link's mailbox to ensure
1841
                        // it doesn't get replayed after a reconnection.
1842
                        l.mailBox.AckPacket(inKey)
×
1843

×
1844
                        return
×
1845
                }
1846

1847
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
27✔
1848
                        "%s->%s", pkt.inKey(), pkt.outKey())
27✔
1849

27✔
1850
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
27✔
1851

27✔
1852
                // With the HTLC settled, we'll need to populate the wire
27✔
1853
                // message to target the specific channel and HTLC to be
27✔
1854
                // canceled.
27✔
1855
                htlc.ChanID = l.ChanID()
27✔
1856
                htlc.ID = pkt.incomingHTLCID
27✔
1857

27✔
1858
                // Then we send the HTLC settle message to the connected peer
27✔
1859
                // so we can continue the propagation of the settle message.
27✔
1860
                l.cfg.Peer.SendMessage(false, htlc)
27✔
1861

27✔
1862
                // Send a settle event notification to htlcNotifier.
27✔
1863
                l.cfg.HtlcNotifier.NotifySettleEvent(
27✔
1864
                        newHtlcKey(pkt),
27✔
1865
                        htlc.PaymentPreimage,
27✔
1866
                        getEventType(pkt),
27✔
1867
                )
27✔
1868

27✔
1869
                // Immediately update the commitment tx to minimize latency.
27✔
1870
                l.updateCommitTxOrFail()
27✔
1871

1872
        case *lnwire.UpdateFailHTLC:
22✔
1873
                // If hodl.FailOutgoing mode is active, we exit early to
22✔
1874
                // simulate arbitrary delays between the switch adding a FAIL to
22✔
1875
                // the mailbox, and the HTLC being added to the commitment
22✔
1876
                // state.
22✔
1877
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
22✔
1878
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1879
                        l.mailBox.AckPacket(pkt.inKey())
×
1880
                        return
×
1881
                }
×
1882

1883
                // An HTLC cancellation has been triggered somewhere upstream,
1884
                // we'll remove then HTLC from our local state machine.
1885
                inKey := pkt.inKey()
22✔
1886
                err := l.channel.FailHTLC(
22✔
1887
                        pkt.incomingHTLCID,
22✔
1888
                        htlc.Reason,
22✔
1889
                        pkt.sourceRef,
22✔
1890
                        pkt.destRef,
22✔
1891
                        &inKey,
22✔
1892
                )
22✔
1893
                if err != nil {
28✔
1894
                        l.log.Errorf("unable to cancel incoming HTLC for "+
6✔
1895
                                "circuit-key=%v: %v", inKey, err)
6✔
1896

6✔
1897
                        // If the HTLC index for Fail response was not known to
6✔
1898
                        // our commitment state, it has already been cleaned up
6✔
1899
                        // by a prior response. We'll thus try to clean up any
6✔
1900
                        // lingering state to ensure we don't continue
6✔
1901
                        // reforwarding.
6✔
1902
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
8✔
1903
                                l.cleanupSpuriousResponse(pkt)
2✔
1904
                        }
2✔
1905

1906
                        // Remove the packet from the link's mailbox to ensure
1907
                        // it doesn't get replayed after a reconnection.
1908
                        l.mailBox.AckPacket(inKey)
6✔
1909

6✔
1910
                        return
6✔
1911
                }
1912

1913
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
20✔
1914
                        pkt.inKey(), pkt.outKey())
20✔
1915

20✔
1916
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
20✔
1917

20✔
1918
                // With the HTLC removed, we'll need to populate the wire
20✔
1919
                // message to target the specific channel and HTLC to be
20✔
1920
                // canceled. The "Reason" field will have already been set
20✔
1921
                // within the switch.
20✔
1922
                htlc.ChanID = l.ChanID()
20✔
1923
                htlc.ID = pkt.incomingHTLCID
20✔
1924

20✔
1925
                // We send the HTLC message to the peer which initially created
20✔
1926
                // the HTLC. If the incoming blinding point is non-nil, we
20✔
1927
                // know that we are a relaying node in a blinded path.
20✔
1928
                // Otherwise, we're either an introduction node or not part of
20✔
1929
                // a blinded path at all.
20✔
1930
                if err := l.sendIncomingHTLCFailureMsg(
20✔
1931
                        htlc.ID,
20✔
1932
                        pkt.obfuscator,
20✔
1933
                        htlc.Reason,
20✔
1934
                ); err != nil {
20✔
1935
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1936
                                err)
×
1937

×
1938
                        return
×
1939
                }
×
1940

1941
                // If the packet does not have a link failure set, it failed
1942
                // further down the route so we notify a forwarding failure.
1943
                // Otherwise, we notify a link failure because it failed at our
1944
                // node.
1945
                if pkt.linkFailure != nil {
34✔
1946
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
14✔
1947
                                newHtlcKey(pkt),
14✔
1948
                                newHtlcInfo(pkt),
14✔
1949
                                getEventType(pkt),
14✔
1950
                                pkt.linkFailure,
14✔
1951
                                false,
14✔
1952
                        )
14✔
1953
                } else {
24✔
1954
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
10✔
1955
                                newHtlcKey(pkt), getEventType(pkt),
10✔
1956
                        )
10✔
1957
                }
10✔
1958

1959
                // Immediately update the commitment tx to minimize latency.
1960
                l.updateCommitTxOrFail()
20✔
1961
        }
1962
}
1963

1964
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1965
// full.
1966
func (l *channelLink) tryBatchUpdateCommitTx() {
479✔
1967
        pending := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
479✔
1968
        if pending < uint64(l.cfg.BatchSize) {
943✔
1969
                return
464✔
1970
        }
464✔
1971

1972
        l.updateCommitTxOrFail()
19✔
1973
}
1974

1975
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1976
// associated with this packet. If successful in doing so, it will also purge
1977
// the open circuit from the circuit map and remove the packet from the link's
1978
// mailbox.
1979
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1980
        inKey := pkt.inKey()
2✔
1981

2✔
1982
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1983
                "circuit-key=%v", inKey)
2✔
1984

2✔
1985
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1986
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1987
        if pkt.sourceRef == nil {
3✔
1988
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1989
                        "circuit-key=%v, does not contain source reference",
1✔
1990
                        inKey)
1✔
1991
                return
1✔
1992
        }
1✔
1993

1994
        // If the source reference is present,  we will try to prevent this link
1995
        // from resending the packet to the switch. To do so, we ack the AddRef
1996
        // of the incoming HTLC belonging to this link.
1997
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1998
        if err != nil {
1✔
1999
                l.log.Errorf("unable to ack AddRef for incoming "+
×
2000
                        "circuit-key=%v: %v", inKey, err)
×
2001

×
2002
                // If this operation failed, it is unsafe to attempt removal of
×
2003
                // the destination reference or circuit, so we exit early. The
×
2004
                // cleanup may proceed with a different packet in the future
×
2005
                // that succeeds on this step.
×
2006
                return
×
2007
        }
×
2008

2009
        // Now that we know this link will stop retransmitting Adds to the
2010
        // switch, we can begin to teardown the response reference and circuit
2011
        // map.
2012
        //
2013
        // If the packet includes a destination reference, then a response for
2014
        // this HTLC was locked into the outgoing channel. Attempt to remove
2015
        // this reference, so we stop retransmitting the response internally.
2016
        // Even if this fails, we will proceed in trying to delete the circuit.
2017
        // When retransmitting responses, the destination references will be
2018
        // cleaned up if an open circuit is not found in the circuit map.
2019
        if pkt.destRef != nil {
1✔
2020
                err := l.channel.AckSettleFails(*pkt.destRef)
×
2021
                if err != nil {
×
2022
                        l.log.Errorf("unable to ack SettleFailRef "+
×
2023
                                "for incoming circuit-key=%v: %v",
×
2024
                                inKey, err)
×
2025
                }
×
2026
        }
2027

2028
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
2029

1✔
2030
        // With all known references acked, we can now safely delete the circuit
1✔
2031
        // from the switch's circuit map, as the state is no longer needed.
1✔
2032
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
2033
        if err != nil {
1✔
2034
                l.log.Errorf("unable to delete circuit for "+
×
2035
                        "circuit-key=%v: %v", inKey, err)
×
2036
        }
×
2037
}
2038

2039
// handleUpstreamMsg processes wire messages related to commitment state
2040
// updates from the upstream peer. The upstream peer is the peer whom we have a
2041
// direct channel with, updating our respective commitment chains.
2042
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
3,165✔
2043
        // First check if the message is an update and we are capable of
3,165✔
2044
        // receiving updates right now.
3,165✔
2045
        if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() {
3,165✔
2046
                l.stfuFailf("update received after stfu: %T", msg)
×
2047
                return
×
2048
        }
×
2049

2050
        switch msg := msg.(type) {
3,165✔
2051
        case *lnwire.UpdateAddHTLC:
454✔
2052
                if l.IsFlushing(Incoming) {
454✔
2053
                        // This is forbidden by the protocol specification.
×
2054
                        // The best chance we have to deal with this is to drop
×
2055
                        // the connection. This should roll back the channel
×
2056
                        // state to the last CommitSig. If the remote has
×
2057
                        // already sent a CommitSig we haven't received yet,
×
2058
                        // channel state will be re-synchronized with a
×
2059
                        // ChannelReestablish message upon reconnection and the
×
2060
                        // protocol state that caused us to flush the link will
×
2061
                        // be rolled back. In the event that there was some
×
2062
                        // non-deterministic behavior in the remote that caused
×
2063
                        // them to violate the protocol, we have a decent shot
×
2064
                        // at correcting it this way, since reconnecting will
×
2065
                        // put us in the cleanest possible state to try again.
×
2066
                        //
×
2067
                        // In addition to the above, it is possible for us to
×
2068
                        // hit this case in situations where we improperly
×
2069
                        // handle message ordering due to concurrency choices.
×
2070
                        // An issue has been filed to address this here:
×
2071
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
2072
                        l.failf(
×
2073
                                LinkFailureError{
×
2074
                                        code:             ErrInvalidUpdate,
×
2075
                                        FailureAction:    LinkFailureDisconnect,
×
2076
                                        PermanentFailure: false,
×
2077
                                        Warning:          true,
×
2078
                                },
×
2079
                                "received add while link is flushing",
×
2080
                        )
×
2081

×
2082
                        return
×
2083
                }
×
2084

2085
                // Disallow htlcs with blinding points set if we haven't
2086
                // enabled the feature. This saves us from having to process
2087
                // the onion at all, but will only catch blinded payments
2088
                // where we are a relaying node (as the blinding point will
2089
                // be in the payload when we're the introduction node).
2090
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
454✔
2091
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2092
                                "blinding point included when route blinding "+
×
2093
                                        "is disabled")
×
2094

×
2095
                        return
×
2096
                }
×
2097

2098
                // We have to check the limit here rather than later in the
2099
                // switch because the counterparty can keep sending HTLC's
2100
                // without sending a revoke. This would mean that the switch
2101
                // check would only occur later.
2102
                if l.isOverexposedWithHtlc(msg, true) {
454✔
2103
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2104
                                "peer sent us an HTLC that exceeded our max "+
×
2105
                                        "fee exposure")
×
2106

×
2107
                        return
×
2108
                }
×
2109

2110
                // We just received an add request from an upstream peer, so we
2111
                // add it to our state machine, then add the HTLC to our
2112
                // "settle" list in the event that we know the preimage.
2113
                index, err := l.channel.ReceiveHTLC(msg)
454✔
2114
                if err != nil {
454✔
2115
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2116
                                "unable to handle upstream add HTLC: %v", err)
×
2117
                        return
×
2118
                }
×
2119

2120
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
454✔
2121
                        "assigning index: %v", msg.PaymentHash[:], index)
454✔
2122

2123
        case *lnwire.UpdateFulfillHTLC:
231✔
2124
                pre := msg.PaymentPreimage
231✔
2125
                idx := msg.ID
231✔
2126

231✔
2127
                // Before we pipeline the settle, we'll check the set of active
231✔
2128
                // htlc's to see if the related UpdateAddHTLC has been fully
231✔
2129
                // locked-in.
231✔
2130
                var lockedin bool
231✔
2131
                htlcs := l.channel.ActiveHtlcs()
231✔
2132
                for _, add := range htlcs {
1,063✔
2133
                        // The HTLC will be outgoing and match idx.
832✔
2134
                        if !add.Incoming && add.HtlcIndex == idx {
1,061✔
2135
                                lockedin = true
229✔
2136
                                break
229✔
2137
                        }
2138
                }
2139

2140
                if !lockedin {
233✔
2141
                        l.failf(
2✔
2142
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2143
                                "unable to handle upstream settle",
2✔
2144
                        )
2✔
2145
                        return
2✔
2146
                }
2✔
2147

2148
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
233✔
2149
                        l.failf(
4✔
2150
                                LinkFailureError{
4✔
2151
                                        code:          ErrInvalidUpdate,
4✔
2152
                                        FailureAction: LinkFailureForceClose,
4✔
2153
                                },
4✔
2154
                                "unable to handle upstream settle HTLC: %v", err,
4✔
2155
                        )
4✔
2156
                        return
4✔
2157
                }
4✔
2158

2159
                settlePacket := &htlcPacket{
229✔
2160
                        outgoingChanID: l.ShortChanID(),
229✔
2161
                        outgoingHTLCID: idx,
229✔
2162
                        htlc: &lnwire.UpdateFulfillHTLC{
229✔
2163
                                PaymentPreimage: pre,
229✔
2164
                        },
229✔
2165
                }
229✔
2166

229✔
2167
                // Add the newly discovered preimage to our growing list of
229✔
2168
                // uncommitted preimage. These will be written to the witness
229✔
2169
                // cache just before accepting the next commitment signature
229✔
2170
                // from the remote peer.
229✔
2171
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
229✔
2172

229✔
2173
                // Pipeline this settle, send it to the switch.
229✔
2174
                go l.forwardBatch(false, settlePacket)
229✔
2175

2176
        case *lnwire.UpdateFailMalformedHTLC:
7✔
2177
                // Convert the failure type encoded within the HTLC fail
7✔
2178
                // message to the proper generic lnwire error code.
7✔
2179
                var failure lnwire.FailureMessage
7✔
2180
                switch msg.FailureCode {
7✔
2181
                case lnwire.CodeInvalidOnionVersion:
5✔
2182
                        failure = &lnwire.FailInvalidOnionVersion{
5✔
2183
                                OnionSHA256: msg.ShaOnionBlob,
5✔
2184
                        }
5✔
2185
                case lnwire.CodeInvalidOnionHmac:
×
2186
                        failure = &lnwire.FailInvalidOnionHmac{
×
2187
                                OnionSHA256: msg.ShaOnionBlob,
×
2188
                        }
×
2189

2190
                case lnwire.CodeInvalidOnionKey:
×
2191
                        failure = &lnwire.FailInvalidOnionKey{
×
2192
                                OnionSHA256: msg.ShaOnionBlob,
×
2193
                        }
×
2194

2195
                // Handle malformed errors that are part of a blinded route.
2196
                // This case is slightly different, because we expect every
2197
                // relaying node in the blinded portion of the route to send
2198
                // malformed errors. If we're also a relaying node, we're
2199
                // likely going to switch this error out anyway for our own
2200
                // malformed error, but we handle the case here for
2201
                // completeness.
2202
                case lnwire.CodeInvalidBlinding:
4✔
2203
                        failure = &lnwire.FailInvalidBlinding{
4✔
2204
                                OnionSHA256: msg.ShaOnionBlob,
4✔
2205
                        }
4✔
2206

2207
                default:
2✔
2208
                        l.log.Warnf("unexpected failure code received in "+
2✔
2209
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2210

2✔
2211
                        // We don't just pass back the error we received from
2✔
2212
                        // our successor. Otherwise we might report a failure
2✔
2213
                        // that penalizes us more than needed. If the onion that
2✔
2214
                        // we forwarded was correct, the node should have been
2✔
2215
                        // able to send back its own failure. The node did not
2✔
2216
                        // send back its own failure, so we assume there was a
2✔
2217
                        // problem with the onion and report that back. We reuse
2✔
2218
                        // the invalid onion key failure because there is no
2✔
2219
                        // specific error for this case.
2✔
2220
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2221
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2222
                        }
2✔
2223
                }
2224

2225
                // With the error parsed, we'll convert the into it's opaque
2226
                // form.
2227
                var b bytes.Buffer
7✔
2228
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
7✔
2229
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2230
                        return
×
2231
                }
×
2232

2233
                // If remote side have been unable to parse the onion blob we
2234
                // have sent to it, than we should transform the malformed HTLC
2235
                // message to the usual HTLC fail message.
2236
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
7✔
2237
                if err != nil {
7✔
2238
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2239
                                "unable to handle upstream fail HTLC: %v", err)
×
2240
                        return
×
2241
                }
×
2242

2243
        case *lnwire.UpdateFailHTLC:
124✔
2244
                // Verify that the failure reason is at least 256 bytes plus
124✔
2245
                // overhead.
124✔
2246
                const minimumFailReasonLength = lnwire.FailureMessageLength +
124✔
2247
                        2 + 2 + 32
124✔
2248

124✔
2249
                if len(msg.Reason) < minimumFailReasonLength {
125✔
2250
                        // We've received a reason with a non-compliant length.
1✔
2251
                        // Older nodes happily relay back these failures that
1✔
2252
                        // may originate from a node further downstream.
1✔
2253
                        // Therefore we can't just fail the channel.
1✔
2254
                        //
1✔
2255
                        // We want to be compliant ourselves, so we also can't
1✔
2256
                        // pass back the reason unmodified. And we must make
1✔
2257
                        // sure that we don't hit the magic length check of 260
1✔
2258
                        // bytes in processRemoteSettleFails either.
1✔
2259
                        //
1✔
2260
                        // Because the reason is unreadable for the payer
1✔
2261
                        // anyway, we just replace it by a compliant-length
1✔
2262
                        // series of random bytes.
1✔
2263
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2264
                        _, err := crand.Read(msg.Reason[:])
1✔
2265
                        if err != nil {
1✔
2266
                                l.log.Errorf("Random generation error: %v", err)
×
2267

×
2268
                                return
×
2269
                        }
×
2270
                }
2271

2272
                // Add fail to the update log.
2273
                idx := msg.ID
124✔
2274
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
124✔
2275
                if err != nil {
124✔
2276
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2277
                                "unable to handle upstream fail HTLC: %v", err)
×
2278
                        return
×
2279
                }
×
2280

2281
        case *lnwire.CommitSig:
1,187✔
2282
                // Since we may have learned new preimages for the first time,
1,187✔
2283
                // we'll add them to our preimage cache. By doing this, we
1,187✔
2284
                // ensure any contested contracts watched by any on-chain
1,187✔
2285
                // arbitrators can now sweep this HTLC on-chain. We delay
1,187✔
2286
                // committing the preimages until just before accepting the new
1,187✔
2287
                // remote commitment, as afterwards the peer won't resend the
1,187✔
2288
                // Settle messages on the next channel reestablishment. Doing so
1,187✔
2289
                // allows us to more effectively batch this operation, instead
1,187✔
2290
                // of doing a single write per preimage.
1,187✔
2291
                err := l.cfg.PreimageCache.AddPreimages(
1,187✔
2292
                        l.uncommittedPreimages...,
1,187✔
2293
                )
1,187✔
2294
                if err != nil {
1,187✔
2295
                        l.failf(
×
2296
                                LinkFailureError{code: ErrInternalError},
×
2297
                                "unable to add preimages=%v to cache: %v",
×
2298
                                l.uncommittedPreimages, err,
×
2299
                        )
×
2300
                        return
×
2301
                }
×
2302

2303
                // Instead of truncating the slice to conserve memory
2304
                // allocations, we simply set the uncommitted preimage slice to
2305
                // nil so that a new one will be initialized if any more
2306
                // witnesses are discovered. We do this because the maximum size
2307
                // that the slice can occupy is 15KB, and we want to ensure we
2308
                // release that memory back to the runtime.
2309
                l.uncommittedPreimages = nil
1,187✔
2310

1,187✔
2311
                // We just received a new updates to our local commitment
1,187✔
2312
                // chain, validate this new commitment, closing the link if
1,187✔
2313
                // invalid.
1,187✔
2314
                auxSigBlob, err := msg.CustomRecords.Serialize()
1,187✔
2315
                if err != nil {
1,187✔
2316
                        l.failf(
×
2317
                                LinkFailureError{code: ErrInvalidCommitment},
×
2318
                                "unable to serialize custom records: %v", err,
×
2319
                        )
×
2320

×
2321
                        return
×
2322
                }
×
2323
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
1,187✔
2324
                        CommitSig:  msg.CommitSig,
1,187✔
2325
                        HtlcSigs:   msg.HtlcSigs,
1,187✔
2326
                        PartialSig: msg.PartialSig,
1,187✔
2327
                        AuxSigBlob: auxSigBlob,
1,187✔
2328
                })
1,187✔
2329
                if err != nil {
1,187✔
2330
                        // If we were unable to reconstruct their proposed
×
2331
                        // commitment, then we'll examine the type of error. If
×
2332
                        // it's an InvalidCommitSigError, then we'll send a
×
2333
                        // direct error.
×
2334
                        var sendData []byte
×
2335
                        switch err.(type) {
×
2336
                        case *lnwallet.InvalidCommitSigError:
×
2337
                                sendData = []byte(err.Error())
×
2338
                        case *lnwallet.InvalidHtlcSigError:
×
2339
                                sendData = []byte(err.Error())
×
2340
                        }
2341
                        l.failf(
×
2342
                                LinkFailureError{
×
2343
                                        code:          ErrInvalidCommitment,
×
2344
                                        FailureAction: LinkFailureForceClose,
×
2345
                                        SendData:      sendData,
×
2346
                                },
×
2347
                                "ChannelPoint(%v): unable to accept new "+
×
2348
                                        "commitment: %v",
×
2349
                                l.channel.ChannelPoint(), err,
×
2350
                        )
×
2351
                        return
×
2352
                }
2353

2354
                // As we've just accepted a new state, we'll now
2355
                // immediately send the remote peer a revocation for our prior
2356
                // state.
2357
                nextRevocation, currentHtlcs, finalHTLCs, err :=
1,187✔
2358
                        l.channel.RevokeCurrentCommitment()
1,187✔
2359
                if err != nil {
1,187✔
2360
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2361

×
2362
                        // We need to fail the channel in case revoking our
×
2363
                        // local commitment does not succeed. We might have
×
2364
                        // already advanced our channel state which would lead
×
2365
                        // us to proceed with an unclean state.
×
2366
                        //
×
2367
                        // NOTE: We do not trigger a force close because this
×
2368
                        // could resolve itself in case our db was just busy
×
2369
                        // not accepting new transactions.
×
2370
                        l.failf(
×
2371
                                LinkFailureError{
×
2372
                                        code:          ErrInternalError,
×
2373
                                        Warning:       true,
×
2374
                                        FailureAction: LinkFailureDisconnect,
×
2375
                                },
×
2376
                                "ChannelPoint(%v): unable to accept new "+
×
2377
                                        "commitment: %v",
×
2378
                                l.channel.ChannelPoint(), err,
×
2379
                        )
×
2380
                        return
×
2381
                }
×
2382

2383
                // As soon as we are ready to send our next revocation, we can
2384
                // invoke the incoming commit hooks.
2385
                l.RWMutex.Lock()
1,187✔
2386
                l.incomingCommitHooks.invoke()
1,187✔
2387
                l.RWMutex.Unlock()
1,187✔
2388

1,187✔
2389
                l.cfg.Peer.SendMessage(false, nextRevocation)
1,187✔
2390

1,187✔
2391
                // Notify the incoming htlcs of which the resolutions were
1,187✔
2392
                // locked in.
1,187✔
2393
                for id, settled := range finalHTLCs {
1,522✔
2394
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
335✔
2395
                                models.CircuitKey{
335✔
2396
                                        ChanID: l.ShortChanID(),
335✔
2397
                                        HtlcID: id,
335✔
2398
                                },
335✔
2399
                                channeldb.FinalHtlcInfo{
335✔
2400
                                        Settled:  settled,
335✔
2401
                                        Offchain: true,
335✔
2402
                                },
335✔
2403
                        )
335✔
2404
                }
335✔
2405

2406
                // Since we just revoked our commitment, we may have a new set
2407
                // of HTLC's on our commitment, so we'll send them using our
2408
                // function closure NotifyContractUpdate.
2409
                newUpdate := &contractcourt.ContractUpdate{
1,187✔
2410
                        HtlcKey: contractcourt.LocalHtlcSet,
1,187✔
2411
                        Htlcs:   currentHtlcs,
1,187✔
2412
                }
1,187✔
2413
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,187✔
2414
                if err != nil {
1,187✔
2415
                        l.log.Errorf("unable to notify contract update: %v",
×
2416
                                err)
×
2417
                        return
×
2418
                }
×
2419

2420
                select {
1,187✔
2421
                case <-l.Quit:
×
2422
                        return
×
2423
                default:
1,187✔
2424
                }
2425

2426
                // If the remote party initiated the state transition,
2427
                // we'll reply with a signature to provide them with their
2428
                // version of the latest commitment. Otherwise, both commitment
2429
                // chains are fully synced from our PoV, then we don't need to
2430
                // reply with a signature as both sides already have a
2431
                // commitment with the latest accepted.
2432
                if l.channel.OweCommitment() {
1,856✔
2433
                        if !l.updateCommitTxOrFail() {
669✔
2434
                                return
×
2435
                        }
×
2436
                }
2437

2438
                // If we need to send out an Stfu, this would be the time to do
2439
                // so.
2440
                if l.noDanglingUpdates(lntypes.Local) {
2,236✔
2441
                        err = l.quiescer.SendOwedStfu()
1,049✔
2442
                        if err != nil {
1,049✔
2443
                                l.stfuFailf("sendOwedStfu: %v", err.Error())
×
2444
                        }
×
2445
                }
2446

2447
                // Now that we have finished processing the incoming CommitSig
2448
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2449
                // the channel state is clean.
2450
                l.RWMutex.Lock()
1,187✔
2451
                if l.channel.IsChannelClean() {
1,361✔
2452
                        l.flushHooks.invoke()
174✔
2453
                }
174✔
2454
                l.RWMutex.Unlock()
1,187✔
2455

2456
        case *lnwire.RevokeAndAck:
1,176✔
2457
                // We've received a revocation from the remote chain, if valid,
1,176✔
2458
                // this moves the remote chain forward, and expands our
1,176✔
2459
                // revocation window.
1,176✔
2460

1,176✔
2461
                // We now process the message and advance our remote commit
1,176✔
2462
                // chain.
1,176✔
2463
                fwdPkg, remoteHTLCs, err := l.channel.ReceiveRevocation(msg)
1,176✔
2464
                if err != nil {
1,176✔
2465
                        // TODO(halseth): force close?
×
2466
                        l.failf(
×
2467
                                LinkFailureError{
×
2468
                                        code:          ErrInvalidRevocation,
×
2469
                                        FailureAction: LinkFailureDisconnect,
×
2470
                                },
×
2471
                                "unable to accept revocation: %v", err,
×
2472
                        )
×
2473
                        return
×
2474
                }
×
2475

2476
                // The remote party now has a new primary commitment, so we'll
2477
                // update the contract court to be aware of this new set (the
2478
                // prior old remote pending).
2479
                newUpdate := &contractcourt.ContractUpdate{
1,176✔
2480
                        HtlcKey: contractcourt.RemoteHtlcSet,
1,176✔
2481
                        Htlcs:   remoteHTLCs,
1,176✔
2482
                }
1,176✔
2483
                err = l.cfg.NotifyContractUpdate(newUpdate)
1,176✔
2484
                if err != nil {
1,176✔
2485
                        l.log.Errorf("unable to notify contract update: %v",
×
2486
                                err)
×
2487
                        return
×
2488
                }
×
2489

2490
                select {
1,176✔
2491
                case <-l.Quit:
2✔
2492
                        return
2✔
2493
                default:
1,174✔
2494
                }
2495

2496
                // If we have a tower client for this channel type, we'll
2497
                // create a backup for the current state.
2498
                if l.cfg.TowerClient != nil {
1,178✔
2499
                        state := l.channel.State()
4✔
2500
                        chanID := l.ChanID()
4✔
2501

4✔
2502
                        err = l.cfg.TowerClient.BackupState(
4✔
2503
                                &chanID, state.RemoteCommitment.CommitHeight-1,
4✔
2504
                        )
4✔
2505
                        if err != nil {
4✔
2506
                                l.failf(LinkFailureError{
×
2507
                                        code: ErrInternalError,
×
2508
                                }, "unable to queue breach backup: %v", err)
×
2509
                                return
×
2510
                        }
×
2511
                }
2512

2513
                // If we can send updates then we can process adds in case we
2514
                // are the exit hop and need to send back resolutions, or in
2515
                // case there are validity issues with the packets. Otherwise
2516
                // we defer the action until resume.
2517
                //
2518
                // We are free to process the settles and fails without this
2519
                // check since processing those can't result in further updates
2520
                // to this channel link.
2521
                if l.quiescer.CanSendUpdates() {
2,347✔
2522
                        l.processRemoteAdds(fwdPkg)
1,173✔
2523
                } else {
1,174✔
2524
                        l.quiescer.OnResume(func() {
1✔
2525
                                l.processRemoteAdds(fwdPkg)
×
2526
                        })
×
2527
                }
2528
                l.processRemoteSettleFails(fwdPkg)
1,174✔
2529

1,174✔
2530
                // If the link failed during processing the adds, we must
1,174✔
2531
                // return to ensure we won't attempted to update the state
1,174✔
2532
                // further.
1,174✔
2533
                if l.failed {
1,178✔
2534
                        return
4✔
2535
                }
4✔
2536

2537
                // The revocation window opened up. If there are pending local
2538
                // updates, try to update the commit tx. Pending updates could
2539
                // already have been present because of a previously failed
2540
                // update to the commit tx or freshly added in by
2541
                // processRemoteAdds. Also in case there are no local updates,
2542
                // but there are still remote updates that are not in the remote
2543
                // commit tx yet, send out an update.
2544
                if l.channel.OweCommitment() {
1,478✔
2545
                        if !l.updateCommitTxOrFail() {
311✔
2546
                                return
7✔
2547
                        }
7✔
2548
                }
2549

2550
                // Now that we have finished processing the RevokeAndAck, we
2551
                // can invoke the flushHooks if the channel state is clean.
2552
                l.RWMutex.Lock()
1,167✔
2553
                if l.channel.IsChannelClean() {
1,330✔
2554
                        l.flushHooks.invoke()
163✔
2555
                }
163✔
2556
                l.RWMutex.Unlock()
1,167✔
2557

2558
        case *lnwire.UpdateFee:
3✔
2559
                // Check and see if their proposed fee-rate would make us
3✔
2560
                // exceed the fee threshold.
3✔
2561
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2562

3✔
2563
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2564
                if err != nil {
3✔
2565
                        // This shouldn't typically happen. If it does, it
×
2566
                        // indicates something is wrong with our channel state.
×
2567
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2568
                                "exceeded")
×
2569
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2570
                                "error calculating fee exposure: %v", err)
×
2571

×
2572
                        return
×
2573
                }
×
2574

2575
                if isDust {
3✔
2576
                        // The proposed fee-rate makes us exceed the fee
×
2577
                        // threshold.
×
2578
                        l.failf(LinkFailureError{code: ErrInternalError},
×
2579
                                "fee threshold exceeded: %v", err)
×
2580
                        return
×
2581
                }
×
2582

2583
                // We received fee update from peer. If we are the initiator we
2584
                // will fail the channel, if not we will apply the update.
2585
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2586
                        l.failf(LinkFailureError{code: ErrInvalidUpdate},
×
2587
                                "error receiving fee update: %v", err)
×
2588
                        return
×
2589
                }
×
2590

2591
                // Update the mailbox's feerate as well.
2592
                l.mailBox.SetFeeRate(fee)
3✔
2593

2594
        case *lnwire.Stfu:
6✔
2595
                err := l.handleStfu(msg)
6✔
2596
                if err != nil {
6✔
2597
                        l.stfuFailf("handleStfu: %v", err.Error())
×
2598
                }
×
2599

2600
        // In the case where we receive a warning message from our peer, just
2601
        // log it and move on. We choose not to disconnect from our peer,
2602
        // although we "MAY" do so according to the specification.
2603
        case *lnwire.Warning:
1✔
2604
                l.log.Warnf("received warning message from peer: %v",
1✔
2605
                        msg.Warning())
1✔
2606

2607
        case *lnwire.Error:
4✔
2608
                // Error received from remote, MUST fail channel, but should
4✔
2609
                // only print the contents of the error message if all
4✔
2610
                // characters are printable ASCII.
4✔
2611
                l.failf(
4✔
2612
                        LinkFailureError{
4✔
2613
                                code: ErrRemoteError,
4✔
2614

4✔
2615
                                // TODO(halseth): we currently don't fail the
4✔
2616
                                // channel permanently, as there are some sync
4✔
2617
                                // issues with other implementations that will
4✔
2618
                                // lead to them sending an error message, but
4✔
2619
                                // we can recover from on next connection. See
4✔
2620
                                // https://github.com/ElementsProject/lightning/issues/4212
4✔
2621
                                PermanentFailure: false,
4✔
2622
                        },
4✔
2623
                        "ChannelPoint(%v): received error from peer: %v",
4✔
2624
                        l.channel.ChannelPoint(), msg.Error(),
4✔
2625
                )
4✔
2626
        default:
×
2627
                l.log.Warnf("received unknown message of type %T", msg)
×
2628
        }
2629

2630
}
2631

2632
// handleStfu implements the top-level logic for handling the Stfu message from
2633
// our peer.
2634
func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error {
6✔
2635
        if !l.noDanglingUpdates(lntypes.Remote) {
6✔
2636
                return ErrPendingRemoteUpdates
×
2637
        }
×
2638
        err := l.quiescer.RecvStfu(*stfu)
6✔
2639
        if err != nil {
6✔
2640
                return err
×
2641
        }
×
2642

2643
        // If we can immediately send an Stfu response back, we will.
2644
        if l.noDanglingUpdates(lntypes.Local) {
11✔
2645
                return l.quiescer.SendOwedStfu()
5✔
2646
        }
5✔
2647

2648
        return nil
1✔
2649
}
2650

2651
// stfuFailf fails the link in the case where the requirements of the quiescence
2652
// protocol are violated. In all cases we opt to drop the connection as only
2653
// link state (as opposed to channel state) is affected.
2654
func (l *channelLink) stfuFailf(format string, args ...interface{}) {
×
2655
        l.failf(LinkFailureError{
×
2656
                code:             ErrStfuViolation,
×
2657
                FailureAction:    LinkFailureDisconnect,
×
2658
                PermanentFailure: false,
×
2659
                Warning:          true,
×
2660
        }, format, args...)
×
2661
}
×
2662

2663
// noDanglingUpdates returns true when there are 0 updates that were originally
2664
// issued by whose on either the Local or Remote commitment transaction.
2665
func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool {
1,192✔
2666
        pendingOnLocal := l.channel.NumPendingUpdates(
1,192✔
2667
                whose, lntypes.Local,
1,192✔
2668
        )
1,192✔
2669
        pendingOnRemote := l.channel.NumPendingUpdates(
1,192✔
2670
                whose, lntypes.Remote,
1,192✔
2671
        )
1,192✔
2672

1,192✔
2673
        return pendingOnLocal == 0 && pendingOnRemote == 0
1,192✔
2674
}
1,192✔
2675

2676
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2677
// for packets delivered from server, and cleaning up any circuits closed by
2678
// signing a previous commitment txn. This method ensures that the circuits are
2679
// removed from the circuit map before removing them from the link's mailbox,
2680
// otherwise it could be possible for some circuit to be missed if this link
2681
// flaps.
2682
func (l *channelLink) ackDownStreamPackets() error {
1,366✔
2683
        // First, remove the downstream Add packets that were included in the
1,366✔
2684
        // previous commitment signature. This will prevent the Adds from being
1,366✔
2685
        // replayed if this link disconnects.
1,366✔
2686
        for _, inKey := range l.openedCircuits {
1,834✔
2687
                // In order to test the sphinx replay logic of the remote
468✔
2688
                // party, unsafe replay does not acknowledge the packets from
468✔
2689
                // the mailbox. We can then force a replay of any Add packets
468✔
2690
                // held in memory by disconnecting and reconnecting the link.
468✔
2691
                if l.cfg.UnsafeReplay {
472✔
2692
                        continue
4✔
2693
                }
2694

2695
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
468✔
2696
                l.mailBox.AckPacket(inKey)
468✔
2697
        }
2698

2699
        // Now, we will delete all circuits closed by the previous commitment
2700
        // signature, which is the result of downstream Settle/Fail packets. We
2701
        // batch them here to ensure circuits are closed atomically and for
2702
        // performance.
2703
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
1,366✔
2704
        switch err {
1,366✔
2705
        case nil:
1,366✔
2706
                // Successful deletion.
2707

2708
        default:
×
2709
                l.log.Errorf("unable to delete %d circuits: %v",
×
2710
                        len(l.closedCircuits), err)
×
2711
                return err
×
2712
        }
2713

2714
        // With the circuits removed from memory and disk, we now ack any
2715
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2716
        // after startup. If forgive is enabled and we've reached this point,
2717
        // the circuits must have been removed at some point, so it is now safe
2718
        // to un-queue the corresponding Settle/Fails.
2719
        for _, inKey := range l.closedCircuits {
1,409✔
2720
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
43✔
2721
                        inKey)
43✔
2722
                l.mailBox.AckPacket(inKey)
43✔
2723
        }
43✔
2724

2725
        // Lastly, reset our buffers to be empty while keeping any acquired
2726
        // growth in the backing array.
2727
        l.openedCircuits = l.openedCircuits[:0]
1,366✔
2728
        l.closedCircuits = l.closedCircuits[:0]
1,366✔
2729

1,366✔
2730
        return nil
1,366✔
2731
}
2732

2733
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2734
// the link.
2735
func (l *channelLink) updateCommitTxOrFail() bool {
1,219✔
2736
        err := l.updateCommitTx()
1,219✔
2737
        switch err {
1,219✔
2738
        // No error encountered, success.
2739
        case nil:
1,209✔
2740

2741
        // A duplicate keystone error should be resolved and is not fatal, so
2742
        // we won't send an Error message to the peer.
2743
        case ErrDuplicateKeystone:
×
2744
                l.failf(LinkFailureError{code: ErrCircuitError},
×
2745
                        "temporary circuit error: %v", err)
×
2746
                return false
×
2747

2748
        // Any other error is treated results in an Error message being sent to
2749
        // the peer.
2750
        default:
10✔
2751
                l.failf(LinkFailureError{code: ErrInternalError},
10✔
2752
                        "unable to update commitment: %v", err)
10✔
2753
                return false
10✔
2754
        }
2755

2756
        return true
1,209✔
2757
}
2758

2759
// updateCommitTx signs, then sends an update to the remote peer adding a new
2760
// commitment to their commitment chain which includes all the latest updates
2761
// we've received+processed up to this point.
2762
func (l *channelLink) updateCommitTx() error {
1,277✔
2763
        // Preemptively write all pending keystones to disk, just in case the
1,277✔
2764
        // HTLCs we have in memory are included in the subsequent attempt to
1,277✔
2765
        // sign a commitment state.
1,277✔
2766
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
1,277✔
2767
        if err != nil {
1,277✔
2768
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2769
                // it.
×
2770
                return err
×
2771
        }
×
2772

2773
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2774
        l.keystoneBatch = l.keystoneBatch[:0]
1,277✔
2775

1,277✔
2776
        // If hodl.Commit mode is active, we will refrain from attempting to
1,277✔
2777
        // commit any in-memory modifications to the channel state. Exiting here
1,277✔
2778
        // permits testing of either the switch or link's ability to trim
1,277✔
2779
        // circuits that have been opened, but unsuccessfully committed.
1,277✔
2780
        if l.cfg.HodlMask.Active(hodl.Commit) {
1,285✔
2781
                l.log.Warnf(hodl.Commit.Warning())
8✔
2782
                return nil
8✔
2783
        }
8✔
2784

2785
        ctx, done := l.WithCtxQuitNoTimeout()
1,273✔
2786
        defer done()
1,273✔
2787

1,273✔
2788
        newCommit, err := l.channel.SignNextCommitment(ctx)
1,273✔
2789
        if err == lnwallet.ErrNoWindow {
1,354✔
2790
                l.cfg.PendingCommitTicker.Resume()
81✔
2791
                l.log.Trace("PendingCommitTicker resumed")
81✔
2792

81✔
2793
                n := l.channel.NumPendingUpdates(lntypes.Local, lntypes.Remote)
81✔
2794
                l.log.Tracef("revocation window exhausted, unable to send: "+
81✔
2795
                        "%v, pend_updates=%v, dangling_closes%v", n,
81✔
2796
                        lnutils.SpewLogClosure(l.openedCircuits),
81✔
2797
                        lnutils.SpewLogClosure(l.closedCircuits))
81✔
2798

81✔
2799
                return nil
81✔
2800
        } else if err != nil {
1,278✔
2801
                return err
1✔
2802
        }
1✔
2803

2804
        if err := l.ackDownStreamPackets(); err != nil {
1,196✔
2805
                return err
×
2806
        }
×
2807

2808
        l.cfg.PendingCommitTicker.Pause()
1,196✔
2809
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
1,196✔
2810

1,196✔
2811
        // The remote party now has a new pending commitment, so we'll update
1,196✔
2812
        // the contract court to be aware of this new set (the prior old remote
1,196✔
2813
        // pending).
1,196✔
2814
        newUpdate := &contractcourt.ContractUpdate{
1,196✔
2815
                HtlcKey: contractcourt.RemotePendingHtlcSet,
1,196✔
2816
                Htlcs:   newCommit.PendingHTLCs,
1,196✔
2817
        }
1,196✔
2818
        err = l.cfg.NotifyContractUpdate(newUpdate)
1,196✔
2819
        if err != nil {
1,196✔
2820
                l.log.Errorf("unable to notify contract update: %v", err)
×
2821
                return err
×
2822
        }
×
2823

2824
        select {
1,196✔
2825
        case <-l.Quit:
11✔
2826
                return ErrLinkShuttingDown
11✔
2827
        default:
1,185✔
2828
        }
2829

2830
        auxBlobRecords, err := lnwire.ParseCustomRecords(newCommit.AuxSigBlob)
1,185✔
2831
        if err != nil {
1,185✔
2832
                return fmt.Errorf("error parsing aux sigs: %w", err)
×
2833
        }
×
2834

2835
        commitSig := &lnwire.CommitSig{
1,185✔
2836
                ChanID:        l.ChanID(),
1,185✔
2837
                CommitSig:     newCommit.CommitSig,
1,185✔
2838
                HtlcSigs:      newCommit.HtlcSigs,
1,185✔
2839
                PartialSig:    newCommit.PartialSig,
1,185✔
2840
                CustomRecords: auxBlobRecords,
1,185✔
2841
        }
1,185✔
2842
        l.cfg.Peer.SendMessage(false, commitSig)
1,185✔
2843

1,185✔
2844
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
1,185✔
2845
        // of commit hooks.
1,185✔
2846
        l.RWMutex.Lock()
1,185✔
2847
        l.outgoingCommitHooks.invoke()
1,185✔
2848
        l.RWMutex.Unlock()
1,185✔
2849

1,185✔
2850
        return nil
1,185✔
2851
}
2852

2853
// Peer returns the representation of remote peer with which we have the
2854
// channel link opened.
2855
//
2856
// NOTE: Part of the ChannelLink interface.
2857
func (l *channelLink) PeerPubKey() [33]byte {
445✔
2858
        return l.cfg.Peer.PubKey()
445✔
2859
}
445✔
2860

2861
// ChannelPoint returns the channel outpoint for the channel link.
2862
// NOTE: Part of the ChannelLink interface.
2863
func (l *channelLink) ChannelPoint() wire.OutPoint {
856✔
2864
        return l.channel.ChannelPoint()
856✔
2865
}
856✔
2866

2867
// ShortChanID returns the short channel ID for the channel link. The short
2868
// channel ID encodes the exact location in the main chain that the original
2869
// funding output can be found.
2870
//
2871
// NOTE: Part of the ChannelLink interface.
2872
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
4,252✔
2873
        l.RLock()
4,252✔
2874
        defer l.RUnlock()
4,252✔
2875

4,252✔
2876
        return l.channel.ShortChanID()
4,252✔
2877
}
4,252✔
2878

2879
// UpdateShortChanID updates the short channel ID for a link. This may be
2880
// required in the event that a link is created before the short chan ID for it
2881
// is known, or a re-org occurs, and the funding transaction changes location
2882
// within the chain.
2883
//
2884
// NOTE: Part of the ChannelLink interface.
2885
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
4✔
2886
        chanID := l.ChanID()
4✔
2887

4✔
2888
        // Refresh the channel state's short channel ID by loading it from disk.
4✔
2889
        // This ensures that the channel state accurately reflects the updated
4✔
2890
        // short channel ID.
4✔
2891
        err := l.channel.State().Refresh()
4✔
2892
        if err != nil {
4✔
2893
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2894
                        "%v", chanID, err)
×
2895
                return hop.Source, err
×
2896
        }
×
2897

2898
        return hop.Source, nil
4✔
2899
}
2900

2901
// ChanID returns the channel ID for the channel link. The channel ID is a more
2902
// compact representation of a channel's full outpoint.
2903
//
2904
// NOTE: Part of the ChannelLink interface.
2905
func (l *channelLink) ChanID() lnwire.ChannelID {
3,925✔
2906
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
3,925✔
2907
}
3,925✔
2908

2909
// Bandwidth returns the total amount that can flow through the channel link at
2910
// this given instance. The value returned is expressed in millisatoshi and can
2911
// be used by callers when making forwarding decisions to determine if a link
2912
// can accept an HTLC.
2913
//
2914
// NOTE: Part of the ChannelLink interface.
2915
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
816✔
2916
        // Get the balance available on the channel for new HTLCs. This takes
816✔
2917
        // the channel reserve into account so HTLCs up to this value won't
816✔
2918
        // violate it.
816✔
2919
        return l.channel.AvailableBalance()
816✔
2920
}
816✔
2921

2922
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2923
// amount provided to the link. This check does not reserve a space, since
2924
// forwards or other payments may use the available slot, so it should be
2925
// considered best-effort.
2926
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
4✔
2927
        return l.channel.MayAddOutgoingHtlc(amt)
4✔
2928
}
4✔
2929

2930
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2931
// method.
2932
//
2933
// NOTE: Part of the dustHandler interface.
2934
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2935
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
2,527✔
2936

2,527✔
2937
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
2,527✔
2938
}
2,527✔
2939

2940
// getFeeRate is a wrapper method that retrieves the underlying channel's
2941
// feerate.
2942
//
2943
// NOTE: Part of the dustHandler interface.
2944
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
673✔
2945
        return l.channel.CommitFeeRate()
673✔
2946
}
673✔
2947

2948
// getDustClosure returns a closure that can be used by the switch or mailbox
2949
// to evaluate whether a given HTLC is dust.
2950
//
2951
// NOTE: Part of the dustHandler interface.
2952
func (l *channelLink) getDustClosure() dustClosure {
1,603✔
2953
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,603✔
2954
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,603✔
2955
        chanType := l.channel.State().ChanType
1,603✔
2956

1,603✔
2957
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,603✔
2958
}
1,603✔
2959

2960
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2961
// is used so that the Switch can have access to the commitment fee without
2962
// needing to have a *LightningChannel. This doesn't include dust.
2963
//
2964
// NOTE: Part of the dustHandler interface.
2965
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
1,884✔
2966
        if remote {
2,838✔
2967
                return l.channel.State().RemoteCommitment.CommitFee
954✔
2968
        }
954✔
2969

2970
        return l.channel.State().LocalCommitment.CommitFee
934✔
2971
}
2972

2973
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2974
// increases the total dust and fees within the channel past the configured
2975
// fee threshold. It first calculates the dust sum over every update in the
2976
// update log with the proposed fee-rate and taking into account both the local
2977
// and remote dust limits. It uses every update in the update log instead of
2978
// what is actually on the local and remote commitments because it is assumed
2979
// that in a worst-case scenario, every update in the update log could
2980
// theoretically be on either commitment transaction and this needs to be
2981
// accounted for with this fee-rate. It then calculates the local and remote
2982
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2983
// and determines if the fee threshold has been exceeded.
2984
func (l *channelLink) exceedsFeeExposureLimit(
2985
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2986

6✔
2987
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2988

6✔
2989
        // Get the sum of dust for both the local and remote commitments using
6✔
2990
        // this "dry-run" fee.
6✔
2991
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2992
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2993

6✔
2994
        // Calculate the local and remote commitment fees using this dry-run
6✔
2995
        // fee.
6✔
2996
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2997
        if err != nil {
6✔
2998
                return false, err
×
2999
        }
×
3000

3001
        // Finally, check whether the max fee exposure was exceeded on either
3002
        // future commitment transaction with the fee-rate.
3003
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
3004
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
3005
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3006
                        "local dust: %v, local fee: %v", l.ShortChanID(),
×
3007
                        totalLocalDust, localFee)
×
3008

×
3009
                return true, nil
×
3010
        }
×
3011

3012
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
3013
                remoteFee,
6✔
3014
        )
6✔
3015

6✔
3016
        if totalRemoteDust > l.cfg.MaxFeeExposure {
6✔
3017
                l.log.Debugf("ChannelLink(%v): exceeds fee exposure limit: "+
×
3018
                        "remote dust: %v, remote fee: %v", l.ShortChanID(),
×
3019
                        totalRemoteDust, remoteFee)
×
3020

×
3021
                return true, nil
×
3022
        }
×
3023

3024
        return false, nil
6✔
3025
}
3026

3027
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
3028
// channel exceed the fee threshold. It first fetches the largest fee-rate that
3029
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
3030
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
3031
// the overall dust sum. If it is not dust, it contributes to weight, which
3032
// also adds to the overall dust sum by an increase in fees. If the dust sum on
3033
// either commitment exceeds the configured fee threshold, this function
3034
// returns true.
3035
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
3036
        incoming bool) bool {
934✔
3037

934✔
3038
        dustClosure := l.getDustClosure()
934✔
3039

934✔
3040
        feeRate := l.channel.WorstCaseFeeRate()
934✔
3041

934✔
3042
        amount := htlc.Amount.ToSatoshis()
934✔
3043

934✔
3044
        // See if this HTLC is dust on both the local and remote commitments.
934✔
3045
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
934✔
3046
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
934✔
3047

934✔
3048
        // Calculate the dust sum for the local and remote commitments.
934✔
3049
        localDustSum := l.getDustSum(
934✔
3050
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
934✔
3051
        )
934✔
3052
        remoteDustSum := l.getDustSum(
934✔
3053
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
934✔
3054
        )
934✔
3055

934✔
3056
        // Grab the larger of the local and remote commitment fees w/o dust.
934✔
3057
        commitFee := l.getCommitFee(false)
934✔
3058

934✔
3059
        if l.getCommitFee(true) > commitFee {
958✔
3060
                commitFee = l.getCommitFee(true)
24✔
3061
        }
24✔
3062

3063
        commitFeeMSat := lnwire.NewMSatFromSatoshis(commitFee)
934✔
3064

934✔
3065
        localDustSum += commitFeeMSat
934✔
3066
        remoteDustSum += commitFeeMSat
934✔
3067

934✔
3068
        // Calculate the additional fee increase if this is a non-dust HTLC.
934✔
3069
        weight := lntypes.WeightUnit(input.HTLCWeight)
934✔
3070
        additional := lnwire.NewMSatFromSatoshis(
934✔
3071
                feeRate.FeeForWeight(weight),
934✔
3072
        )
934✔
3073

934✔
3074
        if isLocalDust {
1,571✔
3075
                // If this is dust, it doesn't contribute to weight but does
637✔
3076
                // contribute to the overall dust sum.
637✔
3077
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
637✔
3078
        } else {
938✔
3079
                // Account for the fee increase that comes with an increase in
301✔
3080
                // weight.
301✔
3081
                localDustSum += additional
301✔
3082
        }
301✔
3083

3084
        if localDustSum > l.cfg.MaxFeeExposure {
938✔
3085
                // The max fee exposure was exceeded.
4✔
3086
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
4✔
3087
                        "overexposed, total local dust: %v (current commit "+
4✔
3088
                        "fee: %v)", l.ShortChanID(), htlc, localDustSum)
4✔
3089

4✔
3090
                return true
4✔
3091
        }
4✔
3092

3093
        if isRemoteDust {
1,564✔
3094
                // If this is dust, it doesn't contribute to weight but does
634✔
3095
                // contribute to the overall dust sum.
634✔
3096
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
634✔
3097
        } else {
934✔
3098
                // Account for the fee increase that comes with an increase in
300✔
3099
                // weight.
300✔
3100
                remoteDustSum += additional
300✔
3101
        }
300✔
3102

3103
        if remoteDustSum > l.cfg.MaxFeeExposure {
930✔
3104
                // The max fee exposure was exceeded.
×
3105
                l.log.Debugf("ChannelLink(%v): HTLC %v makes the channel "+
×
3106
                        "overexposed, total remote dust: %v (current commit "+
×
3107
                        "fee: %v)", l.ShortChanID(), htlc, remoteDustSum)
×
3108

×
3109
                return true
×
3110
        }
×
3111

3112
        return false
930✔
3113
}
3114

3115
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
3116
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
3117
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
3118
// whether to evaluate on the local or remote commit, and finally an HTLC
3119
// amount to test.
3120
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
3121
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
3122

3123
// dustHelper is used to construct the dustClosure.
3124
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
3125
        remoteDustLimit btcutil.Amount) dustClosure {
1,803✔
3126

1,803✔
3127
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
1,803✔
3128
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
11,585✔
3129

9,782✔
3130
                var dustLimit btcutil.Amount
9,782✔
3131
                if whoseCommit.IsLocal() {
14,675✔
3132
                        dustLimit = localDustLimit
4,893✔
3133
                } else {
9,786✔
3134
                        dustLimit = remoteDustLimit
4,893✔
3135
                }
4,893✔
3136

3137
                return lnwallet.HtlcIsDust(
9,782✔
3138
                        chantype, incoming, whoseCommit, feerate, amt,
9,782✔
3139
                        dustLimit,
9,782✔
3140
                )
9,782✔
3141
        }
3142

3143
        return isDust
1,803✔
3144
}
3145

3146
// zeroConfConfirmed returns whether or not the zero-conf channel has
3147
// confirmed on-chain.
3148
//
3149
// Part of the scidAliasHandler interface.
3150
func (l *channelLink) zeroConfConfirmed() bool {
7✔
3151
        return l.channel.State().ZeroConfConfirmed()
7✔
3152
}
7✔
3153

3154
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
3155
// should not be called for non-zero-conf channels.
3156
//
3157
// Part of the scidAliasHandler interface.
3158
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
7✔
3159
        return l.channel.State().ZeroConfRealScid()
7✔
3160
}
7✔
3161

3162
// isZeroConf returns whether or not the underlying channel is a zero-conf
3163
// channel.
3164
//
3165
// Part of the scidAliasHandler interface.
3166
func (l *channelLink) isZeroConf() bool {
217✔
3167
        return l.channel.State().IsZeroConf()
217✔
3168
}
217✔
3169

3170
// negotiatedAliasFeature returns whether or not the underlying channel has
3171
// negotiated the option-scid-alias feature bit. This will be true for both
3172
// option-scid-alias and zero-conf channel-types. It will also be true for
3173
// channels with the feature bit but without the above channel-types.
3174
//
3175
// Part of the scidAliasFeature interface.
3176
func (l *channelLink) negotiatedAliasFeature() bool {
378✔
3177
        return l.channel.State().NegotiatedAliasFeature()
378✔
3178
}
378✔
3179

3180
// getAliases returns the set of aliases for the underlying channel.
3181
//
3182
// Part of the scidAliasHandler interface.
3183
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
223✔
3184
        return l.cfg.GetAliases(l.ShortChanID())
223✔
3185
}
223✔
3186

3187
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
3188
//
3189
// Part of the scidAliasHandler interface.
3190
func (l *channelLink) attachFailAliasUpdate(closure func(
3191
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate1) {
218✔
3192

218✔
3193
        l.Lock()
218✔
3194
        l.cfg.FailAliasUpdate = closure
218✔
3195
        l.Unlock()
218✔
3196
}
218✔
3197

3198
// AttachMailBox updates the current mailbox used by this link, and hooks up
3199
// the mailbox's message and packet outboxes to the link's upstream and
3200
// downstream chans, respectively.
3201
func (l *channelLink) AttachMailBox(mailbox MailBox) {
217✔
3202
        l.Lock()
217✔
3203
        l.mailBox = mailbox
217✔
3204
        l.upstream = mailbox.MessageOutBox()
217✔
3205
        l.downstream = mailbox.PacketOutBox()
217✔
3206
        l.Unlock()
217✔
3207

217✔
3208
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
217✔
3209
        // never committed.
217✔
3210
        l.mailBox.SetFeeRate(l.getFeeRate())
217✔
3211

217✔
3212
        // Also set the mailbox's dust closure so that it can query whether HTLC's
217✔
3213
        // are dust given the current feerate.
217✔
3214
        l.mailBox.SetDustClosure(l.getDustClosure())
217✔
3215
}
217✔
3216

3217
// UpdateForwardingPolicy updates the forwarding policy for the target
3218
// ChannelLink. Once updated, the link will use the new forwarding policy to
3219
// govern if it an incoming HTLC should be forwarded or not. We assume that
3220
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
3221
// update all of the link's FwrdingPolicy's values.
3222
//
3223
// NOTE: Part of the ChannelLink interface.
3224
func (l *channelLink) UpdateForwardingPolicy(
3225
        newPolicy models.ForwardingPolicy) {
16✔
3226

16✔
3227
        l.Lock()
16✔
3228
        defer l.Unlock()
16✔
3229

16✔
3230
        l.cfg.FwrdingPolicy = newPolicy
16✔
3231
}
16✔
3232

3233
// CheckHtlcForward should return a nil error if the passed HTLC details
3234
// satisfy the current forwarding policy fo the target link. Otherwise,
3235
// a LinkError with a valid protocol failure message should be returned
3236
// in order to signal to the source of the HTLC, the policy consistency
3237
// issue.
3238
//
3239
// NOTE: Part of the ChannelLink interface.
3240
func (l *channelLink) CheckHtlcForward(payHash [32]byte, incomingHtlcAmt,
3241
        amtToForward lnwire.MilliSatoshi, incomingTimeout,
3242
        outgoingTimeout uint32, inboundFee models.InboundFee,
3243
        heightNow uint32, originalScid lnwire.ShortChannelID,
3244
        customRecords lnwire.CustomRecords) *LinkError {
53✔
3245

53✔
3246
        l.RLock()
53✔
3247
        policy := l.cfg.FwrdingPolicy
53✔
3248
        l.RUnlock()
53✔
3249

53✔
3250
        // Using the outgoing HTLC amount, we'll calculate the outgoing
53✔
3251
        // fee this incoming HTLC must carry in order to satisfy the constraints
53✔
3252
        // of the outgoing link.
53✔
3253
        outFee := ExpectedFee(policy, amtToForward)
53✔
3254

53✔
3255
        // Then calculate the inbound fee that we charge based on the sum of
53✔
3256
        // outgoing HTLC amount and outgoing fee.
53✔
3257
        inFee := inboundFee.CalcFee(amtToForward + outFee)
53✔
3258

53✔
3259
        // Add up both fee components. It is important to calculate both fees
53✔
3260
        // separately. An alternative way of calculating is to first determine
53✔
3261
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
53✔
3262
        // rounding may cause the result to be slightly higher than in the case
53✔
3263
        // of separately rounded fee components. This potentially causes failed
53✔
3264
        // forwards for senders and is something to be avoided.
53✔
3265
        expectedFee := inFee + int64(outFee)
53✔
3266

53✔
3267
        // If the actual fee is less than our expected fee, then we'll reject
53✔
3268
        // this HTLC as it didn't provide a sufficient amount of fees, or the
53✔
3269
        // values have been tampered with, or the send used incorrect/dated
53✔
3270
        // information to construct the forwarding information for this hop. In
53✔
3271
        // any case, we'll cancel this HTLC.
53✔
3272
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
53✔
3273
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
63✔
3274
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
10✔
3275
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
10✔
3276
                        "inboundFee=%v",
10✔
3277
                        payHash[:], expectedFee, actualFee,
10✔
3278
                        incomingHtlcAmt, amtToForward, inboundFee,
10✔
3279
                )
10✔
3280

10✔
3281
                // As part of the returned error, we'll send our latest routing
10✔
3282
                // policy so the sending node obtains the most up to date data.
10✔
3283
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
20✔
3284
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
10✔
3285
                }
10✔
3286
                failure := l.createFailureWithUpdate(false, originalScid, cb)
10✔
3287
                return NewLinkError(failure)
10✔
3288
        }
3289

3290
        // Check whether the outgoing htlc satisfies the channel policy.
3291
        err := l.canSendHtlc(
47✔
3292
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
47✔
3293
                originalScid, customRecords,
47✔
3294
        )
47✔
3295
        if err != nil {
64✔
3296
                return err
17✔
3297
        }
17✔
3298

3299
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3300
        // the following constraint: the incoming time-lock minus our time-lock
3301
        // delta should equal the outgoing time lock. Otherwise, whether the
3302
        // sender messed up, or an intermediate node tampered with the HTLC.
3303
        timeDelta := policy.TimeLockDelta
34✔
3304
        if incomingTimeout < outgoingTimeout+timeDelta {
36✔
3305
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3306
                        "expected at least %v block delta, got %v block delta",
2✔
3307
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3308

2✔
3309
                // Grab the latest routing policy so the sending node is up to
2✔
3310
                // date with our current policy.
2✔
3311
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3312
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3313
                                incomingTimeout, *upd,
2✔
3314
                        )
2✔
3315
                }
2✔
3316
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3317
                return NewLinkError(failure)
2✔
3318
        }
3319

3320
        return nil
32✔
3321
}
3322

3323
// CheckHtlcTransit should return a nil error if the passed HTLC details
3324
// satisfy the current channel policy.  Otherwise, a LinkError with a
3325
// valid protocol failure message should be returned in order to signal
3326
// the violation. This call is intended to be used for locally initiated
3327
// payments for which there is no corresponding incoming htlc.
3328
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3329
        amt lnwire.MilliSatoshi, timeout uint32, heightNow uint32,
3330
        customRecords lnwire.CustomRecords) *LinkError {
410✔
3331

410✔
3332
        l.RLock()
410✔
3333
        policy := l.cfg.FwrdingPolicy
410✔
3334
        l.RUnlock()
410✔
3335

410✔
3336
        // We pass in hop.Source here as this is only used in the Switch when
410✔
3337
        // trying to send over a local link. This causes the fallback mechanism
410✔
3338
        // to occur.
410✔
3339
        return l.canSendHtlc(
410✔
3340
                policy, payHash, amt, timeout, heightNow, hop.Source,
410✔
3341
                customRecords,
410✔
3342
        )
410✔
3343
}
410✔
3344

3345
// canSendHtlc checks whether the given htlc parameters satisfy
3346
// the channel's amount and time lock constraints.
3347
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3348
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3349
        heightNow uint32, originalScid lnwire.ShortChannelID,
3350
        customRecords lnwire.CustomRecords) *LinkError {
453✔
3351

453✔
3352
        // As our first sanity check, we'll ensure that the passed HTLC isn't
453✔
3353
        // too small for the next hop. If so, then we'll cancel the HTLC
453✔
3354
        // directly.
453✔
3355
        if amt < policy.MinHTLCOut {
465✔
3356
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
12✔
3357
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
12✔
3358
                        amt)
12✔
3359

12✔
3360
                // As part of the returned error, we'll send our latest routing
12✔
3361
                // policy so the sending node obtains the most up to date data.
12✔
3362
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
24✔
3363
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
12✔
3364
                }
12✔
3365
                failure := l.createFailureWithUpdate(false, originalScid, cb)
12✔
3366
                return NewLinkError(failure)
12✔
3367
        }
3368

3369
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3370
        // cancel the HTLC directly.
3371
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
452✔
3372
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
7✔
3373
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
7✔
3374

7✔
3375
                // As part of the returned error, we'll send our latest routing
7✔
3376
                // policy so the sending node obtains the most up-to-date data.
7✔
3377
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
14✔
3378
                        return lnwire.NewTemporaryChannelFailure(upd)
7✔
3379
                }
7✔
3380
                failure := l.createFailureWithUpdate(false, originalScid, cb)
7✔
3381
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
7✔
3382
        }
3383

3384
        // We want to avoid offering an HTLC which will expire in the near
3385
        // future, so we'll reject an HTLC if the outgoing expiration time is
3386
        // too close to the current height.
3387
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
444✔
3388
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3389
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3390
                        timeout, heightNow)
2✔
3391

2✔
3392
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
4✔
3393
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3394
                }
2✔
3395
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3396
                return NewLinkError(failure)
2✔
3397
        }
3398

3399
        // Check absolute max delta.
3400
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
441✔
3401
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3402
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3403
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3404

1✔
3405
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3406
        }
1✔
3407

3408
        // We now check the available bandwidth to see if this HTLC can be
3409
        // forwarded.
3410
        availableBandwidth := l.Bandwidth()
439✔
3411
        auxBandwidth, err := fn.MapOptionZ(
439✔
3412
                l.cfg.AuxTrafficShaper,
439✔
3413
                func(ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
439✔
3414
                        var htlcBlob fn.Option[tlv.Blob]
×
3415
                        blob, err := customRecords.Serialize()
×
3416
                        if err != nil {
×
3417
                                return fn.Err[OptionalBandwidth](
×
3418
                                        fmt.Errorf("unable to serialize "+
×
3419
                                                "custom records: %w", err))
×
3420
                        }
×
3421

3422
                        if len(blob) > 0 {
×
3423
                                htlcBlob = fn.Some(blob)
×
3424
                        }
×
3425

3426
                        return l.AuxBandwidth(amt, originalScid, htlcBlob, ts)
×
3427
                },
3428
        ).Unpack()
3429
        if err != nil {
439✔
3430
                l.log.Errorf("Unable to determine aux bandwidth: %v", err)
×
3431
                return NewLinkError(&lnwire.FailTemporaryNodeFailure{})
×
3432
        }
×
3433

3434
        auxBandwidth.WhenSome(func(bandwidth lnwire.MilliSatoshi) {
439✔
3435
                availableBandwidth = bandwidth
×
3436
        })
×
3437

3438
        // Check to see if there is enough balance in this channel.
3439
        if amt > availableBandwidth {
444✔
3440
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
5✔
3441
                        "larger than %v", amt, l.Bandwidth())
5✔
3442
                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage {
10✔
3443
                        return lnwire.NewTemporaryChannelFailure(upd)
5✔
3444
                }
5✔
3445
                failure := l.createFailureWithUpdate(false, originalScid, cb)
5✔
3446
                return NewDetailedLinkError(
5✔
3447
                        failure, OutgoingFailureInsufficientBalance,
5✔
3448
                )
5✔
3449
        }
3450

3451
        return nil
438✔
3452
}
3453

3454
// AuxBandwidth returns the bandwidth that can be used for a channel, expressed
3455
// in milli-satoshi. This might be different from the regular BTC bandwidth for
3456
// custom channels. This will always return fn.None() for a regular (non-custom)
3457
// channel.
3458
func (l *channelLink) AuxBandwidth(amount lnwire.MilliSatoshi,
3459
        cid lnwire.ShortChannelID, htlcBlob fn.Option[tlv.Blob],
3460
        ts AuxTrafficShaper) fn.Result[OptionalBandwidth] {
×
3461

×
3462
        unknownBandwidth := fn.None[lnwire.MilliSatoshi]()
×
3463

×
3464
        fundingBlob := l.FundingCustomBlob()
×
3465
        shouldHandle, err := ts.ShouldHandleTraffic(cid, fundingBlob)
×
3466
        if err != nil {
×
3467
                return fn.Err[OptionalBandwidth](fmt.Errorf("traffic shaper "+
×
3468
                        "failed to decide whether to handle traffic: %w", err))
×
3469
        }
×
3470

3471
        log.Debugf("ShortChannelID=%v: aux traffic shaper is handling "+
×
3472
                "traffic: %v", cid, shouldHandle)
×
3473

×
3474
        // If this channel isn't handled by the aux traffic shaper, we'll return
×
3475
        // early.
×
3476
        if !shouldHandle {
×
3477
                return fn.Ok(unknownBandwidth)
×
3478
        }
×
3479

3480
        // Ask for a specific bandwidth to be used for the channel.
3481
        commitmentBlob := l.CommitmentCustomBlob()
×
3482
        auxBandwidth, err := ts.PaymentBandwidth(
×
3483
                htlcBlob, commitmentBlob, l.Bandwidth(), amount,
×
3484
        )
×
3485
        if err != nil {
×
3486
                return fn.Err[OptionalBandwidth](fmt.Errorf("failed to get "+
×
3487
                        "bandwidth from external traffic shaper: %w", err))
×
3488
        }
×
3489

3490
        log.Debugf("ShortChannelID=%v: aux traffic shaper reported available "+
×
3491
                "bandwidth: %v", cid, auxBandwidth)
×
3492

×
3493
        return fn.Ok(fn.Some(auxBandwidth))
×
3494
}
3495

3496
// Stats returns the statistics of channel link.
3497
//
3498
// NOTE: Part of the ChannelLink interface.
3499
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
8✔
3500
        snapshot := l.channel.StateSnapshot()
8✔
3501

8✔
3502
        return snapshot.ChannelCommitment.CommitHeight,
8✔
3503
                snapshot.TotalMSatSent,
8✔
3504
                snapshot.TotalMSatReceived
8✔
3505
}
8✔
3506

3507
// String returns the string representation of channel link.
3508
//
3509
// NOTE: Part of the ChannelLink interface.
3510
func (l *channelLink) String() string {
×
3511
        return l.channel.ChannelPoint().String()
×
3512
}
×
3513

3514
// handleSwitchPacket handles the switch packets. This packets which might be
3515
// forwarded to us from another channel link in case the htlc update came from
3516
// another peer or if the update was created by user
3517
//
3518
// NOTE: Part of the packetHandler interface.
3519
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
483✔
3520
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
483✔
3521
                pkt.inKey(), pkt.outKey())
483✔
3522

483✔
3523
        return l.mailBox.AddPacket(pkt)
483✔
3524
}
483✔
3525

3526
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3527
// to us from remote peer we have a channel with.
3528
//
3529
// NOTE: Part of the ChannelLink interface.
3530
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
3,335✔
3531
        select {
3,335✔
3532
        case <-l.Quit:
×
3533
                // Return early if the link is already in the process of
×
3534
                // quitting. It doesn't make sense to hand the message to the
×
3535
                // mailbox here.
×
3536
                return
×
3537
        default:
3,335✔
3538
        }
3539

3540
        err := l.mailBox.AddMessage(message)
3,335✔
3541
        if err != nil {
3,335✔
3542
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3543
        }
×
3544
}
3545

3546
// updateChannelFee updates the commitment fee-per-kw on this channel by
3547
// committing to an update_fee message.
3548
func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
3✔
3549
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3550

3✔
3551
        // We skip sending the UpdateFee message if the channel is not
3✔
3552
        // currently eligible to forward messages.
3✔
3553
        if !l.eligibleToUpdate() {
3✔
3554
                l.log.Debugf("skipping fee update for inactive channel")
×
3555
                return nil
×
3556
        }
×
3557

3558
        // Check and see if our proposed fee-rate would make us exceed the fee
3559
        // threshold.
3560
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3561
        if err != nil {
3✔
3562
                // This shouldn't typically happen. If it does, it indicates
×
3563
                // something is wrong with our channel state.
×
3564
                return err
×
3565
        }
×
3566

3567
        if thresholdExceeded {
3✔
3568
                return fmt.Errorf("link fee threshold exceeded")
×
3569
        }
×
3570

3571
        // First, we'll update the local fee on our commitment.
3572
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3573
                return err
×
3574
        }
×
3575

3576
        // The fee passed the channel's validation checks, so we update the
3577
        // mailbox feerate.
3578
        l.mailBox.SetFeeRate(feePerKw)
3✔
3579

3✔
3580
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3581
        // in immediately by triggering a commitment update.
3✔
3582
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3583
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3584
                return err
×
3585
        }
×
3586
        return l.updateCommitTx()
3✔
3587
}
3588

3589
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3590
// after receiving a revocation from the remote party, and reprocesses them in
3591
// the context of the provided forwarding package. Any settles or fails that
3592
// have already been acknowledged in the forwarding package will not be sent to
3593
// the switch.
3594
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg) {
1,174✔
3595
        if len(fwdPkg.SettleFails) == 0 {
2,034✔
3596
                return
860✔
3597
        }
860✔
3598

3599
        l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
318✔
3600

318✔
3601
        var switchPackets []*htlcPacket
318✔
3602
        for i, update := range fwdPkg.SettleFails {
636✔
3603
                destRef := fwdPkg.DestRef(uint16(i))
318✔
3604

318✔
3605
                // Skip any settles or fails that have already been
318✔
3606
                // acknowledged by the incoming link that originated the
318✔
3607
                // forwarded Add.
318✔
3608
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
318✔
3609
                        continue
×
3610
                }
3611

3612
                // TODO(roasbeef): rework log entries to a shared
3613
                // interface.
3614

3615
                switch msg := update.UpdateMsg.(type) {
318✔
3616
                // A settle for an HTLC we previously forwarded HTLC has been
3617
                // received. So we'll forward the HTLC to the switch which will
3618
                // handle propagating the settle to the prior hop.
3619
                case *lnwire.UpdateFulfillHTLC:
195✔
3620
                        // If hodl.SettleIncoming is requested, we will not
195✔
3621
                        // forward the SETTLE to the switch and will not signal
195✔
3622
                        // a free slot on the commitment transaction.
195✔
3623
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
195✔
3624
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3625
                                continue
×
3626
                        }
3627

3628
                        settlePacket := &htlcPacket{
195✔
3629
                                outgoingChanID: l.ShortChanID(),
195✔
3630
                                outgoingHTLCID: msg.ID,
195✔
3631
                                destRef:        &destRef,
195✔
3632
                                htlc:           msg,
195✔
3633
                        }
195✔
3634

195✔
3635
                        // Add the packet to the batch to be forwarded, and
195✔
3636
                        // notify the overflow queue that a spare spot has been
195✔
3637
                        // freed up within the commitment state.
195✔
3638
                        switchPackets = append(switchPackets, settlePacket)
195✔
3639

3640
                // A failureCode message for a previously forwarded HTLC has
3641
                // been received. As a result a new slot will be freed up in
3642
                // our commitment state, so we'll forward this to the switch so
3643
                // the backwards undo can continue.
3644
                case *lnwire.UpdateFailHTLC:
127✔
3645
                        // If hodl.SettleIncoming is requested, we will not
127✔
3646
                        // forward the FAIL to the switch and will not signal a
127✔
3647
                        // free slot on the commitment transaction.
127✔
3648
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
127✔
3649
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3650
                                continue
×
3651
                        }
3652

3653
                        // Fetch the reason the HTLC was canceled so we can
3654
                        // continue to propagate it. This failure originated
3655
                        // from another node, so the linkFailure field is not
3656
                        // set on the packet.
3657
                        failPacket := &htlcPacket{
127✔
3658
                                outgoingChanID: l.ShortChanID(),
127✔
3659
                                outgoingHTLCID: msg.ID,
127✔
3660
                                destRef:        &destRef,
127✔
3661
                                htlc:           msg,
127✔
3662
                        }
127✔
3663

127✔
3664
                        l.log.Debugf("Failed to send HTLC with ID=%d", msg.ID)
127✔
3665

127✔
3666
                        // If the failure message lacks an HMAC (but includes
127✔
3667
                        // the 4 bytes for encoding the message and padding
127✔
3668
                        // lengths, then this means that we received it as an
127✔
3669
                        // UpdateFailMalformedHTLC. As a result, we'll signal
127✔
3670
                        // that we need to convert this error within the switch
127✔
3671
                        // to an actual error, by encrypting it as if we were
127✔
3672
                        // the originating hop.
127✔
3673
                        convertedErrorSize := lnwire.FailureMessageLength + 4
127✔
3674
                        if len(msg.Reason) == convertedErrorSize {
134✔
3675
                                failPacket.convertedError = true
7✔
3676
                        }
7✔
3677

3678
                        // Add the packet to the batch to be forwarded, and
3679
                        // notify the overflow queue that a spare spot has been
3680
                        // freed up within the commitment state.
3681
                        switchPackets = append(switchPackets, failPacket)
127✔
3682
                }
3683
        }
3684

3685
        // Only spawn the task forward packets we have a non-zero number.
3686
        if len(switchPackets) > 0 {
636✔
3687
                go l.forwardBatch(false, switchPackets...)
318✔
3688
        }
318✔
3689
}
3690

3691
// processRemoteAdds serially processes each of the Add payment descriptors
3692
// which have been "locked-in" by receiving a revocation from the remote party.
3693
// The forwarding package provided instructs how to process this batch,
3694
// indicating whether this is the first time these Adds are being processed, or
3695
// whether we are reprocessing as a result of a failure or restart. Adds that
3696
// have already been acknowledged in the forwarding package will be ignored.
3697
//
3698
//nolint:funlen
3699
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg) {
1,176✔
3700
        l.log.Tracef("processing %d remote adds for height %d",
1,176✔
3701
                len(fwdPkg.Adds), fwdPkg.Height)
1,176✔
3702

1,176✔
3703
        decodeReqs := make(
1,176✔
3704
                []hop.DecodeHopIteratorRequest, 0, len(fwdPkg.Adds),
1,176✔
3705
        )
1,176✔
3706
        for _, update := range fwdPkg.Adds {
1,629✔
3707
                if msg, ok := update.UpdateMsg.(*lnwire.UpdateAddHTLC); ok {
906✔
3708
                        // Before adding the new htlc to the state machine,
453✔
3709
                        // parse the onion object in order to obtain the
453✔
3710
                        // routing information with DecodeHopIterator function
453✔
3711
                        // which process the Sphinx packet.
453✔
3712
                        onionReader := bytes.NewReader(msg.OnionBlob[:])
453✔
3713

453✔
3714
                        req := hop.DecodeHopIteratorRequest{
453✔
3715
                                OnionReader:    onionReader,
453✔
3716
                                RHash:          msg.PaymentHash[:],
453✔
3717
                                IncomingCltv:   msg.Expiry,
453✔
3718
                                IncomingAmount: msg.Amount,
453✔
3719
                                BlindingPoint:  msg.BlindingPoint,
453✔
3720
                        }
453✔
3721

453✔
3722
                        decodeReqs = append(decodeReqs, req)
453✔
3723
                }
453✔
3724
        }
3725

3726
        // Atomically decode the incoming htlcs, simultaneously checking for
3727
        // replay attempts. A particular index in the returned, spare list of
3728
        // channel iterators should only be used if the failure code at the
3729
        // same index is lnwire.FailCodeNone.
3730
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
1,176✔
3731
                fwdPkg.ID(), decodeReqs,
1,176✔
3732
        )
1,176✔
3733
        if sphinxErr != nil {
1,176✔
3734
                l.failf(LinkFailureError{code: ErrInternalError},
×
3735
                        "unable to decode hop iterators: %v", sphinxErr)
×
3736
                return
×
3737
        }
×
3738

3739
        var switchPackets []*htlcPacket
1,176✔
3740

1,176✔
3741
        for i, update := range fwdPkg.Adds {
1,629✔
3742
                idx := uint16(i)
453✔
3743

453✔
3744
                //nolint:forcetypeassert
453✔
3745
                add := *update.UpdateMsg.(*lnwire.UpdateAddHTLC)
453✔
3746
                sourceRef := fwdPkg.SourceRef(idx)
453✔
3747

453✔
3748
                if fwdPkg.State == channeldb.FwdStateProcessed &&
453✔
3749
                        fwdPkg.AckFilter.Contains(idx) {
453✔
3750

×
3751
                        // If this index is already found in the ack filter,
×
3752
                        // the response to this forwarding decision has already
×
3753
                        // been committed by one of our commitment txns. ADDs
×
3754
                        // in this state are waiting for the rest of the fwding
×
3755
                        // package to get acked before being garbage collected.
×
3756
                        continue
×
3757
                }
3758

3759
                // An incoming HTLC add has been full-locked in. As a result we
3760
                // can now examine the forwarding details of the HTLC, and the
3761
                // HTLC itself to decide if: we should forward it, cancel it,
3762
                // or are able to settle it (and it adheres to our fee related
3763
                // constraints).
3764

3765
                // Before adding the new htlc to the state machine, parse the
3766
                // onion object in order to obtain the routing information with
3767
                // DecodeHopIterator function which process the Sphinx packet.
3768
                chanIterator, failureCode := decodeResps[i].Result()
453✔
3769
                if failureCode != lnwire.CodeNone {
459✔
3770
                        // If we're unable to process the onion blob then we
6✔
3771
                        // should send the malformed htlc error to payment
6✔
3772
                        // sender.
6✔
3773
                        l.sendMalformedHTLCError(
6✔
3774
                                add.ID, failureCode, add.OnionBlob, &sourceRef,
6✔
3775
                        )
6✔
3776

6✔
3777
                        l.log.Errorf("unable to decode onion hop "+
6✔
3778
                                "iterator: %v", failureCode)
6✔
3779
                        continue
6✔
3780
                }
3781

3782
                heightNow := l.cfg.BestHeight()
451✔
3783

451✔
3784
                pld, routeRole, pldErr := chanIterator.HopPayload()
451✔
3785
                if pldErr != nil {
455✔
3786
                        // If we're unable to process the onion payload, or we
4✔
3787
                        // received invalid onion payload failure, then we
4✔
3788
                        // should send an error back to the caller so the HTLC
4✔
3789
                        // can be canceled.
4✔
3790
                        var failedType uint64
4✔
3791

4✔
3792
                        // We need to get the underlying error value, so we
4✔
3793
                        // can't use errors.As as suggested by the linter.
4✔
3794
                        //nolint:errorlint
4✔
3795
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
4✔
3796
                                failedType = uint64(e.Type)
×
3797
                        }
×
3798

3799
                        // If we couldn't parse the payload, make our best
3800
                        // effort at creating an error encrypter that knows
3801
                        // what blinding type we were, but if we couldn't
3802
                        // parse the payload we have no way of knowing whether
3803
                        // we were the introduction node or not.
3804
                        //
3805
                        //nolint:ll
3806
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
4✔
3807
                                l.cfg.ExtractErrorEncrypter,
4✔
3808
                                // We need our route role here because we
4✔
3809
                                // couldn't parse or validate the payload.
4✔
3810
                                routeRole == hop.RouteRoleIntroduction,
4✔
3811
                        )
4✔
3812
                        if failCode != lnwire.CodeNone {
4✔
3813
                                l.log.Errorf("could not extract error "+
×
3814
                                        "encrypter: %v", pldErr)
×
3815

×
3816
                                // We can't process this htlc, send back
×
3817
                                // malformed.
×
3818
                                l.sendMalformedHTLCError(
×
3819
                                        add.ID, failureCode, add.OnionBlob,
×
3820
                                        &sourceRef,
×
3821
                                )
×
3822

×
3823
                                continue
×
3824
                        }
3825

3826
                        // TODO: currently none of the test unit infrastructure
3827
                        // is setup to handle TLV payloads, so testing this
3828
                        // would require implementing a separate mock iterator
3829
                        // for TLV payloads that also supports injecting invalid
3830
                        // payloads. Deferring this non-trival effort till a
3831
                        // later date
3832
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
4✔
3833

4✔
3834
                        l.sendHTLCError(
4✔
3835
                                add, sourceRef, NewLinkError(failure),
4✔
3836
                                obfuscator, false,
4✔
3837
                        )
4✔
3838

4✔
3839
                        l.log.Errorf("unable to decode forwarding "+
4✔
3840
                                "instructions: %v", pldErr)
4✔
3841

4✔
3842
                        continue
4✔
3843
                }
3844

3845
                // Retrieve onion obfuscator from onion blob in order to
3846
                // produce initial obfuscation of the onion failureCode.
3847
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
451✔
3848
                        l.cfg.ExtractErrorEncrypter,
451✔
3849
                        routeRole == hop.RouteRoleIntroduction,
451✔
3850
                )
451✔
3851
                if failureCode != lnwire.CodeNone {
452✔
3852
                        // If we're unable to process the onion blob than we
1✔
3853
                        // should send the malformed htlc error to payment
1✔
3854
                        // sender.
1✔
3855
                        l.sendMalformedHTLCError(
1✔
3856
                                add.ID, failureCode, add.OnionBlob,
1✔
3857
                                &sourceRef,
1✔
3858
                        )
1✔
3859

1✔
3860
                        l.log.Errorf("unable to decode onion "+
1✔
3861
                                "obfuscator: %v", failureCode)
1✔
3862

1✔
3863
                        continue
1✔
3864
                }
3865

3866
                fwdInfo := pld.ForwardingInfo()
450✔
3867

450✔
3868
                // Check whether the payload we've just processed uses our
450✔
3869
                // node as the introduction point (gave us a blinding key in
450✔
3870
                // the payload itself) and fail it back if we don't support
450✔
3871
                // route blinding.
450✔
3872
                if fwdInfo.NextBlinding.IsSome() &&
450✔
3873
                        l.cfg.DisallowRouteBlinding {
454✔
3874

4✔
3875
                        failure := lnwire.NewInvalidBlinding(
4✔
3876
                                fn.Some(add.OnionBlob),
4✔
3877
                        )
4✔
3878

4✔
3879
                        l.sendHTLCError(
4✔
3880
                                add, sourceRef, NewLinkError(failure),
4✔
3881
                                obfuscator, false,
4✔
3882
                        )
4✔
3883

4✔
3884
                        l.log.Error("rejected htlc that uses use as an " +
4✔
3885
                                "introduction point when we do not support " +
4✔
3886
                                "route blinding")
4✔
3887

4✔
3888
                        continue
4✔
3889
                }
3890

3891
                switch fwdInfo.NextHop {
450✔
3892
                case hop.Exit:
414✔
3893
                        err := l.processExitHop(
414✔
3894
                                add, sourceRef, obfuscator, fwdInfo,
414✔
3895
                                heightNow, pld,
414✔
3896
                        )
414✔
3897
                        if err != nil {
418✔
3898
                                l.failf(LinkFailureError{
4✔
3899
                                        code: ErrInternalError,
4✔
3900
                                }, err.Error()) //nolint
4✔
3901

4✔
3902
                                return
4✔
3903
                        }
4✔
3904

3905
                // There are additional channels left within this route. So
3906
                // we'll simply do some forwarding package book-keeping.
3907
                default:
40✔
3908
                        // If hodl.AddIncoming is requested, we will not
40✔
3909
                        // validate the forwarded ADD, nor will we send the
40✔
3910
                        // packet to the htlc switch.
40✔
3911
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
40✔
3912
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3913
                                continue
×
3914
                        }
3915

3916
                        endorseValue := l.experimentalEndorsement(
40✔
3917
                                record.CustomSet(add.CustomRecords),
40✔
3918
                        )
40✔
3919
                        endorseType := uint64(
40✔
3920
                                lnwire.ExperimentalEndorsementType,
40✔
3921
                        )
40✔
3922

40✔
3923
                        switch fwdPkg.State {
40✔
3924
                        case channeldb.FwdStateProcessed:
4✔
3925
                                // This add was not forwarded on the previous
4✔
3926
                                // processing phase, run it through our
4✔
3927
                                // validation pipeline to reproduce an error.
4✔
3928
                                // This may trigger a different error due to
4✔
3929
                                // expiring timelocks, but we expect that an
4✔
3930
                                // error will be reproduced.
4✔
3931
                                if !fwdPkg.FwdFilter.Contains(idx) {
4✔
3932
                                        break
×
3933
                                }
3934

3935
                                // Otherwise, it was already processed, we can
3936
                                // can collect it and continue.
3937
                                outgoingAdd := &lnwire.UpdateAddHTLC{
4✔
3938
                                        Expiry:        fwdInfo.OutgoingCTLV,
4✔
3939
                                        Amount:        fwdInfo.AmountToForward,
4✔
3940
                                        PaymentHash:   add.PaymentHash,
4✔
3941
                                        BlindingPoint: fwdInfo.NextBlinding,
4✔
3942
                                }
4✔
3943

4✔
3944
                                endorseValue.WhenSome(func(e byte) {
8✔
3945
                                        custRecords := map[uint64][]byte{
4✔
3946
                                                endorseType: {e},
4✔
3947
                                        }
4✔
3948

4✔
3949
                                        outgoingAdd.CustomRecords = custRecords
4✔
3950
                                })
4✔
3951

3952
                                // Finally, we'll encode the onion packet for
3953
                                // the _next_ hop using the hop iterator
3954
                                // decoded for the current hop.
3955
                                buf := bytes.NewBuffer(
4✔
3956
                                        outgoingAdd.OnionBlob[0:0],
4✔
3957
                                )
4✔
3958

4✔
3959
                                // We know this cannot fail, as this ADD
4✔
3960
                                // was marked forwarded in a previous
4✔
3961
                                // round of processing.
4✔
3962
                                chanIterator.EncodeNextHop(buf)
4✔
3963

4✔
3964
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
4✔
3965

4✔
3966
                                //nolint:ll
4✔
3967
                                updatePacket := &htlcPacket{
4✔
3968
                                        incomingChanID:       l.ShortChanID(),
4✔
3969
                                        incomingHTLCID:       add.ID,
4✔
3970
                                        outgoingChanID:       fwdInfo.NextHop,
4✔
3971
                                        sourceRef:            &sourceRef,
4✔
3972
                                        incomingAmount:       add.Amount,
4✔
3973
                                        amount:               outgoingAdd.Amount,
4✔
3974
                                        htlc:                 outgoingAdd,
4✔
3975
                                        obfuscator:           obfuscator,
4✔
3976
                                        incomingTimeout:      add.Expiry,
4✔
3977
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
4✔
3978
                                        inOnionCustomRecords: pld.CustomRecords(),
4✔
3979
                                        inboundFee:           inboundFee,
4✔
3980
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
4✔
3981
                                }
4✔
3982
                                switchPackets = append(
4✔
3983
                                        switchPackets, updatePacket,
4✔
3984
                                )
4✔
3985

4✔
3986
                                continue
4✔
3987
                        }
3988

3989
                        // TODO(roasbeef): ensure don't accept outrageous
3990
                        // timeout for htlc
3991

3992
                        // With all our forwarding constraints met, we'll
3993
                        // create the outgoing HTLC using the parameters as
3994
                        // specified in the forwarding info.
3995
                        addMsg := &lnwire.UpdateAddHTLC{
40✔
3996
                                Expiry:        fwdInfo.OutgoingCTLV,
40✔
3997
                                Amount:        fwdInfo.AmountToForward,
40✔
3998
                                PaymentHash:   add.PaymentHash,
40✔
3999
                                BlindingPoint: fwdInfo.NextBlinding,
40✔
4000
                        }
40✔
4001

40✔
4002
                        endorseValue.WhenSome(func(e byte) {
80✔
4003
                                addMsg.CustomRecords = map[uint64][]byte{
40✔
4004
                                        endorseType: {e},
40✔
4005
                                }
40✔
4006
                        })
40✔
4007

4008
                        // Finally, we'll encode the onion packet for the
4009
                        // _next_ hop using the hop iterator decoded for the
4010
                        // current hop.
4011
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
40✔
4012
                        err := chanIterator.EncodeNextHop(buf)
40✔
4013
                        if err != nil {
40✔
4014
                                l.log.Errorf("unable to encode the "+
×
4015
                                        "remaining route %v", err)
×
4016

×
4017
                                cb := func(upd *lnwire.ChannelUpdate1) lnwire.FailureMessage { //nolint:ll
×
4018
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
4019
                                }
×
4020

4021
                                failure := l.createFailureWithUpdate(
×
4022
                                        true, hop.Source, cb,
×
4023
                                )
×
4024

×
4025
                                l.sendHTLCError(
×
4026
                                        add, sourceRef, NewLinkError(failure),
×
4027
                                        obfuscator, false,
×
4028
                                )
×
4029
                                continue
×
4030
                        }
4031

4032
                        // Now that this add has been reprocessed, only append
4033
                        // it to our list of packets to forward to the switch
4034
                        // this is the first time processing the add. If the
4035
                        // fwd pkg has already been processed, then we entered
4036
                        // the above section to recreate a previous error.  If
4037
                        // the packet had previously been forwarded, it would
4038
                        // have been added to switchPackets at the top of this
4039
                        // section.
4040
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
80✔
4041
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
40✔
4042

40✔
4043
                                //nolint:ll
40✔
4044
                                updatePacket := &htlcPacket{
40✔
4045
                                        incomingChanID:       l.ShortChanID(),
40✔
4046
                                        incomingHTLCID:       add.ID,
40✔
4047
                                        outgoingChanID:       fwdInfo.NextHop,
40✔
4048
                                        sourceRef:            &sourceRef,
40✔
4049
                                        incomingAmount:       add.Amount,
40✔
4050
                                        amount:               addMsg.Amount,
40✔
4051
                                        htlc:                 addMsg,
40✔
4052
                                        obfuscator:           obfuscator,
40✔
4053
                                        incomingTimeout:      add.Expiry,
40✔
4054
                                        outgoingTimeout:      fwdInfo.OutgoingCTLV,
40✔
4055
                                        inOnionCustomRecords: pld.CustomRecords(),
40✔
4056
                                        inboundFee:           inboundFee,
40✔
4057
                                        inWireCustomRecords:  add.CustomRecords.Copy(),
40✔
4058
                                }
40✔
4059

40✔
4060
                                fwdPkg.FwdFilter.Set(idx)
40✔
4061
                                switchPackets = append(switchPackets,
40✔
4062
                                        updatePacket)
40✔
4063
                        }
40✔
4064
                }
4065
        }
4066

4067
        // Commit the htlcs we are intending to forward if this package has not
4068
        // been fully processed.
4069
        if fwdPkg.State == channeldb.FwdStateLockedIn {
2,349✔
4070
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
1,173✔
4071
                if err != nil {
1,173✔
4072
                        l.failf(LinkFailureError{code: ErrInternalError},
×
4073
                                "unable to set fwd filter: %v", err)
×
4074
                        return
×
4075
                }
×
4076
        }
4077

4078
        if len(switchPackets) == 0 {
2,316✔
4079
                return
1,140✔
4080
        }
1,140✔
4081

4082
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
40✔
4083

40✔
4084
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
40✔
4085
                len(switchPackets), replay)
40✔
4086

40✔
4087
        // NOTE: This call is made synchronous so that we ensure all circuits
40✔
4088
        // are committed in the exact order that they are processed in the link.
40✔
4089
        // Failing to do this could cause reorderings/gaps in the range of
40✔
4090
        // opened circuits, which violates assumptions made by the circuit
40✔
4091
        // trimming.
40✔
4092
        l.forwardBatch(replay, switchPackets...)
40✔
4093
}
4094

4095
// experimentalEndorsement returns the value to set for our outgoing
4096
// experimental endorsement field, and a boolean indicating whether it should
4097
// be populated on the outgoing htlc.
4098
func (l *channelLink) experimentalEndorsement(
4099
        customUpdateAdd record.CustomSet) fn.Option[byte] {
40✔
4100

40✔
4101
        // Only relay experimental signal if we are within the experiment
40✔
4102
        // period.
40✔
4103
        if !l.cfg.ShouldFwdExpEndorsement() {
44✔
4104
                return fn.None[byte]()
4✔
4105
        }
4✔
4106

4107
        // If we don't have any custom records or the experimental field is
4108
        // not set, just forward a zero value.
4109
        if len(customUpdateAdd) == 0 {
80✔
4110
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
40✔
4111
        }
40✔
4112

4113
        t := uint64(lnwire.ExperimentalEndorsementType)
4✔
4114
        value, set := customUpdateAdd[t]
4✔
4115
        if !set {
4✔
4116
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4117
        }
×
4118

4119
        // We expect at least one byte for this field, consider it invalid if
4120
        // it has no data and just forward a zero value.
4121
        if len(value) == 0 {
4✔
4122
                return fn.Some[byte](lnwire.ExperimentalUnendorsed)
×
4123
        }
×
4124

4125
        // Only forward endorsed if the incoming link is endorsed.
4126
        if value[0] == lnwire.ExperimentalEndorsed {
8✔
4127
                return fn.Some[byte](lnwire.ExperimentalEndorsed)
4✔
4128
        }
4✔
4129

4130
        // Forward as unendorsed otherwise, including cases where we've
4131
        // received an invalid value that uses more than 3 bits of information.
4132
        return fn.Some[byte](lnwire.ExperimentalUnendorsed)
4✔
4133
}
4134

4135
// processExitHop handles an htlc for which this link is the exit hop. It
4136
// returns a boolean indicating whether the commitment tx needs an update.
4137
func (l *channelLink) processExitHop(add lnwire.UpdateAddHTLC,
4138
        sourceRef channeldb.AddRef, obfuscator hop.ErrorEncrypter,
4139
        fwdInfo hop.ForwardingInfo, heightNow uint32,
4140
        payload invoices.Payload) error {
414✔
4141

414✔
4142
        // If hodl.ExitSettle is requested, we will not validate the final hop's
414✔
4143
        // ADD, nor will we settle the corresponding invoice or respond with the
414✔
4144
        // preimage.
414✔
4145
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
525✔
4146
                l.log.Warnf("%s for htlc(rhash=%x,htlcIndex=%v)",
111✔
4147
                        hodl.ExitSettle.Warning(), add.PaymentHash, add.ID)
111✔
4148

111✔
4149
                return nil
111✔
4150
        }
111✔
4151

4152
        // As we're the exit hop, we'll double check the hop-payload included in
4153
        // the HTLC to ensure that it was crafted correctly by the sender and
4154
        // is compatible with the HTLC we were extended.
4155
        //
4156
        // For a special case, if the fwdInfo doesn't have any blinded path
4157
        // information, and the incoming HTLC had special extra data, then
4158
        // we'll skip this amount check. The invoice acceptor will make sure we
4159
        // reject the HTLC if it's not containing the correct amount after
4160
        // examining the custom data.
4161
        hasBlindedPath := fwdInfo.NextBlinding.IsSome()
307✔
4162
        customHTLC := len(add.CustomRecords) > 0 && !hasBlindedPath
307✔
4163
        log.Tracef("Exit hop has_blinded_path=%v custom_htlc_bypass=%v",
307✔
4164
                hasBlindedPath, customHTLC)
307✔
4165

307✔
4166
        if !customHTLC && add.Amount < fwdInfo.AmountToForward {
407✔
4167
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
4168
                        "incompatible value: expected <=%v, got %v",
100✔
4169
                        add.PaymentHash, add.Amount, fwdInfo.AmountToForward)
100✔
4170

100✔
4171
                failure := NewLinkError(
100✔
4172
                        lnwire.NewFinalIncorrectHtlcAmount(add.Amount),
100✔
4173
                )
100✔
4174
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
100✔
4175

100✔
4176
                return nil
100✔
4177
        }
100✔
4178

4179
        // We'll also ensure that our time-lock value has been computed
4180
        // correctly.
4181
        if add.Expiry < fwdInfo.OutgoingCTLV {
208✔
4182
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
4183
                        "incompatible time-lock: expected <=%v, got %v",
1✔
4184
                        add.PaymentHash, add.Expiry, fwdInfo.OutgoingCTLV)
1✔
4185

1✔
4186
                failure := NewLinkError(
1✔
4187
                        lnwire.NewFinalIncorrectCltvExpiry(add.Expiry),
1✔
4188
                )
1✔
4189

1✔
4190
                l.sendHTLCError(add, sourceRef, failure, obfuscator, true)
1✔
4191

1✔
4192
                return nil
1✔
4193
        }
1✔
4194

4195
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
4196
        // after this, this code will be re-executed after restart. We will
4197
        // receive back a resolution event.
4198
        invoiceHash := lntypes.Hash(add.PaymentHash)
206✔
4199

206✔
4200
        circuitKey := models.CircuitKey{
206✔
4201
                ChanID: l.ShortChanID(),
206✔
4202
                HtlcID: add.ID,
206✔
4203
        }
206✔
4204

206✔
4205
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
206✔
4206
                invoiceHash, add.Amount, add.Expiry, int32(heightNow),
206✔
4207
                circuitKey, l.hodlQueue.ChanIn(), add.CustomRecords, payload,
206✔
4208
        )
206✔
4209
        if err != nil {
210✔
4210
                return err
4✔
4211
        }
4✔
4212

4213
        // Create a hodlHtlc struct and decide either resolved now or later.
4214
        htlc := hodlHtlc{
206✔
4215
                add:        add,
206✔
4216
                sourceRef:  sourceRef,
206✔
4217
                obfuscator: obfuscator,
206✔
4218
        }
206✔
4219

206✔
4220
        // If the event is nil, the invoice is being held, so we save payment
206✔
4221
        // descriptor for future reference.
206✔
4222
        if event == nil {
266✔
4223
                l.hodlMap[circuitKey] = htlc
60✔
4224
                return nil
60✔
4225
        }
60✔
4226

4227
        // Process the received resolution.
4228
        return l.processHtlcResolution(event, htlc)
150✔
4229
}
4230

4231
// settleHTLC settles the HTLC on the channel.
4232
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
4233
        htlcIndex uint64, sourceRef channeldb.AddRef) error {
201✔
4234

201✔
4235
        hash := preimage.Hash()
201✔
4236

201✔
4237
        l.log.Infof("settling htlc %v as exit hop", hash)
201✔
4238

201✔
4239
        err := l.channel.SettleHTLC(
201✔
4240
                preimage, htlcIndex, &sourceRef, nil, nil,
201✔
4241
        )
201✔
4242
        if err != nil {
201✔
4243
                return fmt.Errorf("unable to settle htlc: %w", err)
×
4244
        }
×
4245

4246
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
4247
        // fake one before sending it to the peer.
4248
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
205✔
4249
                l.log.Warnf(hodl.BogusSettle.Warning())
4✔
4250
                preimage = [32]byte{}
4✔
4251
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
4✔
4252
        }
4✔
4253

4254
        // HTLC was successfully settled locally send notification about it
4255
        // remote peer.
4256
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
201✔
4257
                ChanID:          l.ChanID(),
201✔
4258
                ID:              htlcIndex,
201✔
4259
                PaymentPreimage: preimage,
201✔
4260
        })
201✔
4261

201✔
4262
        // Once we have successfully settled the htlc, notify a settle event.
201✔
4263
        l.cfg.HtlcNotifier.NotifySettleEvent(
201✔
4264
                HtlcKey{
201✔
4265
                        IncomingCircuit: models.CircuitKey{
201✔
4266
                                ChanID: l.ShortChanID(),
201✔
4267
                                HtlcID: htlcIndex,
201✔
4268
                        },
201✔
4269
                },
201✔
4270
                preimage,
201✔
4271
                HtlcEventTypeReceive,
201✔
4272
        )
201✔
4273

201✔
4274
        return nil
201✔
4275
}
4276

4277
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
4278
// err chan for the individual responses. This method is intended to be spawned
4279
// as a goroutine so the responses can be handled in the background.
4280
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
579✔
4281
        // Don't forward packets for which we already have a response in our
579✔
4282
        // mailbox. This could happen if a packet fails and is buffered in the
579✔
4283
        // mailbox, and the incoming link flaps.
579✔
4284
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
579✔
4285
        for _, pkt := range packets {
1,158✔
4286
                if l.mailBox.HasPacket(pkt.inKey()) {
583✔
4287
                        continue
4✔
4288
                }
4289

4290
                filteredPkts = append(filteredPkts, pkt)
579✔
4291
        }
4292

4293
        err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...)
579✔
4294
        if err != nil {
590✔
4295
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
4296
                        "settle/fail over htlcswitch: %v", err)
11✔
4297
        }
11✔
4298
}
4299

4300
// sendHTLCError functions cancels HTLC and send cancel message back to the
4301
// peer from which HTLC was received.
4302
func (l *channelLink) sendHTLCError(add lnwire.UpdateAddHTLC,
4303
        sourceRef channeldb.AddRef, failure *LinkError,
4304
        e hop.ErrorEncrypter, isReceive bool) {
109✔
4305

109✔
4306
        reason, err := e.EncryptFirstHop(failure.WireMessage())
109✔
4307
        if err != nil {
109✔
4308
                l.log.Errorf("unable to obfuscate error: %v", err)
×
4309
                return
×
4310
        }
×
4311

4312
        err = l.channel.FailHTLC(add.ID, reason, &sourceRef, nil, nil)
109✔
4313
        if err != nil {
109✔
4314
                l.log.Errorf("unable cancel htlc: %v", err)
×
4315
                return
×
4316
        }
×
4317

4318
        // Send the appropriate failure message depending on whether we're
4319
        // in a blinded route or not.
4320
        if err := l.sendIncomingHTLCFailureMsg(
109✔
4321
                add.ID, e, reason,
109✔
4322
        ); err != nil {
109✔
4323
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
4324
                return
×
4325
        }
×
4326

4327
        // Notify a link failure on our incoming link. Outgoing htlc information
4328
        // is not available at this point, because we have not decrypted the
4329
        // onion, so it is excluded.
4330
        var eventType HtlcEventType
109✔
4331
        if isReceive {
218✔
4332
                eventType = HtlcEventTypeReceive
109✔
4333
        } else {
113✔
4334
                eventType = HtlcEventTypeForward
4✔
4335
        }
4✔
4336

4337
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
109✔
4338
                HtlcKey{
109✔
4339
                        IncomingCircuit: models.CircuitKey{
109✔
4340
                                ChanID: l.ShortChanID(),
109✔
4341
                                HtlcID: add.ID,
109✔
4342
                        },
109✔
4343
                },
109✔
4344
                HtlcInfo{
109✔
4345
                        IncomingTimeLock: add.Expiry,
109✔
4346
                        IncomingAmt:      add.Amount,
109✔
4347
                },
109✔
4348
                eventType,
109✔
4349
                failure,
109✔
4350
                true,
109✔
4351
        )
109✔
4352
}
4353

4354
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
4355
// peer from which the HTLC was received. This function is primarily used to
4356
// handle the special requirements of route blinding, specifically:
4357
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
4358
// - Introduction nodes should return regular HTLC failure messages.
4359
//
4360
// It accepts the original opaque failure, which will be used in the case
4361
// that we're not part of a blinded route and an error encrypter that'll be
4362
// used if we are the introduction node and need to present an error as if
4363
// we're the failing party.
4364
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
4365
        e hop.ErrorEncrypter,
4366
        originalFailure lnwire.OpaqueReason) error {
125✔
4367

125✔
4368
        var msg lnwire.Message
125✔
4369
        switch {
125✔
4370
        // Our circuit's error encrypter will be nil if this was a locally
4371
        // initiated payment. We can only hit a blinded error for a locally
4372
        // initiated payment if we allow ourselves to be picked as the
4373
        // introduction node for our own payments and in that case we
4374
        // shouldn't reach this code. To prevent the HTLC getting stuck,
4375
        // we fail it back and log an error.
4376
        // code.
4377
        case e == nil:
×
4378
                msg = &lnwire.UpdateFailHTLC{
×
4379
                        ChanID: l.ChanID(),
×
4380
                        ID:     htlcIndex,
×
4381
                        Reason: originalFailure,
×
4382
                }
×
4383

×
4384
                l.log.Errorf("Unexpected blinded failure when "+
×
4385
                        "we are the sending node, incoming htlc: %v(%v)",
×
4386
                        l.ShortChanID(), htlcIndex)
×
4387

4388
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4389
        // transformation on the error message and can just send the original.
4390
        case !e.Type().IsBlinded():
125✔
4391
                msg = &lnwire.UpdateFailHTLC{
125✔
4392
                        ChanID: l.ChanID(),
125✔
4393
                        ID:     htlcIndex,
125✔
4394
                        Reason: originalFailure,
125✔
4395
                }
125✔
4396

4397
        // When we're the introduction node, we need to convert the error to
4398
        // a UpdateFailHTLC.
4399
        case e.Type() == hop.EncrypterTypeIntroduction:
4✔
4400
                l.log.Debugf("Introduction blinded node switching out failure "+
4✔
4401
                        "error: %v", htlcIndex)
4✔
4402

4✔
4403
                // The specification does not require that we set the onion
4✔
4404
                // blob.
4✔
4405
                failureMsg := lnwire.NewInvalidBlinding(
4✔
4406
                        fn.None[[lnwire.OnionPacketSize]byte](),
4✔
4407
                )
4✔
4408
                reason, err := e.EncryptFirstHop(failureMsg)
4✔
4409
                if err != nil {
4✔
4410
                        return err
×
4411
                }
×
4412

4413
                msg = &lnwire.UpdateFailHTLC{
4✔
4414
                        ChanID: l.ChanID(),
4✔
4415
                        ID:     htlcIndex,
4✔
4416
                        Reason: reason,
4✔
4417
                }
4✔
4418

4419
        // If we are a relaying node, we need to switch out any error that
4420
        // we've received to a malformed HTLC error.
4421
        case e.Type() == hop.EncrypterTypeRelaying:
4✔
4422
                l.log.Debugf("Relaying blinded node switching out malformed "+
4✔
4423
                        "error: %v", htlcIndex)
4✔
4424

4✔
4425
                msg = &lnwire.UpdateFailMalformedHTLC{
4✔
4426
                        ChanID:      l.ChanID(),
4✔
4427
                        ID:          htlcIndex,
4✔
4428
                        FailureCode: lnwire.CodeInvalidBlinding,
4✔
4429
                }
4✔
4430

4431
        default:
×
4432
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4433
        }
4434

4435
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
125✔
4436
                l.log.Warnf("Send update fail failed: %v", err)
×
4437
        }
×
4438

4439
        return nil
125✔
4440
}
4441

4442
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4443
// to the payment sender.
4444
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4445
        code lnwire.FailCode, onionBlob [lnwire.OnionPacketSize]byte,
4446
        sourceRef *channeldb.AddRef) {
7✔
4447

7✔
4448
        shaOnionBlob := sha256.Sum256(onionBlob[:])
7✔
4449
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
7✔
4450
        if err != nil {
7✔
4451
                l.log.Errorf("unable cancel htlc: %v", err)
×
4452
                return
×
4453
        }
×
4454

4455
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
7✔
4456
                ChanID:       l.ChanID(),
7✔
4457
                ID:           htlcIndex,
7✔
4458
                ShaOnionBlob: shaOnionBlob,
7✔
4459
                FailureCode:  code,
7✔
4460
        })
7✔
4461
}
4462

4463
// failf is a function which is used to encapsulate the action necessary for
4464
// properly failing the link. It takes a LinkFailureError, which will be passed
4465
// to the OnChannelFailure closure, in order for it to determine if we should
4466
// force close the channel, and if we should send an error message to the
4467
// remote peer.
4468
func (l *channelLink) failf(linkErr LinkFailureError, format string,
4469
        a ...interface{}) {
19✔
4470

19✔
4471
        reason := fmt.Errorf(format, a...)
19✔
4472

19✔
4473
        // Return if we have already notified about a failure.
19✔
4474
        if l.failed {
23✔
4475
                l.log.Warnf("ignoring link failure (%v), as link already "+
4✔
4476
                        "failed", reason)
4✔
4477
                return
4✔
4478
        }
4✔
4479

4480
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
19✔
4481

19✔
4482
        // Set failed, such that we won't process any more updates, and notify
19✔
4483
        // the peer about the failure.
19✔
4484
        l.failed = true
19✔
4485
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
19✔
4486
}
4487

4488
// FundingCustomBlob returns the custom funding blob of the channel that this
4489
// link is associated with. The funding blob represents static information about
4490
// the channel that was created at channel funding time.
4491
func (l *channelLink) FundingCustomBlob() fn.Option[tlv.Blob] {
×
4492
        if l.channel == nil {
×
4493
                return fn.None[tlv.Blob]()
×
4494
        }
×
4495

4496
        if l.channel.State() == nil {
×
4497
                return fn.None[tlv.Blob]()
×
4498
        }
×
4499

4500
        return l.channel.State().CustomBlob
×
4501
}
4502

4503
// CommitmentCustomBlob returns the custom blob of the current local commitment
4504
// of the channel that this link is associated with.
4505
func (l *channelLink) CommitmentCustomBlob() fn.Option[tlv.Blob] {
×
4506
        if l.channel == nil {
×
4507
                return fn.None[tlv.Blob]()
×
4508
        }
×
4509

4510
        return l.channel.LocalCommitmentBlob()
×
4511
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc