• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10204896993

01 Aug 2024 07:57PM UTC coverage: 58.591% (-0.08%) from 58.674%
10204896993

push

github

web-flow
Merge pull request #8962 from ProofOfKeags/refactor/quiescence-micro-spinoffs

[NANO]: Refactor/quiescence micro spinoffs

3 of 4 new or added lines in 2 files covered. (75.0%)

242 existing lines in 26 files now uncovered.

125214 of 213710 relevant lines covered (58.59%)

28092.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.18
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "crypto/sha256"
7
        "errors"
8
        "fmt"
9
        prand "math/rand"
10
        "sync"
11
        "sync/atomic"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/btcsuite/btclog"
17
        "github.com/lightningnetwork/lnd/build"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/channeldb/models"
20
        "github.com/lightningnetwork/lnd/contractcourt"
21
        "github.com/lightningnetwork/lnd/fn"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/input"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnutils"
29
        "github.com/lightningnetwork/lnd/lnwallet"
30
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
31
        "github.com/lightningnetwork/lnd/lnwire"
32
        "github.com/lightningnetwork/lnd/queue"
33
        "github.com/lightningnetwork/lnd/ticker"
34
)
35

36
func init() {
14✔
37
        prand.Seed(time.Now().UnixNano())
14✔
38
}
14✔
39

40
const (
41
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
42
        // the node accepts for forwarded payments. The value is relative to the
43
        // current block height. The reason to have a maximum is to prevent
44
        // funds getting locked up unreasonably long. Otherwise, an attacker
45
        // willing to lock its own funds too, could force the funds of this node
46
        // to be locked up for an indefinite (max int32) number of blocks.
47
        //
48
        // The value 2016 corresponds to on average two weeks worth of blocks
49
        // and is based on the maximum number of hops (20), the default CLTV
50
        // delta (40), and some extra margin to account for the other lightning
51
        // implementations and past lnd versions which used to have a default
52
        // CLTV delta of 144.
53
        DefaultMaxOutgoingCltvExpiry = 2016
54

55
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
56
        // which a link should propose to update its commitment fee rate.
57
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
58

59
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
60
        // which a link should propose to update its commitment fee rate.
61
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
62

63
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
64
        // a channel's commitment fee to be of its balance. This only applies to
65
        // the initiator of the channel.
66
        DefaultMaxLinkFeeAllocation float64 = 0.5
67
)
68

69
// ExpectedFee computes the expected fee for a given htlc amount. The value
70
// returned from this function is to be used as a sanity check when forwarding
71
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
72
// forwarding policy.
73
//
74
// TODO(roasbeef): also add in current available channel bandwidth, inverse
75
// func
76
func ExpectedFee(f models.ForwardingPolicy,
77
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
81✔
78

81✔
79
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
81✔
80
}
81✔
81

82
// ChannelLinkConfig defines the configuration for the channel link. ALL
83
// elements within the configuration MUST be non-nil for channel link to carry
84
// out its duties.
85
type ChannelLinkConfig struct {
86
        // FwrdingPolicy is the initial forwarding policy to be used when
87
        // deciding whether to forwarding incoming HTLC's or not. This value
88
        // can be updated with subsequent calls to UpdateForwardingPolicy
89
        // targeted at a given ChannelLink concrete interface implementation.
90
        FwrdingPolicy models.ForwardingPolicy
91

92
        // Circuits provides restricted access to the switch's circuit map,
93
        // allowing the link to open and close circuits.
94
        Circuits CircuitModifier
95

96
        // BestHeight returns the best known height.
97
        BestHeight func() uint32
98

99
        // ForwardPackets attempts to forward the batch of htlcs through the
100
        // switch. The function returns and error in case it fails to send one or
101
        // more packets. The link's quit signal should be provided to allow
102
        // cancellation of forwarding during link shutdown.
103
        ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error
104

105
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
106
        // blobs, which are then used to inform how to forward an HTLC.
107
        //
108
        // NOTE: This function assumes the same set of readers and preimages
109
        // are always presented for the same identifier.
110
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
111
                []hop.DecodeHopIteratorResponse, error)
112

113
        // ExtractErrorEncrypter function is responsible for decoding HTLC
114
        // Sphinx onion blob, and creating onion failure obfuscator.
115
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
116

117
        // FetchLastChannelUpdate retrieves the latest routing policy for a
118
        // target channel. This channel will typically be the outgoing channel
119
        // specified when we receive an incoming HTLC.  This will be used to
120
        // provide payment senders our latest policy when sending encrypted
121
        // error messages.
122
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
123

124
        // Peer is a lightning network node with which we have the channel link
125
        // opened.
126
        Peer lnpeer.Peer
127

128
        // Registry is a sub-system which responsible for managing the invoices
129
        // in thread-safe manner.
130
        Registry InvoiceDatabase
131

132
        // PreimageCache is a global witness beacon that houses any new
133
        // preimages discovered by other links. We'll use this to add new
134
        // witnesses that we discover which will notify any sub-systems
135
        // subscribed to new events.
136
        PreimageCache contractcourt.WitnessBeacon
137

138
        // OnChannelFailure is a function closure that we'll call if the
139
        // channel failed for some reason. Depending on the severity of the
140
        // error, the closure potentially must force close this channel and
141
        // disconnect the peer.
142
        //
143
        // NOTE: The method must return in order for the ChannelLink to be able
144
        // to shut down properly.
145
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
146
                LinkFailureError)
147

148
        // UpdateContractSignals is a function closure that we'll use to update
149
        // outside sub-systems with this channel's latest ShortChannelID.
150
        UpdateContractSignals func(*contractcourt.ContractSignals) error
151

152
        // NotifyContractUpdate is a function closure that we'll use to update
153
        // the contractcourt and more specifically the ChannelArbitrator of the
154
        // latest channel state.
155
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
156

157
        // ChainEvents is an active subscription to the chain watcher for this
158
        // channel to be notified of any on-chain activity related to this
159
        // channel.
160
        ChainEvents *contractcourt.ChainEventSubscription
161

162
        // FeeEstimator is an instance of a live fee estimator which will be
163
        // used to dynamically regulate the current fee of the commitment
164
        // transaction to ensure timely confirmation.
165
        FeeEstimator chainfee.Estimator
166

167
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
168
        // for HTLC forwarding internal to the switch.
169
        //
170
        // NOTE: This should only be used for testing.
171
        HodlMask hodl.Mask
172

173
        // SyncStates is used to indicate that we need send the channel
174
        // reestablishment message to the remote peer. It should be done if our
175
        // clients have been restarted, or remote peer have been reconnected.
176
        SyncStates bool
177

178
        // BatchTicker is the ticker that determines the interval that we'll
179
        // use to check the batch to see if there're any updates we should
180
        // flush out. By batching updates into a single commit, we attempt to
181
        // increase throughput by maximizing the number of updates coalesced
182
        // into a single commit.
183
        BatchTicker ticker.Ticker
184

185
        // FwdPkgGCTicker is the ticker determining the frequency at which
186
        // garbage collection of forwarding packages occurs. We use a
187
        // time-based approach, as opposed to block epochs, as to not hinder
188
        // syncing.
189
        FwdPkgGCTicker ticker.Ticker
190

191
        // PendingCommitTicker is a ticker that allows the link to determine if
192
        // a locally initiated commitment dance gets stuck waiting for the
193
        // remote party to revoke.
194
        PendingCommitTicker ticker.Ticker
195

196
        // BatchSize is the max size of a batch of updates done to the link
197
        // before we do a state update.
198
        BatchSize uint32
199

200
        // UnsafeReplay will cause a link to replay the adds in its latest
201
        // commitment txn after the link is restarted. This should only be used
202
        // in testing, it is here to ensure the sphinx replay detection on the
203
        // receiving node is persistent.
204
        UnsafeReplay bool
205

206
        // MinUpdateTimeout represents the minimum interval in which a link
207
        // will propose to update its commitment fee rate. A random timeout will
208
        // be selected between this and MaxUpdateTimeout.
209
        MinUpdateTimeout time.Duration
210

211
        // MaxUpdateTimeout represents the maximum interval in which a link
212
        // will propose to update its commitment fee rate. A random timeout will
213
        // be selected between this and MinUpdateTimeout.
214
        MaxUpdateTimeout time.Duration
215

216
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
217
        // an htlc where we don't offer an htlc anymore. This should be at least
218
        // the outgoing broadcast delta, because in any case we don't want to
219
        // risk offering an htlc that triggers channel closure.
220
        OutgoingCltvRejectDelta uint32
221

222
        // TowerClient is an optional engine that manages the signing,
223
        // encrypting, and uploading of justice transactions to the daemon's
224
        // configured set of watchtowers for legacy channels.
225
        TowerClient TowerClient
226

227
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
228
        // should accept for a forwarded HTLC. The value is relative to the
229
        // current block height.
230
        MaxOutgoingCltvExpiry uint32
231

232
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
233
        // commitment fee to be of its balance. This only applies to the
234
        // initiator of the channel.
235
        MaxFeeAllocation float64
236

237
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
238
        // the initiator for channels of the anchor type.
239
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
240

241
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
242
        // link is first started.
243
        NotifyActiveLink func(wire.OutPoint)
244

245
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
246
        // channels becomes active.
247
        NotifyActiveChannel func(wire.OutPoint)
248

249
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
250
        // when channels become inactive.
251
        NotifyInactiveChannel func(wire.OutPoint)
252

253
        // NotifyInactiveLinkEvent allows the switch to tell the
254
        // ChannelNotifier when a channel link become inactive.
255
        NotifyInactiveLinkEvent func(wire.OutPoint)
256

257
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
258
        // events through.
259
        HtlcNotifier htlcNotifier
260

261
        // FailAliasUpdate is a function used to fail an HTLC for an
262
        // option_scid_alias channel.
263
        FailAliasUpdate func(sid lnwire.ShortChannelID,
264
                incoming bool) *lnwire.ChannelUpdate
265

266
        // GetAliases is used by the link and switch to fetch the set of
267
        // aliases for a given link.
268
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
269

270
        // PreviouslySentShutdown is an optional value that is set if, at the
271
        // time of the link being started, persisted shutdown info was found for
272
        // the channel. This value being set means that we previously sent a
273
        // Shutdown message to our peer, and so we should do so again on
274
        // re-establish and should not allow anymore HTLC adds on the outgoing
275
        // direction of the link.
276
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
277

278
        // Adds the option to disable forwarding payments in blinded routes
279
        // by failing back any blinding-related payloads as if they were
280
        // invalid.
281
        DisallowRouteBlinding bool
282

283
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
284
        // restrict the flow of HTLCs and fee updates.
285
        MaxFeeExposure lnwire.MilliSatoshi
286
}
287

288
// channelLink is the service which drives a channel's commitment update
289
// state-machine. In the event that an HTLC needs to be propagated to another
290
// link, the forward handler from config is used which sends HTLC to the
291
// switch. Additionally, the link encapsulate logic of commitment protocol
292
// message ordering and updates.
293
type channelLink struct {
294
        // The following fields are only meant to be used *atomically*
295
        started       int32
296
        reestablished int32
297
        shutdown      int32
298

299
        // failed should be set to true in case a link error happens, making
300
        // sure we don't process any more updates.
301
        failed bool
302

303
        // keystoneBatch represents a volatile list of keystones that must be
304
        // written before attempting to sign the next commitment txn. These
305
        // represent all the HTLC's forwarded to the link from the switch. Once
306
        // we lock them into our outgoing commitment, then the circuit has a
307
        // keystone, and is fully opened.
308
        keystoneBatch []Keystone
309

310
        // openedCircuits is the set of all payment circuits that will be open
311
        // once we make our next commitment. After making the commitment we'll
312
        // ACK all these from our mailbox to ensure that they don't get
313
        // re-delivered if we reconnect.
314
        openedCircuits []CircuitKey
315

316
        // closedCircuits is the set of all payment circuits that will be
317
        // closed once we make our next commitment. After taking the commitment
318
        // we'll ACK all these to ensure that they don't get re-delivered if we
319
        // reconnect.
320
        closedCircuits []CircuitKey
321

322
        // channel is a lightning network channel to which we apply htlc
323
        // updates.
324
        channel *lnwallet.LightningChannel
325

326
        // cfg is a structure which carries all dependable fields/handlers
327
        // which may affect behaviour of the service.
328
        cfg ChannelLinkConfig
329

330
        // mailBox is the main interface between the outside world and the
331
        // link. All incoming messages will be sent over this mailBox. Messages
332
        // include new updates from our connected peer, and new packets to be
333
        // forwarded sent by the switch.
334
        mailBox MailBox
335

336
        // upstream is a channel that new messages sent from the remote peer to
337
        // the local peer will be sent across.
338
        upstream chan lnwire.Message
339

340
        // downstream is a channel in which new multi-hop HTLC's to be
341
        // forwarded will be sent across. Messages from this channel are sent
342
        // by the HTLC switch.
343
        downstream chan *htlcPacket
344

345
        // updateFeeTimer is the timer responsible for updating the link's
346
        // commitment fee every time it fires.
347
        updateFeeTimer *time.Timer
348

349
        // uncommittedPreimages stores a list of all preimages that have been
350
        // learned since receiving the last CommitSig from the remote peer. The
351
        // batch will be flushed just before accepting the subsequent CommitSig
352
        // or on shutdown to avoid doing a write for each preimage received.
353
        uncommittedPreimages []lntypes.Preimage
354

355
        sync.RWMutex
356

357
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
358
        // registry.
359
        hodlQueue *queue.ConcurrentQueue
360

361
        // hodlMap stores related htlc data for a circuit key. It allows
362
        // resolving those htlcs when we receive a message on hodlQueue.
363
        hodlMap map[models.CircuitKey]hodlHtlc
364

365
        // log is a link-specific logging instance.
366
        log btclog.Logger
367

368
        // isOutgoingAddBlocked tracks whether the channelLink can send an
369
        // UpdateAddHTLC.
370
        isOutgoingAddBlocked atomic.Bool
371

372
        // isIncomingAddBlocked tracks whether the channelLink can receive an
373
        // UpdateAddHTLC.
374
        isIncomingAddBlocked atomic.Bool
375

376
        // flushHooks is a hookMap that is triggered when we reach a channel
377
        // state with no live HTLCs.
378
        flushHooks hookMap
379

380
        // outgoingCommitHooks is a hookMap that is triggered after we send our
381
        // next CommitSig.
382
        outgoingCommitHooks hookMap
383

384
        // incomingCommitHooks is a hookMap that is triggered after we receive
385
        // our next CommitSig.
386
        incomingCommitHooks hookMap
387

388
        wg   sync.WaitGroup
389
        quit chan struct{}
390
}
391

392
// hookMap is a data structure that is used to track the hooks that need to be
393
// called in various parts of the channelLink's lifecycle.
394
//
395
// WARNING: NOT thread-safe.
396
type hookMap struct {
397
        // allocIdx keeps track of the next id we haven't yet allocated.
398
        allocIdx atomic.Uint64
399

400
        // transient is a map of hooks that are only called the next time invoke
401
        // is called. These hooks are deleted during invoke.
402
        transient map[uint64]func()
403

404
        // newTransients is a channel that we use to accept new hooks into the
405
        // hookMap.
406
        newTransients chan func()
407
}
408

409
// newHookMap initializes a new empty hookMap.
410
func newHookMap() hookMap {
642✔
411
        return hookMap{
642✔
412
                allocIdx:      atomic.Uint64{},
642✔
413
                transient:     make(map[uint64]func()),
642✔
414
                newTransients: make(chan func()),
642✔
415
        }
642✔
416
}
642✔
417

418
// alloc allocates space in the hook map for the supplied hook, the second
419
// argument determines whether it goes into the transient or persistent part
420
// of the hookMap.
421
func (m *hookMap) alloc(hook func()) uint64 {
5✔
422
        // We assume we never overflow a uint64. Seems OK.
5✔
423
        hookID := m.allocIdx.Add(1)
5✔
424
        if hookID == 0 {
5✔
425
                panic("hookMap allocIdx overflow")
×
426
        }
427
        m.transient[hookID] = hook
5✔
428

5✔
429
        return hookID
5✔
430
}
431

432
// invoke is used on a hook map to call all the registered hooks and then clear
433
// out the transient hooks so they are not called again.
434
func (m *hookMap) invoke() {
4,779✔
435
        for _, hook := range m.transient {
4,784✔
436
                hook()
5✔
437
        }
5✔
438

439
        m.transient = make(map[uint64]func())
4,779✔
440
}
441

442
// hodlHtlc contains htlc data that is required for resolution.
443
type hodlHtlc struct {
444
        pd         *lnwallet.PaymentDescriptor
445
        obfuscator hop.ErrorEncrypter
446
}
447

448
// NewChannelLink creates a new instance of a ChannelLink given a configuration
449
// and active channel that will be used to verify/apply updates to.
450
func NewChannelLink(cfg ChannelLinkConfig,
451
        channel *lnwallet.LightningChannel) ChannelLink {
216✔
452

216✔
453
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
216✔
454

216✔
455
        // If the max fee exposure isn't set, use the default.
216✔
456
        if cfg.MaxFeeExposure == 0 {
429✔
457
                cfg.MaxFeeExposure = DefaultMaxFeeExposure
213✔
458
        }
213✔
459

460
        return &channelLink{
216✔
461
                cfg:                 cfg,
216✔
462
                channel:             channel,
216✔
463
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
216✔
464
                hodlQueue:           queue.NewConcurrentQueue(10),
216✔
465
                log:                 build.NewPrefixLog(logPrefix, log),
216✔
466
                flushHooks:          newHookMap(),
216✔
467
                outgoingCommitHooks: newHookMap(),
216✔
468
                incomingCommitHooks: newHookMap(),
216✔
469
                quit:                make(chan struct{}),
216✔
470
        }
216✔
471
}
472

473
// A compile time check to ensure channelLink implements the ChannelLink
474
// interface.
475
var _ ChannelLink = (*channelLink)(nil)
476

477
// Start starts all helper goroutines required for the operation of the channel
478
// link.
479
//
480
// NOTE: Part of the ChannelLink interface.
481
func (l *channelLink) Start() error {
214✔
482
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
214✔
NEW
483
                err := fmt.Errorf("channel link(%v): already started", l)
×
484
                l.log.Warn("already started")
×
485
                return err
×
486
        }
×
487

488
        l.log.Info("starting")
214✔
489

214✔
490
        // If the config supplied watchtower client, ensure the channel is
214✔
491
        // registered before trying to use it during operation.
214✔
492
        if l.cfg.TowerClient != nil {
217✔
493
                err := l.cfg.TowerClient.RegisterChannel(
3✔
494
                        l.ChanID(), l.channel.State().ChanType,
3✔
495
                )
3✔
496
                if err != nil {
3✔
497
                        return err
×
498
                }
×
499
        }
500

501
        l.mailBox.ResetMessages()
214✔
502
        l.hodlQueue.Start()
214✔
503

214✔
504
        // Before launching the htlcManager messages, revert any circuits that
214✔
505
        // were marked open in the switch's circuit map, but did not make it
214✔
506
        // into a commitment txn. We use the next local htlc index as the cut
214✔
507
        // off point, since all indexes below that are committed. This action
214✔
508
        // is only performed if the link's final short channel ID has been
214✔
509
        // assigned, otherwise we would try to trim the htlcs belonging to the
214✔
510
        // all-zero, hop.Source ID.
214✔
511
        if l.ShortChanID() != hop.Source {
428✔
512
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
214✔
513
                if err != nil {
214✔
514
                        return fmt.Errorf("unable to retrieve next local "+
×
515
                                "htlc index: %v", err)
×
516
                }
×
517

518
                // NOTE: This is automatically done by the switch when it
519
                // starts up, but is necessary to prevent inconsistencies in
520
                // the case that the link flaps. This is a result of a link's
521
                // life-cycle being shorter than that of the switch.
522
                chanID := l.ShortChanID()
214✔
523
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
214✔
524
                if err != nil {
214✔
525
                        return fmt.Errorf("unable to trim circuits above "+
×
526
                                "local htlc index %d: %v", localHtlcIndex, err)
×
527
                }
×
528

529
                // Since the link is live, before we start the link we'll update
530
                // the ChainArbitrator with the set of new channel signals for
531
                // this channel.
532
                //
533
                // TODO(roasbeef): split goroutines within channel arb to avoid
534
                go func() {
428✔
535
                        signals := &contractcourt.ContractSignals{
214✔
536
                                ShortChanID: l.channel.ShortChanID(),
214✔
537
                        }
214✔
538

214✔
539
                        err := l.cfg.UpdateContractSignals(signals)
214✔
540
                        if err != nil {
214✔
541
                                l.log.Errorf("unable to update signals")
×
542
                        }
×
543
                }()
544
        }
545

546
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
214✔
547

214✔
548
        l.wg.Add(1)
214✔
549
        go l.htlcManager()
214✔
550

214✔
551
        return nil
214✔
552
}
553

554
// Stop gracefully stops all active helper goroutines, then waits until they've
555
// exited.
556
//
557
// NOTE: Part of the ChannelLink interface.
558
func (l *channelLink) Stop() {
215✔
559
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
227✔
560
                l.log.Warn("already stopped")
12✔
561
                return
12✔
562
        }
12✔
563

564
        l.log.Info("stopping")
203✔
565

203✔
566
        // As the link is stopping, we are no longer interested in htlc
203✔
567
        // resolutions coming from the invoice registry.
203✔
568
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
203✔
569

203✔
570
        if l.cfg.ChainEvents.Cancel != nil {
206✔
571
                l.cfg.ChainEvents.Cancel()
3✔
572
        }
3✔
573

574
        // Ensure the channel for the timer is drained.
575
        if !l.updateFeeTimer.Stop() {
203✔
576
                select {
×
577
                case <-l.updateFeeTimer.C:
×
578
                default:
×
579
                }
580
        }
581

582
        l.hodlQueue.Stop()
203✔
583

203✔
584
        close(l.quit)
203✔
585
        l.wg.Wait()
203✔
586

203✔
587
        // Now that the htlcManager has completely exited, reset the packet
203✔
588
        // courier. This allows the mailbox to revaluate any lingering Adds that
203✔
589
        // were delivered but didn't make it on a commitment to be failed back
203✔
590
        // if the link is offline for an extended period of time. The error is
203✔
591
        // ignored since it can only fail when the daemon is exiting.
203✔
592
        _ = l.mailBox.ResetPackets()
203✔
593

203✔
594
        // As a final precaution, we will attempt to flush any uncommitted
203✔
595
        // preimages to the preimage cache. The preimages should be re-delivered
203✔
596
        // after channel reestablishment, however this adds an extra layer of
203✔
597
        // protection in case the peer never returns. Without this, we will be
203✔
598
        // unable to settle any contracts depending on the preimages even though
203✔
599
        // we had learned them at some point.
203✔
600
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
203✔
601
        if err != nil {
203✔
602
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
603
                        l.uncommittedPreimages, err)
×
604
        }
×
605
}
606

607
// WaitForShutdown blocks until the link finishes shutting down, which includes
608
// termination of all dependent goroutines.
609
func (l *channelLink) WaitForShutdown() {
×
610
        l.wg.Wait()
×
611
}
×
612

613
// EligibleToForward returns a bool indicating if the channel is able to
614
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
615
// we are eligible to update AND the channel isn't currently flushing the
616
// outgoing half of the channel.
617
func (l *channelLink) EligibleToForward() bool {
1,655✔
618
        return l.EligibleToUpdate() &&
1,655✔
619
                !l.IsFlushing(Outgoing)
1,655✔
620
}
1,655✔
621

622
// EligibleToUpdate returns a bool indicating if the channel is able to update
623
// channel state. We're able to update channel state if we know the remote
624
// party's next revocation point. Otherwise, we can't initiate new channel
625
// state. We also require that the short channel ID not be the all-zero source
626
// ID, meaning that the channel has had its ID finalized.
627
func (l *channelLink) EligibleToUpdate() bool {
1,658✔
628
        return l.channel.RemoteNextRevocation() != nil &&
1,658✔
629
                l.ShortChanID() != hop.Source &&
1,658✔
630
                l.isReestablished()
1,658✔
631
}
1,658✔
632

633
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
634
// the specified direction. It returns true if the state was changed and false
635
// if the desired state was already set before the method was called.
636
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
13✔
637
        if linkDirection == Outgoing {
20✔
638
                return l.isOutgoingAddBlocked.Swap(false)
7✔
639
        }
7✔
640

641
        return l.isIncomingAddBlocked.Swap(false)
6✔
642
}
643

644
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
645
// the specified direction. It returns true if the state was changed and false
646
// if the desired state was already set before the method was called.
647
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
19✔
648
        if linkDirection == Outgoing {
30✔
649
                return !l.isOutgoingAddBlocked.Swap(true)
11✔
650
        }
11✔
651

652
        return !l.isIncomingAddBlocked.Swap(true)
11✔
653
}
654

655
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
656
// the argument.
657
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
4,713✔
658
        if linkDirection == Outgoing {
7,912✔
659
                return l.isOutgoingAddBlocked.Load()
3,199✔
660
        }
3,199✔
661

662
        return l.isIncomingAddBlocked.Load()
1,517✔
663
}
664

665
// OnFlushedOnce adds a hook that will be called the next time the channel
666
// state reaches zero htlcs. This hook will only ever be called once. If the
667
// channel state already has zero htlcs, then this will be called immediately.
668
func (l *channelLink) OnFlushedOnce(hook func()) {
4✔
669
        select {
4✔
670
        case l.flushHooks.newTransients <- hook:
4✔
671
        case <-l.quit:
×
672
        }
673
}
674

675
// OnCommitOnce adds a hook that will be called the next time a CommitSig
676
// message is sent in the argument's LinkDirection. This hook will only ever be
677
// called once. If no CommitSig is owed in the argument's LinkDirection, then
678
// we will call this hook be run immediately.
679
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
4✔
680
        var queue chan func()
4✔
681

4✔
682
        if direction == Outgoing {
8✔
683
                queue = l.outgoingCommitHooks.newTransients
4✔
684
        } else {
4✔
685
                queue = l.incomingCommitHooks.newTransients
×
686
        }
×
687

688
        select {
4✔
689
        case queue <- hook:
4✔
690
        case <-l.quit:
×
691
        }
692
}
693

694
// isReestablished returns true if the link has successfully completed the
695
// channel reestablishment dance.
696
func (l *channelLink) isReestablished() bool {
1,658✔
697
        return atomic.LoadInt32(&l.reestablished) == 1
1,658✔
698
}
1,658✔
699

700
// markReestablished signals that the remote peer has successfully exchanged
701
// channel reestablish messages and that the channel is ready to process
702
// subsequent messages.
703
func (l *channelLink) markReestablished() {
214✔
704
        atomic.StoreInt32(&l.reestablished, 1)
214✔
705
}
214✔
706

707
// IsUnadvertised returns true if the underlying channel is unadvertised.
708
func (l *channelLink) IsUnadvertised() bool {
5✔
709
        state := l.channel.State()
5✔
710
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
5✔
711
}
5✔
712

713
// sampleNetworkFee samples the current fee rate on the network to get into the
714
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
715
// this is the native rate used when computing the fee for commitment
716
// transactions, and the second-level HTLC transactions.
717
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
718
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
719
        // blocks.
4✔
720
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
721
        if err != nil {
4✔
722
                return 0, err
×
723
        }
×
724

725
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
726
                int64(feePerKw))
4✔
727

4✔
728
        return feePerKw, nil
4✔
729
}
730

731
// shouldAdjustCommitFee returns true if we should update our commitment fee to
732
// match that of the network fee. We'll only update our commitment fee if the
733
// network fee is +/- 10% to our commitment fee or if our current commitment
734
// fee is below the minimum relay fee.
735
func shouldAdjustCommitFee(netFee, chanFee,
736
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
737

14✔
738
        switch {
14✔
739
        // If the network fee is greater than our current commitment fee and
740
        // our current commitment fee is below the minimum relay fee then
741
        // we should switch to it no matter if it is less than a 10% increase.
742
        case netFee > chanFee && chanFee < minRelayFee:
1✔
743
                return true
1✔
744

745
        // If the network fee is greater than the commitment fee, then we'll
746
        // switch to it if it's at least 10% greater than the commit fee.
747
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
748
                return true
4✔
749

750
        // If the network fee is less than our commitment fee, then we'll
751
        // switch to it if it's at least 10% less than the commitment fee.
752
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
753
                return true
2✔
754

755
        // Otherwise, we won't modify our fee.
756
        default:
7✔
757
                return false
7✔
758
        }
759
}
760

761
// failCb is used to cut down on the argument verbosity.
762
type failCb func(update *lnwire.ChannelUpdate) lnwire.FailureMessage
763

764
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
765
// outgoing HTLC. It may return a FailureMessage that references a channel's
766
// alias. If the channel does not have an alias, then the regular channel
767
// update from disk will be returned.
768
func (l *channelLink) createFailureWithUpdate(incoming bool,
769
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
25✔
770

25✔
771
        // Determine which SCID to use in case we need to use aliases in the
25✔
772
        // ChannelUpdate.
25✔
773
        scid := outgoingScid
25✔
774
        if incoming {
25✔
775
                scid = l.ShortChanID()
×
776
        }
×
777

778
        // Try using the FailAliasUpdate function. If it returns nil, fallback
779
        // to the non-alias behavior.
780
        update := l.cfg.FailAliasUpdate(scid, incoming)
25✔
781
        if update == nil {
44✔
782
                // Fallback to the non-alias behavior.
19✔
783
                var err error
19✔
784
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
19✔
785
                if err != nil {
19✔
786
                        return &lnwire.FailTemporaryNodeFailure{}
×
787
                }
×
788
        }
789

790
        return cb(update)
25✔
791
}
792

793
// syncChanState attempts to synchronize channel states with the remote party.
794
// This method is to be called upon reconnection after the initial funding
795
// flow. We'll compare out commitment chains with the remote party, and re-send
796
// either a danging commit signature, a revocation, or both.
797
func (l *channelLink) syncChanStates() error {
171✔
798
        chanState := l.channel.State()
171✔
799

171✔
800
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
171✔
801

171✔
802
        // First, we'll generate our ChanSync message to send to the other
171✔
803
        // side. Based on this message, the remote party will decide if they
171✔
804
        // need to retransmit any data or not.
171✔
805
        localChanSyncMsg, err := chanState.ChanSyncMsg()
171✔
806
        if err != nil {
171✔
807
                return fmt.Errorf("unable to generate chan sync message for "+
×
808
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
809
        }
×
810
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
171✔
811
                return fmt.Errorf("unable to send chan sync message for "+
×
812
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
813
        }
×
814

815
        var msgsToReSend []lnwire.Message
171✔
816

171✔
817
        // Next, we'll wait indefinitely to receive the ChanSync message. The
171✔
818
        // first message sent MUST be the ChanSync message.
171✔
819
        select {
171✔
820
        case msg := <-l.upstream:
171✔
821
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
171✔
822
                        l.cfg.Peer.PubKey())
171✔
823

171✔
824
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
171✔
825
                if !ok {
171✔
826
                        return fmt.Errorf("first message sent to sync "+
×
827
                                "should be ChannelReestablish, instead "+
×
828
                                "received: %T", msg)
×
829
                }
×
830

831
                // If the remote party indicates that they think we haven't
832
                // done any state updates yet, then we'll retransmit the
833
                // channel_ready message first. We do this, as at this point
834
                // we can't be sure if they've really received the
835
                // ChannelReady message.
836
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
171✔
837
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
171✔
838
                        !l.channel.IsPending() {
336✔
839

165✔
840
                        l.log.Infof("resending ChannelReady message to peer")
165✔
841

165✔
842
                        nextRevocation, err := l.channel.NextRevocationKey()
165✔
843
                        if err != nil {
165✔
844
                                return fmt.Errorf("unable to create next "+
×
845
                                        "revocation: %v", err)
×
846
                        }
×
847

848
                        channelReadyMsg := lnwire.NewChannelReady(
165✔
849
                                l.ChanID(), nextRevocation,
165✔
850
                        )
165✔
851

165✔
852
                        // If this is a taproot channel, then we'll send the
165✔
853
                        // very same nonce that we sent above, as they should
165✔
854
                        // take the latest verification nonce we send.
165✔
855
                        if chanState.ChanType.IsTaproot() {
168✔
856
                                //nolint:lll
3✔
857
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
3✔
858
                        }
3✔
859

860
                        // For channels that negotiated the option-scid-alias
861
                        // feature bit, ensure that we send over the alias in
862
                        // the channel_ready message. We'll send the first
863
                        // alias we find for the channel since it does not
864
                        // matter which alias we send. We'll error out if no
865
                        // aliases are found.
866
                        if l.negotiatedAliasFeature() {
168✔
867
                                aliases := l.getAliases()
3✔
868
                                if len(aliases) == 0 {
3✔
869
                                        // This shouldn't happen since we
×
870
                                        // always add at least one alias before
×
871
                                        // the channel reaches the link.
×
872
                                        return fmt.Errorf("no aliases found")
×
873
                                }
×
874

875
                                // getAliases returns a copy of the alias slice
876
                                // so it is ok to use a pointer to the first
877
                                // entry.
878
                                channelReadyMsg.AliasScid = &aliases[0]
3✔
879
                        }
880

881
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
165✔
882
                        if err != nil {
165✔
883
                                return fmt.Errorf("unable to re-send "+
×
884
                                        "ChannelReady: %v", err)
×
885
                        }
×
886
                }
887

888
                // In any case, we'll then process their ChanSync message.
889
                l.log.Info("received re-establishment message from remote side")
171✔
890

171✔
891
                var (
171✔
892
                        openedCircuits []CircuitKey
171✔
893
                        closedCircuits []CircuitKey
171✔
894
                )
171✔
895

171✔
896
                // We've just received a ChanSync message from the remote
171✔
897
                // party, so we'll process the message  in order to determine
171✔
898
                // if we need to re-transmit any messages to the remote party.
171✔
899
                msgsToReSend, openedCircuits, closedCircuits, err =
171✔
900
                        l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
171✔
901
                if err != nil {
174✔
902
                        return err
3✔
903
                }
3✔
904

905
                // Repopulate any identifiers for circuits that may have been
906
                // opened or unclosed. This may happen if we needed to
907
                // retransmit a commitment signature message.
908
                l.openedCircuits = openedCircuits
171✔
909
                l.closedCircuits = closedCircuits
171✔
910

171✔
911
                // Ensure that all packets have been have been removed from the
171✔
912
                // link's mailbox.
171✔
913
                if err := l.ackDownStreamPackets(); err != nil {
171✔
914
                        return err
×
915
                }
×
916

917
                if len(msgsToReSend) > 0 {
176✔
918
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
919
                                "state", len(msgsToReSend))
5✔
920
                }
5✔
921

922
                // If we have any messages to retransmit, we'll do so
923
                // immediately so we return to a synchronized state as soon as
924
                // possible.
925
                for _, msg := range msgsToReSend {
182✔
926
                        l.cfg.Peer.SendMessage(false, msg)
11✔
927
                }
11✔
928

929
        case <-l.quit:
3✔
930
                return ErrLinkShuttingDown
3✔
931
        }
932

933
        return nil
171✔
934
}
935

936
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
937
// reprocesses them in order. The primary goal is to make sure that any HTLCs
938
// we previously received are reinstated in memory, and forwarded to the switch
939
// if necessary. After a restart, this will also delete any previously
940
// completed packages.
941
func (l *channelLink) resolveFwdPkgs() error {
214✔
942
        fwdPkgs, err := l.channel.LoadFwdPkgs()
214✔
943
        if err != nil {
214✔
944
                return err
×
945
        }
×
946

947
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
214✔
948

214✔
949
        for _, fwdPkg := range fwdPkgs {
223✔
950
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
9✔
951
                        return err
×
952
                }
×
953
        }
954

955
        // If any of our reprocessing steps require an update to the commitment
956
        // txn, we initiate a state transition to capture all relevant changes.
957
        if l.channel.PendingLocalUpdateCount() > 0 {
217✔
958
                return l.updateCommitTx()
3✔
959
        }
3✔
960

961
        return nil
214✔
962
}
963

964
// resolveFwdPkg interprets the FwdState of the provided package, either
965
// reprocesses any outstanding htlcs in the package, or performs garbage
966
// collection on the package.
967
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
9✔
968
        // Remove any completed packages to clear up space.
9✔
969
        if fwdPkg.State == channeldb.FwdStateCompleted {
13✔
970
                l.log.Debugf("removing completed fwd pkg for height=%d",
4✔
971
                        fwdPkg.Height)
4✔
972

4✔
973
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
4✔
974
                if err != nil {
4✔
975
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
976
                                "%v", fwdPkg.Height, err)
×
977
                        return err
×
978
                }
×
979
        }
980

981
        // Otherwise this is either a new package or one has gone through
982
        // processing, but contains htlcs that need to be restored in memory.
983
        // We replay this forwarding package to make sure our local mem state
984
        // is resurrected, we mimic any original responses back to the remote
985
        // party, and re-forward the relevant HTLCs to the switch.
986

987
        // If the package is fully acked but not completed, it must still have
988
        // settles and fails to propagate.
989
        if !fwdPkg.SettleFailFilter.IsFull() {
12✔
990
                settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates(
3✔
991
                        fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails,
3✔
992
                )
3✔
993
                if err != nil {
3✔
994
                        l.log.Errorf("unable to process remote log updates: %v",
×
995
                                err)
×
996
                        return err
×
997
                }
×
998
                l.processRemoteSettleFails(fwdPkg, settleFails)
3✔
999
        }
1000

1001
        // Finally, replay *ALL ADDS* in this forwarding package. The
1002
        // downstream logic is able to filter out any duplicates, but we must
1003
        // shove the entire, original set of adds down the pipeline so that the
1004
        // batch of adds presented to the sphinx router does not ever change.
1005
        if !fwdPkg.AckFilter.IsFull() {
15✔
1006
                adds, err := lnwallet.PayDescsFromRemoteLogUpdates(
6✔
1007
                        fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
6✔
1008
                )
6✔
1009
                if err != nil {
6✔
1010
                        l.log.Errorf("unable to process remote log updates: %v",
×
1011
                                err)
×
1012
                        return err
×
1013
                }
×
1014
                l.processRemoteAdds(fwdPkg, adds)
6✔
1015

6✔
1016
                // If the link failed during processing the adds, we must
6✔
1017
                // return to ensure we won't attempted to update the state
6✔
1018
                // further.
6✔
1019
                if l.failed {
6✔
1020
                        return fmt.Errorf("link failed while " +
×
1021
                                "processing remote adds")
×
1022
                }
×
1023
        }
1024

1025
        return nil
9✔
1026
}
1027

1028
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1029
// removes those that can be discarded. It is safe to do this entirely in the
1030
// background, since all state is coordinated on disk. This also ensures the
1031
// link can continue to process messages and interleave database accesses.
1032
//
1033
// NOTE: This MUST be run as a goroutine.
1034
func (l *channelLink) fwdPkgGarbager() {
214✔
1035
        defer l.wg.Done()
214✔
1036

214✔
1037
        l.cfg.FwdPkgGCTicker.Resume()
214✔
1038
        defer l.cfg.FwdPkgGCTicker.Stop()
214✔
1039

214✔
1040
        if err := l.loadAndRemove(); err != nil {
214✔
1041
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1042
        }
×
1043

1044
        for {
498✔
1045
                select {
284✔
1046
                case <-l.cfg.FwdPkgGCTicker.Ticks():
70✔
1047
                        if err := l.loadAndRemove(); err != nil {
128✔
1048
                                l.log.Warnf("unable to remove fwd pkgs: %v",
58✔
1049
                                        err)
58✔
1050
                                continue
58✔
1051
                        }
1052
                case <-l.quit:
203✔
1053
                        return
203✔
1054
                }
1055
        }
1056
}
1057

1058
// loadAndRemove loads all the channels forwarding packages and determines if
1059
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1060
// a longer tick interval can be used.
1061
func (l *channelLink) loadAndRemove() error {
284✔
1062
        fwdPkgs, err := l.channel.LoadFwdPkgs()
284✔
1063
        if err != nil {
342✔
1064
                return err
58✔
1065
        }
58✔
1066

1067
        var removeHeights []uint64
226✔
1068
        for _, fwdPkg := range fwdPkgs {
1,157✔
1069
                if fwdPkg.State != channeldb.FwdStateCompleted {
975✔
1070
                        continue
44✔
1071
                }
1072

1073
                removeHeights = append(removeHeights, fwdPkg.Height)
890✔
1074
        }
1075

1076
        // If removeHeights is empty, return early so we don't use a db
1077
        // transaction.
1078
        if len(removeHeights) == 0 {
440✔
1079
                return nil
214✔
1080
        }
214✔
1081

1082
        return l.channel.RemoveFwdPkgs(removeHeights...)
15✔
1083
}
1084

1085
// htlcManager is the primary goroutine which drives a channel's commitment
1086
// update state-machine in response to messages received via several channels.
1087
// This goroutine reads messages from the upstream (remote) peer, and also from
1088
// downstream channel managed by the channel link. In the event that an htlc
1089
// needs to be forwarded, then send-only forward handler is used which sends
1090
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1091
// all timeouts for any active HTLCs, manages the channel's revocation window,
1092
// and also the htlc trickle queue+timer for this active channels.
1093
//
1094
// NOTE: This MUST be run as a goroutine.
1095
func (l *channelLink) htlcManager() {
214✔
1096
        defer func() {
419✔
1097
                l.cfg.BatchTicker.Stop()
205✔
1098
                l.wg.Done()
205✔
1099
                l.log.Infof("exited")
205✔
1100
        }()
205✔
1101

1102
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
214✔
1103

214✔
1104
        // Notify any clients that the link is now in the switch via an
214✔
1105
        // ActiveLinkEvent. We'll also defer an inactive link notification for
214✔
1106
        // when the link exits to ensure that every active notification is
214✔
1107
        // matched by an inactive one.
214✔
1108
        l.cfg.NotifyActiveLink(l.ChannelPoint())
214✔
1109
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
214✔
1110

214✔
1111
        // TODO(roasbeef): need to call wipe chan whenever D/C?
214✔
1112

214✔
1113
        // If this isn't the first time that this channel link has been
214✔
1114
        // created, then we'll need to check to see if we need to
214✔
1115
        // re-synchronize state with the remote peer. settledHtlcs is a map of
214✔
1116
        // HTLC's that we re-settled as part of the channel state sync.
214✔
1117
        if l.cfg.SyncStates {
385✔
1118
                err := l.syncChanStates()
171✔
1119
                if err != nil {
174✔
1120
                        l.log.Warnf("error when syncing channel states: %v", err)
3✔
1121

3✔
1122
                        errDataLoss, localDataLoss :=
3✔
1123
                                err.(*lnwallet.ErrCommitSyncLocalDataLoss)
3✔
1124

3✔
1125
                        switch {
3✔
1126
                        case err == ErrLinkShuttingDown:
3✔
1127
                                l.log.Debugf("unable to sync channel states, " +
3✔
1128
                                        "link is shutting down")
3✔
1129
                                return
3✔
1130

1131
                        // We failed syncing the commit chains, probably
1132
                        // because the remote has lost state. We should force
1133
                        // close the channel.
1134
                        case err == lnwallet.ErrCommitSyncRemoteDataLoss:
3✔
1135
                                fallthrough
3✔
1136

1137
                        // The remote sent us an invalid last commit secret, we
1138
                        // should force close the channel.
1139
                        // TODO(halseth): and permanently ban the peer?
1140
                        case err == lnwallet.ErrInvalidLastCommitSecret:
3✔
1141
                                fallthrough
3✔
1142

1143
                        // The remote sent us a commit point different from
1144
                        // what they sent us before.
1145
                        // TODO(halseth): ban peer?
1146
                        case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint:
3✔
1147
                                // We'll fail the link and tell the peer to
3✔
1148
                                // force close the channel. Note that the
3✔
1149
                                // database state is not updated here, but will
3✔
1150
                                // be updated when the close transaction is
3✔
1151
                                // ready to avoid that we go down before
3✔
1152
                                // storing the transaction in the db.
3✔
1153
                                l.fail(
3✔
1154
                                        LinkFailureError{
3✔
1155
                                                code:          ErrSyncError,
3✔
1156
                                                FailureAction: LinkFailureForceClose, //nolint:lll
3✔
1157
                                        },
3✔
1158
                                        "unable to synchronize channel "+
3✔
1159
                                                "states: %v", err,
3✔
1160
                                )
3✔
1161
                                return
3✔
1162

1163
                        // We have lost state and cannot safely force close the
1164
                        // channel. Fail the channel and wait for the remote to
1165
                        // hopefully force close it. The remote has sent us its
1166
                        // latest unrevoked commitment point, and we'll store
1167
                        // it in the database, such that we can attempt to
1168
                        // recover the funds if the remote force closes the
1169
                        // channel.
1170
                        case localDataLoss:
3✔
1171
                                err := l.channel.MarkDataLoss(
3✔
1172
                                        errDataLoss.CommitPoint,
3✔
1173
                                )
3✔
1174
                                if err != nil {
3✔
1175
                                        l.log.Errorf("unable to mark channel "+
×
1176
                                                "data loss: %v", err)
×
1177
                                }
×
1178

1179
                        // We determined the commit chains were not possible to
1180
                        // sync. We cautiously fail the channel, but don't
1181
                        // force close.
1182
                        // TODO(halseth): can we safely force close in any
1183
                        // cases where this error is returned?
1184
                        case err == lnwallet.ErrCannotSyncCommitChains:
×
1185
                                if err := l.channel.MarkBorked(); err != nil {
×
1186
                                        l.log.Errorf("unable to mark channel "+
×
1187
                                                "borked: %v", err)
×
1188
                                }
×
1189

1190
                        // Other, unspecified error.
1191
                        default:
×
1192
                        }
1193

1194
                        l.fail(
3✔
1195
                                LinkFailureError{
3✔
1196
                                        code:          ErrRecoveryError,
3✔
1197
                                        FailureAction: LinkFailureForceNone,
3✔
1198
                                },
3✔
1199
                                "unable to synchronize channel "+
3✔
1200
                                        "states: %v", err,
3✔
1201
                        )
3✔
1202
                        return
3✔
1203
                }
1204
        }
1205

1206
        // If a shutdown message has previously been sent on this link, then we
1207
        // need to make sure that we have disabled any HTLC adds on the outgoing
1208
        // direction of the link and that we re-resend the same shutdown message
1209
        // that we previously sent.
1210
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
217✔
1211
                // Immediately disallow any new outgoing HTLCs.
3✔
1212
                if !l.DisableAdds(Outgoing) {
3✔
1213
                        l.log.Warnf("Outgoing link adds already disabled")
×
1214
                }
×
1215

1216
                // Re-send the shutdown message the peer. Since syncChanStates
1217
                // would have sent any outstanding CommitSig, it is fine for us
1218
                // to immediately queue the shutdown message now.
1219
                err := l.cfg.Peer.SendMessage(false, &shutdown)
3✔
1220
                if err != nil {
3✔
1221
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1222
                }
×
1223
        })
1224

1225
        // We've successfully reestablished the channel, mark it as such to
1226
        // allow the switch to forward HTLCs in the outbound direction.
1227
        l.markReestablished()
214✔
1228

214✔
1229
        // Now that we've received both channel_ready and channel reestablish,
214✔
1230
        // we can go ahead and send the active channel notification. We'll also
214✔
1231
        // defer the inactive notification for when the link exits to ensure
214✔
1232
        // that every active notification is matched by an inactive one.
214✔
1233
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
214✔
1234
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
214✔
1235

214✔
1236
        // With the channel states synced, we now reset the mailbox to ensure
214✔
1237
        // we start processing all unacked packets in order. This is done here
214✔
1238
        // to ensure that all acknowledgments that occur during channel
214✔
1239
        // resynchronization have taken affect, causing us only to pull unacked
214✔
1240
        // packets after starting to read from the downstream mailbox.
214✔
1241
        l.mailBox.ResetPackets()
214✔
1242

214✔
1243
        // After cleaning up any memory pertaining to incoming packets, we now
214✔
1244
        // replay our forwarding packages to handle any htlcs that can be
214✔
1245
        // processed locally, or need to be forwarded out to the switch. We will
214✔
1246
        // only attempt to resolve packages if our short chan id indicates that
214✔
1247
        // the channel is not pending, otherwise we should have no htlcs to
214✔
1248
        // reforward.
214✔
1249
        if l.ShortChanID() != hop.Source {
428✔
1250
                err := l.resolveFwdPkgs()
214✔
1251
                switch err {
214✔
1252
                // No error was encountered, success.
1253
                case nil:
214✔
1254

1255
                // If the duplicate keystone error was encountered, we'll fail
1256
                // without sending an Error message to the peer.
1257
                case ErrDuplicateKeystone:
×
1258
                        l.fail(LinkFailureError{code: ErrCircuitError},
×
1259
                                "temporary circuit error: %v", err)
×
1260
                        return
×
1261

1262
                // A non-nil error was encountered, send an Error message to
1263
                // the peer.
1264
                default:
×
1265
                        l.fail(LinkFailureError{code: ErrInternalError},
×
1266
                                "unable to resolve fwd pkgs: %v", err)
×
1267
                        return
×
1268
                }
1269

1270
                // With our link's in-memory state fully reconstructed, spawn a
1271
                // goroutine to manage the reclamation of disk space occupied by
1272
                // completed forwarding packages.
1273
                l.wg.Add(1)
214✔
1274
                go l.fwdPkgGarbager()
214✔
1275
        }
1276

1277
        for {
9,409✔
1278
                // We must always check if we failed at some point processing
9,195✔
1279
                // the last update before processing the next.
9,195✔
1280
                if l.failed {
9,204✔
1281
                        l.log.Errorf("link failed, exiting htlcManager")
9✔
1282
                        return
9✔
1283
                }
9✔
1284

1285
                // If the previous event resulted in a non-empty batch, resume
1286
                // the batch ticker so that it can be cleared. Otherwise pause
1287
                // the ticker to prevent waking up the htlcManager while the
1288
                // batch is empty.
1289
                if l.channel.PendingLocalUpdateCount() > 0 {
11,089✔
1290
                        l.cfg.BatchTicker.Resume()
1,900✔
1291
                        l.log.Tracef("BatchTicker resumed, "+
1,900✔
1292
                                "PendingLocalUpdateCount=%d",
1,900✔
1293
                                l.channel.PendingLocalUpdateCount())
1,900✔
1294
                } else {
9,192✔
1295
                        l.cfg.BatchTicker.Pause()
7,292✔
1296
                        l.log.Trace("BatchTicker paused due to zero " +
7,292✔
1297
                                "PendingLocalUpdateCount")
7,292✔
1298
                }
7,292✔
1299

1300
                select {
9,189✔
1301
                // We have a new hook that needs to be run when we reach a clean
1302
                // channel state.
1303
                case hook := <-l.flushHooks.newTransients:
4✔
1304
                        if l.channel.IsChannelClean() {
7✔
1305
                                hook()
3✔
1306
                        } else {
7✔
1307
                                l.flushHooks.alloc(hook)
4✔
1308
                        }
4✔
1309

1310
                // We have a new hook that needs to be run when we have
1311
                // committed all of our updates.
1312
                case hook := <-l.outgoingCommitHooks.newTransients:
4✔
1313
                        if !l.channel.OweCommitment() {
7✔
1314
                                hook()
3✔
1315
                        } else {
4✔
1316
                                l.outgoingCommitHooks.alloc(hook)
1✔
1317
                        }
1✔
1318

1319
                // We have a new hook that needs to be run when our peer has
1320
                // committed all of their updates.
1321
                case hook := <-l.incomingCommitHooks.newTransients:
×
1322
                        if !l.channel.NeedCommitment() {
×
1323
                                hook()
×
1324
                        } else {
×
1325
                                l.incomingCommitHooks.alloc(hook)
×
1326
                        }
×
1327

1328
                // Our update fee timer has fired, so we'll check the network
1329
                // fee to see if we should adjust our commitment fee.
1330
                case <-l.updateFeeTimer.C:
4✔
1331
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1332

4✔
1333
                        // If we're not the initiator of the channel, don't we
4✔
1334
                        // don't control the fees, so we can ignore this.
4✔
1335
                        if !l.channel.IsInitiator() {
4✔
1336
                                continue
×
1337
                        }
1338

1339
                        // If we are the initiator, then we'll sample the
1340
                        // current fee rate to get into the chain within 3
1341
                        // blocks.
1342
                        netFee, err := l.sampleNetworkFee()
4✔
1343
                        if err != nil {
4✔
1344
                                l.log.Errorf("unable to sample network fee: %v",
×
1345
                                        err)
×
1346
                                continue
×
1347
                        }
1348

1349
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1350

4✔
1351
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1352
                                netFee, minRelayFee,
4✔
1353
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1354
                                l.cfg.MaxFeeAllocation,
4✔
1355
                        )
4✔
1356

4✔
1357
                        // We determine if we should adjust the commitment fee
4✔
1358
                        // based on the current commitment fee, the suggested
4✔
1359
                        // new commitment fee and the current minimum relay fee
4✔
1360
                        // rate.
4✔
1361
                        commitFee := l.channel.CommitFeeRate()
4✔
1362
                        if !shouldAdjustCommitFee(
4✔
1363
                                newCommitFee, commitFee, minRelayFee,
4✔
1364
                        ) {
5✔
1365

1✔
1366
                                continue
1✔
1367
                        }
1368

1369
                        // If we do, then we'll send a new UpdateFee message to
1370
                        // the remote party, to be locked in with a new update.
1371
                        if err := l.updateChannelFee(newCommitFee); err != nil {
3✔
1372
                                l.log.Errorf("unable to update fee rate: %v",
×
1373
                                        err)
×
1374
                                continue
×
1375
                        }
1376

1377
                // The underlying channel has notified us of a unilateral close
1378
                // carried out by the remote peer. In the case of such an
1379
                // event, we'll wipe the channel state from the peer, and mark
1380
                // the contract as fully settled. Afterwards we can exit.
1381
                //
1382
                // TODO(roasbeef): add force closure? also breach?
1383
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
3✔
1384
                        l.log.Warnf("remote peer has closed on-chain")
3✔
1385

3✔
1386
                        // TODO(roasbeef): remove all together
3✔
1387
                        go func() {
6✔
1388
                                chanPoint := l.channel.ChannelPoint()
3✔
1389
                                l.cfg.Peer.WipeChannel(&chanPoint)
3✔
1390
                        }()
3✔
1391

1392
                        return
3✔
1393

1394
                case <-l.cfg.BatchTicker.Ticks():
284✔
1395
                        // Attempt to extend the remote commitment chain
284✔
1396
                        // including all the currently pending entries. If the
284✔
1397
                        // send was unsuccessful, then abandon the update,
284✔
1398
                        // waiting for the revocation window to open up.
284✔
1399
                        if !l.updateCommitTxOrFail() {
284✔
1400
                                return
×
1401
                        }
×
1402

1403
                case <-l.cfg.PendingCommitTicker.Ticks():
2✔
1404
                        l.fail(
2✔
1405
                                LinkFailureError{
2✔
1406
                                        code:          ErrRemoteUnresponsive,
2✔
1407
                                        FailureAction: LinkFailureDisconnect,
2✔
1408
                                },
2✔
1409
                                "unable to complete dance",
2✔
1410
                        )
2✔
1411
                        return
2✔
1412

1413
                // A message from the switch was just received. This indicates
1414
                // that the link is an intermediate hop in a multi-hop HTLC
1415
                // circuit.
1416
                case pkt := <-l.downstream:
1,564✔
1417
                        l.handleDownstreamPkt(pkt)
1,564✔
1418

1419
                // A message from the connected peer was just received. This
1420
                // indicates that we have a new incoming HTLC, either directly
1421
                // for us, or part of a multi-hop HTLC circuit.
1422
                case msg := <-l.upstream:
6,648✔
1423
                        l.handleUpstreamMsg(msg)
6,648✔
1424

1425
                // A htlc resolution is received. This means that we now have a
1426
                // resolution for a previously accepted htlc.
1427
                case hodlItem := <-l.hodlQueue.ChanOut():
491✔
1428
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
491✔
1429
                        err := l.processHodlQueue(htlcResolution)
491✔
1430
                        switch err {
491✔
1431
                        // No error, success.
1432
                        case nil:
490✔
1433

1434
                        // If the duplicate keystone error was encountered,
1435
                        // fail back gracefully.
1436
                        case ErrDuplicateKeystone:
×
1437
                                l.fail(LinkFailureError{code: ErrCircuitError},
×
1438
                                        fmt.Sprintf("process hodl queue: "+
×
1439
                                                "temporary circuit error: %v",
×
1440
                                                err,
×
1441
                                        ),
×
1442
                                )
×
1443

1444
                        // Send an Error message to the peer.
1445
                        default:
1✔
1446
                                l.fail(LinkFailureError{code: ErrInternalError},
1✔
1447
                                        fmt.Sprintf("process hodl queue: "+
1✔
1448
                                                "unable to update commitment:"+
1✔
1449
                                                " %v", err),
1✔
1450
                                )
1✔
1451
                        }
1452

1453
                case <-l.quit:
197✔
1454
                        return
197✔
1455
                }
1456
        }
1457
}
1458

1459
// processHodlQueue processes a received htlc resolution and continues reading
1460
// from the hodl queue until no more resolutions remain. When this function
1461
// returns without an error, the commit tx should be updated.
1462
func (l *channelLink) processHodlQueue(
1463
        firstResolution invoices.HtlcResolution) error {
491✔
1464

491✔
1465
        // Try to read all waiting resolution messages, so that they can all be
491✔
1466
        // processed in a single commitment tx update.
491✔
1467
        htlcResolution := firstResolution
491✔
1468
loop:
491✔
1469
        for {
982✔
1470
                // Lookup all hodl htlcs that can be failed or settled with this event.
491✔
1471
                // The hodl htlc must be present in the map.
491✔
1472
                circuitKey := htlcResolution.CircuitKey()
491✔
1473
                hodlHtlc, ok := l.hodlMap[circuitKey]
491✔
1474
                if !ok {
491✔
1475
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1476
                }
×
1477

1478
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
491✔
1479
                        return err
×
1480
                }
×
1481

1482
                // Clean up hodl map.
1483
                delete(l.hodlMap, circuitKey)
491✔
1484

491✔
1485
                select {
491✔
1486
                case item := <-l.hodlQueue.ChanOut():
3✔
1487
                        htlcResolution = item.(invoices.HtlcResolution)
3✔
1488
                default:
491✔
1489
                        break loop
491✔
1490
                }
1491
        }
1492

1493
        // Update the commitment tx.
1494
        if err := l.updateCommitTx(); err != nil {
492✔
1495
                return err
1✔
1496
        }
1✔
1497

1498
        return nil
490✔
1499
}
1500

1501
// processHtlcResolution applies a received htlc resolution to the provided
1502
// htlc. When this function returns without an error, the commit tx should be
1503
// updated.
1504
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1505
        htlc hodlHtlc) error {
637✔
1506

637✔
1507
        circuitKey := resolution.CircuitKey()
637✔
1508

637✔
1509
        // Determine required action for the resolution based on the type of
637✔
1510
        // resolution we have received.
637✔
1511
        switch res := resolution.(type) {
637✔
1512
        // Settle htlcs that returned a settle resolution using the preimage
1513
        // in the resolution.
1514
        case *invoices.HtlcSettleResolution:
633✔
1515
                l.log.Debugf("received settle resolution for %v "+
633✔
1516
                        "with outcome: %v", circuitKey, res.Outcome)
633✔
1517

633✔
1518
                return l.settleHTLC(res.Preimage, htlc.pd)
633✔
1519

1520
        // For htlc failures, we get the relevant failure message based
1521
        // on the failure resolution and then fail the htlc.
1522
        case *invoices.HtlcFailResolution:
7✔
1523
                l.log.Debugf("received cancel resolution for "+
7✔
1524
                        "%v with outcome: %v", circuitKey, res.Outcome)
7✔
1525

7✔
1526
                // Get the lnwire failure message based on the resolution
7✔
1527
                // result.
7✔
1528
                failure := getResolutionFailure(res, htlc.pd.Amount)
7✔
1529

7✔
1530
                l.sendHTLCError(
7✔
1531
                        htlc.pd, failure, htlc.obfuscator, true,
7✔
1532
                )
7✔
1533
                return nil
7✔
1534

1535
        // Fail if we do not get a settle of fail resolution, since we
1536
        // are only expecting to handle settles and fails.
1537
        default:
×
1538
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1539
                        resolution)
×
1540
        }
1541
}
1542

1543
// getResolutionFailure returns the wire message that a htlc resolution should
1544
// be failed with.
1545
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1546
        amount lnwire.MilliSatoshi) *LinkError {
7✔
1547

7✔
1548
        // If the resolution has been resolved as part of a MPP timeout,
7✔
1549
        // we need to fail the htlc with lnwire.FailMppTimeout.
7✔
1550
        if resolution.Outcome == invoices.ResultMppTimeout {
7✔
1551
                return NewDetailedLinkError(
×
1552
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1553
                )
×
1554
        }
×
1555

1556
        // If the htlc is not a MPP timeout, we fail it with
1557
        // FailIncorrectDetails. This error is sent for invoice payment
1558
        // failures such as underpayment/ expiry too soon and hodl invoices
1559
        // (which return FailIncorrectDetails to avoid leaking information).
1560
        incorrectDetails := lnwire.NewFailIncorrectDetails(
7✔
1561
                amount, uint32(resolution.AcceptHeight),
7✔
1562
        )
7✔
1563

7✔
1564
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
7✔
1565
}
1566

1567
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1568
// within the link's configuration that will be used to determine when the link
1569
// should propose an update to its commitment fee rate.
1570
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
218✔
1571
        lower := int64(l.cfg.MinUpdateTimeout)
218✔
1572
        upper := int64(l.cfg.MaxUpdateTimeout)
218✔
1573
        return time.Duration(prand.Int63n(upper-lower) + lower)
218✔
1574
}
218✔
1575

1576
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1577
// downstream HTLC Switch.
1578
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
1,523✔
1579
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
1,523✔
1580
        if !ok {
1,523✔
1581
                return errors.New("not an UpdateAddHTLC packet")
×
1582
        }
×
1583

1584
        // If we are flushing the link in the outgoing direction we can't add
1585
        // new htlcs to the link and we need to bounce it
1586
        if l.IsFlushing(Outgoing) {
1,523✔
1587
                l.mailBox.FailAdd(pkt)
×
1588

×
1589
                return NewDetailedLinkError(
×
1590
                        &lnwire.FailPermanentChannelFailure{},
×
1591
                        OutgoingFailureLinkNotEligible,
×
1592
                )
×
1593
        }
×
1594

1595
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1596
        // arbitrary delays between the switch adding an ADD to the
1597
        // mailbox, and the HTLC being added to the commitment state.
1598
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
1,523✔
1599
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1600
                l.mailBox.AckPacket(pkt.inKey())
×
1601
                return nil
×
1602
        }
×
1603

1604
        // Check if we can add the HTLC here without exceededing the max fee
1605
        // exposure threshold.
1606
        if l.isOverexposedWithHtlc(htlc, false) {
1,527✔
1607
                l.log.Debugf("Unable to handle downstream HTLC - max fee " +
4✔
1608
                        "exposure exceeded")
4✔
1609

4✔
1610
                l.mailBox.FailAdd(pkt)
4✔
1611

4✔
1612
                return NewDetailedLinkError(
4✔
1613
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1614
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1615
                )
4✔
1616
        }
4✔
1617

1618
        // A new payment has been initiated via the downstream channel,
1619
        // so we add the new HTLC to our local log, then update the
1620
        // commitment chains.
1621
        htlc.ChanID = l.ChanID()
1,519✔
1622
        openCircuitRef := pkt.inKey()
1,519✔
1623

1,519✔
1624
        // We enforce the fee buffer for the commitment transaction because
1,519✔
1625
        // we are in control of adding this htlc. Nothing has locked-in yet so
1,519✔
1626
        // we can securely enforce the fee buffer which is only relevant if we
1,519✔
1627
        // are the initiator of the channel.
1,519✔
1628
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
1,519✔
1629
        if err != nil {
1,523✔
1630
                // The HTLC was unable to be added to the state machine,
4✔
1631
                // as a result, we'll signal the switch to cancel the
4✔
1632
                // pending payment.
4✔
1633
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
4✔
1634
                        err)
4✔
1635

4✔
1636
                // Remove this packet from the link's mailbox, this
4✔
1637
                // prevents it from being reprocessed if the link
4✔
1638
                // restarts and resets it mailbox. If this response
4✔
1639
                // doesn't make it back to the originating link, it will
4✔
1640
                // be rejected upon attempting to reforward the Add to
4✔
1641
                // the switch, since the circuit was never fully opened,
4✔
1642
                // and the forwarding package shows it as
4✔
1643
                // unacknowledged.
4✔
1644
                l.mailBox.FailAdd(pkt)
4✔
1645

4✔
1646
                return NewDetailedLinkError(
4✔
1647
                        lnwire.NewTemporaryChannelFailure(nil),
4✔
1648
                        OutgoingFailureDownstreamHtlcAdd,
4✔
1649
                )
4✔
1650
        }
4✔
1651

1652
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
1,518✔
1653
                "local_log_index=%v, pend_updates=%v",
1,518✔
1654
                htlc.PaymentHash[:], index,
1,518✔
1655
                l.channel.PendingLocalUpdateCount())
1,518✔
1656

1,518✔
1657
        pkt.outgoingChanID = l.ShortChanID()
1,518✔
1658
        pkt.outgoingHTLCID = index
1,518✔
1659
        htlc.ID = index
1,518✔
1660

1,518✔
1661
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
1,518✔
1662
                pkt.inKey(), pkt.outKey())
1,518✔
1663

1,518✔
1664
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
1,518✔
1665
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
1,518✔
1666

1,518✔
1667
        _ = l.cfg.Peer.SendMessage(false, htlc)
1,518✔
1668

1,518✔
1669
        // Send a forward event notification to htlcNotifier.
1,518✔
1670
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
1,518✔
1671
                newHtlcKey(pkt),
1,518✔
1672
                HtlcInfo{
1,518✔
1673
                        IncomingTimeLock: pkt.incomingTimeout,
1,518✔
1674
                        IncomingAmt:      pkt.incomingAmount,
1,518✔
1675
                        OutgoingTimeLock: htlc.Expiry,
1,518✔
1676
                        OutgoingAmt:      htlc.Amount,
1,518✔
1677
                },
1,518✔
1678
                getEventType(pkt),
1,518✔
1679
        )
1,518✔
1680

1,518✔
1681
        l.tryBatchUpdateCommitTx()
1,518✔
1682

1,518✔
1683
        return nil
1,518✔
1684
}
1685

1686
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1687
// Switch. Possible messages sent by the switch include requests to forward new
1688
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1689
// cleared HTLCs with the upstream peer.
1690
//
1691
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1692
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
1,564✔
1693
        switch htlc := pkt.htlc.(type) {
1,564✔
1694
        case *lnwire.UpdateAddHTLC:
1,523✔
1695
                // Handle add message. The returned error can be ignored,
1,523✔
1696
                // because it is also sent through the mailbox.
1,523✔
1697
                _ = l.handleDownstreamUpdateAdd(pkt)
1,523✔
1698

1699
        case *lnwire.UpdateFulfillHTLC:
26✔
1700
                // If hodl.SettleOutgoing mode is active, we exit early to
26✔
1701
                // simulate arbitrary delays between the switch adding the
26✔
1702
                // SETTLE to the mailbox, and the HTLC being added to the
26✔
1703
                // commitment state.
26✔
1704
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
26✔
1705
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1706
                        l.mailBox.AckPacket(pkt.inKey())
×
1707
                        return
×
1708
                }
×
1709

1710
                // An HTLC we forward to the switch has just settled somewhere
1711
                // upstream. Therefore we settle the HTLC within the our local
1712
                // state machine.
1713
                inKey := pkt.inKey()
26✔
1714
                err := l.channel.SettleHTLC(
26✔
1715
                        htlc.PaymentPreimage,
26✔
1716
                        pkt.incomingHTLCID,
26✔
1717
                        pkt.sourceRef,
26✔
1718
                        pkt.destRef,
26✔
1719
                        &inKey,
26✔
1720
                )
26✔
1721
                if err != nil {
26✔
1722
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1723
                                "circuit-key=%v: %v", inKey, err)
×
1724

×
1725
                        // If the HTLC index for Settle response was not known
×
1726
                        // to our commitment state, it has already been
×
1727
                        // cleaned up by a prior response. We'll thus try to
×
1728
                        // clean up any lingering state to ensure we don't
×
1729
                        // continue reforwarding.
×
1730
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1731
                                l.cleanupSpuriousResponse(pkt)
×
1732
                        }
×
1733

1734
                        // Remove the packet from the link's mailbox to ensure
1735
                        // it doesn't get replayed after a reconnection.
1736
                        l.mailBox.AckPacket(inKey)
×
1737

×
1738
                        return
×
1739
                }
1740

1741
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
26✔
1742
                        "%s->%s", pkt.inKey(), pkt.outKey())
26✔
1743

26✔
1744
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
26✔
1745

26✔
1746
                // With the HTLC settled, we'll need to populate the wire
26✔
1747
                // message to target the specific channel and HTLC to be
26✔
1748
                // canceled.
26✔
1749
                htlc.ChanID = l.ChanID()
26✔
1750
                htlc.ID = pkt.incomingHTLCID
26✔
1751

26✔
1752
                // Then we send the HTLC settle message to the connected peer
26✔
1753
                // so we can continue the propagation of the settle message.
26✔
1754
                l.cfg.Peer.SendMessage(false, htlc)
26✔
1755

26✔
1756
                // Send a settle event notification to htlcNotifier.
26✔
1757
                l.cfg.HtlcNotifier.NotifySettleEvent(
26✔
1758
                        newHtlcKey(pkt),
26✔
1759
                        htlc.PaymentPreimage,
26✔
1760
                        getEventType(pkt),
26✔
1761
                )
26✔
1762

26✔
1763
                // Immediately update the commitment tx to minimize latency.
26✔
1764
                l.updateCommitTxOrFail()
26✔
1765

1766
        case *lnwire.UpdateFailHTLC:
21✔
1767
                // If hodl.FailOutgoing mode is active, we exit early to
21✔
1768
                // simulate arbitrary delays between the switch adding a FAIL to
21✔
1769
                // the mailbox, and the HTLC being added to the commitment
21✔
1770
                // state.
21✔
1771
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
21✔
1772
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1773
                        l.mailBox.AckPacket(pkt.inKey())
×
1774
                        return
×
1775
                }
×
1776

1777
                // An HTLC cancellation has been triggered somewhere upstream,
1778
                // we'll remove then HTLC from our local state machine.
1779
                inKey := pkt.inKey()
21✔
1780
                err := l.channel.FailHTLC(
21✔
1781
                        pkt.incomingHTLCID,
21✔
1782
                        htlc.Reason,
21✔
1783
                        pkt.sourceRef,
21✔
1784
                        pkt.destRef,
21✔
1785
                        &inKey,
21✔
1786
                )
21✔
1787
                if err != nil {
26✔
1788
                        l.log.Errorf("unable to cancel incoming HTLC for "+
5✔
1789
                                "circuit-key=%v: %v", inKey, err)
5✔
1790

5✔
1791
                        // If the HTLC index for Fail response was not known to
5✔
1792
                        // our commitment state, it has already been cleaned up
5✔
1793
                        // by a prior response. We'll thus try to clean up any
5✔
1794
                        // lingering state to ensure we don't continue
5✔
1795
                        // reforwarding.
5✔
1796
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
7✔
1797
                                l.cleanupSpuriousResponse(pkt)
2✔
1798
                        }
2✔
1799

1800
                        // Remove the packet from the link's mailbox to ensure
1801
                        // it doesn't get replayed after a reconnection.
1802
                        l.mailBox.AckPacket(inKey)
5✔
1803

5✔
1804
                        return
5✔
1805
                }
1806

1807
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
19✔
1808
                        pkt.inKey(), pkt.outKey())
19✔
1809

19✔
1810
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
19✔
1811

19✔
1812
                // With the HTLC removed, we'll need to populate the wire
19✔
1813
                // message to target the specific channel and HTLC to be
19✔
1814
                // canceled. The "Reason" field will have already been set
19✔
1815
                // within the switch.
19✔
1816
                htlc.ChanID = l.ChanID()
19✔
1817
                htlc.ID = pkt.incomingHTLCID
19✔
1818

19✔
1819
                // We send the HTLC message to the peer which initially created
19✔
1820
                // the HTLC. If the incoming blinding point is non-nil, we
19✔
1821
                // know that we are a relaying node in a blinded path.
19✔
1822
                // Otherwise, we're either an introduction node or not part of
19✔
1823
                // a blinded path at all.
19✔
1824
                if err := l.sendIncomingHTLCFailureMsg(
19✔
1825
                        htlc.ID,
19✔
1826
                        pkt.obfuscator,
19✔
1827
                        htlc.Reason,
19✔
1828
                ); err != nil {
19✔
1829
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1830
                                err)
×
1831

×
1832
                        return
×
1833
                }
×
1834

1835
                // If the packet does not have a link failure set, it failed
1836
                // further down the route so we notify a forwarding failure.
1837
                // Otherwise, we notify a link failure because it failed at our
1838
                // node.
1839
                if pkt.linkFailure != nil {
32✔
1840
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
13✔
1841
                                newHtlcKey(pkt),
13✔
1842
                                newHtlcInfo(pkt),
13✔
1843
                                getEventType(pkt),
13✔
1844
                                pkt.linkFailure,
13✔
1845
                                false,
13✔
1846
                        )
13✔
1847
                } else {
22✔
1848
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
9✔
1849
                                newHtlcKey(pkt), getEventType(pkt),
9✔
1850
                        )
9✔
1851
                }
9✔
1852

1853
                // Immediately update the commitment tx to minimize latency.
1854
                l.updateCommitTxOrFail()
19✔
1855
        }
1856
}
1857

1858
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1859
// full.
1860
func (l *channelLink) tryBatchUpdateCommitTx() {
1,518✔
1861
        if l.channel.PendingLocalUpdateCount() < uint64(l.cfg.BatchSize) {
2,239✔
1862
                return
721✔
1863
        }
721✔
1864

1865
        l.updateCommitTxOrFail()
800✔
1866
}
1867

1868
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1869
// associated with this packet. If successful in doing so, it will also purge
1870
// the open circuit from the circuit map and remove the packet from the link's
1871
// mailbox.
1872
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1873
        inKey := pkt.inKey()
2✔
1874

2✔
1875
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1876
                "circuit-key=%v", inKey)
2✔
1877

2✔
1878
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1879
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1880
        if pkt.sourceRef == nil {
3✔
1881
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1882
                        "circuit-key=%v, does not contain source reference",
1✔
1883
                        inKey)
1✔
1884
                return
1✔
1885
        }
1✔
1886

1887
        // If the source reference is present,  we will try to prevent this link
1888
        // from resending the packet to the switch. To do so, we ack the AddRef
1889
        // of the incoming HTLC belonging to this link.
1890
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1891
        if err != nil {
1✔
1892
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1893
                        "circuit-key=%v: %v", inKey, err)
×
1894

×
1895
                // If this operation failed, it is unsafe to attempt removal of
×
1896
                // the destination reference or circuit, so we exit early. The
×
1897
                // cleanup may proceed with a different packet in the future
×
1898
                // that succeeds on this step.
×
1899
                return
×
1900
        }
×
1901

1902
        // Now that we know this link will stop retransmitting Adds to the
1903
        // switch, we can begin to teardown the response reference and circuit
1904
        // map.
1905
        //
1906
        // If the packet includes a destination reference, then a response for
1907
        // this HTLC was locked into the outgoing channel. Attempt to remove
1908
        // this reference, so we stop retransmitting the response internally.
1909
        // Even if this fails, we will proceed in trying to delete the circuit.
1910
        // When retransmitting responses, the destination references will be
1911
        // cleaned up if an open circuit is not found in the circuit map.
1912
        if pkt.destRef != nil {
1✔
1913
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1914
                if err != nil {
×
1915
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1916
                                "for incoming circuit-key=%v: %v",
×
1917
                                inKey, err)
×
1918
                }
×
1919
        }
1920

1921
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
1922

1✔
1923
        // With all known references acked, we can now safely delete the circuit
1✔
1924
        // from the switch's circuit map, as the state is no longer needed.
1✔
1925
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
1926
        if err != nil {
1✔
1927
                l.log.Errorf("unable to delete circuit for "+
×
1928
                        "circuit-key=%v: %v", inKey, err)
×
1929
        }
×
1930
}
1931

1932
// handleUpstreamMsg processes wire messages related to commitment state
1933
// updates from the upstream peer. The upstream peer is the peer whom we have a
1934
// direct channel with, updating our respective commitment chains.
1935
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
6,648✔
1936
        switch msg := msg.(type) {
6,648✔
1937

1938
        case *lnwire.UpdateAddHTLC:
1,493✔
1939
                if l.IsFlushing(Incoming) {
1,493✔
1940
                        // This is forbidden by the protocol specification.
×
1941
                        // The best chance we have to deal with this is to drop
×
1942
                        // the connection. This should roll back the channel
×
1943
                        // state to the last CommitSig. If the remote has
×
1944
                        // already sent a CommitSig we haven't received yet,
×
1945
                        // channel state will be re-synchronized with a
×
1946
                        // ChannelReestablish message upon reconnection and the
×
1947
                        // protocol state that caused us to flush the link will
×
1948
                        // be rolled back. In the event that there was some
×
1949
                        // non-deterministic behavior in the remote that caused
×
1950
                        // them to violate the protocol, we have a decent shot
×
1951
                        // at correcting it this way, since reconnecting will
×
1952
                        // put us in the cleanest possible state to try again.
×
1953
                        //
×
1954
                        // In addition to the above, it is possible for us to
×
1955
                        // hit this case in situations where we improperly
×
1956
                        // handle message ordering due to concurrency choices.
×
1957
                        // An issue has been filed to address this here:
×
1958
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
1959
                        l.fail(
×
1960
                                LinkFailureError{
×
1961
                                        code:             ErrInvalidUpdate,
×
1962
                                        FailureAction:    LinkFailureDisconnect,
×
1963
                                        PermanentFailure: false,
×
1964
                                        Warning:          true,
×
1965
                                },
×
1966
                                "received add while link is flushing",
×
1967
                        )
×
1968

×
1969
                        return
×
1970
                }
×
1971

1972
                // Disallow htlcs with blinding points set if we haven't
1973
                // enabled the feature. This saves us from having to process
1974
                // the onion at all, but will only catch blinded payments
1975
                // where we are a relaying node (as the blinding point will
1976
                // be in the payload when we're the introduction node).
1977
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
1,493✔
1978
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
1979
                                "blinding point included when route blinding "+
×
1980
                                        "is disabled")
×
1981

×
1982
                        return
×
1983
                }
×
1984

1985
                // We have to check the limit here rather than later in the
1986
                // switch because the counterparty can keep sending HTLC's
1987
                // without sending a revoke. This would mean that the switch
1988
                // check would only occur later.
1989
                if l.isOverexposedWithHtlc(msg, true) {
1,493✔
1990
                        l.fail(LinkFailureError{code: ErrInternalError},
×
1991
                                "peer sent us an HTLC that exceeded our max "+
×
1992
                                        "fee exposure")
×
1993

×
1994
                        return
×
1995
                }
×
1996

1997
                // We just received an add request from an upstream peer, so we
1998
                // add it to our state machine, then add the HTLC to our
1999
                // "settle" list in the event that we know the preimage.
2000
                index, err := l.channel.ReceiveHTLC(msg)
1,493✔
2001
                if err != nil {
1,493✔
2002
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2003
                                "unable to handle upstream add HTLC: %v", err)
×
2004
                        return
×
2005
                }
×
2006

2007
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
1,493✔
2008
                        "assigning index: %v", msg.PaymentHash[:], index)
1,493✔
2009

2010
        case *lnwire.UpdateFulfillHTLC:
663✔
2011
                pre := msg.PaymentPreimage
663✔
2012
                idx := msg.ID
663✔
2013

663✔
2014
                // Before we pipeline the settle, we'll check the set of active
663✔
2015
                // htlc's to see if the related UpdateAddHTLC has been fully
663✔
2016
                // locked-in.
663✔
2017
                var lockedin bool
663✔
2018
                htlcs := l.channel.ActiveHtlcs()
663✔
2019
                for _, add := range htlcs {
23,538✔
2020
                        // The HTLC will be outgoing and match idx.
22,875✔
2021
                        if !add.Incoming && add.HtlcIndex == idx {
23,536✔
2022
                                lockedin = true
661✔
2023
                                break
661✔
2024
                        }
2025
                }
2026

2027
                if !lockedin {
665✔
2028
                        l.fail(
2✔
2029
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
2030
                                "unable to handle upstream settle",
2✔
2031
                        )
2✔
2032
                        return
2✔
2033
                }
2✔
2034

2035
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
664✔
2036
                        l.fail(
3✔
2037
                                LinkFailureError{
3✔
2038
                                        code:          ErrInvalidUpdate,
3✔
2039
                                        FailureAction: LinkFailureForceClose,
3✔
2040
                                },
3✔
2041
                                "unable to handle upstream settle HTLC: %v", err,
3✔
2042
                        )
3✔
2043
                        return
3✔
2044
                }
3✔
2045

2046
                settlePacket := &htlcPacket{
661✔
2047
                        outgoingChanID: l.ShortChanID(),
661✔
2048
                        outgoingHTLCID: idx,
661✔
2049
                        htlc: &lnwire.UpdateFulfillHTLC{
661✔
2050
                                PaymentPreimage: pre,
661✔
2051
                        },
661✔
2052
                }
661✔
2053

661✔
2054
                // Add the newly discovered preimage to our growing list of
661✔
2055
                // uncommitted preimage. These will be written to the witness
661✔
2056
                // cache just before accepting the next commitment signature
661✔
2057
                // from the remote peer.
661✔
2058
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
661✔
2059

661✔
2060
                // Pipeline this settle, send it to the switch.
661✔
2061
                go l.forwardBatch(false, settlePacket)
661✔
2062

2063
        case *lnwire.UpdateFailMalformedHTLC:
6✔
2064
                // Convert the failure type encoded within the HTLC fail
6✔
2065
                // message to the proper generic lnwire error code.
6✔
2066
                var failure lnwire.FailureMessage
6✔
2067
                switch msg.FailureCode {
6✔
2068
                case lnwire.CodeInvalidOnionVersion:
4✔
2069
                        failure = &lnwire.FailInvalidOnionVersion{
4✔
2070
                                OnionSHA256: msg.ShaOnionBlob,
4✔
2071
                        }
4✔
2072
                case lnwire.CodeInvalidOnionHmac:
×
2073
                        failure = &lnwire.FailInvalidOnionHmac{
×
2074
                                OnionSHA256: msg.ShaOnionBlob,
×
2075
                        }
×
2076

2077
                case lnwire.CodeInvalidOnionKey:
×
2078
                        failure = &lnwire.FailInvalidOnionKey{
×
2079
                                OnionSHA256: msg.ShaOnionBlob,
×
2080
                        }
×
2081

2082
                // Handle malformed errors that are part of a blinded route.
2083
                // This case is slightly different, because we expect every
2084
                // relaying node in the blinded portion of the route to send
2085
                // malformed errors. If we're also a relaying node, we're
2086
                // likely going to switch this error out anyway for our own
2087
                // malformed error, but we handle the case here for
2088
                // completeness.
2089
                case lnwire.CodeInvalidBlinding:
3✔
2090
                        failure = &lnwire.FailInvalidBlinding{
3✔
2091
                                OnionSHA256: msg.ShaOnionBlob,
3✔
2092
                        }
3✔
2093

2094
                default:
2✔
2095
                        l.log.Warnf("unexpected failure code received in "+
2✔
2096
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2097

2✔
2098
                        // We don't just pass back the error we received from
2✔
2099
                        // our successor. Otherwise we might report a failure
2✔
2100
                        // that penalizes us more than needed. If the onion that
2✔
2101
                        // we forwarded was correct, the node should have been
2✔
2102
                        // able to send back its own failure. The node did not
2✔
2103
                        // send back its own failure, so we assume there was a
2✔
2104
                        // problem with the onion and report that back. We reuse
2✔
2105
                        // the invalid onion key failure because there is no
2✔
2106
                        // specific error for this case.
2✔
2107
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2108
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2109
                        }
2✔
2110
                }
2111

2112
                // With the error parsed, we'll convert the into it's opaque
2113
                // form.
2114
                var b bytes.Buffer
6✔
2115
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
6✔
2116
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2117
                        return
×
2118
                }
×
2119

2120
                // If remote side have been unable to parse the onion blob we
2121
                // have sent to it, than we should transform the malformed HTLC
2122
                // message to the usual HTLC fail message.
2123
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
6✔
2124
                if err != nil {
6✔
2125
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2126
                                "unable to handle upstream fail HTLC: %v", err)
×
2127
                        return
×
2128
                }
×
2129

2130
        case *lnwire.UpdateFailHTLC:
123✔
2131
                // Verify that the failure reason is at least 256 bytes plus
123✔
2132
                // overhead.
123✔
2133
                const minimumFailReasonLength = lnwire.FailureMessageLength +
123✔
2134
                        2 + 2 + 32
123✔
2135

123✔
2136
                if len(msg.Reason) < minimumFailReasonLength {
124✔
2137
                        // We've received a reason with a non-compliant length.
1✔
2138
                        // Older nodes happily relay back these failures that
1✔
2139
                        // may originate from a node further downstream.
1✔
2140
                        // Therefore we can't just fail the channel.
1✔
2141
                        //
1✔
2142
                        // We want to be compliant ourselves, so we also can't
1✔
2143
                        // pass back the reason unmodified. And we must make
1✔
2144
                        // sure that we don't hit the magic length check of 260
1✔
2145
                        // bytes in processRemoteSettleFails either.
1✔
2146
                        //
1✔
2147
                        // Because the reason is unreadable for the payer
1✔
2148
                        // anyway, we just replace it by a compliant-length
1✔
2149
                        // series of random bytes.
1✔
2150
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2151
                        _, err := crand.Read(msg.Reason[:])
1✔
2152
                        if err != nil {
1✔
2153
                                l.log.Errorf("Random generation error: %v", err)
×
2154

×
2155
                                return
×
2156
                        }
×
2157
                }
2158

2159
                // Add fail to the update log.
2160
                idx := msg.ID
123✔
2161
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
123✔
2162
                if err != nil {
123✔
2163
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2164
                                "unable to handle upstream fail HTLC: %v", err)
×
2165
                        return
×
2166
                }
×
2167

2168
        case *lnwire.CommitSig:
2,194✔
2169
                // Since we may have learned new preimages for the first time,
2,194✔
2170
                // we'll add them to our preimage cache. By doing this, we
2,194✔
2171
                // ensure any contested contracts watched by any on-chain
2,194✔
2172
                // arbitrators can now sweep this HTLC on-chain. We delay
2,194✔
2173
                // committing the preimages until just before accepting the new
2,194✔
2174
                // remote commitment, as afterwards the peer won't resend the
2,194✔
2175
                // Settle messages on the next channel reestablishment. Doing so
2,194✔
2176
                // allows us to more effectively batch this operation, instead
2,194✔
2177
                // of doing a single write per preimage.
2,194✔
2178
                err := l.cfg.PreimageCache.AddPreimages(
2,194✔
2179
                        l.uncommittedPreimages...,
2,194✔
2180
                )
2,194✔
2181
                if err != nil {
2,194✔
2182
                        l.fail(
×
2183
                                LinkFailureError{code: ErrInternalError},
×
2184
                                "unable to add preimages=%v to cache: %v",
×
2185
                                l.uncommittedPreimages, err,
×
2186
                        )
×
2187
                        return
×
2188
                }
×
2189

2190
                // Instead of truncating the slice to conserve memory
2191
                // allocations, we simply set the uncommitted preimage slice to
2192
                // nil so that a new one will be initialized if any more
2193
                // witnesses are discovered. We do this because the maximum size
2194
                // that the slice can occupy is 15KB, and we want to ensure we
2195
                // release that memory back to the runtime.
2196
                l.uncommittedPreimages = nil
2,194✔
2197

2,194✔
2198
                // We just received a new updates to our local commitment
2,194✔
2199
                // chain, validate this new commitment, closing the link if
2,194✔
2200
                // invalid.
2,194✔
2201
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
2,194✔
2202
                        CommitSig:  msg.CommitSig,
2,194✔
2203
                        HtlcSigs:   msg.HtlcSigs,
2,194✔
2204
                        PartialSig: msg.PartialSig,
2,194✔
2205
                })
2,194✔
2206
                if err != nil {
2,194✔
2207
                        // If we were unable to reconstruct their proposed
×
2208
                        // commitment, then we'll examine the type of error. If
×
2209
                        // it's an InvalidCommitSigError, then we'll send a
×
2210
                        // direct error.
×
2211
                        var sendData []byte
×
2212
                        switch err.(type) {
×
2213
                        case *lnwallet.InvalidCommitSigError:
×
2214
                                sendData = []byte(err.Error())
×
2215
                        case *lnwallet.InvalidHtlcSigError:
×
2216
                                sendData = []byte(err.Error())
×
2217
                        }
2218
                        l.fail(
×
2219
                                LinkFailureError{
×
2220
                                        code:          ErrInvalidCommitment,
×
2221
                                        FailureAction: LinkFailureForceClose,
×
2222
                                        SendData:      sendData,
×
2223
                                },
×
2224
                                "ChannelPoint(%v): unable to accept new "+
×
2225
                                        "commitment: %v",
×
2226
                                l.channel.ChannelPoint(), err,
×
2227
                        )
×
2228
                        return
×
2229
                }
2230

2231
                // As we've just accepted a new state, we'll now
2232
                // immediately send the remote peer a revocation for our prior
2233
                // state.
2234
                nextRevocation, currentHtlcs, finalHTLCs, err :=
2,194✔
2235
                        l.channel.RevokeCurrentCommitment()
2,194✔
2236
                if err != nil {
2,194✔
2237
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2238

×
2239
                        // We need to fail the channel in case revoking our
×
2240
                        // local commitment does not succeed. We might have
×
2241
                        // already advanced our channel state which would lead
×
2242
                        // us to proceed with an unclean state.
×
2243
                        //
×
2244
                        // NOTE: We do not trigger a force close because this
×
2245
                        // could resolve itself in case our db was just busy
×
2246
                        // not accepting new transactions.
×
2247
                        l.fail(
×
2248
                                LinkFailureError{
×
2249
                                        code:          ErrInternalError,
×
2250
                                        Warning:       true,
×
2251
                                        FailureAction: LinkFailureDisconnect,
×
2252
                                },
×
2253
                                "ChannelPoint(%v): unable to accept new "+
×
2254
                                        "commitment: %v",
×
2255
                                l.channel.ChannelPoint(), err,
×
2256
                        )
×
2257
                        return
×
2258
                }
×
2259

2260
                // As soon as we are ready to send our next revocation, we can
2261
                // invoke the incoming commit hooks.
2262
                l.RWMutex.Lock()
2,194✔
2263
                l.incomingCommitHooks.invoke()
2,194✔
2264
                l.RWMutex.Unlock()
2,194✔
2265

2,194✔
2266
                l.cfg.Peer.SendMessage(false, nextRevocation)
2,194✔
2267

2,194✔
2268
                // Notify the incoming htlcs of which the resolutions were
2,194✔
2269
                // locked in.
2,194✔
2270
                for id, settled := range finalHTLCs {
2,964✔
2271
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
770✔
2272
                                models.CircuitKey{
770✔
2273
                                        ChanID: l.ShortChanID(),
770✔
2274
                                        HtlcID: id,
770✔
2275
                                },
770✔
2276
                                channeldb.FinalHtlcInfo{
770✔
2277
                                        Settled:  settled,
770✔
2278
                                        Offchain: true,
770✔
2279
                                },
770✔
2280
                        )
770✔
2281
                }
770✔
2282

2283
                // Since we just revoked our commitment, we may have a new set
2284
                // of HTLC's on our commitment, so we'll send them using our
2285
                // function closure NotifyContractUpdate.
2286
                newUpdate := &contractcourt.ContractUpdate{
2,194✔
2287
                        HtlcKey: contractcourt.LocalHtlcSet,
2,194✔
2288
                        Htlcs:   currentHtlcs,
2,194✔
2289
                }
2,194✔
2290
                err = l.cfg.NotifyContractUpdate(newUpdate)
2,194✔
2291
                if err != nil {
2,194✔
2292
                        l.log.Errorf("unable to notify contract update: %v",
×
2293
                                err)
×
2294
                        return
×
2295
                }
×
2296

2297
                select {
2,194✔
2298
                case <-l.quit:
1✔
2299
                        return
1✔
2300
                default:
2,193✔
2301
                }
2302

2303
                // If the remote party initiated the state transition,
2304
                // we'll reply with a signature to provide them with their
2305
                // version of the latest commitment. Otherwise, both commitment
2306
                // chains are fully synced from our PoV, then we don't need to
2307
                // reply with a signature as both sides already have a
2308
                // commitment with the latest accepted.
2309
                if l.channel.OweCommitment() {
3,377✔
2310
                        if !l.updateCommitTxOrFail() {
1,184✔
UNCOV
2311
                                return
×
UNCOV
2312
                        }
×
2313
                }
2314

2315
                // Now that we have finished processing the incoming CommitSig
2316
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2317
                // the channel state is clean.
2318
                l.RWMutex.Lock()
2,193✔
2319
                if l.channel.IsChannelClean() {
2,420✔
2320
                        l.flushHooks.invoke()
227✔
2321
                }
227✔
2322
                l.RWMutex.Unlock()
2,193✔
2323

2324
        case *lnwire.RevokeAndAck:
2,180✔
2325
                // We've received a revocation from the remote chain, if valid,
2,180✔
2326
                // this moves the remote chain forward, and expands our
2,180✔
2327
                // revocation window.
2,180✔
2328

2,180✔
2329
                // We now process the message and advance our remote commit
2,180✔
2330
                // chain.
2,180✔
2331
                fwdPkg, adds, settleFails, remoteHTLCs, err := l.channel.
2,180✔
2332
                        ReceiveRevocation(msg)
2,180✔
2333
                if err != nil {
2,180✔
2334
                        // TODO(halseth): force close?
×
2335
                        l.fail(
×
2336
                                LinkFailureError{
×
2337
                                        code:          ErrInvalidRevocation,
×
2338
                                        FailureAction: LinkFailureDisconnect,
×
2339
                                },
×
2340
                                "unable to accept revocation: %v", err,
×
2341
                        )
×
2342
                        return
×
2343
                }
×
2344

2345
                // The remote party now has a new primary commitment, so we'll
2346
                // update the contract court to be aware of this new set (the
2347
                // prior old remote pending).
2348
                newUpdate := &contractcourt.ContractUpdate{
2,180✔
2349
                        HtlcKey: contractcourt.RemoteHtlcSet,
2,180✔
2350
                        Htlcs:   remoteHTLCs,
2,180✔
2351
                }
2,180✔
2352
                err = l.cfg.NotifyContractUpdate(newUpdate)
2,180✔
2353
                if err != nil {
2,180✔
2354
                        l.log.Errorf("unable to notify contract update: %v",
×
2355
                                err)
×
2356
                        return
×
2357
                }
×
2358

2359
                select {
2,180✔
2360
                case <-l.quit:
4✔
2361
                        return
4✔
2362
                default:
2,176✔
2363
                }
2364

2365
                // If we have a tower client for this channel type, we'll
2366
                // create a backup for the current state.
2367
                if l.cfg.TowerClient != nil {
2,179✔
2368
                        state := l.channel.State()
3✔
2369
                        chanID := l.ChanID()
3✔
2370

3✔
2371
                        err = l.cfg.TowerClient.BackupState(
3✔
2372
                                &chanID, state.RemoteCommitment.CommitHeight-1,
3✔
2373
                        )
3✔
2374
                        if err != nil {
3✔
2375
                                l.fail(LinkFailureError{code: ErrInternalError},
×
2376
                                        "unable to queue breach backup: %v",
×
2377
                                        err)
×
2378
                                return
×
2379
                        }
×
2380
                }
2381

2382
                l.processRemoteSettleFails(fwdPkg, settleFails)
2,176✔
2383
                l.processRemoteAdds(fwdPkg, adds)
2,176✔
2384

2,176✔
2385
                // If the link failed during processing the adds, we must
2,176✔
2386
                // return to ensure we won't attempted to update the state
2,176✔
2387
                // further.
2,176✔
2388
                if l.failed {
2,176✔
2389
                        return
×
2390
                }
×
2391

2392
                // The revocation window opened up. If there are pending local
2393
                // updates, try to update the commit tx. Pending updates could
2394
                // already have been present because of a previously failed
2395
                // update to the commit tx or freshly added in by
2396
                // processRemoteAdds. Also in case there are no local updates,
2397
                // but there are still remote updates that are not in the remote
2398
                // commit tx yet, send out an update.
2399
                if l.channel.OweCommitment() {
2,659✔
2400
                        if !l.updateCommitTxOrFail() {
485✔
2401
                                return
2✔
2402
                        }
2✔
2403
                }
2404

2405
                // Now that we have finished processing the RevokeAndAck, we
2406
                // can invoke the flushHooks if the channel state is clean.
2407
                l.RWMutex.Lock()
2,174✔
2408
                if l.channel.IsChannelClean() {
2,346✔
2409
                        l.flushHooks.invoke()
172✔
2410
                }
172✔
2411
                l.RWMutex.Unlock()
2,174✔
2412

2413
        case *lnwire.UpdateFee:
3✔
2414
                // Check and see if their proposed fee-rate would make us
3✔
2415
                // exceed the fee threshold.
3✔
2416
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2417

3✔
2418
                isDust, err := l.exceedsFeeExposureLimit(fee)
3✔
2419
                if err != nil {
3✔
2420
                        // This shouldn't typically happen. If it does, it
×
2421
                        // indicates something is wrong with our channel state.
×
2422
                        l.log.Errorf("Unable to determine if fee threshold " +
×
2423
                                "exceeded")
×
2424
                        l.fail(LinkFailureError{code: ErrInternalError},
×
2425
                                "error calculating fee exposure: %v", err)
×
2426

×
2427
                        return
×
2428
                }
×
2429

2430
                if isDust {
3✔
2431
                        // The proposed fee-rate makes us exceed the fee
×
2432
                        // threshold.
×
2433
                        l.fail(LinkFailureError{code: ErrInternalError},
×
2434
                                "fee threshold exceeded: %v", err)
×
2435
                        return
×
2436
                }
×
2437

2438
                // We received fee update from peer. If we are the initiator we
2439
                // will fail the channel, if not we will apply the update.
2440
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2441
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2442
                                "error receiving fee update: %v", err)
×
2443
                        return
×
2444
                }
×
2445

2446
                // Update the mailbox's feerate as well.
2447
                l.mailBox.SetFeeRate(fee)
3✔
2448

2449
        // In the case where we receive a warning message from our peer, just
2450
        // log it and move on. We choose not to disconnect from our peer,
2451
        // although we "MAY" do so according to the specification.
2452
        case *lnwire.Warning:
1✔
2453
                l.log.Warnf("received warning message from peer: %v",
1✔
2454
                        msg.Warning())
1✔
2455

2456
        case *lnwire.Error:
2✔
2457
                // Error received from remote, MUST fail channel, but should
2✔
2458
                // only print the contents of the error message if all
2✔
2459
                // characters are printable ASCII.
2✔
2460
                l.fail(
2✔
2461
                        LinkFailureError{
2✔
2462
                                code: ErrRemoteError,
2✔
2463

2✔
2464
                                // TODO(halseth): we currently don't fail the
2✔
2465
                                // channel permanently, as there are some sync
2✔
2466
                                // issues with other implementations that will
2✔
2467
                                // lead to them sending an error message, but
2✔
2468
                                // we can recover from on next connection. See
2✔
2469
                                // https://github.com/ElementsProject/lightning/issues/4212
2✔
2470
                                PermanentFailure: false,
2✔
2471
                        },
2✔
2472
                        "ChannelPoint(%v): received error from peer: %v",
2✔
2473
                        l.channel.ChannelPoint(), msg.Error(),
2✔
2474
                )
2✔
2475
        default:
×
2476
                l.log.Warnf("received unknown message of type %T", msg)
×
2477
        }
2478

2479
}
2480

2481
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2482
// for packets delivered from server, and cleaning up any circuits closed by
2483
// signing a previous commitment txn. This method ensures that the circuits are
2484
// removed from the circuit map before removing them from the link's mailbox,
2485
// otherwise it could be possible for some circuit to be missed if this link
2486
// flaps.
2487
func (l *channelLink) ackDownStreamPackets() error {
2,367✔
2488
        // First, remove the downstream Add packets that were included in the
2,367✔
2489
        // previous commitment signature. This will prevent the Adds from being
2,367✔
2490
        // replayed if this link disconnects.
2,367✔
2491
        for _, inKey := range l.openedCircuits {
3,874✔
2492
                // In order to test the sphinx replay logic of the remote
1,507✔
2493
                // party, unsafe replay does not acknowledge the packets from
1,507✔
2494
                // the mailbox. We can then force a replay of any Add packets
1,507✔
2495
                // held in memory by disconnecting and reconnecting the link.
1,507✔
2496
                if l.cfg.UnsafeReplay {
1,510✔
2497
                        continue
3✔
2498
                }
2499

2500
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
1,507✔
2501
                l.mailBox.AckPacket(inKey)
1,507✔
2502
        }
2503

2504
        // Now, we will delete all circuits closed by the previous commitment
2505
        // signature, which is the result of downstream Settle/Fail packets. We
2506
        // batch them here to ensure circuits are closed atomically and for
2507
        // performance.
2508
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
2,367✔
2509
        switch err {
2,367✔
2510
        case nil:
2,367✔
2511
                // Successful deletion.
2512

2513
        default:
×
2514
                l.log.Errorf("unable to delete %d circuits: %v",
×
2515
                        len(l.closedCircuits), err)
×
2516
                return err
×
2517
        }
2518

2519
        // With the circuits removed from memory and disk, we now ack any
2520
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2521
        // after startup. If forgive is enabled and we've reached this point,
2522
        // the circuits must have been removed at some point, so it is now safe
2523
        // to un-queue the corresponding Settle/Fails.
2524
        for _, inKey := range l.closedCircuits {
2,409✔
2525
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
42✔
2526
                        inKey)
42✔
2527
                l.mailBox.AckPacket(inKey)
42✔
2528
        }
42✔
2529

2530
        // Lastly, reset our buffers to be empty while keeping any acquired
2531
        // growth in the backing array.
2532
        l.openedCircuits = l.openedCircuits[:0]
2,367✔
2533
        l.closedCircuits = l.closedCircuits[:0]
2,367✔
2534

2,367✔
2535
        return nil
2,367✔
2536
}
2537

2538
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2539
// the link.
2540
func (l *channelLink) updateCommitTxOrFail() bool {
2,781✔
2541
        err := l.updateCommitTx()
2,781✔
2542
        switch err {
2,781✔
2543
        // No error encountered, success.
2544
        case nil:
2,778✔
2545

2546
        // A duplicate keystone error should be resolved and is not fatal, so
2547
        // we won't send an Error message to the peer.
2548
        case ErrDuplicateKeystone:
×
2549
                l.fail(LinkFailureError{code: ErrCircuitError},
×
2550
                        "temporary circuit error: %v", err)
×
2551
                return false
×
2552

2553
        // Any other error is treated results in an Error message being sent to
2554
        // the peer.
2555
        default:
3✔
2556
                l.fail(LinkFailureError{code: ErrInternalError},
3✔
2557
                        "unable to update commitment: %v", err)
3✔
2558
                return false
3✔
2559
        }
2560

2561
        return true
2,778✔
2562
}
2563

2564
// updateCommitTx signs, then sends an update to the remote peer adding a new
2565
// commitment to their commitment chain which includes all the latest updates
2566
// we've received+processed up to this point.
2567
func (l *channelLink) updateCommitTx() error {
3,272✔
2568
        // Preemptively write all pending keystones to disk, just in case the
3,272✔
2569
        // HTLCs we have in memory are included in the subsequent attempt to
3,272✔
2570
        // sign a commitment state.
3,272✔
2571
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
3,272✔
2572
        if err != nil {
3,272✔
2573
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2574
                // it.
×
2575
                return err
×
2576
        }
×
2577

2578
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2579
        l.keystoneBatch = l.keystoneBatch[:0]
3,272✔
2580

3,272✔
2581
        // If hodl.Commit mode is active, we will refrain from attempting to
3,272✔
2582
        // commit any in-memory modifications to the channel state. Exiting here
3,272✔
2583
        // permits testing of either the switch or link's ability to trim
3,272✔
2584
        // circuits that have been opened, but unsuccessfully committed.
3,272✔
2585
        if l.cfg.HodlMask.Active(hodl.Commit) {
3,279✔
2586
                l.log.Warnf(hodl.Commit.Warning())
7✔
2587
                return nil
7✔
2588
        }
7✔
2589

2590
        newCommit, err := l.channel.SignNextCommitment()
3,268✔
2591
        if err == lnwallet.ErrNoWindow {
4,340✔
2592
                l.cfg.PendingCommitTicker.Resume()
1,072✔
2593
                l.log.Trace("PendingCommitTicker resumed")
1,072✔
2594

1,072✔
2595
                l.log.Tracef("revocation window exhausted, unable to send: "+
1,072✔
2596
                        "%v, pend_updates=%v, dangling_closes%v",
1,072✔
2597
                        l.channel.PendingLocalUpdateCount(),
1,072✔
2598
                        lnutils.SpewLogClosure(l.openedCircuits),
1,072✔
2599
                        lnutils.SpewLogClosure(l.closedCircuits))
1,072✔
2600

1,072✔
2601
                return nil
1,072✔
2602
        } else if err != nil {
3,271✔
2603
                return err
×
2604
        }
×
2605

2606
        if err := l.ackDownStreamPackets(); err != nil {
2,199✔
2607
                return err
×
2608
        }
×
2609

2610
        l.cfg.PendingCommitTicker.Pause()
2,199✔
2611
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
2,199✔
2612

2,199✔
2613
        // The remote party now has a new pending commitment, so we'll update
2,199✔
2614
        // the contract court to be aware of this new set (the prior old remote
2,199✔
2615
        // pending).
2,199✔
2616
        newUpdate := &contractcourt.ContractUpdate{
2,199✔
2617
                HtlcKey: contractcourt.RemotePendingHtlcSet,
2,199✔
2618
                Htlcs:   newCommit.PendingHTLCs,
2,199✔
2619
        }
2,199✔
2620
        err = l.cfg.NotifyContractUpdate(newUpdate)
2,199✔
2621
        if err != nil {
2,199✔
2622
                l.log.Errorf("unable to notify contract update: %v", err)
×
2623
                return err
×
2624
        }
×
2625

2626
        select {
2,199✔
2627
        case <-l.quit:
4✔
2628
                return ErrLinkShuttingDown
4✔
2629
        default:
2,195✔
2630
        }
2631

2632
        commitSig := &lnwire.CommitSig{
2,195✔
2633
                ChanID:     l.ChanID(),
2,195✔
2634
                CommitSig:  newCommit.CommitSig,
2,195✔
2635
                HtlcSigs:   newCommit.HtlcSigs,
2,195✔
2636
                PartialSig: newCommit.PartialSig,
2,195✔
2637
        }
2,195✔
2638
        l.cfg.Peer.SendMessage(false, commitSig)
2,195✔
2639

2,195✔
2640
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2,195✔
2641
        // of commit hooks.
2,195✔
2642
        l.RWMutex.Lock()
2,195✔
2643
        l.outgoingCommitHooks.invoke()
2,195✔
2644
        l.RWMutex.Unlock()
2,195✔
2645

2,195✔
2646
        return nil
2,195✔
2647
}
2648

2649
// Peer returns the representation of remote peer with which we have the
2650
// channel link opened.
2651
//
2652
// NOTE: Part of the ChannelLink interface.
2653
func (l *channelLink) PeerPubKey() [33]byte {
440✔
2654
        return l.cfg.Peer.PubKey()
440✔
2655
}
440✔
2656

2657
// ChannelPoint returns the channel outpoint for the channel link.
2658
// NOTE: Part of the ChannelLink interface.
2659
func (l *channelLink) ChannelPoint() wire.OutPoint {
847✔
2660
        return l.channel.ChannelPoint()
847✔
2661
}
847✔
2662

2663
// ShortChanID returns the short channel ID for the channel link. The short
2664
// channel ID encodes the exact location in the main chain that the original
2665
// funding output can be found.
2666
//
2667
// NOTE: Part of the ChannelLink interface.
2668
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
9,485✔
2669
        l.RLock()
9,485✔
2670
        defer l.RUnlock()
9,485✔
2671

9,485✔
2672
        return l.channel.ShortChanID()
9,485✔
2673
}
9,485✔
2674

2675
// UpdateShortChanID updates the short channel ID for a link. This may be
2676
// required in the event that a link is created before the short chan ID for it
2677
// is known, or a re-org occurs, and the funding transaction changes location
2678
// within the chain.
2679
//
2680
// NOTE: Part of the ChannelLink interface.
2681
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
3✔
2682
        chanID := l.ChanID()
3✔
2683

3✔
2684
        // Refresh the channel state's short channel ID by loading it from disk.
3✔
2685
        // This ensures that the channel state accurately reflects the updated
3✔
2686
        // short channel ID.
3✔
2687
        err := l.channel.State().Refresh()
3✔
2688
        if err != nil {
3✔
2689
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2690
                        "%v", chanID, err)
×
2691
                return hop.Source, err
×
2692
        }
×
2693

2694
        return hop.Source, nil
3✔
2695
}
2696

2697
// ChanID returns the channel ID for the channel link. The channel ID is a more
2698
// compact representation of a channel's full outpoint.
2699
//
2700
// NOTE: Part of the ChannelLink interface.
2701
func (l *channelLink) ChanID() lnwire.ChannelID {
6,993✔
2702
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
6,993✔
2703
}
6,993✔
2704

2705
// Bandwidth returns the total amount that can flow through the channel link at
2706
// this given instance. The value returned is expressed in millisatoshi and can
2707
// be used by callers when making forwarding decisions to determine if a link
2708
// can accept an HTLC.
2709
//
2710
// NOTE: Part of the ChannelLink interface.
2711
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
1,854✔
2712
        // Get the balance available on the channel for new HTLCs. This takes
1,854✔
2713
        // the channel reserve into account so HTLCs up to this value won't
1,854✔
2714
        // violate it.
1,854✔
2715
        return l.channel.AvailableBalance()
1,854✔
2716
}
1,854✔
2717

2718
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2719
// amount provided to the link. This check does not reserve a space, since
2720
// forwards or other payments may use the available slot, so it should be
2721
// considered best-effort.
2722
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
3✔
2723
        return l.channel.MayAddOutgoingHtlc(amt)
3✔
2724
}
3✔
2725

2726
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2727
// method.
2728
//
2729
// NOTE: Part of the dustHandler interface.
2730
func (l *channelLink) getDustSum(whoseCommit lntypes.ChannelParty,
2731
        dryRunFee fn.Option[chainfee.SatPerKWeight]) lnwire.MilliSatoshi {
7,902✔
2732

7,902✔
2733
        return l.channel.GetDustSum(whoseCommit, dryRunFee)
7,902✔
2734
}
7,902✔
2735

2736
// getFeeRate is a wrapper method that retrieves the underlying channel's
2737
// feerate.
2738
//
2739
// NOTE: Part of the dustHandler interface.
2740
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
1,711✔
2741
        return l.channel.CommitFeeRate()
1,711✔
2742
}
1,711✔
2743

2744
// getDustClosure returns a closure that can be used by the switch or mailbox
2745
// to evaluate whether a given HTLC is dust.
2746
//
2747
// NOTE: Part of the dustHandler interface.
2748
func (l *channelLink) getDustClosure() dustClosure {
4,721✔
2749
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
4,721✔
2750
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
4,721✔
2751
        chanType := l.channel.State().ChanType
4,721✔
2752

4,721✔
2753
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
4,721✔
2754
}
4,721✔
2755

2756
// getCommitFee returns either the local or remote CommitFee in satoshis. This
2757
// is used so that the Switch can have access to the commitment fee without
2758
// needing to have a *LightningChannel. This doesn't include dust.
2759
//
2760
// NOTE: Part of the dustHandler interface.
2761
func (l *channelLink) getCommitFee(remote bool) btcutil.Amount {
6,406✔
2762
        if remote {
9,802✔
2763
                return l.channel.State().RemoteCommitment.CommitFee
3,396✔
2764
        }
3,396✔
2765

2766
        return l.channel.State().LocalCommitment.CommitFee
3,013✔
2767
}
2768

2769
// exceedsFeeExposureLimit returns whether or not the new proposed fee-rate
2770
// increases the total dust and fees within the channel past the configured
2771
// fee threshold. It first calculates the dust sum over every update in the
2772
// update log with the proposed fee-rate and taking into account both the local
2773
// and remote dust limits. It uses every update in the update log instead of
2774
// what is actually on the local and remote commitments because it is assumed
2775
// that in a worst-case scenario, every update in the update log could
2776
// theoretically be on either commitment transaction and this needs to be
2777
// accounted for with this fee-rate. It then calculates the local and remote
2778
// commitment fees given the proposed fee-rate. Finally, it tallies the results
2779
// and determines if the fee threshold has been exceeded.
2780
func (l *channelLink) exceedsFeeExposureLimit(
2781
        feePerKw chainfee.SatPerKWeight) (bool, error) {
6✔
2782

6✔
2783
        dryRunFee := fn.Some[chainfee.SatPerKWeight](feePerKw)
6✔
2784

6✔
2785
        // Get the sum of dust for both the local and remote commitments using
6✔
2786
        // this "dry-run" fee.
6✔
2787
        localDustSum := l.getDustSum(lntypes.Local, dryRunFee)
6✔
2788
        remoteDustSum := l.getDustSum(lntypes.Remote, dryRunFee)
6✔
2789

6✔
2790
        // Calculate the local and remote commitment fees using this dry-run
6✔
2791
        // fee.
6✔
2792
        localFee, remoteFee, err := l.channel.CommitFeeTotalAt(feePerKw)
6✔
2793
        if err != nil {
6✔
2794
                return false, err
×
2795
        }
×
2796

2797
        // Finally, check whether the max fee exposure was exceeded on either
2798
        // future commitment transaction with the fee-rate.
2799
        totalLocalDust := localDustSum + lnwire.NewMSatFromSatoshis(localFee)
6✔
2800
        if totalLocalDust > l.cfg.MaxFeeExposure {
6✔
2801
                return true, nil
×
2802
        }
×
2803

2804
        totalRemoteDust := remoteDustSum + lnwire.NewMSatFromSatoshis(
6✔
2805
                remoteFee,
6✔
2806
        )
6✔
2807

6✔
2808
        return totalRemoteDust > l.cfg.MaxFeeExposure, nil
6✔
2809
}
2810

2811
// isOverexposedWithHtlc calculates whether the proposed HTLC will make the
2812
// channel exceed the fee threshold. It first fetches the largest fee-rate that
2813
// may be on any unrevoked commitment transaction. Then, using this fee-rate,
2814
// determines if the to-be-added HTLC is dust. If the HTLC is dust, it adds to
2815
// the overall dust sum. If it is not dust, it contributes to weight, which
2816
// also adds to the overall dust sum by an increase in fees. If the dust sum on
2817
// either commitment exceeds the configured fee threshold, this function
2818
// returns true.
2819
func (l *channelLink) isOverexposedWithHtlc(htlc *lnwire.UpdateAddHTLC,
2820
        incoming bool) bool {
3,013✔
2821

3,013✔
2822
        dustClosure := l.getDustClosure()
3,013✔
2823

3,013✔
2824
        feeRate := l.channel.WorstCaseFeeRate()
3,013✔
2825

3,013✔
2826
        amount := htlc.Amount.ToSatoshis()
3,013✔
2827

3,013✔
2828
        // See if this HTLC is dust on both the local and remote commitments.
3,013✔
2829
        isLocalDust := dustClosure(feeRate, incoming, lntypes.Local, amount)
3,013✔
2830
        isRemoteDust := dustClosure(feeRate, incoming, lntypes.Remote, amount)
3,013✔
2831

3,013✔
2832
        // Calculate the dust sum for the local and remote commitments.
3,013✔
2833
        localDustSum := l.getDustSum(
3,013✔
2834
                lntypes.Local, fn.None[chainfee.SatPerKWeight](),
3,013✔
2835
        )
3,013✔
2836
        remoteDustSum := l.getDustSum(
3,013✔
2837
                lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
3,013✔
2838
        )
3,013✔
2839

3,013✔
2840
        // Grab the larger of the local and remote commitment fees w/o dust.
3,013✔
2841
        commitFee := l.getCommitFee(false)
3,013✔
2842

3,013✔
2843
        if l.getCommitFee(true) > commitFee {
3,399✔
2844
                commitFee = l.getCommitFee(true)
386✔
2845
        }
386✔
2846

2847
        localDustSum += lnwire.NewMSatFromSatoshis(commitFee)
3,013✔
2848
        remoteDustSum += lnwire.NewMSatFromSatoshis(commitFee)
3,013✔
2849

3,013✔
2850
        // Calculate the additional fee increase if this is a non-dust HTLC.
3,013✔
2851
        weight := lntypes.WeightUnit(input.HTLCWeight)
3,013✔
2852
        additional := lnwire.NewMSatFromSatoshis(
3,013✔
2853
                feeRate.FeeForWeight(weight),
3,013✔
2854
        )
3,013✔
2855

3,013✔
2856
        if isLocalDust {
4,865✔
2857
                // If this is dust, it doesn't contribute to weight but does
1,852✔
2858
                // contribute to the overall dust sum.
1,852✔
2859
                localDustSum += lnwire.NewMSatFromSatoshis(amount)
1,852✔
2860
        } else {
3,016✔
2861
                // Account for the fee increase that comes with an increase in
1,164✔
2862
                // weight.
1,164✔
2863
                localDustSum += additional
1,164✔
2864
        }
1,164✔
2865

2866
        if localDustSum > l.cfg.MaxFeeExposure {
3,017✔
2867
                // The max fee exposure was exceeded.
4✔
2868
                return true
4✔
2869
        }
4✔
2870

2871
        if isRemoteDust {
4,858✔
2872
                // If this is dust, it doesn't contribute to weight but does
1,849✔
2873
                // contribute to the overall dust sum.
1,849✔
2874
                remoteDustSum += lnwire.NewMSatFromSatoshis(amount)
1,849✔
2875
        } else {
3,012✔
2876
                // Account for the fee increase that comes with an increase in
1,163✔
2877
                // weight.
1,163✔
2878
                remoteDustSum += additional
1,163✔
2879
        }
1,163✔
2880

2881
        return remoteDustSum > l.cfg.MaxFeeExposure
3,009✔
2882
}
2883

2884
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2885
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2886
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2887
// whether to evaluate on the local or remote commit, and finally an HTLC
2888
// amount to test.
2889
type dustClosure func(feerate chainfee.SatPerKWeight, incoming bool,
2890
        whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool
2891

2892
// dustHelper is used to construct the dustClosure.
2893
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2894
        remoteDustLimit btcutil.Amount) dustClosure {
4,921✔
2895

4,921✔
2896
        isDust := func(feerate chainfee.SatPerKWeight, incoming bool,
4,921✔
2897
                whoseCommit lntypes.ChannelParty, amt btcutil.Amount) bool {
123,110✔
2898

118,189✔
2899
                var dustLimit btcutil.Amount
118,189✔
2900
                if whoseCommit.IsLocal() {
177,285✔
2901
                        dustLimit = localDustLimit
59,096✔
2902
                } else {
118,192✔
2903
                        dustLimit = remoteDustLimit
59,096✔
2904
                }
59,096✔
2905

2906
                return lnwallet.HtlcIsDust(
118,189✔
2907
                        chantype, incoming, whoseCommit, feerate, amt,
118,189✔
2908
                        dustLimit,
118,189✔
2909
                )
118,189✔
2910
        }
2911

2912
        return isDust
4,921✔
2913
}
2914

2915
// zeroConfConfirmed returns whether or not the zero-conf channel has
2916
// confirmed on-chain.
2917
//
2918
// Part of the scidAliasHandler interface.
2919
func (l *channelLink) zeroConfConfirmed() bool {
6✔
2920
        return l.channel.State().ZeroConfConfirmed()
6✔
2921
}
6✔
2922

2923
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2924
// should not be called for non-zero-conf channels.
2925
//
2926
// Part of the scidAliasHandler interface.
2927
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
6✔
2928
        return l.channel.State().ZeroConfRealScid()
6✔
2929
}
6✔
2930

2931
// isZeroConf returns whether or not the underlying channel is a zero-conf
2932
// channel.
2933
//
2934
// Part of the scidAliasHandler interface.
2935
func (l *channelLink) isZeroConf() bool {
214✔
2936
        return l.channel.State().IsZeroConf()
214✔
2937
}
214✔
2938

2939
// negotiatedAliasFeature returns whether or not the underlying channel has
2940
// negotiated the option-scid-alias feature bit. This will be true for both
2941
// option-scid-alias and zero-conf channel-types. It will also be true for
2942
// channels with the feature bit but without the above channel-types.
2943
//
2944
// Part of the scidAliasFeature interface.
2945
func (l *channelLink) negotiatedAliasFeature() bool {
373✔
2946
        return l.channel.State().NegotiatedAliasFeature()
373✔
2947
}
373✔
2948

2949
// getAliases returns the set of aliases for the underlying channel.
2950
//
2951
// Part of the scidAliasHandler interface.
2952
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
220✔
2953
        return l.cfg.GetAliases(l.ShortChanID())
220✔
2954
}
220✔
2955

2956
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2957
//
2958
// Part of the scidAliasHandler interface.
2959
func (l *channelLink) attachFailAliasUpdate(closure func(
2960
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate) {
215✔
2961

215✔
2962
        l.Lock()
215✔
2963
        l.cfg.FailAliasUpdate = closure
215✔
2964
        l.Unlock()
215✔
2965
}
215✔
2966

2967
// AttachMailBox updates the current mailbox used by this link, and hooks up
2968
// the mailbox's message and packet outboxes to the link's upstream and
2969
// downstream chans, respectively.
2970
func (l *channelLink) AttachMailBox(mailbox MailBox) {
214✔
2971
        l.Lock()
214✔
2972
        l.mailBox = mailbox
214✔
2973
        l.upstream = mailbox.MessageOutBox()
214✔
2974
        l.downstream = mailbox.PacketOutBox()
214✔
2975
        l.Unlock()
214✔
2976

214✔
2977
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
214✔
2978
        // never committed.
214✔
2979
        l.mailBox.SetFeeRate(l.getFeeRate())
214✔
2980

214✔
2981
        // Also set the mailbox's dust closure so that it can query whether HTLC's
214✔
2982
        // are dust given the current feerate.
214✔
2983
        l.mailBox.SetDustClosure(l.getDustClosure())
214✔
2984
}
214✔
2985

2986
// UpdateForwardingPolicy updates the forwarding policy for the target
2987
// ChannelLink. Once updated, the link will use the new forwarding policy to
2988
// govern if it an incoming HTLC should be forwarded or not. We assume that
2989
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2990
// update all of the link's FwrdingPolicy's values.
2991
//
2992
// NOTE: Part of the ChannelLink interface.
2993
func (l *channelLink) UpdateForwardingPolicy(
2994
        newPolicy models.ForwardingPolicy) {
15✔
2995

15✔
2996
        l.Lock()
15✔
2997
        defer l.Unlock()
15✔
2998

15✔
2999
        l.cfg.FwrdingPolicy = newPolicy
15✔
3000
}
15✔
3001

3002
// CheckHtlcForward should return a nil error if the passed HTLC details
3003
// satisfy the current forwarding policy fo the target link. Otherwise,
3004
// a LinkError with a valid protocol failure message should be returned
3005
// in order to signal to the source of the HTLC, the policy consistency
3006
// issue.
3007
//
3008
// NOTE: Part of the ChannelLink interface.
3009
func (l *channelLink) CheckHtlcForward(payHash [32]byte,
3010
        incomingHtlcAmt, amtToForward lnwire.MilliSatoshi,
3011
        incomingTimeout, outgoingTimeout uint32,
3012
        inboundFee models.InboundFee,
3013
        heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {
52✔
3014

52✔
3015
        l.RLock()
52✔
3016
        policy := l.cfg.FwrdingPolicy
52✔
3017
        l.RUnlock()
52✔
3018

52✔
3019
        // Using the outgoing HTLC amount, we'll calculate the outgoing
52✔
3020
        // fee this incoming HTLC must carry in order to satisfy the constraints
52✔
3021
        // of the outgoing link.
52✔
3022
        outFee := ExpectedFee(policy, amtToForward)
52✔
3023

52✔
3024
        // Then calculate the inbound fee that we charge based on the sum of
52✔
3025
        // outgoing HTLC amount and outgoing fee.
52✔
3026
        inFee := inboundFee.CalcFee(amtToForward + outFee)
52✔
3027

52✔
3028
        // Add up both fee components. It is important to calculate both fees
52✔
3029
        // separately. An alternative way of calculating is to first determine
52✔
3030
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
52✔
3031
        // rounding may cause the result to be slightly higher than in the case
52✔
3032
        // of separately rounded fee components. This potentially causes failed
52✔
3033
        // forwards for senders and is something to be avoided.
52✔
3034
        expectedFee := inFee + int64(outFee)
52✔
3035

52✔
3036
        // If the actual fee is less than our expected fee, then we'll reject
52✔
3037
        // this HTLC as it didn't provide a sufficient amount of fees, or the
52✔
3038
        // values have been tampered with, or the send used incorrect/dated
52✔
3039
        // information to construct the forwarding information for this hop. In
52✔
3040
        // any case, we'll cancel this HTLC.
52✔
3041
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
52✔
3042
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
61✔
3043
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
9✔
3044
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
9✔
3045
                        "inboundFee=%v",
9✔
3046
                        payHash[:], expectedFee, actualFee,
9✔
3047
                        incomingHtlcAmt, amtToForward, inboundFee,
9✔
3048
                )
9✔
3049

9✔
3050
                // As part of the returned error, we'll send our latest routing
9✔
3051
                // policy so the sending node obtains the most up to date data.
9✔
3052
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
18✔
3053
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
9✔
3054
                }
9✔
3055
                failure := l.createFailureWithUpdate(false, originalScid, cb)
9✔
3056
                return NewLinkError(failure)
9✔
3057
        }
3058

3059
        // Check whether the outgoing htlc satisfies the channel policy.
3060
        err := l.canSendHtlc(
46✔
3061
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
46✔
3062
                originalScid,
46✔
3063
        )
46✔
3064
        if err != nil {
62✔
3065
                return err
16✔
3066
        }
16✔
3067

3068
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
3069
        // the following constraint: the incoming time-lock minus our time-lock
3070
        // delta should equal the outgoing time lock. Otherwise, whether the
3071
        // sender messed up, or an intermediate node tampered with the HTLC.
3072
        timeDelta := policy.TimeLockDelta
33✔
3073
        if incomingTimeout < outgoingTimeout+timeDelta {
35✔
3074
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
3075
                        "expected at least %v block delta, got %v block delta",
2✔
3076
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
3077

2✔
3078
                // Grab the latest routing policy so the sending node is up to
2✔
3079
                // date with our current policy.
2✔
3080
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
4✔
3081
                        return lnwire.NewIncorrectCltvExpiry(
2✔
3082
                                incomingTimeout, *upd,
2✔
3083
                        )
2✔
3084
                }
2✔
3085
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3086
                return NewLinkError(failure)
2✔
3087
        }
3088

3089
        return nil
31✔
3090
}
3091

3092
// CheckHtlcTransit should return a nil error if the passed HTLC details
3093
// satisfy the current channel policy.  Otherwise, a LinkError with a
3094
// valid protocol failure message should be returned in order to signal
3095
// the violation. This call is intended to be used for locally initiated
3096
// payments for which there is no corresponding incoming htlc.
3097
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
3098
        amt lnwire.MilliSatoshi, timeout uint32,
3099
        heightNow uint32) *LinkError {
1,450✔
3100

1,450✔
3101
        l.RLock()
1,450✔
3102
        policy := l.cfg.FwrdingPolicy
1,450✔
3103
        l.RUnlock()
1,450✔
3104

1,450✔
3105
        // We pass in hop.Source here as this is only used in the Switch when
1,450✔
3106
        // trying to send over a local link. This causes the fallback mechanism
1,450✔
3107
        // to occur.
1,450✔
3108
        return l.canSendHtlc(
1,450✔
3109
                policy, payHash, amt, timeout, heightNow, hop.Source,
1,450✔
3110
        )
1,450✔
3111
}
1,450✔
3112

3113
// canSendHtlc checks whether the given htlc parameters satisfy
3114
// the channel's amount and time lock constraints.
3115
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
3116
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
3117
        heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {
1,493✔
3118

1,493✔
3119
        // As our first sanity check, we'll ensure that the passed HTLC isn't
1,493✔
3120
        // too small for the next hop. If so, then we'll cancel the HTLC
1,493✔
3121
        // directly.
1,493✔
3122
        if amt < policy.MinHTLCOut {
1,504✔
3123
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
11✔
3124
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
11✔
3125
                        amt)
11✔
3126

11✔
3127
                // As part of the returned error, we'll send our latest routing
11✔
3128
                // policy so the sending node obtains the most up to date data.
11✔
3129
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
22✔
3130
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
11✔
3131
                }
11✔
3132
                failure := l.createFailureWithUpdate(false, originalScid, cb)
11✔
3133
                return NewLinkError(failure)
11✔
3134
        }
3135

3136
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
3137
        // cancel the HTLC directly.
3138
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
1,491✔
3139
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
6✔
3140
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
6✔
3141

6✔
3142
                // As part of the returned error, we'll send our latest routing
6✔
3143
                // policy so the sending node obtains the most up-to-date data.
6✔
3144
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
12✔
3145
                        return lnwire.NewTemporaryChannelFailure(upd)
6✔
3146
                }
6✔
3147
                failure := l.createFailureWithUpdate(false, originalScid, cb)
6✔
3148
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
6✔
3149
        }
3150

3151
        // We want to avoid offering an HTLC which will expire in the near
3152
        // future, so we'll reject an HTLC if the outgoing expiration time is
3153
        // too close to the current height.
3154
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
1,484✔
3155
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
3156
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
3157
                        timeout, heightNow)
2✔
3158

2✔
3159
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
4✔
3160
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
3161
                }
2✔
3162
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
3163
                return NewLinkError(failure)
2✔
3164
        }
3165

3166
        // Check absolute max delta.
3167
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
1,481✔
3168
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
3169
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
3170
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
3171

1✔
3172
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
3173
        }
1✔
3174

3175
        // Check to see if there is enough balance in this channel.
3176
        if amt > l.Bandwidth() {
1,483✔
3177
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
4✔
3178
                        "larger than %v", amt, l.Bandwidth())
4✔
3179
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
8✔
3180
                        return lnwire.NewTemporaryChannelFailure(upd)
4✔
3181
                }
4✔
3182
                failure := l.createFailureWithUpdate(false, originalScid, cb)
4✔
3183
                return NewDetailedLinkError(
4✔
3184
                        failure, OutgoingFailureInsufficientBalance,
4✔
3185
                )
4✔
3186
        }
3187

3188
        return nil
1,478✔
3189
}
3190

3191
// Stats returns the statistics of channel link.
3192
//
3193
// NOTE: Part of the ChannelLink interface.
3194
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
23✔
3195
        snapshot := l.channel.StateSnapshot()
23✔
3196

23✔
3197
        return snapshot.ChannelCommitment.CommitHeight,
23✔
3198
                snapshot.TotalMSatSent,
23✔
3199
                snapshot.TotalMSatReceived
23✔
3200
}
23✔
3201

3202
// String returns the string representation of channel link.
3203
//
3204
// NOTE: Part of the ChannelLink interface.
3205
func (l *channelLink) String() string {
×
3206
        return l.channel.ChannelPoint().String()
×
3207
}
×
3208

3209
// handleSwitchPacket handles the switch packets. This packets which might be
3210
// forwarded to us from another channel link in case the htlc update came from
3211
// another peer or if the update was created by user
3212
//
3213
// NOTE: Part of the packetHandler interface.
3214
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
1,522✔
3215
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
1,522✔
3216
                pkt.inKey(), pkt.outKey())
1,522✔
3217

1,522✔
3218
        return l.mailBox.AddPacket(pkt)
1,522✔
3219
}
1,522✔
3220

3221
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3222
// to us from remote peer we have a channel with.
3223
//
3224
// NOTE: Part of the ChannelLink interface.
3225
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
6,819✔
3226
        select {
6,819✔
3227
        case <-l.quit:
×
3228
                // Return early if the link is already in the process of
×
3229
                // quitting. It doesn't make sense to hand the message to the
×
3230
                // mailbox here.
×
3231
                return
×
3232
        default:
6,819✔
3233
        }
3234

3235
        err := l.mailBox.AddMessage(message)
6,819✔
3236
        if err != nil {
6,819✔
3237
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3238
        }
×
3239
}
3240

3241
// updateChannelFee updates the commitment fee-per-kw on this channel by
3242
// committing to an update_fee message.
3243
func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
3✔
3244
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3245

3✔
3246
        // We skip sending the UpdateFee message if the channel is not
3✔
3247
        // currently eligible to forward messages.
3✔
3248
        if !l.EligibleToUpdate() {
3✔
3249
                l.log.Debugf("skipping fee update for inactive channel")
×
3250
                return nil
×
3251
        }
×
3252

3253
        // Check and see if our proposed fee-rate would make us exceed the fee
3254
        // threshold.
3255
        thresholdExceeded, err := l.exceedsFeeExposureLimit(feePerKw)
3✔
3256
        if err != nil {
3✔
3257
                // This shouldn't typically happen. If it does, it indicates
×
3258
                // something is wrong with our channel state.
×
3259
                return err
×
3260
        }
×
3261

3262
        if thresholdExceeded {
3✔
3263
                return fmt.Errorf("link fee threshold exceeded")
×
3264
        }
×
3265

3266
        // First, we'll update the local fee on our commitment.
3267
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3268
                return err
×
3269
        }
×
3270

3271
        // The fee passed the channel's validation checks, so we update the
3272
        // mailbox feerate.
3273
        l.mailBox.SetFeeRate(feePerKw)
3✔
3274

3✔
3275
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3276
        // in immediately by triggering a commitment update.
3✔
3277
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3278
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3279
                return err
×
3280
        }
×
3281
        return l.updateCommitTx()
3✔
3282
}
3283

3284
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3285
// after receiving a revocation from the remote party, and reprocesses them in
3286
// the context of the provided forwarding package. Any settles or fails that
3287
// have already been acknowledged in the forwarding package will not be sent to
3288
// the switch.
3289
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
3290
        settleFails []*lnwallet.PaymentDescriptor) {
2,176✔
3291

2,176✔
3292
        if len(settleFails) == 0 {
3,605✔
3293
                return
1,429✔
3294
        }
1,429✔
3295

3296
        l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter)
750✔
3297

750✔
3298
        var switchPackets []*htlcPacket
750✔
3299
        for i, pd := range settleFails {
1,501✔
3300
                // Skip any settles or fails that have already been
751✔
3301
                // acknowledged by the incoming link that originated the
751✔
3302
                // forwarded Add.
751✔
3303
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
751✔
3304
                        continue
×
3305
                }
3306

3307
                // TODO(roasbeef): rework log entries to a shared
3308
                // interface.
3309

3310
                switch pd.EntryType {
751✔
3311

3312
                // A settle for an HTLC we previously forwarded HTLC has been
3313
                // received. So we'll forward the HTLC to the switch which will
3314
                // handle propagating the settle to the prior hop.
3315
                case lnwallet.Settle:
628✔
3316
                        // If hodl.SettleIncoming is requested, we will not
628✔
3317
                        // forward the SETTLE to the switch and will not signal
628✔
3318
                        // a free slot on the commitment transaction.
628✔
3319
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
628✔
3320
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3321
                                continue
×
3322
                        }
3323

3324
                        settlePacket := &htlcPacket{
628✔
3325
                                outgoingChanID: l.ShortChanID(),
628✔
3326
                                outgoingHTLCID: pd.ParentIndex,
628✔
3327
                                destRef:        pd.DestRef,
628✔
3328
                                htlc: &lnwire.UpdateFulfillHTLC{
628✔
3329
                                        PaymentPreimage: pd.RPreimage,
628✔
3330
                                },
628✔
3331
                        }
628✔
3332

628✔
3333
                        // Add the packet to the batch to be forwarded, and
628✔
3334
                        // notify the overflow queue that a spare spot has been
628✔
3335
                        // freed up within the commitment state.
628✔
3336
                        switchPackets = append(switchPackets, settlePacket)
628✔
3337

3338
                // A failureCode message for a previously forwarded HTLC has
3339
                // been received. As a result a new slot will be freed up in
3340
                // our commitment state, so we'll forward this to the switch so
3341
                // the backwards undo can continue.
3342
                case lnwallet.Fail:
126✔
3343
                        // If hodl.SettleIncoming is requested, we will not
126✔
3344
                        // forward the FAIL to the switch and will not signal a
126✔
3345
                        // free slot on the commitment transaction.
126✔
3346
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
126✔
3347
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3348
                                continue
×
3349
                        }
3350

3351
                        // Fetch the reason the HTLC was canceled so we can
3352
                        // continue to propagate it. This failure originated
3353
                        // from another node, so the linkFailure field is not
3354
                        // set on the packet.
3355
                        failPacket := &htlcPacket{
126✔
3356
                                outgoingChanID: l.ShortChanID(),
126✔
3357
                                outgoingHTLCID: pd.ParentIndex,
126✔
3358
                                destRef:        pd.DestRef,
126✔
3359
                                htlc: &lnwire.UpdateFailHTLC{
126✔
3360
                                        Reason: lnwire.OpaqueReason(
126✔
3361
                                                pd.FailReason,
126✔
3362
                                        ),
126✔
3363
                                },
126✔
3364
                        }
126✔
3365

126✔
3366
                        l.log.Debugf("Failed to send %s", pd.Amount)
126✔
3367

126✔
3368
                        // If the failure message lacks an HMAC (but includes
126✔
3369
                        // the 4 bytes for encoding the message and padding
126✔
3370
                        // lengths, then this means that we received it as an
126✔
3371
                        // UpdateFailMalformedHTLC. As a result, we'll signal
126✔
3372
                        // that we need to convert this error within the switch
126✔
3373
                        // to an actual error, by encrypting it as if we were
126✔
3374
                        // the originating hop.
126✔
3375
                        convertedErrorSize := lnwire.FailureMessageLength + 4
126✔
3376
                        if len(pd.FailReason) == convertedErrorSize {
132✔
3377
                                failPacket.convertedError = true
6✔
3378
                        }
6✔
3379

3380
                        // Add the packet to the batch to be forwarded, and
3381
                        // notify the overflow queue that a spare spot has been
3382
                        // freed up within the commitment state.
3383
                        switchPackets = append(switchPackets, failPacket)
126✔
3384
                }
3385
        }
3386

3387
        // Only spawn the task forward packets we have a non-zero number.
3388
        if len(switchPackets) > 0 {
1,500✔
3389
                go l.forwardBatch(false, switchPackets...)
750✔
3390
        }
750✔
3391
}
3392

3393
// processRemoteAdds serially processes each of the Add payment descriptors
3394
// which have been "locked-in" by receiving a revocation from the remote party.
3395
// The forwarding package provided instructs how to process this batch,
3396
// indicating whether this is the first time these Adds are being processed, or
3397
// whether we are reprocessing as a result of a failure or restart. Adds that
3398
// have already been acknowledged in the forwarding package will be ignored.
3399
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
3400
        lockedInHtlcs []*lnwallet.PaymentDescriptor) {
2,179✔
3401

2,179✔
3402
        l.log.Tracef("processing %d remote adds for height %d",
2,179✔
3403
                len(lockedInHtlcs), fwdPkg.Height)
2,179✔
3404

2,179✔
3405
        decodeReqs := make(
2,179✔
3406
                []hop.DecodeHopIteratorRequest, 0, len(lockedInHtlcs),
2,179✔
3407
        )
2,179✔
3408
        for _, pd := range lockedInHtlcs {
3,672✔
3409
                switch pd.EntryType {
1,493✔
3410

3411
                // TODO(conner): remove type switch?
3412
                case lnwallet.Add:
1,493✔
3413
                        // Before adding the new htlc to the state machine,
1,493✔
3414
                        // parse the onion object in order to obtain the
1,493✔
3415
                        // routing information with DecodeHopIterator function
1,493✔
3416
                        // which process the Sphinx packet.
1,493✔
3417
                        onionReader := bytes.NewReader(pd.OnionBlob)
1,493✔
3418

1,493✔
3419
                        req := hop.DecodeHopIteratorRequest{
1,493✔
3420
                                OnionReader:    onionReader,
1,493✔
3421
                                RHash:          pd.RHash[:],
1,493✔
3422
                                IncomingCltv:   pd.Timeout,
1,493✔
3423
                                IncomingAmount: pd.Amount,
1,493✔
3424
                                BlindingPoint:  pd.BlindingPoint,
1,493✔
3425
                        }
1,493✔
3426

1,493✔
3427
                        decodeReqs = append(decodeReqs, req)
1,493✔
3428
                }
3429
        }
3430

3431
        // Atomically decode the incoming htlcs, simultaneously checking for
3432
        // replay attempts. A particular index in the returned, spare list of
3433
        // channel iterators should only be used if the failure code at the
3434
        // same index is lnwire.FailCodeNone.
3435
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
2,179✔
3436
                fwdPkg.ID(), decodeReqs,
2,179✔
3437
        )
2,179✔
3438
        if sphinxErr != nil {
2,179✔
3439
                l.fail(LinkFailureError{code: ErrInternalError},
×
3440
                        "unable to decode hop iterators: %v", sphinxErr)
×
3441
                return
×
3442
        }
×
3443

3444
        var switchPackets []*htlcPacket
2,179✔
3445

2,179✔
3446
        for i, pd := range lockedInHtlcs {
3,672✔
3447
                idx := uint16(i)
1,493✔
3448

1,493✔
3449
                if fwdPkg.State == channeldb.FwdStateProcessed &&
1,493✔
3450
                        fwdPkg.AckFilter.Contains(idx) {
1,493✔
3451

×
3452
                        // If this index is already found in the ack filter,
×
3453
                        // the response to this forwarding decision has already
×
3454
                        // been committed by one of our commitment txns. ADDs
×
3455
                        // in this state are waiting for the rest of the fwding
×
3456
                        // package to get acked before being garbage collected.
×
3457
                        continue
×
3458
                }
3459

3460
                // An incoming HTLC add has been full-locked in. As a result we
3461
                // can now examine the forwarding details of the HTLC, and the
3462
                // HTLC itself to decide if: we should forward it, cancel it,
3463
                // or are able to settle it (and it adheres to our fee related
3464
                // constraints).
3465

3466
                // Fetch the onion blob that was included within this processed
3467
                // payment descriptor.
3468
                var onionBlob [lnwire.OnionPacketSize]byte
1,493✔
3469
                copy(onionBlob[:], pd.OnionBlob)
1,493✔
3470

1,493✔
3471
                // Before adding the new htlc to the state machine, parse the
1,493✔
3472
                // onion object in order to obtain the routing information with
1,493✔
3473
                // DecodeHopIterator function which process the Sphinx packet.
1,493✔
3474
                chanIterator, failureCode := decodeResps[i].Result()
1,493✔
3475
                if failureCode != lnwire.CodeNone {
1,498✔
3476
                        // If we're unable to process the onion blob then we
5✔
3477
                        // should send the malformed htlc error to payment
5✔
3478
                        // sender.
5✔
3479
                        l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
5✔
3480
                                onionBlob[:], pd.SourceRef)
5✔
3481

5✔
3482
                        l.log.Errorf("unable to decode onion hop "+
5✔
3483
                                "iterator: %v", failureCode)
5✔
3484
                        continue
5✔
3485
                }
3486

3487
                heightNow := l.cfg.BestHeight()
1,491✔
3488

1,491✔
3489
                pld, routeRole, pldErr := chanIterator.HopPayload()
1,491✔
3490
                if pldErr != nil {
1,494✔
3491
                        // If we're unable to process the onion payload, or we
3✔
3492
                        // received invalid onion payload failure, then we
3✔
3493
                        // should send an error back to the caller so the HTLC
3✔
3494
                        // can be canceled.
3✔
3495
                        var failedType uint64
3✔
3496

3✔
3497
                        // We need to get the underlying error value, so we
3✔
3498
                        // can't use errors.As as suggested by the linter.
3✔
3499
                        //nolint:errorlint
3✔
3500
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
3✔
3501
                                failedType = uint64(e.Type)
×
3502
                        }
×
3503

3504
                        // If we couldn't parse the payload, make our best
3505
                        // effort at creating an error encrypter that knows
3506
                        // what blinding type we were, but if we couldn't
3507
                        // parse the payload we have no way of knowing whether
3508
                        // we were the introduction node or not.
3509
                        //
3510
                        //nolint:lll
3511
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
3✔
3512
                                l.cfg.ExtractErrorEncrypter,
3✔
3513
                                // We need our route role here because we
3✔
3514
                                // couldn't parse or validate the payload.
3✔
3515
                                routeRole == hop.RouteRoleIntroduction,
3✔
3516
                        )
3✔
3517
                        if failCode != lnwire.CodeNone {
3✔
3518
                                l.log.Errorf("could not extract error "+
×
3519
                                        "encrypter: %v", pldErr)
×
3520

×
3521
                                // We can't process this htlc, send back
×
3522
                                // malformed.
×
3523
                                l.sendMalformedHTLCError(
×
3524
                                        pd.HtlcIndex, failureCode,
×
3525
                                        onionBlob[:], pd.SourceRef,
×
3526
                                )
×
3527

×
3528
                                continue
×
3529
                        }
3530

3531
                        // TODO: currently none of the test unit infrastructure
3532
                        // is setup to handle TLV payloads, so testing this
3533
                        // would require implementing a separate mock iterator
3534
                        // for TLV payloads that also supports injecting invalid
3535
                        // payloads. Deferring this non-trival effort till a
3536
                        // later date
3537
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
3✔
3538
                        l.sendHTLCError(
3✔
3539
                                pd, NewLinkError(failure), obfuscator, false,
3✔
3540
                        )
3✔
3541

3✔
3542
                        l.log.Errorf("unable to decode forwarding "+
3✔
3543
                                "instructions: %v", pldErr)
3✔
3544

3✔
3545
                        continue
3✔
3546
                }
3547

3548
                // Retrieve onion obfuscator from onion blob in order to
3549
                // produce initial obfuscation of the onion failureCode.
3550
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
1,491✔
3551
                        l.cfg.ExtractErrorEncrypter,
1,491✔
3552
                        routeRole == hop.RouteRoleIntroduction,
1,491✔
3553
                )
1,491✔
3554
                if failureCode != lnwire.CodeNone {
1,492✔
3555
                        // If we're unable to process the onion blob than we
1✔
3556
                        // should send the malformed htlc error to payment
1✔
3557
                        // sender.
1✔
3558
                        l.sendMalformedHTLCError(
1✔
3559
                                pd.HtlcIndex, failureCode, onionBlob[:],
1✔
3560
                                pd.SourceRef,
1✔
3561
                        )
1✔
3562

1✔
3563
                        l.log.Errorf("unable to decode onion "+
1✔
3564
                                "obfuscator: %v", failureCode)
1✔
3565

1✔
3566
                        continue
1✔
3567
                }
3568

3569
                fwdInfo := pld.ForwardingInfo()
1,490✔
3570

1,490✔
3571
                // Check whether the payload we've just processed uses our
1,490✔
3572
                // node as the introduction point (gave us a blinding key in
1,490✔
3573
                // the payload itself) and fail it back if we don't support
1,490✔
3574
                // route blinding.
1,490✔
3575
                if fwdInfo.NextBlinding.IsSome() &&
1,490✔
3576
                        l.cfg.DisallowRouteBlinding {
1,493✔
3577

3✔
3578
                        failure := lnwire.NewInvalidBlinding(
3✔
3579
                                onionBlob[:],
3✔
3580
                        )
3✔
3581
                        l.sendHTLCError(
3✔
3582
                                pd, NewLinkError(failure), obfuscator, false,
3✔
3583
                        )
3✔
3584

3✔
3585
                        l.log.Error("rejected htlc that uses use as an " +
3✔
3586
                                "introduction point when we do not support " +
3✔
3587
                                "route blinding")
3✔
3588

3✔
3589
                        continue
3✔
3590
                }
3591

3592
                switch fwdInfo.NextHop {
1,490✔
3593
                case hop.Exit:
1,454✔
3594
                        err := l.processExitHop(
1,454✔
3595
                                pd, obfuscator, fwdInfo, heightNow, pld,
1,454✔
3596
                        )
1,454✔
3597
                        if err != nil {
1,454✔
3598
                                l.fail(LinkFailureError{code: ErrInternalError},
×
3599
                                        err.Error(),
×
3600
                                )
×
3601

×
3602
                                return
×
3603
                        }
×
3604

3605
                // There are additional channels left within this route. So
3606
                // we'll simply do some forwarding package book-keeping.
3607
                default:
39✔
3608
                        // If hodl.AddIncoming is requested, we will not
39✔
3609
                        // validate the forwarded ADD, nor will we send the
39✔
3610
                        // packet to the htlc switch.
39✔
3611
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
39✔
3612
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3613
                                continue
×
3614
                        }
3615

3616
                        switch fwdPkg.State {
39✔
3617
                        case channeldb.FwdStateProcessed:
3✔
3618
                                // This add was not forwarded on the previous
3✔
3619
                                // processing phase, run it through our
3✔
3620
                                // validation pipeline to reproduce an error.
3✔
3621
                                // This may trigger a different error due to
3✔
3622
                                // expiring timelocks, but we expect that an
3✔
3623
                                // error will be reproduced.
3✔
3624
                                if !fwdPkg.FwdFilter.Contains(idx) {
3✔
3625
                                        break
×
3626
                                }
3627

3628
                                // Otherwise, it was already processed, we can
3629
                                // can collect it and continue.
3630
                                addMsg := &lnwire.UpdateAddHTLC{
3✔
3631
                                        Expiry:        fwdInfo.OutgoingCTLV,
3✔
3632
                                        Amount:        fwdInfo.AmountToForward,
3✔
3633
                                        PaymentHash:   pd.RHash,
3✔
3634
                                        BlindingPoint: fwdInfo.NextBlinding,
3✔
3635
                                }
3✔
3636

3✔
3637
                                // Finally, we'll encode the onion packet for
3✔
3638
                                // the _next_ hop using the hop iterator
3✔
3639
                                // decoded for the current hop.
3✔
3640
                                buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
3✔
3641

3✔
3642
                                // We know this cannot fail, as this ADD
3✔
3643
                                // was marked forwarded in a previous
3✔
3644
                                // round of processing.
3✔
3645
                                chanIterator.EncodeNextHop(buf)
3✔
3646

3✔
3647
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
3✔
3648

3✔
3649
                                updatePacket := &htlcPacket{
3✔
3650
                                        incomingChanID:  l.ShortChanID(),
3✔
3651
                                        incomingHTLCID:  pd.HtlcIndex,
3✔
3652
                                        outgoingChanID:  fwdInfo.NextHop,
3✔
3653
                                        sourceRef:       pd.SourceRef,
3✔
3654
                                        incomingAmount:  pd.Amount,
3✔
3655
                                        amount:          addMsg.Amount,
3✔
3656
                                        htlc:            addMsg,
3✔
3657
                                        obfuscator:      obfuscator,
3✔
3658
                                        incomingTimeout: pd.Timeout,
3✔
3659
                                        outgoingTimeout: fwdInfo.OutgoingCTLV,
3✔
3660
                                        customRecords:   pld.CustomRecords(),
3✔
3661
                                        inboundFee:      inboundFee,
3✔
3662
                                }
3✔
3663
                                switchPackets = append(
3✔
3664
                                        switchPackets, updatePacket,
3✔
3665
                                )
3✔
3666

3✔
3667
                                continue
3✔
3668
                        }
3669

3670
                        // TODO(roasbeef): ensure don't accept outrageous
3671
                        // timeout for htlc
3672

3673
                        // With all our forwarding constraints met, we'll
3674
                        // create the outgoing HTLC using the parameters as
3675
                        // specified in the forwarding info.
3676
                        addMsg := &lnwire.UpdateAddHTLC{
39✔
3677
                                Expiry:        fwdInfo.OutgoingCTLV,
39✔
3678
                                Amount:        fwdInfo.AmountToForward,
39✔
3679
                                PaymentHash:   pd.RHash,
39✔
3680
                                BlindingPoint: fwdInfo.NextBlinding,
39✔
3681
                        }
39✔
3682

39✔
3683
                        // Finally, we'll encode the onion packet for the
39✔
3684
                        // _next_ hop using the hop iterator decoded for the
39✔
3685
                        // current hop.
39✔
3686
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
39✔
3687
                        err := chanIterator.EncodeNextHop(buf)
39✔
3688
                        if err != nil {
39✔
3689
                                l.log.Errorf("unable to encode the "+
×
3690
                                        "remaining route %v", err)
×
3691

×
3692
                                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
×
3693
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3694
                                }
×
3695

3696
                                failure := l.createFailureWithUpdate(
×
3697
                                        true, hop.Source, cb,
×
3698
                                )
×
3699

×
3700
                                l.sendHTLCError(
×
3701
                                        pd, NewLinkError(failure), obfuscator, false,
×
3702
                                )
×
3703
                                continue
×
3704
                        }
3705

3706
                        // Now that this add has been reprocessed, only append
3707
                        // it to our list of packets to forward to the switch
3708
                        // this is the first time processing the add. If the
3709
                        // fwd pkg has already been processed, then we entered
3710
                        // the above section to recreate a previous error.  If
3711
                        // the packet had previously been forwarded, it would
3712
                        // have been added to switchPackets at the top of this
3713
                        // section.
3714
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
78✔
3715
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
39✔
3716

39✔
3717
                                updatePacket := &htlcPacket{
39✔
3718
                                        incomingChanID:  l.ShortChanID(),
39✔
3719
                                        incomingHTLCID:  pd.HtlcIndex,
39✔
3720
                                        outgoingChanID:  fwdInfo.NextHop,
39✔
3721
                                        sourceRef:       pd.SourceRef,
39✔
3722
                                        incomingAmount:  pd.Amount,
39✔
3723
                                        amount:          addMsg.Amount,
39✔
3724
                                        htlc:            addMsg,
39✔
3725
                                        obfuscator:      obfuscator,
39✔
3726
                                        incomingTimeout: pd.Timeout,
39✔
3727
                                        outgoingTimeout: fwdInfo.OutgoingCTLV,
39✔
3728
                                        customRecords:   pld.CustomRecords(),
39✔
3729
                                        inboundFee:      inboundFee,
39✔
3730
                                }
39✔
3731

39✔
3732
                                fwdPkg.FwdFilter.Set(idx)
39✔
3733
                                switchPackets = append(switchPackets,
39✔
3734
                                        updatePacket)
39✔
3735
                        }
39✔
3736
                }
3737
        }
3738

3739
        // Commit the htlcs we are intending to forward if this package has not
3740
        // been fully processed.
3741
        if fwdPkg.State == channeldb.FwdStateLockedIn {
4,355✔
3742
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
2,176✔
3743
                if err != nil {
2,176✔
3744
                        l.fail(LinkFailureError{code: ErrInternalError},
×
3745
                                "unable to set fwd filter: %v", err)
×
3746
                        return
×
3747
                }
×
3748
        }
3749

3750
        if len(switchPackets) == 0 {
4,322✔
3751
                return
2,143✔
3752
        }
2,143✔
3753

3754
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
39✔
3755

39✔
3756
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
39✔
3757
                len(switchPackets), replay)
39✔
3758

39✔
3759
        // NOTE: This call is made synchronous so that we ensure all circuits
39✔
3760
        // are committed in the exact order that they are processed in the link.
39✔
3761
        // Failing to do this could cause reorderings/gaps in the range of
39✔
3762
        // opened circuits, which violates assumptions made by the circuit
39✔
3763
        // trimming.
39✔
3764
        l.forwardBatch(replay, switchPackets...)
39✔
3765
}
3766

3767
// processExitHop handles an htlc for which this link is the exit hop. It
3768
// returns a boolean indicating whether the commitment tx needs an update.
3769
func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
3770
        obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo,
3771
        heightNow uint32, payload invoices.Payload) error {
1,454✔
3772

1,454✔
3773
        // If hodl.ExitSettle is requested, we will not validate the final hop's
1,454✔
3774
        // ADD, nor will we settle the corresponding invoice or respond with the
1,454✔
3775
        // preimage.
1,454✔
3776
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
2,172✔
3777
                l.log.Warnf(hodl.ExitSettle.Warning())
718✔
3778

718✔
3779
                return nil
718✔
3780
        }
718✔
3781

3782
        // As we're the exit hop, we'll double check the hop-payload included in
3783
        // the HTLC to ensure that it was crafted correctly by the sender and
3784
        // is compatible with the HTLC we were extended.
3785
        if pd.Amount < fwdInfo.AmountToForward {
839✔
3786
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
3787
                        "incompatible value: expected <=%v, got %v", pd.RHash,
100✔
3788
                        pd.Amount, fwdInfo.AmountToForward)
100✔
3789

100✔
3790
                failure := NewLinkError(
100✔
3791
                        lnwire.NewFinalIncorrectHtlcAmount(pd.Amount),
100✔
3792
                )
100✔
3793
                l.sendHTLCError(pd, failure, obfuscator, true)
100✔
3794

100✔
3795
                return nil
100✔
3796
        }
100✔
3797

3798
        // We'll also ensure that our time-lock value has been computed
3799
        // correctly.
3800
        if pd.Timeout < fwdInfo.OutgoingCTLV {
640✔
3801
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
3802
                        "incompatible time-lock: expected <=%v, got %v",
1✔
3803
                        pd.RHash[:], pd.Timeout, fwdInfo.OutgoingCTLV)
1✔
3804

1✔
3805
                failure := NewLinkError(
1✔
3806
                        lnwire.NewFinalIncorrectCltvExpiry(pd.Timeout),
1✔
3807
                )
1✔
3808
                l.sendHTLCError(pd, failure, obfuscator, true)
1✔
3809

1✔
3810
                return nil
1✔
3811
        }
1✔
3812

3813
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3814
        // after this, this code will be re-executed after restart. We will
3815
        // receive back a resolution event.
3816
        invoiceHash := lntypes.Hash(pd.RHash)
638✔
3817

638✔
3818
        circuitKey := models.CircuitKey{
638✔
3819
                ChanID: l.ShortChanID(),
638✔
3820
                HtlcID: pd.HtlcIndex,
638✔
3821
        }
638✔
3822

638✔
3823
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
638✔
3824
                invoiceHash, pd.Amount, pd.Timeout, int32(heightNow),
638✔
3825
                circuitKey, l.hodlQueue.ChanIn(), payload,
638✔
3826
        )
638✔
3827
        if err != nil {
638✔
3828
                return err
×
3829
        }
×
3830

3831
        // Create a hodlHtlc struct and decide either resolved now or later.
3832
        htlc := hodlHtlc{
638✔
3833
                pd:         pd,
638✔
3834
                obfuscator: obfuscator,
638✔
3835
        }
638✔
3836

638✔
3837
        // If the event is nil, the invoice is being held, so we save payment
638✔
3838
        // descriptor for future reference.
638✔
3839
        if event == nil {
1,130✔
3840
                l.hodlMap[circuitKey] = htlc
492✔
3841
                return nil
492✔
3842
        }
492✔
3843

3844
        // Process the received resolution.
3845
        return l.processHtlcResolution(event, htlc)
149✔
3846
}
3847

3848
// settleHTLC settles the HTLC on the channel.
3849
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3850
        pd *lnwallet.PaymentDescriptor) error {
633✔
3851

633✔
3852
        hash := preimage.Hash()
633✔
3853

633✔
3854
        l.log.Infof("settling htlc %v as exit hop", hash)
633✔
3855

633✔
3856
        err := l.channel.SettleHTLC(
633✔
3857
                preimage, pd.HtlcIndex, pd.SourceRef, nil, nil,
633✔
3858
        )
633✔
3859
        if err != nil {
633✔
3860
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3861
        }
×
3862

3863
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3864
        // fake one before sending it to the peer.
3865
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
636✔
3866
                l.log.Warnf(hodl.BogusSettle.Warning())
3✔
3867
                preimage = [32]byte{}
3✔
3868
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
3✔
3869
        }
3✔
3870

3871
        // HTLC was successfully settled locally send notification about it
3872
        // remote peer.
3873
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
633✔
3874
                ChanID:          l.ChanID(),
633✔
3875
                ID:              pd.HtlcIndex,
633✔
3876
                PaymentPreimage: preimage,
633✔
3877
        })
633✔
3878

633✔
3879
        // Once we have successfully settled the htlc, notify a settle event.
633✔
3880
        l.cfg.HtlcNotifier.NotifySettleEvent(
633✔
3881
                HtlcKey{
633✔
3882
                        IncomingCircuit: models.CircuitKey{
633✔
3883
                                ChanID: l.ShortChanID(),
633✔
3884
                                HtlcID: pd.HtlcIndex,
633✔
3885
                        },
633✔
3886
                },
633✔
3887
                preimage,
633✔
3888
                HtlcEventTypeReceive,
633✔
3889
        )
633✔
3890

633✔
3891
        return nil
633✔
3892
}
3893

3894
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3895
// err chan for the individual responses. This method is intended to be spawned
3896
// as a goroutine so the responses can be handled in the background.
3897
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
1,444✔
3898
        // Don't forward packets for which we already have a response in our
1,444✔
3899
        // mailbox. This could happen if a packet fails and is buffered in the
1,444✔
3900
        // mailbox, and the incoming link flaps.
1,444✔
3901
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
1,444✔
3902
        for _, pkt := range packets {
2,889✔
3903
                if l.mailBox.HasPacket(pkt.inKey()) {
1,448✔
3904
                        continue
3✔
3905
                }
3906

3907
                filteredPkts = append(filteredPkts, pkt)
1,445✔
3908
        }
3909

3910
        err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
1,444✔
3911
        if err != nil {
1,455✔
3912
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
3913
                        "settle/fail over htlcswitch: %v", err)
11✔
3914
        }
11✔
3915
}
3916

3917
// sendHTLCError functions cancels HTLC and send cancel message back to the
3918
// peer from which HTLC was received.
3919
func (l *channelLink) sendHTLCError(pd *lnwallet.PaymentDescriptor,
3920
        failure *LinkError, e hop.ErrorEncrypter, isReceive bool) {
108✔
3921

108✔
3922
        reason, err := e.EncryptFirstHop(failure.WireMessage())
108✔
3923
        if err != nil {
108✔
3924
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3925
                return
×
3926
        }
×
3927

3928
        err = l.channel.FailHTLC(pd.HtlcIndex, reason, pd.SourceRef, nil, nil)
108✔
3929
        if err != nil {
108✔
3930
                l.log.Errorf("unable cancel htlc: %v", err)
×
3931
                return
×
3932
        }
×
3933

3934
        // Send the appropriate failure message depending on whether we're
3935
        // in a blinded route or not.
3936
        if err := l.sendIncomingHTLCFailureMsg(
108✔
3937
                pd.HtlcIndex, e, reason,
108✔
3938
        ); err != nil {
108✔
3939
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3940
                return
×
3941
        }
×
3942

3943
        // Notify a link failure on our incoming link. Outgoing htlc information
3944
        // is not available at this point, because we have not decrypted the
3945
        // onion, so it is excluded.
3946
        var eventType HtlcEventType
108✔
3947
        if isReceive {
216✔
3948
                eventType = HtlcEventTypeReceive
108✔
3949
        } else {
111✔
3950
                eventType = HtlcEventTypeForward
3✔
3951
        }
3✔
3952

3953
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
108✔
3954
                HtlcKey{
108✔
3955
                        IncomingCircuit: models.CircuitKey{
108✔
3956
                                ChanID: l.ShortChanID(),
108✔
3957
                                HtlcID: pd.HtlcIndex,
108✔
3958
                        },
108✔
3959
                },
108✔
3960
                HtlcInfo{
108✔
3961
                        IncomingTimeLock: pd.Timeout,
108✔
3962
                        IncomingAmt:      pd.Amount,
108✔
3963
                },
108✔
3964
                eventType,
108✔
3965
                failure,
108✔
3966
                true,
108✔
3967
        )
108✔
3968
}
3969

3970
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3971
// peer from which the HTLC was received. This function is primarily used to
3972
// handle the special requirements of route blinding, specifically:
3973
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3974
// - Introduction nodes should return regular HTLC failure messages.
3975
//
3976
// It accepts the original opaque failure, which will be used in the case
3977
// that we're not part of a blinded route and an error encrypter that'll be
3978
// used if we are the introduction node and need to present an error as if
3979
// we're the failing party.
3980
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3981
        e hop.ErrorEncrypter,
3982
        originalFailure lnwire.OpaqueReason) error {
124✔
3983

124✔
3984
        var msg lnwire.Message
124✔
3985
        switch {
124✔
3986
        // Our circuit's error encrypter will be nil if this was a locally
3987
        // initiated payment. We can only hit a blinded error for a locally
3988
        // initiated payment if we allow ourselves to be picked as the
3989
        // introduction node for our own payments and in that case we
3990
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3991
        // we fail it back and log an error.
3992
        // code.
3993
        case e == nil:
×
3994
                msg = &lnwire.UpdateFailHTLC{
×
3995
                        ChanID: l.ChanID(),
×
3996
                        ID:     htlcIndex,
×
3997
                        Reason: originalFailure,
×
3998
                }
×
3999

×
4000
                l.log.Errorf("Unexpected blinded failure when "+
×
4001
                        "we are the sending node, incoming htlc: %v(%v)",
×
4002
                        l.ShortChanID(), htlcIndex)
×
4003

4004
        // For cleartext hops (ie, non-blinded/normal) we don't need any
4005
        // transformation on the error message and can just send the original.
4006
        case !e.Type().IsBlinded():
124✔
4007
                msg = &lnwire.UpdateFailHTLC{
124✔
4008
                        ChanID: l.ChanID(),
124✔
4009
                        ID:     htlcIndex,
124✔
4010
                        Reason: originalFailure,
124✔
4011
                }
124✔
4012

4013
        // When we're the introduction node, we need to convert the error to
4014
        // a UpdateFailHTLC.
4015
        case e.Type() == hop.EncrypterTypeIntroduction:
3✔
4016
                l.log.Debugf("Introduction blinded node switching out failure "+
3✔
4017
                        "error: %v", htlcIndex)
3✔
4018

3✔
4019
                // The specification does not require that we set the onion
3✔
4020
                // blob.
3✔
4021
                failureMsg := lnwire.NewInvalidBlinding(nil)
3✔
4022
                reason, err := e.EncryptFirstHop(failureMsg)
3✔
4023
                if err != nil {
3✔
4024
                        return err
×
4025
                }
×
4026

4027
                msg = &lnwire.UpdateFailHTLC{
3✔
4028
                        ChanID: l.ChanID(),
3✔
4029
                        ID:     htlcIndex,
3✔
4030
                        Reason: reason,
3✔
4031
                }
3✔
4032

4033
        // If we are a relaying node, we need to switch out any error that
4034
        // we've received to a malformed HTLC error.
4035
        case e.Type() == hop.EncrypterTypeRelaying:
3✔
4036
                l.log.Debugf("Relaying blinded node switching out malformed "+
3✔
4037
                        "error: %v", htlcIndex)
3✔
4038

3✔
4039
                msg = &lnwire.UpdateFailMalformedHTLC{
3✔
4040
                        ChanID:      l.ChanID(),
3✔
4041
                        ID:          htlcIndex,
3✔
4042
                        FailureCode: lnwire.CodeInvalidBlinding,
3✔
4043
                }
3✔
4044

4045
        default:
×
4046
                return fmt.Errorf("unexpected encrypter: %d", e)
×
4047
        }
4048

4049
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
124✔
4050
                l.log.Warnf("Send update fail failed: %v", err)
×
4051
        }
×
4052

4053
        return nil
124✔
4054
}
4055

4056
// sendMalformedHTLCError helper function which sends the malformed HTLC update
4057
// to the payment sender.
4058
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
4059
        code lnwire.FailCode, onionBlob []byte, sourceRef *channeldb.AddRef) {
6✔
4060

6✔
4061
        shaOnionBlob := sha256.Sum256(onionBlob)
6✔
4062
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
6✔
4063
        if err != nil {
6✔
4064
                l.log.Errorf("unable cancel htlc: %v", err)
×
4065
                return
×
4066
        }
×
4067

4068
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
6✔
4069
                ChanID:       l.ChanID(),
6✔
4070
                ID:           htlcIndex,
6✔
4071
                ShaOnionBlob: shaOnionBlob,
6✔
4072
                FailureCode:  code,
6✔
4073
        })
6✔
4074
}
4075

4076
// fail is a function which is used to encapsulate the action necessary for
4077
// properly failing the link. It takes a LinkFailureError, which will be passed
4078
// to the OnChannelFailure closure, in order for it to determine if we should
4079
// force close the channel, and if we should send an error message to the
4080
// remote peer.
4081
func (l *channelLink) fail(linkErr LinkFailureError,
4082
        format string, a ...interface{}) {
11✔
4083
        reason := fmt.Errorf(format, a...)
11✔
4084

11✔
4085
        // Return if we have already notified about a failure.
11✔
4086
        if l.failed {
11✔
4087
                l.log.Warnf("ignoring link failure (%v), as link already "+
×
4088
                        "failed", reason)
×
4089
                return
×
4090
        }
×
4091

4092
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
11✔
4093

11✔
4094
        // Set failed, such that we won't process any more updates, and notify
11✔
4095
        // the peer about the failure.
11✔
4096
        l.failed = true
11✔
4097
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
11✔
4098
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc