• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9617502354

21 Jun 2024 05:27PM UTC coverage: 58.414% (+0.004%) from 58.41%
9617502354

Pull #8856

github

web-flow
[docs] Update go instructions

Building current lnd `0.18` fails with older go (`1.19.7`).

* Updated go download path to 1.22.4
* Updated hashes
* Added `rm -rf` instructions as per [go.dev instructions](https://go.dev/doc/install)
Pull Request #8856: [docs] Update go instructions

123389 of 211233 relevant lines covered (58.41%)

28572.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.16
/htlcswitch/link.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        crand "crypto/rand"
6
        "crypto/sha256"
7
        "fmt"
8
        prand "math/rand"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btclog"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/go-errors/errors"
18
        "github.com/lightningnetwork/lnd/build"
19
        "github.com/lightningnetwork/lnd/channeldb"
20
        "github.com/lightningnetwork/lnd/channeldb/models"
21
        "github.com/lightningnetwork/lnd/contractcourt"
22
        "github.com/lightningnetwork/lnd/fn"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hodl"
24
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
25
        "github.com/lightningnetwork/lnd/invoices"
26
        "github.com/lightningnetwork/lnd/lnpeer"
27
        "github.com/lightningnetwork/lnd/lntypes"
28
        "github.com/lightningnetwork/lnd/lnwallet"
29
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
30
        "github.com/lightningnetwork/lnd/lnwire"
31
        "github.com/lightningnetwork/lnd/queue"
32
        "github.com/lightningnetwork/lnd/ticker"
33
)
34

35
func init() {
19✔
36
        prand.Seed(time.Now().UnixNano())
19✔
37
}
19✔
38

39
const (
40
        // DefaultMaxOutgoingCltvExpiry is the maximum outgoing time lock that
41
        // the node accepts for forwarded payments. The value is relative to the
42
        // current block height. The reason to have a maximum is to prevent
43
        // funds getting locked up unreasonably long. Otherwise, an attacker
44
        // willing to lock its own funds too, could force the funds of this node
45
        // to be locked up for an indefinite (max int32) number of blocks.
46
        //
47
        // The value 2016 corresponds to on average two weeks worth of blocks
48
        // and is based on the maximum number of hops (20), the default CLTV
49
        // delta (40), and some extra margin to account for the other lightning
50
        // implementations and past lnd versions which used to have a default
51
        // CLTV delta of 144.
52
        DefaultMaxOutgoingCltvExpiry = 2016
53

54
        // DefaultMinLinkFeeUpdateTimeout represents the minimum interval in
55
        // which a link should propose to update its commitment fee rate.
56
        DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
57

58
        // DefaultMaxLinkFeeUpdateTimeout represents the maximum interval in
59
        // which a link should propose to update its commitment fee rate.
60
        DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
61

62
        // DefaultMaxLinkFeeAllocation is the highest allocation we'll allow
63
        // a channel's commitment fee to be of its balance. This only applies to
64
        // the initiator of the channel.
65
        DefaultMaxLinkFeeAllocation float64 = 0.5
66
)
67

68
// ExpectedFee computes the expected fee for a given htlc amount. The value
69
// returned from this function is to be used as a sanity check when forwarding
70
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
71
// forwarding policy.
72
//
73
// TODO(roasbeef): also add in current available channel bandwidth, inverse
74
// func
75
func ExpectedFee(f models.ForwardingPolicy,
76
        htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
82✔
77

82✔
78
        return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
82✔
79
}
82✔
80

81
// ChannelLinkConfig defines the configuration for the channel link. ALL
82
// elements within the configuration MUST be non-nil for channel link to carry
83
// out its duties.
84
type ChannelLinkConfig struct {
85
        // FwrdingPolicy is the initial forwarding policy to be used when
86
        // deciding whether to forwarding incoming HTLC's or not. This value
87
        // can be updated with subsequent calls to UpdateForwardingPolicy
88
        // targeted at a given ChannelLink concrete interface implementation.
89
        FwrdingPolicy models.ForwardingPolicy
90

91
        // Circuits provides restricted access to the switch's circuit map,
92
        // allowing the link to open and close circuits.
93
        Circuits CircuitModifier
94

95
        // BestHeight returns the best known height.
96
        BestHeight func() uint32
97

98
        // ForwardPackets attempts to forward the batch of htlcs through the
99
        // switch. The function returns and error in case it fails to send one or
100
        // more packets. The link's quit signal should be provided to allow
101
        // cancellation of forwarding during link shutdown.
102
        ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error
103

104
        // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
105
        // blobs, which are then used to inform how to forward an HTLC.
106
        //
107
        // NOTE: This function assumes the same set of readers and preimages
108
        // are always presented for the same identifier.
109
        DecodeHopIterators func([]byte, []hop.DecodeHopIteratorRequest) (
110
                []hop.DecodeHopIteratorResponse, error)
111

112
        // ExtractErrorEncrypter function is responsible for decoding HTLC
113
        // Sphinx onion blob, and creating onion failure obfuscator.
114
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
115

116
        // FetchLastChannelUpdate retrieves the latest routing policy for a
117
        // target channel. This channel will typically be the outgoing channel
118
        // specified when we receive an incoming HTLC.  This will be used to
119
        // provide payment senders our latest policy when sending encrypted
120
        // error messages.
121
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
122

123
        // Peer is a lightning network node with which we have the channel link
124
        // opened.
125
        Peer lnpeer.Peer
126

127
        // Registry is a sub-system which responsible for managing the invoices
128
        // in thread-safe manner.
129
        Registry InvoiceDatabase
130

131
        // PreimageCache is a global witness beacon that houses any new
132
        // preimages discovered by other links. We'll use this to add new
133
        // witnesses that we discover which will notify any sub-systems
134
        // subscribed to new events.
135
        PreimageCache contractcourt.WitnessBeacon
136

137
        // OnChannelFailure is a function closure that we'll call if the
138
        // channel failed for some reason. Depending on the severity of the
139
        // error, the closure potentially must force close this channel and
140
        // disconnect the peer.
141
        //
142
        // NOTE: The method must return in order for the ChannelLink to be able
143
        // to shut down properly.
144
        OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
145
                LinkFailureError)
146

147
        // UpdateContractSignals is a function closure that we'll use to update
148
        // outside sub-systems with this channel's latest ShortChannelID.
149
        UpdateContractSignals func(*contractcourt.ContractSignals) error
150

151
        // NotifyContractUpdate is a function closure that we'll use to update
152
        // the contractcourt and more specifically the ChannelArbitrator of the
153
        // latest channel state.
154
        NotifyContractUpdate func(*contractcourt.ContractUpdate) error
155

156
        // ChainEvents is an active subscription to the chain watcher for this
157
        // channel to be notified of any on-chain activity related to this
158
        // channel.
159
        ChainEvents *contractcourt.ChainEventSubscription
160

161
        // FeeEstimator is an instance of a live fee estimator which will be
162
        // used to dynamically regulate the current fee of the commitment
163
        // transaction to ensure timely confirmation.
164
        FeeEstimator chainfee.Estimator
165

166
        // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints
167
        // for HTLC forwarding internal to the switch.
168
        //
169
        // NOTE: This should only be used for testing.
170
        HodlMask hodl.Mask
171

172
        // SyncStates is used to indicate that we need send the channel
173
        // reestablishment message to the remote peer. It should be done if our
174
        // clients have been restarted, or remote peer have been reconnected.
175
        SyncStates bool
176

177
        // BatchTicker is the ticker that determines the interval that we'll
178
        // use to check the batch to see if there're any updates we should
179
        // flush out. By batching updates into a single commit, we attempt to
180
        // increase throughput by maximizing the number of updates coalesced
181
        // into a single commit.
182
        BatchTicker ticker.Ticker
183

184
        // FwdPkgGCTicker is the ticker determining the frequency at which
185
        // garbage collection of forwarding packages occurs. We use a
186
        // time-based approach, as opposed to block epochs, as to not hinder
187
        // syncing.
188
        FwdPkgGCTicker ticker.Ticker
189

190
        // PendingCommitTicker is a ticker that allows the link to determine if
191
        // a locally initiated commitment dance gets stuck waiting for the
192
        // remote party to revoke.
193
        PendingCommitTicker ticker.Ticker
194

195
        // BatchSize is the max size of a batch of updates done to the link
196
        // before we do a state update.
197
        BatchSize uint32
198

199
        // UnsafeReplay will cause a link to replay the adds in its latest
200
        // commitment txn after the link is restarted. This should only be used
201
        // in testing, it is here to ensure the sphinx replay detection on the
202
        // receiving node is persistent.
203
        UnsafeReplay bool
204

205
        // MinUpdateTimeout represents the minimum interval in which a link
206
        // will propose to update its commitment fee rate. A random timeout will
207
        // be selected between this and MaxUpdateTimeout.
208
        MinUpdateTimeout time.Duration
209

210
        // MaxUpdateTimeout represents the maximum interval in which a link
211
        // will propose to update its commitment fee rate. A random timeout will
212
        // be selected between this and MinUpdateTimeout.
213
        MaxUpdateTimeout time.Duration
214

215
        // OutgoingCltvRejectDelta defines the number of blocks before expiry of
216
        // an htlc where we don't offer an htlc anymore. This should be at least
217
        // the outgoing broadcast delta, because in any case we don't want to
218
        // risk offering an htlc that triggers channel closure.
219
        OutgoingCltvRejectDelta uint32
220

221
        // TowerClient is an optional engine that manages the signing,
222
        // encrypting, and uploading of justice transactions to the daemon's
223
        // configured set of watchtowers for legacy channels.
224
        TowerClient TowerClient
225

226
        // MaxOutgoingCltvExpiry is the maximum outgoing timelock that the link
227
        // should accept for a forwarded HTLC. The value is relative to the
228
        // current block height.
229
        MaxOutgoingCltvExpiry uint32
230

231
        // MaxFeeAllocation is the highest allocation we'll allow a channel's
232
        // commitment fee to be of its balance. This only applies to the
233
        // initiator of the channel.
234
        MaxFeeAllocation float64
235

236
        // MaxAnchorsCommitFeeRate is the max commitment fee rate we'll use as
237
        // the initiator for channels of the anchor type.
238
        MaxAnchorsCommitFeeRate chainfee.SatPerKWeight
239

240
        // NotifyActiveLink allows the link to tell the ChannelNotifier when a
241
        // link is first started.
242
        NotifyActiveLink func(wire.OutPoint)
243

244
        // NotifyActiveChannel allows the link to tell the ChannelNotifier when
245
        // channels becomes active.
246
        NotifyActiveChannel func(wire.OutPoint)
247

248
        // NotifyInactiveChannel allows the switch to tell the ChannelNotifier
249
        // when channels become inactive.
250
        NotifyInactiveChannel func(wire.OutPoint)
251

252
        // NotifyInactiveLinkEvent allows the switch to tell the
253
        // ChannelNotifier when a channel link become inactive.
254
        NotifyInactiveLinkEvent func(wire.OutPoint)
255

256
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
257
        // events through.
258
        HtlcNotifier htlcNotifier
259

260
        // FailAliasUpdate is a function used to fail an HTLC for an
261
        // option_scid_alias channel.
262
        FailAliasUpdate func(sid lnwire.ShortChannelID,
263
                incoming bool) *lnwire.ChannelUpdate
264

265
        // GetAliases is used by the link and switch to fetch the set of
266
        // aliases for a given link.
267
        GetAliases func(base lnwire.ShortChannelID) []lnwire.ShortChannelID
268

269
        // PreviouslySentShutdown is an optional value that is set if, at the
270
        // time of the link being started, persisted shutdown info was found for
271
        // the channel. This value being set means that we previously sent a
272
        // Shutdown message to our peer, and so we should do so again on
273
        // re-establish and should not allow anymore HTLC adds on the outgoing
274
        // direction of the link.
275
        PreviouslySentShutdown fn.Option[lnwire.Shutdown]
276

277
        // Adds the option to disable forwarding payments in blinded routes
278
        // by failing back any blinding-related payloads as if they were
279
        // invalid.
280
        DisallowRouteBlinding bool
281
}
282

283
// channelLink is the service which drives a channel's commitment update
284
// state-machine. In the event that an HTLC needs to be propagated to another
285
// link, the forward handler from config is used which sends HTLC to the
286
// switch. Additionally, the link encapsulate logic of commitment protocol
287
// message ordering and updates.
288
type channelLink struct {
289
        // The following fields are only meant to be used *atomically*
290
        started       int32
291
        reestablished int32
292
        shutdown      int32
293

294
        // failed should be set to true in case a link error happens, making
295
        // sure we don't process any more updates.
296
        failed bool
297

298
        // keystoneBatch represents a volatile list of keystones that must be
299
        // written before attempting to sign the next commitment txn. These
300
        // represent all the HTLC's forwarded to the link from the switch. Once
301
        // we lock them into our outgoing commitment, then the circuit has a
302
        // keystone, and is fully opened.
303
        keystoneBatch []Keystone
304

305
        // openedCircuits is the set of all payment circuits that will be open
306
        // once we make our next commitment. After making the commitment we'll
307
        // ACK all these from our mailbox to ensure that they don't get
308
        // re-delivered if we reconnect.
309
        openedCircuits []CircuitKey
310

311
        // closedCircuits is the set of all payment circuits that will be
312
        // closed once we make our next commitment. After taking the commitment
313
        // we'll ACK all these to ensure that they don't get re-delivered if we
314
        // reconnect.
315
        closedCircuits []CircuitKey
316

317
        // channel is a lightning network channel to which we apply htlc
318
        // updates.
319
        channel *lnwallet.LightningChannel
320

321
        // cfg is a structure which carries all dependable fields/handlers
322
        // which may affect behaviour of the service.
323
        cfg ChannelLinkConfig
324

325
        // mailBox is the main interface between the outside world and the
326
        // link. All incoming messages will be sent over this mailBox. Messages
327
        // include new updates from our connected peer, and new packets to be
328
        // forwarded sent by the switch.
329
        mailBox MailBox
330

331
        // upstream is a channel that new messages sent from the remote peer to
332
        // the local peer will be sent across.
333
        upstream chan lnwire.Message
334

335
        // downstream is a channel in which new multi-hop HTLC's to be
336
        // forwarded will be sent across. Messages from this channel are sent
337
        // by the HTLC switch.
338
        downstream chan *htlcPacket
339

340
        // updateFeeTimer is the timer responsible for updating the link's
341
        // commitment fee every time it fires.
342
        updateFeeTimer *time.Timer
343

344
        // uncommittedPreimages stores a list of all preimages that have been
345
        // learned since receiving the last CommitSig from the remote peer. The
346
        // batch will be flushed just before accepting the subsequent CommitSig
347
        // or on shutdown to avoid doing a write for each preimage received.
348
        uncommittedPreimages []lntypes.Preimage
349

350
        sync.RWMutex
351

352
        // hodlQueue is used to receive exit hop htlc resolutions from invoice
353
        // registry.
354
        hodlQueue *queue.ConcurrentQueue
355

356
        // hodlMap stores related htlc data for a circuit key. It allows
357
        // resolving those htlcs when we receive a message on hodlQueue.
358
        hodlMap map[models.CircuitKey]hodlHtlc
359

360
        // log is a link-specific logging instance.
361
        log btclog.Logger
362

363
        // isOutgoingAddBlocked tracks whether the channelLink can send an
364
        // UpdateAddHTLC.
365
        isOutgoingAddBlocked atomic.Bool
366

367
        // isIncomingAddBlocked tracks whether the channelLink can receive an
368
        // UpdateAddHTLC.
369
        isIncomingAddBlocked atomic.Bool
370

371
        // flushHooks is a hookMap that is triggered when we reach a channel
372
        // state with no live HTLCs.
373
        flushHooks hookMap
374

375
        // outgoingCommitHooks is a hookMap that is triggered after we send our
376
        // next CommitSig.
377
        outgoingCommitHooks hookMap
378

379
        // incomingCommitHooks is a hookMap that is triggered after we receive
380
        // our next CommitSig.
381
        incomingCommitHooks hookMap
382

383
        wg   sync.WaitGroup
384
        quit chan struct{}
385
}
386

387
// hookMap is a data structure that is used to track the hooks that need to be
388
// called in various parts of the channelLink's lifecycle.
389
//
390
// WARNING: NOT thread-safe.
391
type hookMap struct {
392
        // allocIdx keeps track of the next id we haven't yet allocated.
393
        allocIdx atomic.Uint64
394

395
        // transient is a map of hooks that are only called the next time invoke
396
        // is called. These hooks are deleted during invoke.
397
        transient map[uint64]func()
398

399
        // newTransients is a channel that we use to accept new hooks into the
400
        // hookMap.
401
        newTransients chan func()
402
}
403

404
// newHookMap initializes a new empty hookMap.
405
func newHookMap() hookMap {
643✔
406
        return hookMap{
643✔
407
                allocIdx:      atomic.Uint64{},
643✔
408
                transient:     make(map[uint64]func()),
643✔
409
                newTransients: make(chan func()),
643✔
410
        }
643✔
411
}
643✔
412

413
// alloc allocates space in the hook map for the supplied hook, the second
414
// argument determines whether it goes into the transient or persistent part
415
// of the hookMap.
416
func (m *hookMap) alloc(hook func()) uint64 {
6✔
417
        // We assume we never overflow a uint64. Seems OK.
6✔
418
        hookID := m.allocIdx.Add(1)
6✔
419
        if hookID == 0 {
6✔
420
                panic("hookMap allocIdx overflow")
×
421
        }
422
        m.transient[hookID] = hook
6✔
423

6✔
424
        return hookID
6✔
425
}
426

427
// invoke is used on a hook map to call all the registered hooks and then clear
428
// out the transient hooks so they are not called again.
429
func (m *hookMap) invoke() {
5,135✔
430
        for _, hook := range m.transient {
5,141✔
431
                hook()
6✔
432
        }
6✔
433

434
        m.transient = make(map[uint64]func())
5,135✔
435
}
436

437
// hodlHtlc contains htlc data that is required for resolution.
438
type hodlHtlc struct {
439
        pd         *lnwallet.PaymentDescriptor
440
        obfuscator hop.ErrorEncrypter
441
}
442

443
// NewChannelLink creates a new instance of a ChannelLink given a configuration
444
// and active channel that will be used to verify/apply updates to.
445
func NewChannelLink(cfg ChannelLinkConfig,
446
        channel *lnwallet.LightningChannel) ChannelLink {
217✔
447

217✔
448
        logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
217✔
449

217✔
450
        return &channelLink{
217✔
451
                cfg:                 cfg,
217✔
452
                channel:             channel,
217✔
453
                hodlMap:             make(map[models.CircuitKey]hodlHtlc),
217✔
454
                hodlQueue:           queue.NewConcurrentQueue(10),
217✔
455
                log:                 build.NewPrefixLog(logPrefix, log),
217✔
456
                flushHooks:          newHookMap(),
217✔
457
                outgoingCommitHooks: newHookMap(),
217✔
458
                incomingCommitHooks: newHookMap(),
217✔
459
                quit:                make(chan struct{}),
217✔
460
        }
217✔
461
}
217✔
462

463
// A compile time check to ensure channelLink implements the ChannelLink
464
// interface.
465
var _ ChannelLink = (*channelLink)(nil)
466

467
// Start starts all helper goroutines required for the operation of the channel
468
// link.
469
//
470
// NOTE: Part of the ChannelLink interface.
471
func (l *channelLink) Start() error {
215✔
472
        if !atomic.CompareAndSwapInt32(&l.started, 0, 1) {
215✔
473
                err := errors.Errorf("channel link(%v): already started", l)
×
474
                l.log.Warn("already started")
×
475
                return err
×
476
        }
×
477

478
        l.log.Info("starting")
215✔
479

215✔
480
        // If the config supplied watchtower client, ensure the channel is
215✔
481
        // registered before trying to use it during operation.
215✔
482
        if l.cfg.TowerClient != nil {
219✔
483
                err := l.cfg.TowerClient.RegisterChannel(
4✔
484
                        l.ChanID(), l.channel.State().ChanType,
4✔
485
                )
4✔
486
                if err != nil {
4✔
487
                        return err
×
488
                }
×
489
        }
490

491
        l.mailBox.ResetMessages()
215✔
492
        l.hodlQueue.Start()
215✔
493

215✔
494
        // Before launching the htlcManager messages, revert any circuits that
215✔
495
        // were marked open in the switch's circuit map, but did not make it
215✔
496
        // into a commitment txn. We use the next local htlc index as the cut
215✔
497
        // off point, since all indexes below that are committed. This action
215✔
498
        // is only performed if the link's final short channel ID has been
215✔
499
        // assigned, otherwise we would try to trim the htlcs belonging to the
215✔
500
        // all-zero, hop.Source ID.
215✔
501
        if l.ShortChanID() != hop.Source {
430✔
502
                localHtlcIndex, err := l.channel.NextLocalHtlcIndex()
215✔
503
                if err != nil {
215✔
504
                        return fmt.Errorf("unable to retrieve next local "+
×
505
                                "htlc index: %v", err)
×
506
                }
×
507

508
                // NOTE: This is automatically done by the switch when it
509
                // starts up, but is necessary to prevent inconsistencies in
510
                // the case that the link flaps. This is a result of a link's
511
                // life-cycle being shorter than that of the switch.
512
                chanID := l.ShortChanID()
215✔
513
                err = l.cfg.Circuits.TrimOpenCircuits(chanID, localHtlcIndex)
215✔
514
                if err != nil {
215✔
515
                        return fmt.Errorf("unable to trim circuits above "+
×
516
                                "local htlc index %d: %v", localHtlcIndex, err)
×
517
                }
×
518

519
                // Since the link is live, before we start the link we'll update
520
                // the ChainArbitrator with the set of new channel signals for
521
                // this channel.
522
                //
523
                // TODO(roasbeef): split goroutines within channel arb to avoid
524
                go func() {
430✔
525
                        signals := &contractcourt.ContractSignals{
215✔
526
                                ShortChanID: l.channel.ShortChanID(),
215✔
527
                        }
215✔
528

215✔
529
                        err := l.cfg.UpdateContractSignals(signals)
215✔
530
                        if err != nil {
215✔
531
                                l.log.Errorf("unable to update signals")
×
532
                        }
×
533
                }()
534
        }
535

536
        l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
215✔
537

215✔
538
        l.wg.Add(1)
215✔
539
        go l.htlcManager()
215✔
540

215✔
541
        return nil
215✔
542
}
543

544
// Stop gracefully stops all active helper goroutines, then waits until they've
545
// exited.
546
//
547
// NOTE: Part of the ChannelLink interface.
548
func (l *channelLink) Stop() {
216✔
549
        if !atomic.CompareAndSwapInt32(&l.shutdown, 0, 1) {
228✔
550
                l.log.Warn("already stopped")
12✔
551
                return
12✔
552
        }
12✔
553

554
        l.log.Info("stopping")
204✔
555

204✔
556
        // As the link is stopping, we are no longer interested in htlc
204✔
557
        // resolutions coming from the invoice registry.
204✔
558
        l.cfg.Registry.HodlUnsubscribeAll(l.hodlQueue.ChanIn())
204✔
559

204✔
560
        if l.cfg.ChainEvents.Cancel != nil {
208✔
561
                l.cfg.ChainEvents.Cancel()
4✔
562
        }
4✔
563

564
        // Ensure the channel for the timer is drained.
565
        if !l.updateFeeTimer.Stop() {
204✔
566
                select {
×
567
                case <-l.updateFeeTimer.C:
×
568
                default:
×
569
                }
570
        }
571

572
        l.hodlQueue.Stop()
204✔
573

204✔
574
        close(l.quit)
204✔
575
        l.wg.Wait()
204✔
576

204✔
577
        // Now that the htlcManager has completely exited, reset the packet
204✔
578
        // courier. This allows the mailbox to revaluate any lingering Adds that
204✔
579
        // were delivered but didn't make it on a commitment to be failed back
204✔
580
        // if the link is offline for an extended period of time. The error is
204✔
581
        // ignored since it can only fail when the daemon is exiting.
204✔
582
        _ = l.mailBox.ResetPackets()
204✔
583

204✔
584
        // As a final precaution, we will attempt to flush any uncommitted
204✔
585
        // preimages to the preimage cache. The preimages should be re-delivered
204✔
586
        // after channel reestablishment, however this adds an extra layer of
204✔
587
        // protection in case the peer never returns. Without this, we will be
204✔
588
        // unable to settle any contracts depending on the preimages even though
204✔
589
        // we had learned them at some point.
204✔
590
        err := l.cfg.PreimageCache.AddPreimages(l.uncommittedPreimages...)
204✔
591
        if err != nil {
204✔
592
                l.log.Errorf("unable to add preimages=%v to cache: %v",
×
593
                        l.uncommittedPreimages, err)
×
594
        }
×
595
}
596

597
// WaitForShutdown blocks until the link finishes shutting down, which includes
598
// termination of all dependent goroutines.
599
func (l *channelLink) WaitForShutdown() {
×
600
        l.wg.Wait()
×
601
}
×
602

603
// EligibleToForward returns a bool indicating if the channel is able to
604
// actively accept requests to forward HTLC's. We're able to forward HTLC's if
605
// we are eligible to update AND the channel isn't currently flushing the
606
// outgoing half of the channel.
607
func (l *channelLink) EligibleToForward() bool {
1,764✔
608
        return l.EligibleToUpdate() &&
1,764✔
609
                !l.IsFlushing(Outgoing)
1,764✔
610
}
1,764✔
611

612
// EligibleToUpdate returns a bool indicating if the channel is able to update
613
// channel state. We're able to update channel state if we know the remote
614
// party's next revocation point. Otherwise, we can't initiate new channel
615
// state. We also require that the short channel ID not be the all-zero source
616
// ID, meaning that the channel has had its ID finalized.
617
func (l *channelLink) EligibleToUpdate() bool {
1,767✔
618
        return l.channel.RemoteNextRevocation() != nil &&
1,767✔
619
                l.ShortChanID() != hop.Source &&
1,767✔
620
                l.isReestablished()
1,767✔
621
}
1,767✔
622

623
// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
624
// the specified direction. It returns true if the state was changed and false
625
// if the desired state was already set before the method was called.
626
func (l *channelLink) EnableAdds(linkDirection LinkDirection) bool {
15✔
627
        if linkDirection == Outgoing {
24✔
628
                return l.isOutgoingAddBlocked.Swap(false)
9✔
629
        }
9✔
630

631
        return l.isIncomingAddBlocked.Swap(false)
6✔
632
}
633

634
// DisableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in
635
// the specified direction. It returns true if the state was changed and false
636
// if the desired state was already set before the method was called.
637
func (l *channelLink) DisableAdds(linkDirection LinkDirection) bool {
18✔
638
        if linkDirection == Outgoing {
28✔
639
                return !l.isOutgoingAddBlocked.Swap(true)
10✔
640
        }
10✔
641

642
        return !l.isIncomingAddBlocked.Swap(true)
12✔
643
}
644

645
// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of
646
// the argument.
647
func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
4,832✔
648
        if linkDirection == Outgoing {
8,143✔
649
                return l.isOutgoingAddBlocked.Load()
3,311✔
650
        }
3,311✔
651

652
        return l.isIncomingAddBlocked.Load()
1,525✔
653
}
654

655
// OnFlushedOnce adds a hook that will be called the next time the channel
656
// state reaches zero htlcs. This hook will only ever be called once. If the
657
// channel state already has zero htlcs, then this will be called immediately.
658
func (l *channelLink) OnFlushedOnce(hook func()) {
5✔
659
        select {
5✔
660
        case l.flushHooks.newTransients <- hook:
5✔
661
        case <-l.quit:
×
662
        }
663
}
664

665
// OnCommitOnce adds a hook that will be called the next time a CommitSig
666
// message is sent in the argument's LinkDirection. This hook will only ever be
667
// called once. If no CommitSig is owed in the argument's LinkDirection, then
668
// we will call this hook be run immediately.
669
func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
5✔
670
        var queue chan func()
5✔
671

5✔
672
        if direction == Outgoing {
10✔
673
                queue = l.outgoingCommitHooks.newTransients
5✔
674
        } else {
5✔
675
                queue = l.incomingCommitHooks.newTransients
×
676
        }
×
677

678
        select {
5✔
679
        case queue <- hook:
5✔
680
        case <-l.quit:
×
681
        }
682
}
683

684
// isReestablished returns true if the link has successfully completed the
685
// channel reestablishment dance.
686
func (l *channelLink) isReestablished() bool {
1,767✔
687
        return atomic.LoadInt32(&l.reestablished) == 1
1,767✔
688
}
1,767✔
689

690
// markReestablished signals that the remote peer has successfully exchanged
691
// channel reestablish messages and that the channel is ready to process
692
// subsequent messages.
693
func (l *channelLink) markReestablished() {
215✔
694
        atomic.StoreInt32(&l.reestablished, 1)
215✔
695
}
215✔
696

697
// IsUnadvertised returns true if the underlying channel is unadvertised.
698
func (l *channelLink) IsUnadvertised() bool {
6✔
699
        state := l.channel.State()
6✔
700
        return state.ChannelFlags&lnwire.FFAnnounceChannel == 0
6✔
701
}
6✔
702

703
// sampleNetworkFee samples the current fee rate on the network to get into the
704
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
705
// this is the native rate used when computing the fee for commitment
706
// transactions, and the second-level HTLC transactions.
707
func (l *channelLink) sampleNetworkFee() (chainfee.SatPerKWeight, error) {
4✔
708
        // We'll first query for the sat/kw recommended to be confirmed within 3
4✔
709
        // blocks.
4✔
710
        feePerKw, err := l.cfg.FeeEstimator.EstimateFeePerKW(3)
4✔
711
        if err != nil {
4✔
712
                return 0, err
×
713
        }
×
714

715
        l.log.Debugf("sampled fee rate for 3 block conf: %v sat/kw",
4✔
716
                int64(feePerKw))
4✔
717

4✔
718
        return feePerKw, nil
4✔
719
}
720

721
// shouldAdjustCommitFee returns true if we should update our commitment fee to
722
// match that of the network fee. We'll only update our commitment fee if the
723
// network fee is +/- 10% to our commitment fee or if our current commitment
724
// fee is below the minimum relay fee.
725
func shouldAdjustCommitFee(netFee, chanFee,
726
        minRelayFee chainfee.SatPerKWeight) bool {
14✔
727

14✔
728
        switch {
14✔
729
        // If the network fee is greater than our current commitment fee and
730
        // our current commitment fee is below the minimum relay fee then
731
        // we should switch to it no matter if it is less than a 10% increase.
732
        case netFee > chanFee && chanFee < minRelayFee:
1✔
733
                return true
1✔
734

735
        // If the network fee is greater than the commitment fee, then we'll
736
        // switch to it if it's at least 10% greater than the commit fee.
737
        case netFee > chanFee && netFee >= (chanFee+(chanFee*10)/100):
4✔
738
                return true
4✔
739

740
        // If the network fee is less than our commitment fee, then we'll
741
        // switch to it if it's at least 10% less than the commitment fee.
742
        case netFee < chanFee && netFee <= (chanFee-(chanFee*10)/100):
2✔
743
                return true
2✔
744

745
        // Otherwise, we won't modify our fee.
746
        default:
7✔
747
                return false
7✔
748
        }
749
}
750

751
// failCb is used to cut down on the argument verbosity.
752
type failCb func(update *lnwire.ChannelUpdate) lnwire.FailureMessage
753

754
// createFailureWithUpdate creates a ChannelUpdate when failing an incoming or
755
// outgoing HTLC. It may return a FailureMessage that references a channel's
756
// alias. If the channel does not have an alias, then the regular channel
757
// update from disk will be returned.
758
func (l *channelLink) createFailureWithUpdate(incoming bool,
759
        outgoingScid lnwire.ShortChannelID, cb failCb) lnwire.FailureMessage {
26✔
760

26✔
761
        // Determine which SCID to use in case we need to use aliases in the
26✔
762
        // ChannelUpdate.
26✔
763
        scid := outgoingScid
26✔
764
        if incoming {
26✔
765
                scid = l.ShortChanID()
×
766
        }
×
767

768
        // Try using the FailAliasUpdate function. If it returns nil, fallback
769
        // to the non-alias behavior.
770
        update := l.cfg.FailAliasUpdate(scid, incoming)
26✔
771
        if update == nil {
46✔
772
                // Fallback to the non-alias behavior.
20✔
773
                var err error
20✔
774
                update, err = l.cfg.FetchLastChannelUpdate(l.ShortChanID())
20✔
775
                if err != nil {
20✔
776
                        return &lnwire.FailTemporaryNodeFailure{}
×
777
                }
×
778
        }
779

780
        return cb(update)
26✔
781
}
782

783
// syncChanState attempts to synchronize channel states with the remote party.
784
// This method is to be called upon reconnection after the initial funding
785
// flow. We'll compare out commitment chains with the remote party, and re-send
786
// either a danging commit signature, a revocation, or both.
787
func (l *channelLink) syncChanStates() error {
172✔
788
        chanState := l.channel.State()
172✔
789

172✔
790
        l.log.Infof("Attempting to re-synchronize channel: %v", chanState)
172✔
791

172✔
792
        // First, we'll generate our ChanSync message to send to the other
172✔
793
        // side. Based on this message, the remote party will decide if they
172✔
794
        // need to retransmit any data or not.
172✔
795
        localChanSyncMsg, err := chanState.ChanSyncMsg()
172✔
796
        if err != nil {
172✔
797
                return fmt.Errorf("unable to generate chan sync message for "+
×
798
                        "ChannelPoint(%v)", l.channel.ChannelPoint())
×
799
        }
×
800
        if err := l.cfg.Peer.SendMessage(true, localChanSyncMsg); err != nil {
172✔
801
                return fmt.Errorf("unable to send chan sync message for "+
×
802
                        "ChannelPoint(%v): %v", l.channel.ChannelPoint(), err)
×
803
        }
×
804

805
        var msgsToReSend []lnwire.Message
172✔
806

172✔
807
        // Next, we'll wait indefinitely to receive the ChanSync message. The
172✔
808
        // first message sent MUST be the ChanSync message.
172✔
809
        select {
172✔
810
        case msg := <-l.upstream:
172✔
811
                l.log.Tracef("Received msg=%v from peer(%x)", msg.MsgType(),
172✔
812
                        l.cfg.Peer.PubKey())
172✔
813

172✔
814
                remoteChanSyncMsg, ok := msg.(*lnwire.ChannelReestablish)
172✔
815
                if !ok {
172✔
816
                        return fmt.Errorf("first message sent to sync "+
×
817
                                "should be ChannelReestablish, instead "+
×
818
                                "received: %T", msg)
×
819
                }
×
820

821
                // If the remote party indicates that they think we haven't
822
                // done any state updates yet, then we'll retransmit the
823
                // channel_ready message first. We do this, as at this point
824
                // we can't be sure if they've really received the
825
                // ChannelReady message.
826
                if remoteChanSyncMsg.NextLocalCommitHeight == 1 &&
172✔
827
                        localChanSyncMsg.NextLocalCommitHeight == 1 &&
172✔
828
                        !l.channel.IsPending() {
338✔
829

166✔
830
                        l.log.Infof("resending ChannelReady message to peer")
166✔
831

166✔
832
                        nextRevocation, err := l.channel.NextRevocationKey()
166✔
833
                        if err != nil {
166✔
834
                                return fmt.Errorf("unable to create next "+
×
835
                                        "revocation: %v", err)
×
836
                        }
×
837

838
                        channelReadyMsg := lnwire.NewChannelReady(
166✔
839
                                l.ChanID(), nextRevocation,
166✔
840
                        )
166✔
841

166✔
842
                        // If this is a taproot channel, then we'll send the
166✔
843
                        // very same nonce that we sent above, as they should
166✔
844
                        // take the latest verification nonce we send.
166✔
845
                        if chanState.ChanType.IsTaproot() {
170✔
846
                                //nolint:lll
4✔
847
                                channelReadyMsg.NextLocalNonce = localChanSyncMsg.LocalNonce
4✔
848
                        }
4✔
849

850
                        // For channels that negotiated the option-scid-alias
851
                        // feature bit, ensure that we send over the alias in
852
                        // the channel_ready message. We'll send the first
853
                        // alias we find for the channel since it does not
854
                        // matter which alias we send. We'll error out if no
855
                        // aliases are found.
856
                        if l.negotiatedAliasFeature() {
170✔
857
                                aliases := l.getAliases()
4✔
858
                                if len(aliases) == 0 {
4✔
859
                                        // This shouldn't happen since we
×
860
                                        // always add at least one alias before
×
861
                                        // the channel reaches the link.
×
862
                                        return fmt.Errorf("no aliases found")
×
863
                                }
×
864

865
                                // getAliases returns a copy of the alias slice
866
                                // so it is ok to use a pointer to the first
867
                                // entry.
868
                                channelReadyMsg.AliasScid = &aliases[0]
4✔
869
                        }
870

871
                        err = l.cfg.Peer.SendMessage(false, channelReadyMsg)
166✔
872
                        if err != nil {
166✔
873
                                return fmt.Errorf("unable to re-send "+
×
874
                                        "ChannelReady: %v", err)
×
875
                        }
×
876
                }
877

878
                // In any case, we'll then process their ChanSync message.
879
                l.log.Info("received re-establishment message from remote side")
172✔
880

172✔
881
                var (
172✔
882
                        openedCircuits []CircuitKey
172✔
883
                        closedCircuits []CircuitKey
172✔
884
                )
172✔
885

172✔
886
                // We've just received a ChanSync message from the remote
172✔
887
                // party, so we'll process the message  in order to determine
172✔
888
                // if we need to re-transmit any messages to the remote party.
172✔
889
                msgsToReSend, openedCircuits, closedCircuits, err =
172✔
890
                        l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
172✔
891
                if err != nil {
176✔
892
                        return err
4✔
893
                }
4✔
894

895
                // Repopulate any identifiers for circuits that may have been
896
                // opened or unclosed. This may happen if we needed to
897
                // retransmit a commitment signature message.
898
                l.openedCircuits = openedCircuits
172✔
899
                l.closedCircuits = closedCircuits
172✔
900

172✔
901
                // Ensure that all packets have been have been removed from the
172✔
902
                // link's mailbox.
172✔
903
                if err := l.ackDownStreamPackets(); err != nil {
172✔
904
                        return err
×
905
                }
×
906

907
                if len(msgsToReSend) > 0 {
177✔
908
                        l.log.Infof("sending %v updates to synchronize the "+
5✔
909
                                "state", len(msgsToReSend))
5✔
910
                }
5✔
911

912
                // If we have any messages to retransmit, we'll do so
913
                // immediately so we return to a synchronized state as soon as
914
                // possible.
915
                for _, msg := range msgsToReSend {
183✔
916
                        l.cfg.Peer.SendMessage(false, msg)
11✔
917
                }
11✔
918

919
        case <-l.quit:
4✔
920
                return ErrLinkShuttingDown
4✔
921
        }
922

923
        return nil
172✔
924
}
925

926
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
927
// reprocesses them in order. The primary goal is to make sure that any HTLCs
928
// we previously received are reinstated in memory, and forwarded to the switch
929
// if necessary. After a restart, this will also delete any previously
930
// completed packages.
931
func (l *channelLink) resolveFwdPkgs() error {
215✔
932
        fwdPkgs, err := l.channel.LoadFwdPkgs()
215✔
933
        if err != nil {
215✔
934
                return err
×
935
        }
×
936

937
        l.log.Debugf("loaded %d fwd pks", len(fwdPkgs))
215✔
938

215✔
939
        for _, fwdPkg := range fwdPkgs {
225✔
940
                if err := l.resolveFwdPkg(fwdPkg); err != nil {
10✔
941
                        return err
×
942
                }
×
943
        }
944

945
        // If any of our reprocessing steps require an update to the commitment
946
        // txn, we initiate a state transition to capture all relevant changes.
947
        if l.channel.PendingLocalUpdateCount() > 0 {
219✔
948
                return l.updateCommitTx()
4✔
949
        }
4✔
950

951
        return nil
215✔
952
}
953

954
// resolveFwdPkg interprets the FwdState of the provided package, either
955
// reprocesses any outstanding htlcs in the package, or performs garbage
956
// collection on the package.
957
func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
10✔
958
        // Remove any completed packages to clear up space.
10✔
959
        if fwdPkg.State == channeldb.FwdStateCompleted {
15✔
960
                l.log.Debugf("removing completed fwd pkg for height=%d",
5✔
961
                        fwdPkg.Height)
5✔
962

5✔
963
                err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
5✔
964
                if err != nil {
5✔
965
                        l.log.Errorf("unable to remove fwd pkg for height=%d: "+
×
966
                                "%v", fwdPkg.Height, err)
×
967
                        return err
×
968
                }
×
969
        }
970

971
        // Otherwise this is either a new package or one has gone through
972
        // processing, but contains htlcs that need to be restored in memory.
973
        // We replay this forwarding package to make sure our local mem state
974
        // is resurrected, we mimic any original responses back to the remote
975
        // party, and re-forward the relevant HTLCs to the switch.
976

977
        // If the package is fully acked but not completed, it must still have
978
        // settles and fails to propagate.
979
        if !fwdPkg.SettleFailFilter.IsFull() {
14✔
980
                settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates(
4✔
981
                        fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails,
4✔
982
                )
4✔
983
                if err != nil {
4✔
984
                        l.log.Errorf("unable to process remote log updates: %v",
×
985
                                err)
×
986
                        return err
×
987
                }
×
988
                l.processRemoteSettleFails(fwdPkg, settleFails)
4✔
989
        }
990

991
        // Finally, replay *ALL ADDS* in this forwarding package. The
992
        // downstream logic is able to filter out any duplicates, but we must
993
        // shove the entire, original set of adds down the pipeline so that the
994
        // batch of adds presented to the sphinx router does not ever change.
995
        if !fwdPkg.AckFilter.IsFull() {
17✔
996
                adds, err := lnwallet.PayDescsFromRemoteLogUpdates(
7✔
997
                        fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
7✔
998
                )
7✔
999
                if err != nil {
7✔
1000
                        l.log.Errorf("unable to process remote log updates: %v",
×
1001
                                err)
×
1002
                        return err
×
1003
                }
×
1004
                l.processRemoteAdds(fwdPkg, adds)
7✔
1005

7✔
1006
                // If the link failed during processing the adds, we must
7✔
1007
                // return to ensure we won't attempted to update the state
7✔
1008
                // further.
7✔
1009
                if l.failed {
7✔
1010
                        return fmt.Errorf("link failed while " +
×
1011
                                "processing remote adds")
×
1012
                }
×
1013
        }
1014

1015
        return nil
10✔
1016
}
1017

1018
// fwdPkgGarbager periodically reads all forwarding packages from disk and
1019
// removes those that can be discarded. It is safe to do this entirely in the
1020
// background, since all state is coordinated on disk. This also ensures the
1021
// link can continue to process messages and interleave database accesses.
1022
//
1023
// NOTE: This MUST be run as a goroutine.
1024
func (l *channelLink) fwdPkgGarbager() {
215✔
1025
        defer l.wg.Done()
215✔
1026

215✔
1027
        l.cfg.FwdPkgGCTicker.Resume()
215✔
1028
        defer l.cfg.FwdPkgGCTicker.Stop()
215✔
1029

215✔
1030
        if err := l.loadAndRemove(); err != nil {
215✔
1031
                l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
×
1032
        }
×
1033

1034
        for {
505✔
1035
                select {
290✔
1036
                case <-l.cfg.FwdPkgGCTicker.Ticks():
75✔
1037
                        if err := l.loadAndRemove(); err != nil {
138✔
1038
                                l.log.Warnf("unable to remove fwd pkgs: %v",
63✔
1039
                                        err)
63✔
1040
                                continue
63✔
1041
                        }
1042
                case <-l.quit:
204✔
1043
                        return
204✔
1044
                }
1045
        }
1046
}
1047

1048
// loadAndRemove loads all the channels forwarding packages and determines if
1049
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
1050
// a longer tick interval can be used.
1051
func (l *channelLink) loadAndRemove() error {
290✔
1052
        fwdPkgs, err := l.channel.LoadFwdPkgs()
290✔
1053
        if err != nil {
353✔
1054
                return err
63✔
1055
        }
63✔
1056

1057
        var removeHeights []uint64
227✔
1058
        for _, fwdPkg := range fwdPkgs {
1,158✔
1059
                if fwdPkg.State != channeldb.FwdStateCompleted {
971✔
1060
                        continue
40✔
1061
                }
1062

1063
                removeHeights = append(removeHeights, fwdPkg.Height)
895✔
1064
        }
1065

1066
        // If removeHeights is empty, return early so we don't use a db
1067
        // transaction.
1068
        if len(removeHeights) == 0 {
442✔
1069
                return nil
215✔
1070
        }
215✔
1071

1072
        return l.channel.RemoveFwdPkgs(removeHeights...)
16✔
1073
}
1074

1075
// htlcManager is the primary goroutine which drives a channel's commitment
1076
// update state-machine in response to messages received via several channels.
1077
// This goroutine reads messages from the upstream (remote) peer, and also from
1078
// downstream channel managed by the channel link. In the event that an htlc
1079
// needs to be forwarded, then send-only forward handler is used which sends
1080
// htlc packets to the switch. Additionally, this goroutine handles acting upon
1081
// all timeouts for any active HTLCs, manages the channel's revocation window,
1082
// and also the htlc trickle queue+timer for this active channels.
1083
//
1084
// NOTE: This MUST be run as a goroutine.
1085
func (l *channelLink) htlcManager() {
215✔
1086
        defer func() {
421✔
1087
                l.cfg.BatchTicker.Stop()
206✔
1088
                l.wg.Done()
206✔
1089
                l.log.Infof("exited")
206✔
1090
        }()
206✔
1091

1092
        l.log.Infof("HTLC manager started, bandwidth=%v", l.Bandwidth())
215✔
1093

215✔
1094
        // Notify any clients that the link is now in the switch via an
215✔
1095
        // ActiveLinkEvent. We'll also defer an inactive link notification for
215✔
1096
        // when the link exits to ensure that every active notification is
215✔
1097
        // matched by an inactive one.
215✔
1098
        l.cfg.NotifyActiveLink(l.ChannelPoint())
215✔
1099
        defer l.cfg.NotifyInactiveLinkEvent(l.ChannelPoint())
215✔
1100

215✔
1101
        // TODO(roasbeef): need to call wipe chan whenever D/C?
215✔
1102

215✔
1103
        // If this isn't the first time that this channel link has been
215✔
1104
        // created, then we'll need to check to see if we need to
215✔
1105
        // re-synchronize state with the remote peer. settledHtlcs is a map of
215✔
1106
        // HTLC's that we re-settled as part of the channel state sync.
215✔
1107
        if l.cfg.SyncStates {
387✔
1108
                err := l.syncChanStates()
172✔
1109
                if err != nil {
176✔
1110
                        l.log.Warnf("error when syncing channel states: %v", err)
4✔
1111

4✔
1112
                        errDataLoss, localDataLoss :=
4✔
1113
                                err.(*lnwallet.ErrCommitSyncLocalDataLoss)
4✔
1114

4✔
1115
                        switch {
4✔
1116
                        case err == ErrLinkShuttingDown:
4✔
1117
                                l.log.Debugf("unable to sync channel states, " +
4✔
1118
                                        "link is shutting down")
4✔
1119
                                return
4✔
1120

1121
                        // We failed syncing the commit chains, probably
1122
                        // because the remote has lost state. We should force
1123
                        // close the channel.
1124
                        case err == lnwallet.ErrCommitSyncRemoteDataLoss:
4✔
1125
                                fallthrough
4✔
1126

1127
                        // The remote sent us an invalid last commit secret, we
1128
                        // should force close the channel.
1129
                        // TODO(halseth): and permanently ban the peer?
1130
                        case err == lnwallet.ErrInvalidLastCommitSecret:
4✔
1131
                                fallthrough
4✔
1132

1133
                        // The remote sent us a commit point different from
1134
                        // what they sent us before.
1135
                        // TODO(halseth): ban peer?
1136
                        case err == lnwallet.ErrInvalidLocalUnrevokedCommitPoint:
4✔
1137
                                // We'll fail the link and tell the peer to
4✔
1138
                                // force close the channel. Note that the
4✔
1139
                                // database state is not updated here, but will
4✔
1140
                                // be updated when the close transaction is
4✔
1141
                                // ready to avoid that we go down before
4✔
1142
                                // storing the transaction in the db.
4✔
1143
                                l.fail(
4✔
1144
                                        LinkFailureError{
4✔
1145
                                                code:          ErrSyncError,
4✔
1146
                                                FailureAction: LinkFailureForceClose, //nolint:lll
4✔
1147
                                        },
4✔
1148
                                        "unable to synchronize channel "+
4✔
1149
                                                "states: %v", err,
4✔
1150
                                )
4✔
1151
                                return
4✔
1152

1153
                        // We have lost state and cannot safely force close the
1154
                        // channel. Fail the channel and wait for the remote to
1155
                        // hopefully force close it. The remote has sent us its
1156
                        // latest unrevoked commitment point, and we'll store
1157
                        // it in the database, such that we can attempt to
1158
                        // recover the funds if the remote force closes the
1159
                        // channel.
1160
                        case localDataLoss:
4✔
1161
                                err := l.channel.MarkDataLoss(
4✔
1162
                                        errDataLoss.CommitPoint,
4✔
1163
                                )
4✔
1164
                                if err != nil {
4✔
1165
                                        l.log.Errorf("unable to mark channel "+
×
1166
                                                "data loss: %v", err)
×
1167
                                }
×
1168

1169
                        // We determined the commit chains were not possible to
1170
                        // sync. We cautiously fail the channel, but don't
1171
                        // force close.
1172
                        // TODO(halseth): can we safely force close in any
1173
                        // cases where this error is returned?
1174
                        case err == lnwallet.ErrCannotSyncCommitChains:
×
1175
                                if err := l.channel.MarkBorked(); err != nil {
×
1176
                                        l.log.Errorf("unable to mark channel "+
×
1177
                                                "borked: %v", err)
×
1178
                                }
×
1179

1180
                        // Other, unspecified error.
1181
                        default:
×
1182
                        }
1183

1184
                        l.fail(
4✔
1185
                                LinkFailureError{
4✔
1186
                                        code:          ErrRecoveryError,
4✔
1187
                                        FailureAction: LinkFailureForceNone,
4✔
1188
                                },
4✔
1189
                                "unable to synchronize channel "+
4✔
1190
                                        "states: %v", err,
4✔
1191
                        )
4✔
1192
                        return
4✔
1193
                }
1194
        }
1195

1196
        // If a shutdown message has previously been sent on this link, then we
1197
        // need to make sure that we have disabled any HTLC adds on the outgoing
1198
        // direction of the link and that we re-resend the same shutdown message
1199
        // that we previously sent.
1200
        l.cfg.PreviouslySentShutdown.WhenSome(func(shutdown lnwire.Shutdown) {
219✔
1201
                // Immediately disallow any new outgoing HTLCs.
4✔
1202
                if !l.DisableAdds(Outgoing) {
4✔
1203
                        l.log.Warnf("Outgoing link adds already disabled")
×
1204
                }
×
1205

1206
                // Re-send the shutdown message the peer. Since syncChanStates
1207
                // would have sent any outstanding CommitSig, it is fine for us
1208
                // to immediately queue the shutdown message now.
1209
                err := l.cfg.Peer.SendMessage(false, &shutdown)
4✔
1210
                if err != nil {
4✔
1211
                        l.log.Warnf("Error sending shutdown message: %v", err)
×
1212
                }
×
1213
        })
1214

1215
        // We've successfully reestablished the channel, mark it as such to
1216
        // allow the switch to forward HTLCs in the outbound direction.
1217
        l.markReestablished()
215✔
1218

215✔
1219
        // Now that we've received both channel_ready and channel reestablish,
215✔
1220
        // we can go ahead and send the active channel notification. We'll also
215✔
1221
        // defer the inactive notification for when the link exits to ensure
215✔
1222
        // that every active notification is matched by an inactive one.
215✔
1223
        l.cfg.NotifyActiveChannel(l.ChannelPoint())
215✔
1224
        defer l.cfg.NotifyInactiveChannel(l.ChannelPoint())
215✔
1225

215✔
1226
        // With the channel states synced, we now reset the mailbox to ensure
215✔
1227
        // we start processing all unacked packets in order. This is done here
215✔
1228
        // to ensure that all acknowledgments that occur during channel
215✔
1229
        // resynchronization have taken affect, causing us only to pull unacked
215✔
1230
        // packets after starting to read from the downstream mailbox.
215✔
1231
        l.mailBox.ResetPackets()
215✔
1232

215✔
1233
        // After cleaning up any memory pertaining to incoming packets, we now
215✔
1234
        // replay our forwarding packages to handle any htlcs that can be
215✔
1235
        // processed locally, or need to be forwarded out to the switch. We will
215✔
1236
        // only attempt to resolve packages if our short chan id indicates that
215✔
1237
        // the channel is not pending, otherwise we should have no htlcs to
215✔
1238
        // reforward.
215✔
1239
        if l.ShortChanID() != hop.Source {
430✔
1240
                err := l.resolveFwdPkgs()
215✔
1241
                switch err {
215✔
1242
                // No error was encountered, success.
1243
                case nil:
215✔
1244

1245
                // If the duplicate keystone error was encountered, we'll fail
1246
                // without sending an Error message to the peer.
1247
                case ErrDuplicateKeystone:
×
1248
                        l.fail(LinkFailureError{code: ErrCircuitError},
×
1249
                                "temporary circuit error: %v", err)
×
1250
                        return
×
1251

1252
                // A non-nil error was encountered, send an Error message to
1253
                // the peer.
1254
                default:
×
1255
                        l.fail(LinkFailureError{code: ErrInternalError},
×
1256
                                "unable to resolve fwd pkgs: %v", err)
×
1257
                        return
×
1258
                }
1259

1260
                // With our link's in-memory state fully reconstructed, spawn a
1261
                // goroutine to manage the reclamation of disk space occupied by
1262
                // completed forwarding packages.
1263
                l.wg.Add(1)
215✔
1264
                go l.fwdPkgGarbager()
215✔
1265
        }
1266

1267
        for {
9,752✔
1268
                // We must always check if we failed at some point processing
9,537✔
1269
                // the last update before processing the next.
9,537✔
1270
                if l.failed {
9,547✔
1271
                        l.log.Errorf("link failed, exiting htlcManager")
10✔
1272
                        return
10✔
1273
                }
10✔
1274

1275
                // If the previous event resulted in a non-empty batch, resume
1276
                // the batch ticker so that it can be cleared. Otherwise pause
1277
                // the ticker to prevent waking up the htlcManager while the
1278
                // batch is empty.
1279
                if l.channel.PendingLocalUpdateCount() > 0 {
11,251✔
1280
                        l.cfg.BatchTicker.Resume()
1,720✔
1281
                        l.log.Tracef("BatchTicker resumed, "+
1,720✔
1282
                                "PendingLocalUpdateCount=%d",
1,720✔
1283
                                l.channel.PendingLocalUpdateCount())
1,720✔
1284
                } else {
9,535✔
1285
                        l.cfg.BatchTicker.Pause()
7,815✔
1286
                        l.log.Trace("BatchTicker paused due to zero " +
7,815✔
1287
                                "PendingLocalUpdateCount")
7,815✔
1288
                }
7,815✔
1289

1290
                select {
9,531✔
1291
                // We have a new hook that needs to be run when we reach a clean
1292
                // channel state.
1293
                case hook := <-l.flushHooks.newTransients:
5✔
1294
                        if l.channel.IsChannelClean() {
9✔
1295
                                hook()
4✔
1296
                        } else {
9✔
1297
                                l.flushHooks.alloc(hook)
5✔
1298
                        }
5✔
1299

1300
                // We have a new hook that needs to be run when we have
1301
                // committed all of our updates.
1302
                case hook := <-l.outgoingCommitHooks.newTransients:
5✔
1303
                        if !l.channel.OweCommitment() {
9✔
1304
                                hook()
4✔
1305
                        } else {
5✔
1306
                                l.outgoingCommitHooks.alloc(hook)
1✔
1307
                        }
1✔
1308

1309
                // We have a new hook that needs to be run when our peer has
1310
                // committed all of their updates.
1311
                case hook := <-l.incomingCommitHooks.newTransients:
×
1312
                        if !l.channel.NeedCommitment() {
×
1313
                                hook()
×
1314
                        } else {
×
1315
                                l.incomingCommitHooks.alloc(hook)
×
1316
                        }
×
1317

1318
                // Our update fee timer has fired, so we'll check the network
1319
                // fee to see if we should adjust our commitment fee.
1320
                case <-l.updateFeeTimer.C:
4✔
1321
                        l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
4✔
1322

4✔
1323
                        // If we're not the initiator of the channel, don't we
4✔
1324
                        // don't control the fees, so we can ignore this.
4✔
1325
                        if !l.channel.IsInitiator() {
4✔
1326
                                continue
×
1327
                        }
1328

1329
                        // If we are the initiator, then we'll sample the
1330
                        // current fee rate to get into the chain within 3
1331
                        // blocks.
1332
                        netFee, err := l.sampleNetworkFee()
4✔
1333
                        if err != nil {
4✔
1334
                                l.log.Errorf("unable to sample network fee: %v",
×
1335
                                        err)
×
1336
                                continue
×
1337
                        }
1338

1339
                        minRelayFee := l.cfg.FeeEstimator.RelayFeePerKW()
4✔
1340

4✔
1341
                        newCommitFee := l.channel.IdealCommitFeeRate(
4✔
1342
                                netFee, minRelayFee,
4✔
1343
                                l.cfg.MaxAnchorsCommitFeeRate,
4✔
1344
                                l.cfg.MaxFeeAllocation,
4✔
1345
                        )
4✔
1346

4✔
1347
                        // We determine if we should adjust the commitment fee
4✔
1348
                        // based on the current commitment fee, the suggested
4✔
1349
                        // new commitment fee and the current minimum relay fee
4✔
1350
                        // rate.
4✔
1351
                        commitFee := l.channel.CommitFeeRate()
4✔
1352
                        if !shouldAdjustCommitFee(
4✔
1353
                                newCommitFee, commitFee, minRelayFee,
4✔
1354
                        ) {
5✔
1355

1✔
1356
                                continue
1✔
1357
                        }
1358

1359
                        // If we do, then we'll send a new UpdateFee message to
1360
                        // the remote party, to be locked in with a new update.
1361
                        if err := l.updateChannelFee(newCommitFee); err != nil {
3✔
1362
                                l.log.Errorf("unable to update fee rate: %v",
×
1363
                                        err)
×
1364
                                continue
×
1365
                        }
1366

1367
                // The underlying channel has notified us of a unilateral close
1368
                // carried out by the remote peer. In the case of such an
1369
                // event, we'll wipe the channel state from the peer, and mark
1370
                // the contract as fully settled. Afterwards we can exit.
1371
                //
1372
                // TODO(roasbeef): add force closure? also breach?
1373
                case <-l.cfg.ChainEvents.RemoteUnilateralClosure:
4✔
1374
                        l.log.Warnf("remote peer has closed on-chain")
4✔
1375

4✔
1376
                        // TODO(roasbeef): remove all together
4✔
1377
                        go func() {
8✔
1378
                                chanPoint := l.channel.ChannelPoint()
4✔
1379
                                l.cfg.Peer.WipeChannel(&chanPoint)
4✔
1380
                        }()
4✔
1381

1382
                        return
4✔
1383

1384
                case <-l.cfg.BatchTicker.Ticks():
256✔
1385
                        // Attempt to extend the remote commitment chain
256✔
1386
                        // including all the currently pending entries. If the
256✔
1387
                        // send was unsuccessful, then abandon the update,
256✔
1388
                        // waiting for the revocation window to open up.
256✔
1389
                        if !l.updateCommitTxOrFail() {
256✔
1390
                                return
×
1391
                        }
×
1392

1393
                case <-l.cfg.PendingCommitTicker.Ticks():
2✔
1394
                        l.fail(
2✔
1395
                                LinkFailureError{
2✔
1396
                                        code:          ErrRemoteUnresponsive,
2✔
1397
                                        FailureAction: LinkFailureDisconnect,
2✔
1398
                                },
2✔
1399
                                "unable to complete dance",
2✔
1400
                        )
2✔
1401
                        return
2✔
1402

1403
                // A message from the switch was just received. This indicates
1404
                // that the link is an intermediate hop in a multi-hop HTLC
1405
                // circuit.
1406
                case pkt := <-l.downstream:
1,568✔
1407
                        l.handleDownstreamPkt(pkt)
1,568✔
1408

1409
                // A message from the connected peer was just received. This
1410
                // indicates that we have a new incoming HTLC, either directly
1411
                // for us, or part of a multi-hop HTLC circuit.
1412
                case msg := <-l.upstream:
7,016✔
1413
                        l.handleUpstreamMsg(msg)
7,016✔
1414

1415
                // A htlc resolution is received. This means that we now have a
1416
                // resolution for a previously accepted htlc.
1417
                case hodlItem := <-l.hodlQueue.ChanOut():
492✔
1418
                        htlcResolution := hodlItem.(invoices.HtlcResolution)
492✔
1419
                        err := l.processHodlQueue(htlcResolution)
492✔
1420
                        switch err {
492✔
1421
                        // No error, success.
1422
                        case nil:
491✔
1423

1424
                        // If the duplicate keystone error was encountered,
1425
                        // fail back gracefully.
1426
                        case ErrDuplicateKeystone:
×
1427
                                l.fail(LinkFailureError{code: ErrCircuitError},
×
1428
                                        fmt.Sprintf("process hodl queue: "+
×
1429
                                                "temporary circuit error: %v",
×
1430
                                                err,
×
1431
                                        ),
×
1432
                                )
×
1433

1434
                        // Send an Error message to the peer.
1435
                        default:
1✔
1436
                                l.fail(LinkFailureError{code: ErrInternalError},
1✔
1437
                                        fmt.Sprintf("process hodl queue: "+
1✔
1438
                                                "unable to update commitment:"+
1✔
1439
                                                " %v", err),
1✔
1440
                                )
1✔
1441
                        }
1442

1443
                case <-l.quit:
198✔
1444
                        return
198✔
1445
                }
1446
        }
1447
}
1448

1449
// processHodlQueue processes a received htlc resolution and continues reading
1450
// from the hodl queue until no more resolutions remain. When this function
1451
// returns without an error, the commit tx should be updated.
1452
func (l *channelLink) processHodlQueue(
1453
        firstResolution invoices.HtlcResolution) error {
492✔
1454

492✔
1455
        // Try to read all waiting resolution messages, so that they can all be
492✔
1456
        // processed in a single commitment tx update.
492✔
1457
        htlcResolution := firstResolution
492✔
1458
loop:
492✔
1459
        for {
984✔
1460
                // Lookup all hodl htlcs that can be failed or settled with this event.
492✔
1461
                // The hodl htlc must be present in the map.
492✔
1462
                circuitKey := htlcResolution.CircuitKey()
492✔
1463
                hodlHtlc, ok := l.hodlMap[circuitKey]
492✔
1464
                if !ok {
492✔
1465
                        return fmt.Errorf("hodl htlc not found: %v", circuitKey)
×
1466
                }
×
1467

1468
                if err := l.processHtlcResolution(htlcResolution, hodlHtlc); err != nil {
492✔
1469
                        return err
×
1470
                }
×
1471

1472
                // Clean up hodl map.
1473
                delete(l.hodlMap, circuitKey)
492✔
1474

492✔
1475
                select {
492✔
1476
                case item := <-l.hodlQueue.ChanOut():
4✔
1477
                        htlcResolution = item.(invoices.HtlcResolution)
4✔
1478
                default:
492✔
1479
                        break loop
492✔
1480
                }
1481
        }
1482

1483
        // Update the commitment tx.
1484
        if err := l.updateCommitTx(); err != nil {
493✔
1485
                return err
1✔
1486
        }
1✔
1487

1488
        return nil
491✔
1489
}
1490

1491
// processHtlcResolution applies a received htlc resolution to the provided
1492
// htlc. When this function returns without an error, the commit tx should be
1493
// updated.
1494
func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution,
1495
        htlc hodlHtlc) error {
638✔
1496

638✔
1497
        circuitKey := resolution.CircuitKey()
638✔
1498

638✔
1499
        // Determine required action for the resolution based on the type of
638✔
1500
        // resolution we have received.
638✔
1501
        switch res := resolution.(type) {
638✔
1502
        // Settle htlcs that returned a settle resolution using the preimage
1503
        // in the resolution.
1504
        case *invoices.HtlcSettleResolution:
634✔
1505
                l.log.Debugf("received settle resolution for %v "+
634✔
1506
                        "with outcome: %v", circuitKey, res.Outcome)
634✔
1507

634✔
1508
                return l.settleHTLC(res.Preimage, htlc.pd)
634✔
1509

1510
        // For htlc failures, we get the relevant failure message based
1511
        // on the failure resolution and then fail the htlc.
1512
        case *invoices.HtlcFailResolution:
8✔
1513
                l.log.Debugf("received cancel resolution for "+
8✔
1514
                        "%v with outcome: %v", circuitKey, res.Outcome)
8✔
1515

8✔
1516
                // Get the lnwire failure message based on the resolution
8✔
1517
                // result.
8✔
1518
                failure := getResolutionFailure(res, htlc.pd.Amount)
8✔
1519

8✔
1520
                l.sendHTLCError(
8✔
1521
                        htlc.pd, failure, htlc.obfuscator, true,
8✔
1522
                )
8✔
1523
                return nil
8✔
1524

1525
        // Fail if we do not get a settle of fail resolution, since we
1526
        // are only expecting to handle settles and fails.
1527
        default:
×
1528
                return fmt.Errorf("unknown htlc resolution type: %T",
×
1529
                        resolution)
×
1530
        }
1531
}
1532

1533
// getResolutionFailure returns the wire message that a htlc resolution should
1534
// be failed with.
1535
func getResolutionFailure(resolution *invoices.HtlcFailResolution,
1536
        amount lnwire.MilliSatoshi) *LinkError {
8✔
1537

8✔
1538
        // If the resolution has been resolved as part of a MPP timeout,
8✔
1539
        // we need to fail the htlc with lnwire.FailMppTimeout.
8✔
1540
        if resolution.Outcome == invoices.ResultMppTimeout {
8✔
1541
                return NewDetailedLinkError(
×
1542
                        &lnwire.FailMPPTimeout{}, resolution.Outcome,
×
1543
                )
×
1544
        }
×
1545

1546
        // If the htlc is not a MPP timeout, we fail it with
1547
        // FailIncorrectDetails. This error is sent for invoice payment
1548
        // failures such as underpayment/ expiry too soon and hodl invoices
1549
        // (which return FailIncorrectDetails to avoid leaking information).
1550
        incorrectDetails := lnwire.NewFailIncorrectDetails(
8✔
1551
                amount, uint32(resolution.AcceptHeight),
8✔
1552
        )
8✔
1553

8✔
1554
        return NewDetailedLinkError(incorrectDetails, resolution.Outcome)
8✔
1555
}
1556

1557
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
1558
// within the link's configuration that will be used to determine when the link
1559
// should propose an update to its commitment fee rate.
1560
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
219✔
1561
        lower := int64(l.cfg.MinUpdateTimeout)
219✔
1562
        upper := int64(l.cfg.MaxUpdateTimeout)
219✔
1563
        return time.Duration(prand.Int63n(upper-lower) + lower)
219✔
1564
}
219✔
1565

1566
// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the
1567
// downstream HTLC Switch.
1568
func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error {
1,527✔
1569
        htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
1,527✔
1570
        if !ok {
1,527✔
1571
                return errors.New("not an UpdateAddHTLC packet")
×
1572
        }
×
1573

1574
        // If we are flushing the link in the outgoing direction we can't add
1575
        // new htlcs to the link and we need to bounce it
1576
        if l.IsFlushing(Outgoing) {
1,527✔
1577
                l.mailBox.FailAdd(pkt)
×
1578

×
1579
                return NewDetailedLinkError(
×
1580
                        &lnwire.FailPermanentChannelFailure{},
×
1581
                        OutgoingFailureLinkNotEligible,
×
1582
                )
×
1583
        }
×
1584

1585
        // If hodl.AddOutgoing mode is active, we exit early to simulate
1586
        // arbitrary delays between the switch adding an ADD to the
1587
        // mailbox, and the HTLC being added to the commitment state.
1588
        if l.cfg.HodlMask.Active(hodl.AddOutgoing) {
1,527✔
1589
                l.log.Warnf(hodl.AddOutgoing.Warning())
×
1590
                l.mailBox.AckPacket(pkt.inKey())
×
1591
                return nil
×
1592
        }
×
1593

1594
        // A new payment has been initiated via the downstream channel,
1595
        // so we add the new HTLC to our local log, then update the
1596
        // commitment chains.
1597
        htlc.ChanID = l.ChanID()
1,527✔
1598
        openCircuitRef := pkt.inKey()
1,527✔
1599

1,527✔
1600
        // We enforce the fee buffer for the commitment transaction because
1,527✔
1601
        // we are in control of adding this htlc. Nothing has locked-in yet so
1,527✔
1602
        // we can securely enforce the fee buffer which is only relevant if we
1,527✔
1603
        // are the initiator of the channel.
1,527✔
1604
        index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
1,527✔
1605
        if err != nil {
1,532✔
1606
                // The HTLC was unable to be added to the state machine,
5✔
1607
                // as a result, we'll signal the switch to cancel the
5✔
1608
                // pending payment.
5✔
1609
                l.log.Warnf("Unable to handle downstream add HTLC: %v",
5✔
1610
                        err)
5✔
1611

5✔
1612
                // Remove this packet from the link's mailbox, this
5✔
1613
                // prevents it from being reprocessed if the link
5✔
1614
                // restarts and resets it mailbox. If this response
5✔
1615
                // doesn't make it back to the originating link, it will
5✔
1616
                // be rejected upon attempting to reforward the Add to
5✔
1617
                // the switch, since the circuit was never fully opened,
5✔
1618
                // and the forwarding package shows it as
5✔
1619
                // unacknowledged.
5✔
1620
                l.mailBox.FailAdd(pkt)
5✔
1621

5✔
1622
                return NewDetailedLinkError(
5✔
1623
                        lnwire.NewTemporaryChannelFailure(nil),
5✔
1624
                        OutgoingFailureDownstreamHtlcAdd,
5✔
1625
                )
5✔
1626
        }
5✔
1627

1628
        l.log.Tracef("received downstream htlc: payment_hash=%x, "+
1,526✔
1629
                "local_log_index=%v, pend_updates=%v",
1,526✔
1630
                htlc.PaymentHash[:], index,
1,526✔
1631
                l.channel.PendingLocalUpdateCount())
1,526✔
1632

1,526✔
1633
        pkt.outgoingChanID = l.ShortChanID()
1,526✔
1634
        pkt.outgoingHTLCID = index
1,526✔
1635
        htlc.ID = index
1,526✔
1636

1,526✔
1637
        l.log.Debugf("queueing keystone of ADD open circuit: %s->%s",
1,526✔
1638
                pkt.inKey(), pkt.outKey())
1,526✔
1639

1,526✔
1640
        l.openedCircuits = append(l.openedCircuits, pkt.inKey())
1,526✔
1641
        l.keystoneBatch = append(l.keystoneBatch, pkt.keystone())
1,526✔
1642

1,526✔
1643
        _ = l.cfg.Peer.SendMessage(false, htlc)
1,526✔
1644

1,526✔
1645
        // Send a forward event notification to htlcNotifier.
1,526✔
1646
        l.cfg.HtlcNotifier.NotifyForwardingEvent(
1,526✔
1647
                newHtlcKey(pkt),
1,526✔
1648
                HtlcInfo{
1,526✔
1649
                        IncomingTimeLock: pkt.incomingTimeout,
1,526✔
1650
                        IncomingAmt:      pkt.incomingAmount,
1,526✔
1651
                        OutgoingTimeLock: htlc.Expiry,
1,526✔
1652
                        OutgoingAmt:      htlc.Amount,
1,526✔
1653
                },
1,526✔
1654
                getEventType(pkt),
1,526✔
1655
        )
1,526✔
1656

1,526✔
1657
        l.tryBatchUpdateCommitTx()
1,526✔
1658

1,526✔
1659
        return nil
1,526✔
1660
}
1661

1662
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
1663
// Switch. Possible messages sent by the switch include requests to forward new
1664
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
1665
// cleared HTLCs with the upstream peer.
1666
//
1667
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
1668
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
1,568✔
1669
        switch htlc := pkt.htlc.(type) {
1,568✔
1670
        case *lnwire.UpdateAddHTLC:
1,527✔
1671
                // Handle add message. The returned error can be ignored,
1,527✔
1672
                // because it is also sent through the mailbox.
1,527✔
1673
                _ = l.handleDownstreamUpdateAdd(pkt)
1,527✔
1674

1675
        case *lnwire.UpdateFulfillHTLC:
27✔
1676
                // If hodl.SettleOutgoing mode is active, we exit early to
27✔
1677
                // simulate arbitrary delays between the switch adding the
27✔
1678
                // SETTLE to the mailbox, and the HTLC being added to the
27✔
1679
                // commitment state.
27✔
1680
                if l.cfg.HodlMask.Active(hodl.SettleOutgoing) {
27✔
1681
                        l.log.Warnf(hodl.SettleOutgoing.Warning())
×
1682
                        l.mailBox.AckPacket(pkt.inKey())
×
1683
                        return
×
1684
                }
×
1685

1686
                // An HTLC we forward to the switch has just settled somewhere
1687
                // upstream. Therefore we settle the HTLC within the our local
1688
                // state machine.
1689
                inKey := pkt.inKey()
27✔
1690
                err := l.channel.SettleHTLC(
27✔
1691
                        htlc.PaymentPreimage,
27✔
1692
                        pkt.incomingHTLCID,
27✔
1693
                        pkt.sourceRef,
27✔
1694
                        pkt.destRef,
27✔
1695
                        &inKey,
27✔
1696
                )
27✔
1697
                if err != nil {
27✔
1698
                        l.log.Errorf("unable to settle incoming HTLC for "+
×
1699
                                "circuit-key=%v: %v", inKey, err)
×
1700

×
1701
                        // If the HTLC index for Settle response was not known
×
1702
                        // to our commitment state, it has already been
×
1703
                        // cleaned up by a prior response. We'll thus try to
×
1704
                        // clean up any lingering state to ensure we don't
×
1705
                        // continue reforwarding.
×
1706
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
×
1707
                                l.cleanupSpuriousResponse(pkt)
×
1708
                        }
×
1709

1710
                        // Remove the packet from the link's mailbox to ensure
1711
                        // it doesn't get replayed after a reconnection.
1712
                        l.mailBox.AckPacket(inKey)
×
1713

×
1714
                        return
×
1715
                }
1716

1717
                l.log.Debugf("queueing removal of SETTLE closed circuit: "+
27✔
1718
                        "%s->%s", pkt.inKey(), pkt.outKey())
27✔
1719

27✔
1720
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
27✔
1721

27✔
1722
                // With the HTLC settled, we'll need to populate the wire
27✔
1723
                // message to target the specific channel and HTLC to be
27✔
1724
                // canceled.
27✔
1725
                htlc.ChanID = l.ChanID()
27✔
1726
                htlc.ID = pkt.incomingHTLCID
27✔
1727

27✔
1728
                // Then we send the HTLC settle message to the connected peer
27✔
1729
                // so we can continue the propagation of the settle message.
27✔
1730
                l.cfg.Peer.SendMessage(false, htlc)
27✔
1731

27✔
1732
                // Send a settle event notification to htlcNotifier.
27✔
1733
                l.cfg.HtlcNotifier.NotifySettleEvent(
27✔
1734
                        newHtlcKey(pkt),
27✔
1735
                        htlc.PaymentPreimage,
27✔
1736
                        getEventType(pkt),
27✔
1737
                )
27✔
1738

27✔
1739
                // Immediately update the commitment tx to minimize latency.
27✔
1740
                l.updateCommitTxOrFail()
27✔
1741

1742
        case *lnwire.UpdateFailHTLC:
22✔
1743
                // If hodl.FailOutgoing mode is active, we exit early to
22✔
1744
                // simulate arbitrary delays between the switch adding a FAIL to
22✔
1745
                // the mailbox, and the HTLC being added to the commitment
22✔
1746
                // state.
22✔
1747
                if l.cfg.HodlMask.Active(hodl.FailOutgoing) {
22✔
1748
                        l.log.Warnf(hodl.FailOutgoing.Warning())
×
1749
                        l.mailBox.AckPacket(pkt.inKey())
×
1750
                        return
×
1751
                }
×
1752

1753
                // An HTLC cancellation has been triggered somewhere upstream,
1754
                // we'll remove then HTLC from our local state machine.
1755
                inKey := pkt.inKey()
22✔
1756
                err := l.channel.FailHTLC(
22✔
1757
                        pkt.incomingHTLCID,
22✔
1758
                        htlc.Reason,
22✔
1759
                        pkt.sourceRef,
22✔
1760
                        pkt.destRef,
22✔
1761
                        &inKey,
22✔
1762
                )
22✔
1763
                if err != nil {
28✔
1764
                        l.log.Errorf("unable to cancel incoming HTLC for "+
6✔
1765
                                "circuit-key=%v: %v", inKey, err)
6✔
1766

6✔
1767
                        // If the HTLC index for Fail response was not known to
6✔
1768
                        // our commitment state, it has already been cleaned up
6✔
1769
                        // by a prior response. We'll thus try to clean up any
6✔
1770
                        // lingering state to ensure we don't continue
6✔
1771
                        // reforwarding.
6✔
1772
                        if _, ok := err.(lnwallet.ErrUnknownHtlcIndex); ok {
8✔
1773
                                l.cleanupSpuriousResponse(pkt)
2✔
1774
                        }
2✔
1775

1776
                        // Remove the packet from the link's mailbox to ensure
1777
                        // it doesn't get replayed after a reconnection.
1778
                        l.mailBox.AckPacket(inKey)
6✔
1779

6✔
1780
                        return
6✔
1781
                }
1782

1783
                l.log.Debugf("queueing removal of FAIL closed circuit: %s->%s",
20✔
1784
                        pkt.inKey(), pkt.outKey())
20✔
1785

20✔
1786
                l.closedCircuits = append(l.closedCircuits, pkt.inKey())
20✔
1787

20✔
1788
                // With the HTLC removed, we'll need to populate the wire
20✔
1789
                // message to target the specific channel and HTLC to be
20✔
1790
                // canceled. The "Reason" field will have already been set
20✔
1791
                // within the switch.
20✔
1792
                htlc.ChanID = l.ChanID()
20✔
1793
                htlc.ID = pkt.incomingHTLCID
20✔
1794

20✔
1795
                // We send the HTLC message to the peer which initially created
20✔
1796
                // the HTLC. If the incoming blinding point is non-nil, we
20✔
1797
                // know that we are a relaying node in a blinded path.
20✔
1798
                // Otherwise, we're either an introduction node or not part of
20✔
1799
                // a blinded path at all.
20✔
1800
                if err := l.sendIncomingHTLCFailureMsg(
20✔
1801
                        htlc.ID,
20✔
1802
                        pkt.obfuscator,
20✔
1803
                        htlc.Reason,
20✔
1804
                ); err != nil {
20✔
1805
                        l.log.Errorf("unable to send HTLC failure: %v",
×
1806
                                err)
×
1807

×
1808
                        return
×
1809
                }
×
1810

1811
                // If the packet does not have a link failure set, it failed
1812
                // further down the route so we notify a forwarding failure.
1813
                // Otherwise, we notify a link failure because it failed at our
1814
                // node.
1815
                if pkt.linkFailure != nil {
34✔
1816
                        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
14✔
1817
                                newHtlcKey(pkt),
14✔
1818
                                newHtlcInfo(pkt),
14✔
1819
                                getEventType(pkt),
14✔
1820
                                pkt.linkFailure,
14✔
1821
                                false,
14✔
1822
                        )
14✔
1823
                } else {
24✔
1824
                        l.cfg.HtlcNotifier.NotifyForwardingFailEvent(
10✔
1825
                                newHtlcKey(pkt), getEventType(pkt),
10✔
1826
                        )
10✔
1827
                }
10✔
1828

1829
                // Immediately update the commitment tx to minimize latency.
1830
                l.updateCommitTxOrFail()
20✔
1831
        }
1832
}
1833

1834
// tryBatchUpdateCommitTx updates the commitment transaction if the batch is
1835
// full.
1836
func (l *channelLink) tryBatchUpdateCommitTx() {
1,526✔
1837
        if l.channel.PendingLocalUpdateCount() < uint64(l.cfg.BatchSize) {
2,503✔
1838
                return
977✔
1839
        }
977✔
1840

1841
        l.updateCommitTxOrFail()
553✔
1842
}
1843

1844
// cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef
1845
// associated with this packet. If successful in doing so, it will also purge
1846
// the open circuit from the circuit map and remove the packet from the link's
1847
// mailbox.
1848
func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) {
2✔
1849
        inKey := pkt.inKey()
2✔
1850

2✔
1851
        l.log.Debugf("cleaning up spurious response for incoming "+
2✔
1852
                "circuit-key=%v", inKey)
2✔
1853

2✔
1854
        // If the htlc packet doesn't have a source reference, it is unsafe to
2✔
1855
        // proceed, as skipping this ack may cause the htlc to be reforwarded.
2✔
1856
        if pkt.sourceRef == nil {
3✔
1857
                l.log.Errorf("unable to cleanup response for incoming "+
1✔
1858
                        "circuit-key=%v, does not contain source reference",
1✔
1859
                        inKey)
1✔
1860
                return
1✔
1861
        }
1✔
1862

1863
        // If the source reference is present,  we will try to prevent this link
1864
        // from resending the packet to the switch. To do so, we ack the AddRef
1865
        // of the incoming HTLC belonging to this link.
1866
        err := l.channel.AckAddHtlcs(*pkt.sourceRef)
1✔
1867
        if err != nil {
1✔
1868
                l.log.Errorf("unable to ack AddRef for incoming "+
×
1869
                        "circuit-key=%v: %v", inKey, err)
×
1870

×
1871
                // If this operation failed, it is unsafe to attempt removal of
×
1872
                // the destination reference or circuit, so we exit early. The
×
1873
                // cleanup may proceed with a different packet in the future
×
1874
                // that succeeds on this step.
×
1875
                return
×
1876
        }
×
1877

1878
        // Now that we know this link will stop retransmitting Adds to the
1879
        // switch, we can begin to teardown the response reference and circuit
1880
        // map.
1881
        //
1882
        // If the packet includes a destination reference, then a response for
1883
        // this HTLC was locked into the outgoing channel. Attempt to remove
1884
        // this reference, so we stop retransmitting the response internally.
1885
        // Even if this fails, we will proceed in trying to delete the circuit.
1886
        // When retransmitting responses, the destination references will be
1887
        // cleaned up if an open circuit is not found in the circuit map.
1888
        if pkt.destRef != nil {
1✔
1889
                err := l.channel.AckSettleFails(*pkt.destRef)
×
1890
                if err != nil {
×
1891
                        l.log.Errorf("unable to ack SettleFailRef "+
×
1892
                                "for incoming circuit-key=%v: %v",
×
1893
                                inKey, err)
×
1894
                }
×
1895
        }
1896

1897
        l.log.Debugf("deleting circuit for incoming circuit-key=%x", inKey)
1✔
1898

1✔
1899
        // With all known references acked, we can now safely delete the circuit
1✔
1900
        // from the switch's circuit map, as the state is no longer needed.
1✔
1901
        err = l.cfg.Circuits.DeleteCircuits(inKey)
1✔
1902
        if err != nil {
1✔
1903
                l.log.Errorf("unable to delete circuit for "+
×
1904
                        "circuit-key=%v: %v", inKey, err)
×
1905
        }
×
1906
}
1907

1908
// handleUpstreamMsg processes wire messages related to commitment state
1909
// updates from the upstream peer. The upstream peer is the peer whom we have a
1910
// direct channel with, updating our respective commitment chains.
1911
func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
7,016✔
1912
        switch msg := msg.(type) {
7,016✔
1913

1914
        case *lnwire.UpdateAddHTLC:
1,501✔
1915
                if l.IsFlushing(Incoming) {
1,501✔
1916
                        // This is forbidden by the protocol specification.
×
1917
                        // The best chance we have to deal with this is to drop
×
1918
                        // the connection. This should roll back the channel
×
1919
                        // state to the last CommitSig. If the remote has
×
1920
                        // already sent a CommitSig we haven't received yet,
×
1921
                        // channel state will be re-synchronized with a
×
1922
                        // ChannelReestablish message upon reconnection and the
×
1923
                        // protocol state that caused us to flush the link will
×
1924
                        // be rolled back. In the event that there was some
×
1925
                        // non-deterministic behavior in the remote that caused
×
1926
                        // them to violate the protocol, we have a decent shot
×
1927
                        // at correcting it this way, since reconnecting will
×
1928
                        // put us in the cleanest possible state to try again.
×
1929
                        //
×
1930
                        // In addition to the above, it is possible for us to
×
1931
                        // hit this case in situations where we improperly
×
1932
                        // handle message ordering due to concurrency choices.
×
1933
                        // An issue has been filed to address this here:
×
1934
                        // https://github.com/lightningnetwork/lnd/issues/8393
×
1935
                        l.fail(
×
1936
                                LinkFailureError{
×
1937
                                        code:             ErrInvalidUpdate,
×
1938
                                        FailureAction:    LinkFailureDisconnect,
×
1939
                                        PermanentFailure: false,
×
1940
                                        Warning:          true,
×
1941
                                },
×
1942
                                "received add while link is flushing",
×
1943
                        )
×
1944

×
1945
                        return
×
1946
                }
×
1947

1948
                // Disallow htlcs with blinding points set if we haven't
1949
                // enabled the feature. This saves us from having to process
1950
                // the onion at all, but will only catch blinded payments
1951
                // where we are a relaying node (as the blinding point will
1952
                // be in the payload when we're the introduction node).
1953
                if msg.BlindingPoint.IsSome() && l.cfg.DisallowRouteBlinding {
1,501✔
1954
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
1955
                                "blinding point included when route blinding "+
×
1956
                                        "is disabled")
×
1957

×
1958
                        return
×
1959
                }
×
1960

1961
                // We just received an add request from an upstream peer, so we
1962
                // add it to our state machine, then add the HTLC to our
1963
                // "settle" list in the event that we know the preimage.
1964
                index, err := l.channel.ReceiveHTLC(msg)
1,501✔
1965
                if err != nil {
1,501✔
1966
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
1967
                                "unable to handle upstream add HTLC: %v", err)
×
1968
                        return
×
1969
                }
×
1970

1971
                l.log.Tracef("receive upstream htlc with payment hash(%x), "+
1,501✔
1972
                        "assigning index: %v", msg.PaymentHash[:], index)
1,501✔
1973

1974
        case *lnwire.UpdateFulfillHTLC:
664✔
1975
                pre := msg.PaymentPreimage
664✔
1976
                idx := msg.ID
664✔
1977

664✔
1978
                // Before we pipeline the settle, we'll check the set of active
664✔
1979
                // htlc's to see if the related UpdateAddHTLC has been fully
664✔
1980
                // locked-in.
664✔
1981
                var lockedin bool
664✔
1982
                htlcs := l.channel.ActiveHtlcs()
664✔
1983
                for _, add := range htlcs {
75,188✔
1984
                        // The HTLC will be outgoing and match idx.
74,524✔
1985
                        if !add.Incoming && add.HtlcIndex == idx {
75,186✔
1986
                                lockedin = true
662✔
1987
                                break
662✔
1988
                        }
1989
                }
1990

1991
                if !lockedin {
666✔
1992
                        l.fail(
2✔
1993
                                LinkFailureError{code: ErrInvalidUpdate},
2✔
1994
                                "unable to handle upstream settle",
2✔
1995
                        )
2✔
1996
                        return
2✔
1997
                }
2✔
1998

1999
                if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
666✔
2000
                        l.fail(
4✔
2001
                                LinkFailureError{
4✔
2002
                                        code:          ErrInvalidUpdate,
4✔
2003
                                        FailureAction: LinkFailureForceClose,
4✔
2004
                                },
4✔
2005
                                "unable to handle upstream settle HTLC: %v", err,
4✔
2006
                        )
4✔
2007
                        return
4✔
2008
                }
4✔
2009

2010
                settlePacket := &htlcPacket{
662✔
2011
                        outgoingChanID: l.ShortChanID(),
662✔
2012
                        outgoingHTLCID: idx,
662✔
2013
                        htlc: &lnwire.UpdateFulfillHTLC{
662✔
2014
                                PaymentPreimage: pre,
662✔
2015
                        },
662✔
2016
                }
662✔
2017

662✔
2018
                // Add the newly discovered preimage to our growing list of
662✔
2019
                // uncommitted preimage. These will be written to the witness
662✔
2020
                // cache just before accepting the next commitment signature
662✔
2021
                // from the remote peer.
662✔
2022
                l.uncommittedPreimages = append(l.uncommittedPreimages, pre)
662✔
2023

662✔
2024
                // Pipeline this settle, send it to the switch.
662✔
2025
                go l.forwardBatch(false, settlePacket)
662✔
2026

2027
        case *lnwire.UpdateFailMalformedHTLC:
7✔
2028
                // Convert the failure type encoded within the HTLC fail
7✔
2029
                // message to the proper generic lnwire error code.
7✔
2030
                var failure lnwire.FailureMessage
7✔
2031
                switch msg.FailureCode {
7✔
2032
                case lnwire.CodeInvalidOnionVersion:
5✔
2033
                        failure = &lnwire.FailInvalidOnionVersion{
5✔
2034
                                OnionSHA256: msg.ShaOnionBlob,
5✔
2035
                        }
5✔
2036
                case lnwire.CodeInvalidOnionHmac:
×
2037
                        failure = &lnwire.FailInvalidOnionHmac{
×
2038
                                OnionSHA256: msg.ShaOnionBlob,
×
2039
                        }
×
2040

2041
                case lnwire.CodeInvalidOnionKey:
×
2042
                        failure = &lnwire.FailInvalidOnionKey{
×
2043
                                OnionSHA256: msg.ShaOnionBlob,
×
2044
                        }
×
2045

2046
                // Handle malformed errors that are part of a blinded route.
2047
                // This case is slightly different, because we expect every
2048
                // relaying node in the blinded portion of the route to send
2049
                // malformed errors. If we're also a relaying node, we're
2050
                // likely going to switch this error out anyway for our own
2051
                // malformed error, but we handle the case here for
2052
                // completeness.
2053
                case lnwire.CodeInvalidBlinding:
4✔
2054
                        failure = &lnwire.FailInvalidBlinding{
4✔
2055
                                OnionSHA256: msg.ShaOnionBlob,
4✔
2056
                        }
4✔
2057

2058
                default:
2✔
2059
                        l.log.Warnf("unexpected failure code received in "+
2✔
2060
                                "UpdateFailMailformedHTLC: %v", msg.FailureCode)
2✔
2061

2✔
2062
                        // We don't just pass back the error we received from
2✔
2063
                        // our successor. Otherwise we might report a failure
2✔
2064
                        // that penalizes us more than needed. If the onion that
2✔
2065
                        // we forwarded was correct, the node should have been
2✔
2066
                        // able to send back its own failure. The node did not
2✔
2067
                        // send back its own failure, so we assume there was a
2✔
2068
                        // problem with the onion and report that back. We reuse
2✔
2069
                        // the invalid onion key failure because there is no
2✔
2070
                        // specific error for this case.
2✔
2071
                        failure = &lnwire.FailInvalidOnionKey{
2✔
2072
                                OnionSHA256: msg.ShaOnionBlob,
2✔
2073
                        }
2✔
2074
                }
2075

2076
                // With the error parsed, we'll convert the into it's opaque
2077
                // form.
2078
                var b bytes.Buffer
7✔
2079
                if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
7✔
2080
                        l.log.Errorf("unable to encode malformed error: %v", err)
×
2081
                        return
×
2082
                }
×
2083

2084
                // If remote side have been unable to parse the onion blob we
2085
                // have sent to it, than we should transform the malformed HTLC
2086
                // message to the usual HTLC fail message.
2087
                err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
7✔
2088
                if err != nil {
7✔
2089
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2090
                                "unable to handle upstream fail HTLC: %v", err)
×
2091
                        return
×
2092
                }
×
2093

2094
        case *lnwire.UpdateFailHTLC:
124✔
2095
                // Verify that the failure reason is at least 256 bytes plus
124✔
2096
                // overhead.
124✔
2097
                const minimumFailReasonLength = lnwire.FailureMessageLength +
124✔
2098
                        2 + 2 + 32
124✔
2099

124✔
2100
                if len(msg.Reason) < minimumFailReasonLength {
125✔
2101
                        // We've received a reason with a non-compliant length.
1✔
2102
                        // Older nodes happily relay back these failures that
1✔
2103
                        // may originate from a node further downstream.
1✔
2104
                        // Therefore we can't just fail the channel.
1✔
2105
                        //
1✔
2106
                        // We want to be compliant ourselves, so we also can't
1✔
2107
                        // pass back the reason unmodified. And we must make
1✔
2108
                        // sure that we don't hit the magic length check of 260
1✔
2109
                        // bytes in processRemoteSettleFails either.
1✔
2110
                        //
1✔
2111
                        // Because the reason is unreadable for the payer
1✔
2112
                        // anyway, we just replace it by a compliant-length
1✔
2113
                        // series of random bytes.
1✔
2114
                        msg.Reason = make([]byte, minimumFailReasonLength)
1✔
2115
                        _, err := crand.Read(msg.Reason[:])
1✔
2116
                        if err != nil {
1✔
2117
                                l.log.Errorf("Random generation error: %v", err)
×
2118

×
2119
                                return
×
2120
                        }
×
2121
                }
2122

2123
                // Add fail to the update log.
2124
                idx := msg.ID
124✔
2125
                err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
124✔
2126
                if err != nil {
124✔
2127
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2128
                                "unable to handle upstream fail HTLC: %v", err)
×
2129
                        return
×
2130
                }
×
2131

2132
        case *lnwire.CommitSig:
2,375✔
2133
                // Since we may have learned new preimages for the first time,
2,375✔
2134
                // we'll add them to our preimage cache. By doing this, we
2,375✔
2135
                // ensure any contested contracts watched by any on-chain
2,375✔
2136
                // arbitrators can now sweep this HTLC on-chain. We delay
2,375✔
2137
                // committing the preimages until just before accepting the new
2,375✔
2138
                // remote commitment, as afterwards the peer won't resend the
2,375✔
2139
                // Settle messages on the next channel reestablishment. Doing so
2,375✔
2140
                // allows us to more effectively batch this operation, instead
2,375✔
2141
                // of doing a single write per preimage.
2,375✔
2142
                err := l.cfg.PreimageCache.AddPreimages(
2,375✔
2143
                        l.uncommittedPreimages...,
2,375✔
2144
                )
2,375✔
2145
                if err != nil {
2,375✔
2146
                        l.fail(
×
2147
                                LinkFailureError{code: ErrInternalError},
×
2148
                                "unable to add preimages=%v to cache: %v",
×
2149
                                l.uncommittedPreimages, err,
×
2150
                        )
×
2151
                        return
×
2152
                }
×
2153

2154
                // Instead of truncating the slice to conserve memory
2155
                // allocations, we simply set the uncommitted preimage slice to
2156
                // nil so that a new one will be initialized if any more
2157
                // witnesses are discovered. We do this because the maximum size
2158
                // that the slice can occupy is 15KB, and we want to ensure we
2159
                // release that memory back to the runtime.
2160
                l.uncommittedPreimages = nil
2,375✔
2161

2,375✔
2162
                // We just received a new updates to our local commitment
2,375✔
2163
                // chain, validate this new commitment, closing the link if
2,375✔
2164
                // invalid.
2,375✔
2165
                err = l.channel.ReceiveNewCommitment(&lnwallet.CommitSigs{
2,375✔
2166
                        CommitSig:  msg.CommitSig,
2,375✔
2167
                        HtlcSigs:   msg.HtlcSigs,
2,375✔
2168
                        PartialSig: msg.PartialSig,
2,375✔
2169
                })
2,375✔
2170
                if err != nil {
2,375✔
2171
                        // If we were unable to reconstruct their proposed
×
2172
                        // commitment, then we'll examine the type of error. If
×
2173
                        // it's an InvalidCommitSigError, then we'll send a
×
2174
                        // direct error.
×
2175
                        var sendData []byte
×
2176
                        switch err.(type) {
×
2177
                        case *lnwallet.InvalidCommitSigError:
×
2178
                                sendData = []byte(err.Error())
×
2179
                        case *lnwallet.InvalidHtlcSigError:
×
2180
                                sendData = []byte(err.Error())
×
2181
                        }
2182
                        l.fail(
×
2183
                                LinkFailureError{
×
2184
                                        code:          ErrInvalidCommitment,
×
2185
                                        FailureAction: LinkFailureForceClose,
×
2186
                                        SendData:      sendData,
×
2187
                                },
×
2188
                                "ChannelPoint(%v): unable to accept new "+
×
2189
                                        "commitment: %v",
×
2190
                                l.channel.ChannelPoint(), err,
×
2191
                        )
×
2192
                        return
×
2193
                }
2194

2195
                // As we've just accepted a new state, we'll now
2196
                // immediately send the remote peer a revocation for our prior
2197
                // state.
2198
                nextRevocation, currentHtlcs, finalHTLCs, err :=
2,375✔
2199
                        l.channel.RevokeCurrentCommitment()
2,375✔
2200
                if err != nil {
2,375✔
2201
                        l.log.Errorf("unable to revoke commitment: %v", err)
×
2202

×
2203
                        // We need to fail the channel in case revoking our
×
2204
                        // local commitment does not succeed. We might have
×
2205
                        // already advanced our channel state which would lead
×
2206
                        // us to proceed with an unclean state.
×
2207
                        //
×
2208
                        // NOTE: We do not trigger a force close because this
×
2209
                        // could resolve itself in case our db was just busy
×
2210
                        // not accepting new transactions.
×
2211
                        l.fail(
×
2212
                                LinkFailureError{
×
2213
                                        code:          ErrInternalError,
×
2214
                                        Warning:       true,
×
2215
                                        FailureAction: LinkFailureDisconnect,
×
2216
                                },
×
2217
                                "ChannelPoint(%v): unable to accept new "+
×
2218
                                        "commitment: %v",
×
2219
                                l.channel.ChannelPoint(), err,
×
2220
                        )
×
2221
                        return
×
2222
                }
×
2223

2224
                // As soon as we are ready to send our next revocation, we can
2225
                // invoke the incoming commit hooks.
2226
                l.RWMutex.Lock()
2,375✔
2227
                l.incomingCommitHooks.invoke()
2,375✔
2228
                l.RWMutex.Unlock()
2,375✔
2229

2,375✔
2230
                l.cfg.Peer.SendMessage(false, nextRevocation)
2,375✔
2231

2,375✔
2232
                // Notify the incoming htlcs of which the resolutions were
2,375✔
2233
                // locked in.
2,375✔
2234
                for id, settled := range finalHTLCs {
3,149✔
2235
                        l.cfg.HtlcNotifier.NotifyFinalHtlcEvent(
774✔
2236
                                models.CircuitKey{
774✔
2237
                                        ChanID: l.ShortChanID(),
774✔
2238
                                        HtlcID: id,
774✔
2239
                                },
774✔
2240
                                channeldb.FinalHtlcInfo{
774✔
2241
                                        Settled:  settled,
774✔
2242
                                        Offchain: true,
774✔
2243
                                },
774✔
2244
                        )
774✔
2245
                }
774✔
2246

2247
                // Since we just revoked our commitment, we may have a new set
2248
                // of HTLC's on our commitment, so we'll send them using our
2249
                // function closure NotifyContractUpdate.
2250
                newUpdate := &contractcourt.ContractUpdate{
2,375✔
2251
                        HtlcKey: contractcourt.LocalHtlcSet,
2,375✔
2252
                        Htlcs:   currentHtlcs,
2,375✔
2253
                }
2,375✔
2254
                err = l.cfg.NotifyContractUpdate(newUpdate)
2,375✔
2255
                if err != nil {
2,375✔
2256
                        l.log.Errorf("unable to notify contract update: %v",
×
2257
                                err)
×
2258
                        return
×
2259
                }
×
2260

2261
                select {
2,375✔
2262
                case <-l.quit:
8✔
2263
                        return
8✔
2264
                default:
2,367✔
2265
                }
2266

2267
                // If the remote party initiated the state transition,
2268
                // we'll reply with a signature to provide them with their
2269
                // version of the latest commitment. Otherwise, both commitment
2270
                // chains are fully synced from our PoV, then we don't need to
2271
                // reply with a signature as both sides already have a
2272
                // commitment with the latest accepted.
2273
                if l.channel.OweCommitment() {
3,693✔
2274
                        if !l.updateCommitTxOrFail() {
1,326✔
2275
                                return
×
2276
                        }
×
2277
                }
2278

2279
                // Now that we have finished processing the incoming CommitSig
2280
                // and sent out our RevokeAndAck, we invoke the flushHooks if
2281
                // the channel state is clean.
2282
                l.RWMutex.Lock()
2,367✔
2283
                if l.channel.IsChannelClean() {
2,589✔
2284
                        l.flushHooks.invoke()
222✔
2285
                }
222✔
2286
                l.RWMutex.Unlock()
2,367✔
2287

2288
        case *lnwire.RevokeAndAck:
2,361✔
2289
                // We've received a revocation from the remote chain, if valid,
2,361✔
2290
                // this moves the remote chain forward, and expands our
2,361✔
2291
                // revocation window.
2,361✔
2292

2,361✔
2293
                // We now process the message and advance our remote commit
2,361✔
2294
                // chain.
2,361✔
2295
                fwdPkg, adds, settleFails, remoteHTLCs, err := l.channel.
2,361✔
2296
                        ReceiveRevocation(msg)
2,361✔
2297
                if err != nil {
2,361✔
2298
                        // TODO(halseth): force close?
×
2299
                        l.fail(
×
2300
                                LinkFailureError{
×
2301
                                        code:          ErrInvalidRevocation,
×
2302
                                        FailureAction: LinkFailureDisconnect,
×
2303
                                },
×
2304
                                "unable to accept revocation: %v", err,
×
2305
                        )
×
2306
                        return
×
2307
                }
×
2308

2309
                // The remote party now has a new primary commitment, so we'll
2310
                // update the contract court to be aware of this new set (the
2311
                // prior old remote pending).
2312
                newUpdate := &contractcourt.ContractUpdate{
2,361✔
2313
                        HtlcKey: contractcourt.RemoteHtlcSet,
2,361✔
2314
                        Htlcs:   remoteHTLCs,
2,361✔
2315
                }
2,361✔
2316
                err = l.cfg.NotifyContractUpdate(newUpdate)
2,361✔
2317
                if err != nil {
2,361✔
2318
                        l.log.Errorf("unable to notify contract update: %v",
×
2319
                                err)
×
2320
                        return
×
2321
                }
×
2322

2323
                select {
2,361✔
2324
                case <-l.quit:
3✔
2325
                        return
3✔
2326
                default:
2,358✔
2327
                }
2328

2329
                // If we have a tower client for this channel type, we'll
2330
                // create a backup for the current state.
2331
                if l.cfg.TowerClient != nil {
2,362✔
2332
                        state := l.channel.State()
4✔
2333
                        chanID := l.ChanID()
4✔
2334

4✔
2335
                        err = l.cfg.TowerClient.BackupState(
4✔
2336
                                &chanID, state.RemoteCommitment.CommitHeight-1,
4✔
2337
                        )
4✔
2338
                        if err != nil {
4✔
2339
                                l.fail(LinkFailureError{code: ErrInternalError},
×
2340
                                        "unable to queue breach backup: %v",
×
2341
                                        err)
×
2342
                                return
×
2343
                        }
×
2344
                }
2345

2346
                l.processRemoteSettleFails(fwdPkg, settleFails)
2,358✔
2347
                l.processRemoteAdds(fwdPkg, adds)
2,358✔
2348

2,358✔
2349
                // If the link failed during processing the adds, we must
2,358✔
2350
                // return to ensure we won't attempted to update the state
2,358✔
2351
                // further.
2,358✔
2352
                if l.failed {
2,358✔
2353
                        return
×
2354
                }
×
2355

2356
                // The revocation window opened up. If there are pending local
2357
                // updates, try to update the commit tx. Pending updates could
2358
                // already have been present because of a previously failed
2359
                // update to the commit tx or freshly added in by
2360
                // processRemoteAdds. Also in case there are no local updates,
2361
                // but there are still remote updates that are not in the remote
2362
                // commit tx yet, send out an update.
2363
                if l.channel.OweCommitment() {
2,960✔
2364
                        if !l.updateCommitTxOrFail() {
605✔
2365
                                return
3✔
2366
                        }
3✔
2367
                }
2368

2369
                // Now that we have finished processing the RevokeAndAck, we
2370
                // can invoke the flushHooks if the channel state is clean.
2371
                l.RWMutex.Lock()
2,355✔
2372
                if l.channel.IsChannelClean() {
2,531✔
2373
                        l.flushHooks.invoke()
176✔
2374
                }
176✔
2375
                l.RWMutex.Unlock()
2,355✔
2376

2377
        case *lnwire.UpdateFee:
3✔
2378
                // We received fee update from peer. If we are the initiator we
3✔
2379
                // will fail the channel, if not we will apply the update.
3✔
2380
                fee := chainfee.SatPerKWeight(msg.FeePerKw)
3✔
2381
                if err := l.channel.ReceiveUpdateFee(fee); err != nil {
3✔
2382
                        l.fail(LinkFailureError{code: ErrInvalidUpdate},
×
2383
                                "error receiving fee update: %v", err)
×
2384
                        return
×
2385
                }
×
2386

2387
                // Update the mailbox's feerate as well.
2388
                l.mailBox.SetFeeRate(fee)
3✔
2389

2390
        // In the case where we receive a warning message from our peer, just
2391
        // log it and move on. We choose not to disconnect from our peer,
2392
        // although we "MAY" do so according to the specification.
2393
        case *lnwire.Warning:
1✔
2394
                l.log.Warnf("received warning message from peer: %v",
1✔
2395
                        msg.Warning())
1✔
2396

2397
        case *lnwire.Error:
3✔
2398
                // Error received from remote, MUST fail channel, but should
3✔
2399
                // only print the contents of the error message if all
3✔
2400
                // characters are printable ASCII.
3✔
2401
                l.fail(
3✔
2402
                        LinkFailureError{
3✔
2403
                                code: ErrRemoteError,
3✔
2404

3✔
2405
                                // TODO(halseth): we currently don't fail the
3✔
2406
                                // channel permanently, as there are some sync
3✔
2407
                                // issues with other implementations that will
3✔
2408
                                // lead to them sending an error message, but
3✔
2409
                                // we can recover from on next connection. See
3✔
2410
                                // https://github.com/ElementsProject/lightning/issues/4212
3✔
2411
                                PermanentFailure: false,
3✔
2412
                        },
3✔
2413
                        "ChannelPoint(%v): received error from peer: %v",
3✔
2414
                        l.channel.ChannelPoint(), msg.Error(),
3✔
2415
                )
3✔
2416
        default:
×
2417
                l.log.Warnf("received unknown message of type %T", msg)
×
2418
        }
2419

2420
}
2421

2422
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
2423
// for packets delivered from server, and cleaning up any circuits closed by
2424
// signing a previous commitment txn. This method ensures that the circuits are
2425
// removed from the circuit map before removing them from the link's mailbox,
2426
// otherwise it could be possible for some circuit to be missed if this link
2427
// flaps.
2428
func (l *channelLink) ackDownStreamPackets() error {
2,546✔
2429
        // First, remove the downstream Add packets that were included in the
2,546✔
2430
        // previous commitment signature. This will prevent the Adds from being
2,546✔
2431
        // replayed if this link disconnects.
2,546✔
2432
        for _, inKey := range l.openedCircuits {
4,061✔
2433
                // In order to test the sphinx replay logic of the remote
1,515✔
2434
                // party, unsafe replay does not acknowledge the packets from
1,515✔
2435
                // the mailbox. We can then force a replay of any Add packets
1,515✔
2436
                // held in memory by disconnecting and reconnecting the link.
1,515✔
2437
                if l.cfg.UnsafeReplay {
1,519✔
2438
                        continue
4✔
2439
                }
2440

2441
                l.log.Debugf("removing Add packet %s from mailbox", inKey)
1,515✔
2442
                l.mailBox.AckPacket(inKey)
1,515✔
2443
        }
2444

2445
        // Now, we will delete all circuits closed by the previous commitment
2446
        // signature, which is the result of downstream Settle/Fail packets. We
2447
        // batch them here to ensure circuits are closed atomically and for
2448
        // performance.
2449
        err := l.cfg.Circuits.DeleteCircuits(l.closedCircuits...)
2,546✔
2450
        switch err {
2,546✔
2451
        case nil:
2,546✔
2452
                // Successful deletion.
2453

2454
        default:
×
2455
                l.log.Errorf("unable to delete %d circuits: %v",
×
2456
                        len(l.closedCircuits), err)
×
2457
                return err
×
2458
        }
2459

2460
        // With the circuits removed from memory and disk, we now ack any
2461
        // Settle/Fails in the mailbox to ensure they do not get redelivered
2462
        // after startup. If forgive is enabled and we've reached this point,
2463
        // the circuits must have been removed at some point, so it is now safe
2464
        // to un-queue the corresponding Settle/Fails.
2465
        for _, inKey := range l.closedCircuits {
2,589✔
2466
                l.log.Debugf("removing Fail/Settle packet %s from mailbox",
43✔
2467
                        inKey)
43✔
2468
                l.mailBox.AckPacket(inKey)
43✔
2469
        }
43✔
2470

2471
        // Lastly, reset our buffers to be empty while keeping any acquired
2472
        // growth in the backing array.
2473
        l.openedCircuits = l.openedCircuits[:0]
2,546✔
2474
        l.closedCircuits = l.closedCircuits[:0]
2,546✔
2475

2,546✔
2476
        return nil
2,546✔
2477
}
2478

2479
// updateCommitTxOrFail updates the commitment tx and if that fails, it fails
2480
// the link.
2481
func (l *channelLink) updateCommitTxOrFail() bool {
2,764✔
2482
        err := l.updateCommitTx()
2,764✔
2483
        switch err {
2,764✔
2484
        // No error encountered, success.
2485
        case nil:
2,761✔
2486

2487
        // A duplicate keystone error should be resolved and is not fatal, so
2488
        // we won't send an Error message to the peer.
2489
        case ErrDuplicateKeystone:
×
2490
                l.fail(LinkFailureError{code: ErrCircuitError},
×
2491
                        "temporary circuit error: %v", err)
×
2492
                return false
×
2493

2494
        // Any other error is treated results in an Error message being sent to
2495
        // the peer.
2496
        default:
3✔
2497
                l.fail(LinkFailureError{code: ErrInternalError},
3✔
2498
                        "unable to update commitment: %v", err)
3✔
2499
                return false
3✔
2500
        }
2501

2502
        return true
2,761✔
2503
}
2504

2505
// updateCommitTx signs, then sends an update to the remote peer adding a new
2506
// commitment to their commitment chain which includes all the latest updates
2507
// we've received+processed up to this point.
2508
func (l *channelLink) updateCommitTx() error {
3,255✔
2509
        // Preemptively write all pending keystones to disk, just in case the
3,255✔
2510
        // HTLCs we have in memory are included in the subsequent attempt to
3,255✔
2511
        // sign a commitment state.
3,255✔
2512
        err := l.cfg.Circuits.OpenCircuits(l.keystoneBatch...)
3,255✔
2513
        if err != nil {
3,255✔
2514
                // If ErrDuplicateKeystone is returned, the caller will catch
×
2515
                // it.
×
2516
                return err
×
2517
        }
×
2518

2519
        // Reset the batch, but keep the backing buffer to avoid reallocating.
2520
        l.keystoneBatch = l.keystoneBatch[:0]
3,255✔
2521

3,255✔
2522
        // If hodl.Commit mode is active, we will refrain from attempting to
3,255✔
2523
        // commit any in-memory modifications to the channel state. Exiting here
3,255✔
2524
        // permits testing of either the switch or link's ability to trim
3,255✔
2525
        // circuits that have been opened, but unsuccessfully committed.
3,255✔
2526
        if l.cfg.HodlMask.Active(hodl.Commit) {
3,263✔
2527
                l.log.Warnf(hodl.Commit.Warning())
8✔
2528
                return nil
8✔
2529
        }
8✔
2530

2531
        newCommit, err := l.channel.SignNextCommitment()
3,251✔
2532
        if err == lnwallet.ErrNoWindow {
4,128✔
2533
                l.cfg.PendingCommitTicker.Resume()
877✔
2534
                l.log.Trace("PendingCommitTicker resumed")
877✔
2535

877✔
2536
                l.log.Tracef("revocation window exhausted, unable to send: "+
877✔
2537
                        "%v, pend_updates=%v, dangling_closes%v",
877✔
2538
                        l.channel.PendingLocalUpdateCount(),
877✔
2539
                        newLogClosure(func() string {
877✔
2540
                                return spew.Sdump(l.openedCircuits)
×
2541
                        }),
×
2542
                        newLogClosure(func() string {
×
2543
                                return spew.Sdump(l.closedCircuits)
×
2544
                        }),
×
2545
                )
2546
                return nil
877✔
2547
        } else if err != nil {
2,378✔
2548
                return err
×
2549
        }
×
2550

2551
        if err := l.ackDownStreamPackets(); err != nil {
2,378✔
2552
                return err
×
2553
        }
×
2554

2555
        l.cfg.PendingCommitTicker.Pause()
2,378✔
2556
        l.log.Trace("PendingCommitTicker paused after ackDownStreamPackets")
2,378✔
2557

2,378✔
2558
        // The remote party now has a new pending commitment, so we'll update
2,378✔
2559
        // the contract court to be aware of this new set (the prior old remote
2,378✔
2560
        // pending).
2,378✔
2561
        newUpdate := &contractcourt.ContractUpdate{
2,378✔
2562
                HtlcKey: contractcourt.RemotePendingHtlcSet,
2,378✔
2563
                Htlcs:   newCommit.PendingHTLCs,
2,378✔
2564
        }
2,378✔
2565
        err = l.cfg.NotifyContractUpdate(newUpdate)
2,378✔
2566
        if err != nil {
2,378✔
2567
                l.log.Errorf("unable to notify contract update: %v", err)
×
2568
                return err
×
2569
        }
×
2570

2571
        select {
2,378✔
2572
        case <-l.quit:
4✔
2573
                return ErrLinkShuttingDown
4✔
2574
        default:
2,374✔
2575
        }
2576

2577
        commitSig := &lnwire.CommitSig{
2,374✔
2578
                ChanID:     l.ChanID(),
2,374✔
2579
                CommitSig:  newCommit.CommitSig,
2,374✔
2580
                HtlcSigs:   newCommit.HtlcSigs,
2,374✔
2581
                PartialSig: newCommit.PartialSig,
2,374✔
2582
        }
2,374✔
2583
        l.cfg.Peer.SendMessage(false, commitSig)
2,374✔
2584

2,374✔
2585
        // Now that we have sent out a new CommitSig, we invoke the outgoing set
2,374✔
2586
        // of commit hooks.
2,374✔
2587
        l.RWMutex.Lock()
2,374✔
2588
        l.outgoingCommitHooks.invoke()
2,374✔
2589
        l.RWMutex.Unlock()
2,374✔
2590

2,374✔
2591
        return nil
2,374✔
2592
}
2593

2594
// Peer returns the representation of remote peer with which we have the
2595
// channel link opened.
2596
//
2597
// NOTE: Part of the ChannelLink interface.
2598
func (l *channelLink) PeerPubKey() [33]byte {
441✔
2599
        return l.cfg.Peer.PubKey()
441✔
2600
}
441✔
2601

2602
// ChannelPoint returns the channel outpoint for the channel link.
2603
// NOTE: Part of the ChannelLink interface.
2604
func (l *channelLink) ChannelPoint() wire.OutPoint {
848✔
2605
        return l.channel.ChannelPoint()
848✔
2606
}
848✔
2607

2608
// ShortChanID returns the short channel ID for the channel link. The short
2609
// channel ID encodes the exact location in the main chain that the original
2610
// funding output can be found.
2611
//
2612
// NOTE: Part of the ChannelLink interface.
2613
func (l *channelLink) ShortChanID() lnwire.ShortChannelID {
9,713✔
2614
        l.RLock()
9,713✔
2615
        defer l.RUnlock()
9,713✔
2616

9,713✔
2617
        return l.channel.ShortChanID()
9,713✔
2618
}
9,713✔
2619

2620
// UpdateShortChanID updates the short channel ID for a link. This may be
2621
// required in the event that a link is created before the short chan ID for it
2622
// is known, or a re-org occurs, and the funding transaction changes location
2623
// within the chain.
2624
//
2625
// NOTE: Part of the ChannelLink interface.
2626
func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
4✔
2627
        chanID := l.ChanID()
4✔
2628

4✔
2629
        // Refresh the channel state's short channel ID by loading it from disk.
4✔
2630
        // This ensures that the channel state accurately reflects the updated
4✔
2631
        // short channel ID.
4✔
2632
        err := l.channel.State().Refresh()
4✔
2633
        if err != nil {
4✔
2634
                l.log.Errorf("unable to refresh short_chan_id for chan_id=%v: "+
×
2635
                        "%v", chanID, err)
×
2636
                return hop.Source, err
×
2637
        }
×
2638

2639
        return hop.Source, nil
4✔
2640
}
2641

2642
// ChanID returns the channel ID for the channel link. The channel ID is a more
2643
// compact representation of a channel's full outpoint.
2644
//
2645
// NOTE: Part of the ChannelLink interface.
2646
func (l *channelLink) ChanID() lnwire.ChannelID {
7,287✔
2647
        return lnwire.NewChanIDFromOutPoint(l.channel.ChannelPoint())
7,287✔
2648
}
7,287✔
2649

2650
// Bandwidth returns the total amount that can flow through the channel link at
2651
// this given instance. The value returned is expressed in millisatoshi and can
2652
// be used by callers when making forwarding decisions to determine if a link
2653
// can accept an HTLC.
2654
//
2655
// NOTE: Part of the ChannelLink interface.
2656
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
1,963✔
2657
        // Get the balance available on the channel for new HTLCs. This takes
1,963✔
2658
        // the channel reserve into account so HTLCs up to this value won't
1,963✔
2659
        // violate it.
1,963✔
2660
        return l.channel.AvailableBalance()
1,963✔
2661
}
1,963✔
2662

2663
// MayAddOutgoingHtlc indicates whether we can add an outgoing htlc with the
2664
// amount provided to the link. This check does not reserve a space, since
2665
// forwards or other payments may use the available slot, so it should be
2666
// considered best-effort.
2667
func (l *channelLink) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error {
4✔
2668
        return l.channel.MayAddOutgoingHtlc(amt)
4✔
2669
}
4✔
2670

2671
// getDustSum is a wrapper method that calls the underlying channel's dust sum
2672
// method.
2673
//
2674
// NOTE: Part of the dustHandler interface.
2675
func (l *channelLink) getDustSum(remote bool) lnwire.MilliSatoshi {
1,981✔
2676
        return l.channel.GetDustSum(remote)
1,981✔
2677
}
1,981✔
2678

2679
// getFeeRate is a wrapper method that retrieves the underlying channel's
2680
// feerate.
2681
//
2682
// NOTE: Part of the dustHandler interface.
2683
func (l *channelLink) getFeeRate() chainfee.SatPerKWeight {
1,820✔
2684
        return l.channel.CommitFeeRate()
1,820✔
2685
}
1,820✔
2686

2687
// getDustClosure returns a closure that can be used by the switch or mailbox
2688
// to evaluate whether a given HTLC is dust.
2689
//
2690
// NOTE: Part of the dustHandler interface.
2691
func (l *channelLink) getDustClosure() dustClosure {
1,820✔
2692
        localDustLimit := l.channel.State().LocalChanCfg.DustLimit
1,820✔
2693
        remoteDustLimit := l.channel.State().RemoteChanCfg.DustLimit
1,820✔
2694
        chanType := l.channel.State().ChanType
1,820✔
2695

1,820✔
2696
        return dustHelper(chanType, localDustLimit, remoteDustLimit)
1,820✔
2697
}
1,820✔
2698

2699
// dustClosure is a function that evaluates whether an HTLC is dust. It returns
2700
// true if the HTLC is dust. It takes in a feerate, a boolean denoting whether
2701
// the HTLC is incoming (i.e. one that the remote sent), a boolean denoting
2702
// whether to evaluate on the local or remote commit, and finally an HTLC
2703
// amount to test.
2704
type dustClosure func(chainfee.SatPerKWeight, bool, bool, btcutil.Amount) bool
2705

2706
// dustHelper is used to construct the dustClosure.
2707
func dustHelper(chantype channeldb.ChannelType, localDustLimit,
2708
        remoteDustLimit btcutil.Amount) dustClosure {
2,020✔
2709

2,020✔
2710
        isDust := func(feerate chainfee.SatPerKWeight, incoming,
2,020✔
2711
                localCommit bool, amt btcutil.Amount) bool {
46,252✔
2712

44,232✔
2713
                if localCommit {
66,350✔
2714
                        return lnwallet.HtlcIsDust(
22,118✔
2715
                                chantype, incoming, true, feerate, amt,
22,118✔
2716
                                localDustLimit,
22,118✔
2717
                        )
22,118✔
2718
                }
22,118✔
2719

2720
                return lnwallet.HtlcIsDust(
22,118✔
2721
                        chantype, incoming, false, feerate, amt,
22,118✔
2722
                        remoteDustLimit,
22,118✔
2723
                )
22,118✔
2724
        }
2725

2726
        return isDust
2,020✔
2727
}
2728

2729
// zeroConfConfirmed returns whether or not the zero-conf channel has
2730
// confirmed on-chain.
2731
//
2732
// Part of the scidAliasHandler interface.
2733
func (l *channelLink) zeroConfConfirmed() bool {
7✔
2734
        return l.channel.State().ZeroConfConfirmed()
7✔
2735
}
7✔
2736

2737
// confirmedScid returns the confirmed SCID for a zero-conf channel. This
2738
// should not be called for non-zero-conf channels.
2739
//
2740
// Part of the scidAliasHandler interface.
2741
func (l *channelLink) confirmedScid() lnwire.ShortChannelID {
7✔
2742
        return l.channel.State().ZeroConfRealScid()
7✔
2743
}
7✔
2744

2745
// isZeroConf returns whether or not the underlying channel is a zero-conf
2746
// channel.
2747
//
2748
// Part of the scidAliasHandler interface.
2749
func (l *channelLink) isZeroConf() bool {
215✔
2750
        return l.channel.State().IsZeroConf()
215✔
2751
}
215✔
2752

2753
// negotiatedAliasFeature returns whether or not the underlying channel has
2754
// negotiated the option-scid-alias feature bit. This will be true for both
2755
// option-scid-alias and zero-conf channel-types. It will also be true for
2756
// channels with the feature bit but without the above channel-types.
2757
//
2758
// Part of the scidAliasFeature interface.
2759
func (l *channelLink) negotiatedAliasFeature() bool {
374✔
2760
        return l.channel.State().NegotiatedAliasFeature()
374✔
2761
}
374✔
2762

2763
// getAliases returns the set of aliases for the underlying channel.
2764
//
2765
// Part of the scidAliasHandler interface.
2766
func (l *channelLink) getAliases() []lnwire.ShortChannelID {
221✔
2767
        return l.cfg.GetAliases(l.ShortChanID())
221✔
2768
}
221✔
2769

2770
// attachFailAliasUpdate sets the link's FailAliasUpdate function.
2771
//
2772
// Part of the scidAliasHandler interface.
2773
func (l *channelLink) attachFailAliasUpdate(closure func(
2774
        sid lnwire.ShortChannelID, incoming bool) *lnwire.ChannelUpdate) {
216✔
2775

216✔
2776
        l.Lock()
216✔
2777
        l.cfg.FailAliasUpdate = closure
216✔
2778
        l.Unlock()
216✔
2779
}
216✔
2780

2781
// AttachMailBox updates the current mailbox used by this link, and hooks up
2782
// the mailbox's message and packet outboxes to the link's upstream and
2783
// downstream chans, respectively.
2784
func (l *channelLink) AttachMailBox(mailbox MailBox) {
215✔
2785
        l.Lock()
215✔
2786
        l.mailBox = mailbox
215✔
2787
        l.upstream = mailbox.MessageOutBox()
215✔
2788
        l.downstream = mailbox.PacketOutBox()
215✔
2789
        l.Unlock()
215✔
2790

215✔
2791
        // Set the mailbox's fee rate. This may be refreshing a feerate that was
215✔
2792
        // never committed.
215✔
2793
        l.mailBox.SetFeeRate(l.getFeeRate())
215✔
2794

215✔
2795
        // Also set the mailbox's dust closure so that it can query whether HTLC's
215✔
2796
        // are dust given the current feerate.
215✔
2797
        l.mailBox.SetDustClosure(l.getDustClosure())
215✔
2798
}
215✔
2799

2800
// UpdateForwardingPolicy updates the forwarding policy for the target
2801
// ChannelLink. Once updated, the link will use the new forwarding policy to
2802
// govern if it an incoming HTLC should be forwarded or not. We assume that
2803
// fields that are zero are intentionally set to zero, so we'll use newPolicy to
2804
// update all of the link's FwrdingPolicy's values.
2805
//
2806
// NOTE: Part of the ChannelLink interface.
2807
func (l *channelLink) UpdateForwardingPolicy(
2808
        newPolicy models.ForwardingPolicy) {
16✔
2809

16✔
2810
        l.Lock()
16✔
2811
        defer l.Unlock()
16✔
2812

16✔
2813
        l.cfg.FwrdingPolicy = newPolicy
16✔
2814
}
16✔
2815

2816
// CheckHtlcForward should return a nil error if the passed HTLC details
2817
// satisfy the current forwarding policy fo the target link. Otherwise,
2818
// a LinkError with a valid protocol failure message should be returned
2819
// in order to signal to the source of the HTLC, the policy consistency
2820
// issue.
2821
//
2822
// NOTE: Part of the ChannelLink interface.
2823
func (l *channelLink) CheckHtlcForward(payHash [32]byte,
2824
        incomingHtlcAmt, amtToForward lnwire.MilliSatoshi,
2825
        incomingTimeout, outgoingTimeout uint32,
2826
        inboundFee models.InboundFee,
2827
        heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {
53✔
2828

53✔
2829
        l.RLock()
53✔
2830
        policy := l.cfg.FwrdingPolicy
53✔
2831
        l.RUnlock()
53✔
2832

53✔
2833
        // Using the outgoing HTLC amount, we'll calculate the outgoing
53✔
2834
        // fee this incoming HTLC must carry in order to satisfy the constraints
53✔
2835
        // of the outgoing link.
53✔
2836
        outFee := ExpectedFee(policy, amtToForward)
53✔
2837

53✔
2838
        // Then calculate the inbound fee that we charge based on the sum of
53✔
2839
        // outgoing HTLC amount and outgoing fee.
53✔
2840
        inFee := inboundFee.CalcFee(amtToForward + outFee)
53✔
2841

53✔
2842
        // Add up both fee components. It is important to calculate both fees
53✔
2843
        // separately. An alternative way of calculating is to first determine
53✔
2844
        // an aggregate fee and apply that to the outgoing HTLC amount. However,
53✔
2845
        // rounding may cause the result to be slightly higher than in the case
53✔
2846
        // of separately rounded fee components. This potentially causes failed
53✔
2847
        // forwards for senders and is something to be avoided.
53✔
2848
        expectedFee := inFee + int64(outFee)
53✔
2849

53✔
2850
        // If the actual fee is less than our expected fee, then we'll reject
53✔
2851
        // this HTLC as it didn't provide a sufficient amount of fees, or the
53✔
2852
        // values have been tampered with, or the send used incorrect/dated
53✔
2853
        // information to construct the forwarding information for this hop. In
53✔
2854
        // any case, we'll cancel this HTLC.
53✔
2855
        actualFee := int64(incomingHtlcAmt) - int64(amtToForward)
53✔
2856
        if incomingHtlcAmt < amtToForward || actualFee < expectedFee {
63✔
2857
                l.log.Warnf("outgoing htlc(%x) has insufficient fee: "+
10✔
2858
                        "expected %v, got %v: incoming=%v, outgoing=%v, "+
10✔
2859
                        "inboundFee=%v",
10✔
2860
                        payHash[:], expectedFee, actualFee,
10✔
2861
                        incomingHtlcAmt, amtToForward, inboundFee,
10✔
2862
                )
10✔
2863

10✔
2864
                // As part of the returned error, we'll send our latest routing
10✔
2865
                // policy so the sending node obtains the most up to date data.
10✔
2866
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
20✔
2867
                        return lnwire.NewFeeInsufficient(amtToForward, *upd)
10✔
2868
                }
10✔
2869
                failure := l.createFailureWithUpdate(false, originalScid, cb)
10✔
2870
                return NewLinkError(failure)
10✔
2871
        }
2872

2873
        // Check whether the outgoing htlc satisfies the channel policy.
2874
        err := l.canSendHtlc(
47✔
2875
                policy, payHash, amtToForward, outgoingTimeout, heightNow,
47✔
2876
                originalScid,
47✔
2877
        )
47✔
2878
        if err != nil {
64✔
2879
                return err
17✔
2880
        }
17✔
2881

2882
        // Finally, we'll ensure that the time-lock on the outgoing HTLC meets
2883
        // the following constraint: the incoming time-lock minus our time-lock
2884
        // delta should equal the outgoing time lock. Otherwise, whether the
2885
        // sender messed up, or an intermediate node tampered with the HTLC.
2886
        timeDelta := policy.TimeLockDelta
34✔
2887
        if incomingTimeout < outgoingTimeout+timeDelta {
36✔
2888
                l.log.Warnf("incoming htlc(%x) has incorrect time-lock value: "+
2✔
2889
                        "expected at least %v block delta, got %v block delta",
2✔
2890
                        payHash[:], timeDelta, incomingTimeout-outgoingTimeout)
2✔
2891

2✔
2892
                // Grab the latest routing policy so the sending node is up to
2✔
2893
                // date with our current policy.
2✔
2894
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
4✔
2895
                        return lnwire.NewIncorrectCltvExpiry(
2✔
2896
                                incomingTimeout, *upd,
2✔
2897
                        )
2✔
2898
                }
2✔
2899
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2900
                return NewLinkError(failure)
2✔
2901
        }
2902

2903
        return nil
32✔
2904
}
2905

2906
// CheckHtlcTransit should return a nil error if the passed HTLC details
2907
// satisfy the current channel policy.  Otherwise, a LinkError with a
2908
// valid protocol failure message should be returned in order to signal
2909
// the violation. This call is intended to be used for locally initiated
2910
// payments for which there is no corresponding incoming htlc.
2911
func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
2912
        amt lnwire.MilliSatoshi, timeout uint32,
2913
        heightNow uint32) *LinkError {
1,559✔
2914

1,559✔
2915
        l.RLock()
1,559✔
2916
        policy := l.cfg.FwrdingPolicy
1,559✔
2917
        l.RUnlock()
1,559✔
2918

1,559✔
2919
        // We pass in hop.Source here as this is only used in the Switch when
1,559✔
2920
        // trying to send over a local link. This causes the fallback mechanism
1,559✔
2921
        // to occur.
1,559✔
2922
        return l.canSendHtlc(
1,559✔
2923
                policy, payHash, amt, timeout, heightNow, hop.Source,
1,559✔
2924
        )
1,559✔
2925
}
1,559✔
2926

2927
// canSendHtlc checks whether the given htlc parameters satisfy
2928
// the channel's amount and time lock constraints.
2929
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
2930
        payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
2931
        heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {
1,602✔
2932

1,602✔
2933
        // As our first sanity check, we'll ensure that the passed HTLC isn't
1,602✔
2934
        // too small for the next hop. If so, then we'll cancel the HTLC
1,602✔
2935
        // directly.
1,602✔
2936
        if amt < policy.MinHTLCOut {
1,614✔
2937
                l.log.Warnf("outgoing htlc(%x) is too small: min_htlc=%v, "+
12✔
2938
                        "htlc_value=%v", payHash[:], policy.MinHTLCOut,
12✔
2939
                        amt)
12✔
2940

12✔
2941
                // As part of the returned error, we'll send our latest routing
12✔
2942
                // policy so the sending node obtains the most up to date data.
12✔
2943
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
24✔
2944
                        return lnwire.NewAmountBelowMinimum(amt, *upd)
12✔
2945
                }
12✔
2946
                failure := l.createFailureWithUpdate(false, originalScid, cb)
12✔
2947
                return NewLinkError(failure)
12✔
2948
        }
2949

2950
        // Next, ensure that the passed HTLC isn't too large. If so, we'll
2951
        // cancel the HTLC directly.
2952
        if policy.MaxHTLC != 0 && amt > policy.MaxHTLC {
1,601✔
2953
                l.log.Warnf("outgoing htlc(%x) is too large: max_htlc=%v, "+
7✔
2954
                        "htlc_value=%v", payHash[:], policy.MaxHTLC, amt)
7✔
2955

7✔
2956
                // As part of the returned error, we'll send our latest routing
7✔
2957
                // policy so the sending node obtains the most up-to-date data.
7✔
2958
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
14✔
2959
                        return lnwire.NewTemporaryChannelFailure(upd)
7✔
2960
                }
7✔
2961
                failure := l.createFailureWithUpdate(false, originalScid, cb)
7✔
2962
                return NewDetailedLinkError(failure, OutgoingFailureHTLCExceedsMax)
7✔
2963
        }
2964

2965
        // We want to avoid offering an HTLC which will expire in the near
2966
        // future, so we'll reject an HTLC if the outgoing expiration time is
2967
        // too close to the current height.
2968
        if timeout <= heightNow+l.cfg.OutgoingCltvRejectDelta {
1,593✔
2969
                l.log.Warnf("htlc(%x) has an expiry that's too soon: "+
2✔
2970
                        "outgoing_expiry=%v, best_height=%v", payHash[:],
2✔
2971
                        timeout, heightNow)
2✔
2972

2✔
2973
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
4✔
2974
                        return lnwire.NewExpiryTooSoon(*upd)
2✔
2975
                }
2✔
2976
                failure := l.createFailureWithUpdate(false, originalScid, cb)
2✔
2977
                return NewLinkError(failure)
2✔
2978
        }
2979

2980
        // Check absolute max delta.
2981
        if timeout > l.cfg.MaxOutgoingCltvExpiry+heightNow {
1,590✔
2982
                l.log.Warnf("outgoing htlc(%x) has a time lock too far in "+
1✔
2983
                        "the future: got %v, but maximum is %v", payHash[:],
1✔
2984
                        timeout-heightNow, l.cfg.MaxOutgoingCltvExpiry)
1✔
2985

1✔
2986
                return NewLinkError(&lnwire.FailExpiryTooFar{})
1✔
2987
        }
1✔
2988

2989
        // Check to see if there is enough balance in this channel.
2990
        if amt > l.Bandwidth() {
1,593✔
2991
                l.log.Warnf("insufficient bandwidth to route htlc: %v is "+
5✔
2992
                        "larger than %v", amt, l.Bandwidth())
5✔
2993
                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
10✔
2994
                        return lnwire.NewTemporaryChannelFailure(upd)
5✔
2995
                }
5✔
2996
                failure := l.createFailureWithUpdate(false, originalScid, cb)
5✔
2997
                return NewDetailedLinkError(
5✔
2998
                        failure, OutgoingFailureInsufficientBalance,
5✔
2999
                )
5✔
3000
        }
3001

3002
        return nil
1,587✔
3003
}
3004

3005
// Stats returns the statistics of channel link.
3006
//
3007
// NOTE: Part of the ChannelLink interface.
3008
func (l *channelLink) Stats() (uint64, lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
24✔
3009
        snapshot := l.channel.StateSnapshot()
24✔
3010

24✔
3011
        return snapshot.ChannelCommitment.CommitHeight,
24✔
3012
                snapshot.TotalMSatSent,
24✔
3013
                snapshot.TotalMSatReceived
24✔
3014
}
24✔
3015

3016
// String returns the string representation of channel link.
3017
//
3018
// NOTE: Part of the ChannelLink interface.
3019
func (l *channelLink) String() string {
×
3020
        return l.channel.ChannelPoint().String()
×
3021
}
×
3022

3023
// handleSwitchPacket handles the switch packets. This packets which might be
3024
// forwarded to us from another channel link in case the htlc update came from
3025
// another peer or if the update was created by user
3026
//
3027
// NOTE: Part of the packetHandler interface.
3028
func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
1,526✔
3029
        l.log.Tracef("received switch packet inkey=%v, outkey=%v",
1,526✔
3030
                pkt.inKey(), pkt.outKey())
1,526✔
3031

1,526✔
3032
        return l.mailBox.AddPacket(pkt)
1,526✔
3033
}
1,526✔
3034

3035
// HandleChannelUpdate handles the htlc requests as settle/add/fail which sent
3036
// to us from remote peer we have a channel with.
3037
//
3038
// NOTE: Part of the ChannelLink interface.
3039
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
7,187✔
3040
        select {
7,187✔
3041
        case <-l.quit:
×
3042
                // Return early if the link is already in the process of
×
3043
                // quitting. It doesn't make sense to hand the message to the
×
3044
                // mailbox here.
×
3045
                return
×
3046
        default:
7,187✔
3047
        }
3048

3049
        err := l.mailBox.AddMessage(message)
7,187✔
3050
        if err != nil {
7,187✔
3051
                l.log.Errorf("failed to add Message to mailbox: %v", err)
×
3052
        }
×
3053
}
3054

3055
// updateChannelFee updates the commitment fee-per-kw on this channel by
3056
// committing to an update_fee message.
3057
func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error {
3✔
3058
        l.log.Infof("updating commit fee to %v", feePerKw)
3✔
3059

3✔
3060
        // We skip sending the UpdateFee message if the channel is not
3✔
3061
        // currently eligible to forward messages.
3✔
3062
        if !l.EligibleToUpdate() {
3✔
3063
                l.log.Debugf("skipping fee update for inactive channel")
×
3064
                return nil
×
3065
        }
×
3066

3067
        // First, we'll update the local fee on our commitment.
3068
        if err := l.channel.UpdateFee(feePerKw); err != nil {
3✔
3069
                return err
×
3070
        }
×
3071

3072
        // The fee passed the channel's validation checks, so we update the
3073
        // mailbox feerate.
3074
        l.mailBox.SetFeeRate(feePerKw)
3✔
3075

3✔
3076
        // We'll then attempt to send a new UpdateFee message, and also lock it
3✔
3077
        // in immediately by triggering a commitment update.
3✔
3078
        msg := lnwire.NewUpdateFee(l.ChanID(), uint32(feePerKw))
3✔
3079
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
3✔
3080
                return err
×
3081
        }
×
3082
        return l.updateCommitTx()
3✔
3083
}
3084

3085
// processRemoteSettleFails accepts a batch of settle/fail payment descriptors
3086
// after receiving a revocation from the remote party, and reprocesses them in
3087
// the context of the provided forwarding package. Any settles or fails that
3088
// have already been acknowledged in the forwarding package will not be sent to
3089
// the switch.
3090
func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
3091
        settleFails []*lnwallet.PaymentDescriptor) {
2,358✔
3092

2,358✔
3093
        if len(settleFails) == 0 {
3,966✔
3094
                return
1,608✔
3095
        }
1,608✔
3096

3097
        l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter)
754✔
3098

754✔
3099
        var switchPackets []*htlcPacket
754✔
3100
        for i, pd := range settleFails {
1,508✔
3101
                // Skip any settles or fails that have already been
754✔
3102
                // acknowledged by the incoming link that originated the
754✔
3103
                // forwarded Add.
754✔
3104
                if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
754✔
3105
                        continue
×
3106
                }
3107

3108
                // TODO(roasbeef): rework log entries to a shared
3109
                // interface.
3110

3111
                switch pd.EntryType {
754✔
3112

3113
                // A settle for an HTLC we previously forwarded HTLC has been
3114
                // received. So we'll forward the HTLC to the switch which will
3115
                // handle propagating the settle to the prior hop.
3116
                case lnwallet.Settle:
631✔
3117
                        // If hodl.SettleIncoming is requested, we will not
631✔
3118
                        // forward the SETTLE to the switch and will not signal
631✔
3119
                        // a free slot on the commitment transaction.
631✔
3120
                        if l.cfg.HodlMask.Active(hodl.SettleIncoming) {
631✔
3121
                                l.log.Warnf(hodl.SettleIncoming.Warning())
×
3122
                                continue
×
3123
                        }
3124

3125
                        settlePacket := &htlcPacket{
631✔
3126
                                outgoingChanID: l.ShortChanID(),
631✔
3127
                                outgoingHTLCID: pd.ParentIndex,
631✔
3128
                                destRef:        pd.DestRef,
631✔
3129
                                htlc: &lnwire.UpdateFulfillHTLC{
631✔
3130
                                        PaymentPreimage: pd.RPreimage,
631✔
3131
                                },
631✔
3132
                        }
631✔
3133

631✔
3134
                        // Add the packet to the batch to be forwarded, and
631✔
3135
                        // notify the overflow queue that a spare spot has been
631✔
3136
                        // freed up within the commitment state.
631✔
3137
                        switchPackets = append(switchPackets, settlePacket)
631✔
3138

3139
                // A failureCode message for a previously forwarded HTLC has
3140
                // been received. As a result a new slot will be freed up in
3141
                // our commitment state, so we'll forward this to the switch so
3142
                // the backwards undo can continue.
3143
                case lnwallet.Fail:
127✔
3144
                        // If hodl.SettleIncoming is requested, we will not
127✔
3145
                        // forward the FAIL to the switch and will not signal a
127✔
3146
                        // free slot on the commitment transaction.
127✔
3147
                        if l.cfg.HodlMask.Active(hodl.FailIncoming) {
127✔
3148
                                l.log.Warnf(hodl.FailIncoming.Warning())
×
3149
                                continue
×
3150
                        }
3151

3152
                        // Fetch the reason the HTLC was canceled so we can
3153
                        // continue to propagate it. This failure originated
3154
                        // from another node, so the linkFailure field is not
3155
                        // set on the packet.
3156
                        failPacket := &htlcPacket{
127✔
3157
                                outgoingChanID: l.ShortChanID(),
127✔
3158
                                outgoingHTLCID: pd.ParentIndex,
127✔
3159
                                destRef:        pd.DestRef,
127✔
3160
                                htlc: &lnwire.UpdateFailHTLC{
127✔
3161
                                        Reason: lnwire.OpaqueReason(
127✔
3162
                                                pd.FailReason,
127✔
3163
                                        ),
127✔
3164
                                },
127✔
3165
                        }
127✔
3166

127✔
3167
                        l.log.Debugf("Failed to send %s", pd.Amount)
127✔
3168

127✔
3169
                        // If the failure message lacks an HMAC (but includes
127✔
3170
                        // the 4 bytes for encoding the message and padding
127✔
3171
                        // lengths, then this means that we received it as an
127✔
3172
                        // UpdateFailMalformedHTLC. As a result, we'll signal
127✔
3173
                        // that we need to convert this error within the switch
127✔
3174
                        // to an actual error, by encrypting it as if we were
127✔
3175
                        // the originating hop.
127✔
3176
                        convertedErrorSize := lnwire.FailureMessageLength + 4
127✔
3177
                        if len(pd.FailReason) == convertedErrorSize {
134✔
3178
                                failPacket.convertedError = true
7✔
3179
                        }
7✔
3180

3181
                        // Add the packet to the batch to be forwarded, and
3182
                        // notify the overflow queue that a spare spot has been
3183
                        // freed up within the commitment state.
3184
                        switchPackets = append(switchPackets, failPacket)
127✔
3185
                }
3186
        }
3187

3188
        // Only spawn the task forward packets we have a non-zero number.
3189
        if len(switchPackets) > 0 {
1,508✔
3190
                go l.forwardBatch(false, switchPackets...)
754✔
3191
        }
754✔
3192
}
3193

3194
// processRemoteAdds serially processes each of the Add payment descriptors
3195
// which have been "locked-in" by receiving a revocation from the remote party.
3196
// The forwarding package provided instructs how to process this batch,
3197
// indicating whether this is the first time these Adds are being processed, or
3198
// whether we are reprocessing as a result of a failure or restart. Adds that
3199
// have already been acknowledged in the forwarding package will be ignored.
3200
func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
3201
        lockedInHtlcs []*lnwallet.PaymentDescriptor) {
2,361✔
3202

2,361✔
3203
        l.log.Tracef("processing %d remote adds for height %d",
2,361✔
3204
                len(lockedInHtlcs), fwdPkg.Height)
2,361✔
3205

2,361✔
3206
        decodeReqs := make(
2,361✔
3207
                []hop.DecodeHopIteratorRequest, 0, len(lockedInHtlcs),
2,361✔
3208
        )
2,361✔
3209
        for _, pd := range lockedInHtlcs {
3,862✔
3210
                switch pd.EntryType {
1,501✔
3211

3212
                // TODO(conner): remove type switch?
3213
                case lnwallet.Add:
1,501✔
3214
                        // Before adding the new htlc to the state machine,
1,501✔
3215
                        // parse the onion object in order to obtain the
1,501✔
3216
                        // routing information with DecodeHopIterator function
1,501✔
3217
                        // which process the Sphinx packet.
1,501✔
3218
                        onionReader := bytes.NewReader(pd.OnionBlob)
1,501✔
3219

1,501✔
3220
                        req := hop.DecodeHopIteratorRequest{
1,501✔
3221
                                OnionReader:    onionReader,
1,501✔
3222
                                RHash:          pd.RHash[:],
1,501✔
3223
                                IncomingCltv:   pd.Timeout,
1,501✔
3224
                                IncomingAmount: pd.Amount,
1,501✔
3225
                                BlindingPoint:  pd.BlindingPoint,
1,501✔
3226
                        }
1,501✔
3227

1,501✔
3228
                        decodeReqs = append(decodeReqs, req)
1,501✔
3229
                }
3230
        }
3231

3232
        // Atomically decode the incoming htlcs, simultaneously checking for
3233
        // replay attempts. A particular index in the returned, spare list of
3234
        // channel iterators should only be used if the failure code at the
3235
        // same index is lnwire.FailCodeNone.
3236
        decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
2,361✔
3237
                fwdPkg.ID(), decodeReqs,
2,361✔
3238
        )
2,361✔
3239
        if sphinxErr != nil {
2,361✔
3240
                l.fail(LinkFailureError{code: ErrInternalError},
×
3241
                        "unable to decode hop iterators: %v", sphinxErr)
×
3242
                return
×
3243
        }
×
3244

3245
        var switchPackets []*htlcPacket
2,361✔
3246

2,361✔
3247
        for i, pd := range lockedInHtlcs {
3,862✔
3248
                idx := uint16(i)
1,501✔
3249

1,501✔
3250
                if fwdPkg.State == channeldb.FwdStateProcessed &&
1,501✔
3251
                        fwdPkg.AckFilter.Contains(idx) {
1,501✔
3252

×
3253
                        // If this index is already found in the ack filter,
×
3254
                        // the response to this forwarding decision has already
×
3255
                        // been committed by one of our commitment txns. ADDs
×
3256
                        // in this state are waiting for the rest of the fwding
×
3257
                        // package to get acked before being garbage collected.
×
3258
                        continue
×
3259
                }
3260

3261
                // An incoming HTLC add has been full-locked in. As a result we
3262
                // can now examine the forwarding details of the HTLC, and the
3263
                // HTLC itself to decide if: we should forward it, cancel it,
3264
                // or are able to settle it (and it adheres to our fee related
3265
                // constraints).
3266

3267
                // Fetch the onion blob that was included within this processed
3268
                // payment descriptor.
3269
                var onionBlob [lnwire.OnionPacketSize]byte
1,501✔
3270
                copy(onionBlob[:], pd.OnionBlob)
1,501✔
3271

1,501✔
3272
                // Before adding the new htlc to the state machine, parse the
1,501✔
3273
                // onion object in order to obtain the routing information with
1,501✔
3274
                // DecodeHopIterator function which process the Sphinx packet.
1,501✔
3275
                chanIterator, failureCode := decodeResps[i].Result()
1,501✔
3276
                if failureCode != lnwire.CodeNone {
1,507✔
3277
                        // If we're unable to process the onion blob then we
6✔
3278
                        // should send the malformed htlc error to payment
6✔
3279
                        // sender.
6✔
3280
                        l.sendMalformedHTLCError(pd.HtlcIndex, failureCode,
6✔
3281
                                onionBlob[:], pd.SourceRef)
6✔
3282

6✔
3283
                        l.log.Errorf("unable to decode onion hop "+
6✔
3284
                                "iterator: %v", failureCode)
6✔
3285
                        continue
6✔
3286
                }
3287

3288
                heightNow := l.cfg.BestHeight()
1,499✔
3289

1,499✔
3290
                pld, routeRole, pldErr := chanIterator.HopPayload()
1,499✔
3291
                if pldErr != nil {
1,503✔
3292
                        // If we're unable to process the onion payload, or we
4✔
3293
                        // received invalid onion payload failure, then we
4✔
3294
                        // should send an error back to the caller so the HTLC
4✔
3295
                        // can be canceled.
4✔
3296
                        var failedType uint64
4✔
3297

4✔
3298
                        // We need to get the underlying error value, so we
4✔
3299
                        // can't use errors.As as suggested by the linter.
4✔
3300
                        //nolint:errorlint
4✔
3301
                        if e, ok := pldErr.(hop.ErrInvalidPayload); ok {
4✔
3302
                                failedType = uint64(e.Type)
×
3303
                        }
×
3304

3305
                        // If we couldn't parse the payload, make our best
3306
                        // effort at creating an error encrypter that knows
3307
                        // what blinding type we were, but if we couldn't
3308
                        // parse the payload we have no way of knowing whether
3309
                        // we were the introduction node or not.
3310
                        //
3311
                        //nolint:lll
3312
                        obfuscator, failCode := chanIterator.ExtractErrorEncrypter(
4✔
3313
                                l.cfg.ExtractErrorEncrypter,
4✔
3314
                                // We need our route role here because we
4✔
3315
                                // couldn't parse or validate the payload.
4✔
3316
                                routeRole == hop.RouteRoleIntroduction,
4✔
3317
                        )
4✔
3318
                        if failCode != lnwire.CodeNone {
4✔
3319
                                l.log.Errorf("could not extract error "+
×
3320
                                        "encrypter: %v", pldErr)
×
3321

×
3322
                                // We can't process this htlc, send back
×
3323
                                // malformed.
×
3324
                                l.sendMalformedHTLCError(
×
3325
                                        pd.HtlcIndex, failureCode,
×
3326
                                        onionBlob[:], pd.SourceRef,
×
3327
                                )
×
3328

×
3329
                                continue
×
3330
                        }
3331

3332
                        // TODO: currently none of the test unit infrastructure
3333
                        // is setup to handle TLV payloads, so testing this
3334
                        // would require implementing a separate mock iterator
3335
                        // for TLV payloads that also supports injecting invalid
3336
                        // payloads. Deferring this non-trival effort till a
3337
                        // later date
3338
                        failure := lnwire.NewInvalidOnionPayload(failedType, 0)
4✔
3339
                        l.sendHTLCError(
4✔
3340
                                pd, NewLinkError(failure), obfuscator, false,
4✔
3341
                        )
4✔
3342

4✔
3343
                        l.log.Errorf("unable to decode forwarding "+
4✔
3344
                                "instructions: %v", pldErr)
4✔
3345

4✔
3346
                        continue
4✔
3347
                }
3348

3349
                // Retrieve onion obfuscator from onion blob in order to
3350
                // produce initial obfuscation of the onion failureCode.
3351
                obfuscator, failureCode := chanIterator.ExtractErrorEncrypter(
1,499✔
3352
                        l.cfg.ExtractErrorEncrypter,
1,499✔
3353
                        routeRole == hop.RouteRoleIntroduction,
1,499✔
3354
                )
1,499✔
3355
                if failureCode != lnwire.CodeNone {
1,500✔
3356
                        // If we're unable to process the onion blob than we
1✔
3357
                        // should send the malformed htlc error to payment
1✔
3358
                        // sender.
1✔
3359
                        l.sendMalformedHTLCError(
1✔
3360
                                pd.HtlcIndex, failureCode, onionBlob[:],
1✔
3361
                                pd.SourceRef,
1✔
3362
                        )
1✔
3363

1✔
3364
                        l.log.Errorf("unable to decode onion "+
1✔
3365
                                "obfuscator: %v", failureCode)
1✔
3366

1✔
3367
                        continue
1✔
3368
                }
3369

3370
                fwdInfo := pld.ForwardingInfo()
1,498✔
3371

1,498✔
3372
                // Check whether the payload we've just processed uses our
1,498✔
3373
                // node as the introduction point (gave us a blinding key in
1,498✔
3374
                // the payload itself) and fail it back if we don't support
1,498✔
3375
                // route blinding.
1,498✔
3376
                if fwdInfo.NextBlinding.IsSome() &&
1,498✔
3377
                        l.cfg.DisallowRouteBlinding {
1,502✔
3378

4✔
3379
                        failure := lnwire.NewInvalidBlinding(
4✔
3380
                                onionBlob[:],
4✔
3381
                        )
4✔
3382
                        l.sendHTLCError(
4✔
3383
                                pd, NewLinkError(failure), obfuscator, false,
4✔
3384
                        )
4✔
3385

4✔
3386
                        l.log.Error("rejected htlc that uses use as an " +
4✔
3387
                                "introduction point when we do not support " +
4✔
3388
                                "route blinding")
4✔
3389

4✔
3390
                        continue
4✔
3391
                }
3392

3393
                switch fwdInfo.NextHop {
1,498✔
3394
                case hop.Exit:
1,462✔
3395
                        err := l.processExitHop(
1,462✔
3396
                                pd, obfuscator, fwdInfo, heightNow, pld,
1,462✔
3397
                        )
1,462✔
3398
                        if err != nil {
1,462✔
3399
                                l.fail(LinkFailureError{code: ErrInternalError},
×
3400
                                        err.Error(),
×
3401
                                )
×
3402

×
3403
                                return
×
3404
                        }
×
3405

3406
                // There are additional channels left within this route. So
3407
                // we'll simply do some forwarding package book-keeping.
3408
                default:
40✔
3409
                        // If hodl.AddIncoming is requested, we will not
40✔
3410
                        // validate the forwarded ADD, nor will we send the
40✔
3411
                        // packet to the htlc switch.
40✔
3412
                        if l.cfg.HodlMask.Active(hodl.AddIncoming) {
40✔
3413
                                l.log.Warnf(hodl.AddIncoming.Warning())
×
3414
                                continue
×
3415
                        }
3416

3417
                        switch fwdPkg.State {
40✔
3418
                        case channeldb.FwdStateProcessed:
4✔
3419
                                // This add was not forwarded on the previous
4✔
3420
                                // processing phase, run it through our
4✔
3421
                                // validation pipeline to reproduce an error.
4✔
3422
                                // This may trigger a different error due to
4✔
3423
                                // expiring timelocks, but we expect that an
4✔
3424
                                // error will be reproduced.
4✔
3425
                                if !fwdPkg.FwdFilter.Contains(idx) {
4✔
3426
                                        break
×
3427
                                }
3428

3429
                                // Otherwise, it was already processed, we can
3430
                                // can collect it and continue.
3431
                                addMsg := &lnwire.UpdateAddHTLC{
4✔
3432
                                        Expiry:        fwdInfo.OutgoingCTLV,
4✔
3433
                                        Amount:        fwdInfo.AmountToForward,
4✔
3434
                                        PaymentHash:   pd.RHash,
4✔
3435
                                        BlindingPoint: fwdInfo.NextBlinding,
4✔
3436
                                }
4✔
3437

4✔
3438
                                // Finally, we'll encode the onion packet for
4✔
3439
                                // the _next_ hop using the hop iterator
4✔
3440
                                // decoded for the current hop.
4✔
3441
                                buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
4✔
3442

4✔
3443
                                // We know this cannot fail, as this ADD
4✔
3444
                                // was marked forwarded in a previous
4✔
3445
                                // round of processing.
4✔
3446
                                chanIterator.EncodeNextHop(buf)
4✔
3447

4✔
3448
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
4✔
3449

4✔
3450
                                updatePacket := &htlcPacket{
4✔
3451
                                        incomingChanID:  l.ShortChanID(),
4✔
3452
                                        incomingHTLCID:  pd.HtlcIndex,
4✔
3453
                                        outgoingChanID:  fwdInfo.NextHop,
4✔
3454
                                        sourceRef:       pd.SourceRef,
4✔
3455
                                        incomingAmount:  pd.Amount,
4✔
3456
                                        amount:          addMsg.Amount,
4✔
3457
                                        htlc:            addMsg,
4✔
3458
                                        obfuscator:      obfuscator,
4✔
3459
                                        incomingTimeout: pd.Timeout,
4✔
3460
                                        outgoingTimeout: fwdInfo.OutgoingCTLV,
4✔
3461
                                        customRecords:   pld.CustomRecords(),
4✔
3462
                                        inboundFee:      inboundFee,
4✔
3463
                                }
4✔
3464
                                switchPackets = append(
4✔
3465
                                        switchPackets, updatePacket,
4✔
3466
                                )
4✔
3467

4✔
3468
                                continue
4✔
3469
                        }
3470

3471
                        // TODO(roasbeef): ensure don't accept outrageous
3472
                        // timeout for htlc
3473

3474
                        // With all our forwarding constraints met, we'll
3475
                        // create the outgoing HTLC using the parameters as
3476
                        // specified in the forwarding info.
3477
                        addMsg := &lnwire.UpdateAddHTLC{
40✔
3478
                                Expiry:        fwdInfo.OutgoingCTLV,
40✔
3479
                                Amount:        fwdInfo.AmountToForward,
40✔
3480
                                PaymentHash:   pd.RHash,
40✔
3481
                                BlindingPoint: fwdInfo.NextBlinding,
40✔
3482
                        }
40✔
3483

40✔
3484
                        // Finally, we'll encode the onion packet for the
40✔
3485
                        // _next_ hop using the hop iterator decoded for the
40✔
3486
                        // current hop.
40✔
3487
                        buf := bytes.NewBuffer(addMsg.OnionBlob[0:0])
40✔
3488
                        err := chanIterator.EncodeNextHop(buf)
40✔
3489
                        if err != nil {
40✔
3490
                                l.log.Errorf("unable to encode the "+
×
3491
                                        "remaining route %v", err)
×
3492

×
3493
                                cb := func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
×
3494
                                        return lnwire.NewTemporaryChannelFailure(upd)
×
3495
                                }
×
3496

3497
                                failure := l.createFailureWithUpdate(
×
3498
                                        true, hop.Source, cb,
×
3499
                                )
×
3500

×
3501
                                l.sendHTLCError(
×
3502
                                        pd, NewLinkError(failure), obfuscator, false,
×
3503
                                )
×
3504
                                continue
×
3505
                        }
3506

3507
                        // Now that this add has been reprocessed, only append
3508
                        // it to our list of packets to forward to the switch
3509
                        // this is the first time processing the add. If the
3510
                        // fwd pkg has already been processed, then we entered
3511
                        // the above section to recreate a previous error.  If
3512
                        // the packet had previously been forwarded, it would
3513
                        // have been added to switchPackets at the top of this
3514
                        // section.
3515
                        if fwdPkg.State == channeldb.FwdStateLockedIn {
80✔
3516
                                inboundFee := l.cfg.FwrdingPolicy.InboundFee
40✔
3517

40✔
3518
                                updatePacket := &htlcPacket{
40✔
3519
                                        incomingChanID:  l.ShortChanID(),
40✔
3520
                                        incomingHTLCID:  pd.HtlcIndex,
40✔
3521
                                        outgoingChanID:  fwdInfo.NextHop,
40✔
3522
                                        sourceRef:       pd.SourceRef,
40✔
3523
                                        incomingAmount:  pd.Amount,
40✔
3524
                                        amount:          addMsg.Amount,
40✔
3525
                                        htlc:            addMsg,
40✔
3526
                                        obfuscator:      obfuscator,
40✔
3527
                                        incomingTimeout: pd.Timeout,
40✔
3528
                                        outgoingTimeout: fwdInfo.OutgoingCTLV,
40✔
3529
                                        customRecords:   pld.CustomRecords(),
40✔
3530
                                        inboundFee:      inboundFee,
40✔
3531
                                }
40✔
3532

40✔
3533
                                fwdPkg.FwdFilter.Set(idx)
40✔
3534
                                switchPackets = append(switchPackets,
40✔
3535
                                        updatePacket)
40✔
3536
                        }
40✔
3537
                }
3538
        }
3539

3540
        // Commit the htlcs we are intending to forward if this package has not
3541
        // been fully processed.
3542
        if fwdPkg.State == channeldb.FwdStateLockedIn {
4,719✔
3543
                err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
2,358✔
3544
                if err != nil {
2,358✔
3545
                        l.fail(LinkFailureError{code: ErrInternalError},
×
3546
                                "unable to set fwd filter: %v", err)
×
3547
                        return
×
3548
                }
×
3549
        }
3550

3551
        if len(switchPackets) == 0 {
4,686✔
3552
                return
2,325✔
3553
        }
2,325✔
3554

3555
        replay := fwdPkg.State != channeldb.FwdStateLockedIn
40✔
3556

40✔
3557
        l.log.Debugf("forwarding %d packets to switch: replay=%v",
40✔
3558
                len(switchPackets), replay)
40✔
3559

40✔
3560
        // NOTE: This call is made synchronous so that we ensure all circuits
40✔
3561
        // are committed in the exact order that they are processed in the link.
40✔
3562
        // Failing to do this could cause reorderings/gaps in the range of
40✔
3563
        // opened circuits, which violates assumptions made by the circuit
40✔
3564
        // trimming.
40✔
3565
        l.forwardBatch(replay, switchPackets...)
40✔
3566
}
3567

3568
// processExitHop handles an htlc for which this link is the exit hop. It
3569
// returns a boolean indicating whether the commitment tx needs an update.
3570
func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
3571
        obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo,
3572
        heightNow uint32, payload invoices.Payload) error {
1,462✔
3573

1,462✔
3574
        // If hodl.ExitSettle is requested, we will not validate the final hop's
1,462✔
3575
        // ADD, nor will we settle the corresponding invoice or respond with the
1,462✔
3576
        // preimage.
1,462✔
3577
        if l.cfg.HodlMask.Active(hodl.ExitSettle) {
2,188✔
3578
                l.log.Warnf(hodl.ExitSettle.Warning())
726✔
3579

726✔
3580
                return nil
726✔
3581
        }
726✔
3582

3583
        // As we're the exit hop, we'll double check the hop-payload included in
3584
        // the HTLC to ensure that it was crafted correctly by the sender and
3585
        // is compatible with the HTLC we were extended.
3586
        if pd.Amount < fwdInfo.AmountToForward {
840✔
3587
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
100✔
3588
                        "incompatible value: expected <=%v, got %v", pd.RHash,
100✔
3589
                        pd.Amount, fwdInfo.AmountToForward)
100✔
3590

100✔
3591
                failure := NewLinkError(
100✔
3592
                        lnwire.NewFinalIncorrectHtlcAmount(pd.Amount),
100✔
3593
                )
100✔
3594
                l.sendHTLCError(pd, failure, obfuscator, true)
100✔
3595

100✔
3596
                return nil
100✔
3597
        }
100✔
3598

3599
        // We'll also ensure that our time-lock value has been computed
3600
        // correctly.
3601
        if pd.Timeout < fwdInfo.OutgoingCTLV {
641✔
3602
                l.log.Errorf("onion payload of incoming htlc(%x) has "+
1✔
3603
                        "incompatible time-lock: expected <=%v, got %v",
1✔
3604
                        pd.RHash[:], pd.Timeout, fwdInfo.OutgoingCTLV)
1✔
3605

1✔
3606
                failure := NewLinkError(
1✔
3607
                        lnwire.NewFinalIncorrectCltvExpiry(pd.Timeout),
1✔
3608
                )
1✔
3609
                l.sendHTLCError(pd, failure, obfuscator, true)
1✔
3610

1✔
3611
                return nil
1✔
3612
        }
1✔
3613

3614
        // Notify the invoiceRegistry of the exit hop htlc. If we crash right
3615
        // after this, this code will be re-executed after restart. We will
3616
        // receive back a resolution event.
3617
        invoiceHash := lntypes.Hash(pd.RHash)
639✔
3618

639✔
3619
        circuitKey := models.CircuitKey{
639✔
3620
                ChanID: l.ShortChanID(),
639✔
3621
                HtlcID: pd.HtlcIndex,
639✔
3622
        }
639✔
3623

639✔
3624
        event, err := l.cfg.Registry.NotifyExitHopHtlc(
639✔
3625
                invoiceHash, pd.Amount, pd.Timeout, int32(heightNow),
639✔
3626
                circuitKey, l.hodlQueue.ChanIn(), payload,
639✔
3627
        )
639✔
3628
        if err != nil {
639✔
3629
                return err
×
3630
        }
×
3631

3632
        // Create a hodlHtlc struct and decide either resolved now or later.
3633
        htlc := hodlHtlc{
639✔
3634
                pd:         pd,
639✔
3635
                obfuscator: obfuscator,
639✔
3636
        }
639✔
3637

639✔
3638
        // If the event is nil, the invoice is being held, so we save payment
639✔
3639
        // descriptor for future reference.
639✔
3640
        if event == nil {
1,132✔
3641
                l.hodlMap[circuitKey] = htlc
493✔
3642
                return nil
493✔
3643
        }
493✔
3644

3645
        // Process the received resolution.
3646
        return l.processHtlcResolution(event, htlc)
150✔
3647
}
3648

3649
// settleHTLC settles the HTLC on the channel.
3650
func (l *channelLink) settleHTLC(preimage lntypes.Preimage,
3651
        pd *lnwallet.PaymentDescriptor) error {
634✔
3652

634✔
3653
        hash := preimage.Hash()
634✔
3654

634✔
3655
        l.log.Infof("settling htlc %v as exit hop", hash)
634✔
3656

634✔
3657
        err := l.channel.SettleHTLC(
634✔
3658
                preimage, pd.HtlcIndex, pd.SourceRef, nil, nil,
634✔
3659
        )
634✔
3660
        if err != nil {
634✔
3661
                return fmt.Errorf("unable to settle htlc: %w", err)
×
3662
        }
×
3663

3664
        // If the link is in hodl.BogusSettle mode, replace the preimage with a
3665
        // fake one before sending it to the peer.
3666
        if l.cfg.HodlMask.Active(hodl.BogusSettle) {
638✔
3667
                l.log.Warnf(hodl.BogusSettle.Warning())
4✔
3668
                preimage = [32]byte{}
4✔
3669
                copy(preimage[:], bytes.Repeat([]byte{2}, 32))
4✔
3670
        }
4✔
3671

3672
        // HTLC was successfully settled locally send notification about it
3673
        // remote peer.
3674
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFulfillHTLC{
634✔
3675
                ChanID:          l.ChanID(),
634✔
3676
                ID:              pd.HtlcIndex,
634✔
3677
                PaymentPreimage: preimage,
634✔
3678
        })
634✔
3679

634✔
3680
        // Once we have successfully settled the htlc, notify a settle event.
634✔
3681
        l.cfg.HtlcNotifier.NotifySettleEvent(
634✔
3682
                HtlcKey{
634✔
3683
                        IncomingCircuit: models.CircuitKey{
634✔
3684
                                ChanID: l.ShortChanID(),
634✔
3685
                                HtlcID: pd.HtlcIndex,
634✔
3686
                        },
634✔
3687
                },
634✔
3688
                preimage,
634✔
3689
                HtlcEventTypeReceive,
634✔
3690
        )
634✔
3691

634✔
3692
        return nil
634✔
3693
}
3694

3695
// forwardBatch forwards the given htlcPackets to the switch, and waits on the
3696
// err chan for the individual responses. This method is intended to be spawned
3697
// as a goroutine so the responses can be handled in the background.
3698
func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
1,448✔
3699
        // Don't forward packets for which we already have a response in our
1,448✔
3700
        // mailbox. This could happen if a packet fails and is buffered in the
1,448✔
3701
        // mailbox, and the incoming link flaps.
1,448✔
3702
        var filteredPkts = make([]*htlcPacket, 0, len(packets))
1,448✔
3703
        for _, pkt := range packets {
2,896✔
3704
                if l.mailBox.HasPacket(pkt.inKey()) {
1,452✔
3705
                        continue
4✔
3706
                }
3707

3708
                filteredPkts = append(filteredPkts, pkt)
1,448✔
3709
        }
3710

3711
        err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
1,448✔
3712
        if err != nil {
1,459✔
3713
                log.Errorf("Unhandled error while reforwarding htlc "+
11✔
3714
                        "settle/fail over htlcswitch: %v", err)
11✔
3715
        }
11✔
3716
}
3717

3718
// sendHTLCError functions cancels HTLC and send cancel message back to the
3719
// peer from which HTLC was received.
3720
func (l *channelLink) sendHTLCError(pd *lnwallet.PaymentDescriptor,
3721
        failure *LinkError, e hop.ErrorEncrypter, isReceive bool) {
109✔
3722

109✔
3723
        reason, err := e.EncryptFirstHop(failure.WireMessage())
109✔
3724
        if err != nil {
109✔
3725
                l.log.Errorf("unable to obfuscate error: %v", err)
×
3726
                return
×
3727
        }
×
3728

3729
        err = l.channel.FailHTLC(pd.HtlcIndex, reason, pd.SourceRef, nil, nil)
109✔
3730
        if err != nil {
109✔
3731
                l.log.Errorf("unable cancel htlc: %v", err)
×
3732
                return
×
3733
        }
×
3734

3735
        // Send the appropriate failure message depending on whether we're
3736
        // in a blinded route or not.
3737
        if err := l.sendIncomingHTLCFailureMsg(
109✔
3738
                pd.HtlcIndex, e, reason,
109✔
3739
        ); err != nil {
109✔
3740
                l.log.Errorf("unable to send HTLC failure: %v", err)
×
3741
                return
×
3742
        }
×
3743

3744
        // Notify a link failure on our incoming link. Outgoing htlc information
3745
        // is not available at this point, because we have not decrypted the
3746
        // onion, so it is excluded.
3747
        var eventType HtlcEventType
109✔
3748
        if isReceive {
218✔
3749
                eventType = HtlcEventTypeReceive
109✔
3750
        } else {
113✔
3751
                eventType = HtlcEventTypeForward
4✔
3752
        }
4✔
3753

3754
        l.cfg.HtlcNotifier.NotifyLinkFailEvent(
109✔
3755
                HtlcKey{
109✔
3756
                        IncomingCircuit: models.CircuitKey{
109✔
3757
                                ChanID: l.ShortChanID(),
109✔
3758
                                HtlcID: pd.HtlcIndex,
109✔
3759
                        },
109✔
3760
                },
109✔
3761
                HtlcInfo{
109✔
3762
                        IncomingTimeLock: pd.Timeout,
109✔
3763
                        IncomingAmt:      pd.Amount,
109✔
3764
                },
109✔
3765
                eventType,
109✔
3766
                failure,
109✔
3767
                true,
109✔
3768
        )
109✔
3769
}
3770

3771
// sendPeerHTLCFailure handles sending a HTLC failure message back to the
3772
// peer from which the HTLC was received. This function is primarily used to
3773
// handle the special requirements of route blinding, specifically:
3774
// - Forwarding nodes must switch out any errors with MalformedFailHTLC
3775
// - Introduction nodes should return regular HTLC failure messages.
3776
//
3777
// It accepts the original opaque failure, which will be used in the case
3778
// that we're not part of a blinded route and an error encrypter that'll be
3779
// used if we are the introduction node and need to present an error as if
3780
// we're the failing party.
3781
//
3782
// Note: this function does not yet handle special error cases for receiving
3783
// nodes in blinded paths, as LND does not support blinded receives.
3784
func (l *channelLink) sendIncomingHTLCFailureMsg(htlcIndex uint64,
3785
        e hop.ErrorEncrypter,
3786
        originalFailure lnwire.OpaqueReason) error {
125✔
3787

125✔
3788
        var msg lnwire.Message
125✔
3789
        switch {
125✔
3790
        // Our circuit's error encrypter will be nil if this was a locally
3791
        // initiated payment. We can only hit a blinded error for a locally
3792
        // initiated payment if we allow ourselves to be picked as the
3793
        // introduction node for our own payments and in that case we
3794
        // shouldn't reach this code. To prevent the HTLC getting stuck,
3795
        // we fail it back and log an error.
3796
        // code.
3797
        case e == nil:
×
3798
                msg = &lnwire.UpdateFailHTLC{
×
3799
                        ChanID: l.ChanID(),
×
3800
                        ID:     htlcIndex,
×
3801
                        Reason: originalFailure,
×
3802
                }
×
3803

×
3804
                l.log.Errorf("Unexpected blinded failure when "+
×
3805
                        "we are the sending node, incoming htlc: %v(%v)",
×
3806
                        l.ShortChanID(), htlcIndex)
×
3807

3808
        // For cleartext hops (ie, non-blinded/normal) we don't need any
3809
        // transformation on the error message and can just send the original.
3810
        case !e.Type().IsBlinded():
125✔
3811
                msg = &lnwire.UpdateFailHTLC{
125✔
3812
                        ChanID: l.ChanID(),
125✔
3813
                        ID:     htlcIndex,
125✔
3814
                        Reason: originalFailure,
125✔
3815
                }
125✔
3816

3817
        // When we're the introduction node, we need to convert the error to
3818
        // a UpdateFailHTLC.
3819
        case e.Type() == hop.EncrypterTypeIntroduction:
4✔
3820
                l.log.Debugf("Introduction blinded node switching out failure "+
4✔
3821
                        "error: %v", htlcIndex)
4✔
3822

4✔
3823
                // The specification does not require that we set the onion
4✔
3824
                // blob.
4✔
3825
                failureMsg := lnwire.NewInvalidBlinding(nil)
4✔
3826
                reason, err := e.EncryptFirstHop(failureMsg)
4✔
3827
                if err != nil {
4✔
3828
                        return err
×
3829
                }
×
3830

3831
                msg = &lnwire.UpdateFailHTLC{
4✔
3832
                        ChanID: l.ChanID(),
4✔
3833
                        ID:     htlcIndex,
4✔
3834
                        Reason: reason,
4✔
3835
                }
4✔
3836

3837
        // If we are a relaying node, we need to switch out any error that
3838
        // we've received to a malformed HTLC error.
3839
        case e.Type() == hop.EncrypterTypeRelaying:
4✔
3840
                l.log.Debugf("Relaying blinded node switching out malformed "+
4✔
3841
                        "error: %v", htlcIndex)
4✔
3842

4✔
3843
                msg = &lnwire.UpdateFailMalformedHTLC{
4✔
3844
                        ChanID:      l.ChanID(),
4✔
3845
                        ID:          htlcIndex,
4✔
3846
                        FailureCode: lnwire.CodeInvalidBlinding,
4✔
3847
                }
4✔
3848

3849
        default:
×
3850
                return fmt.Errorf("unexpected encrypter: %d", e)
×
3851
        }
3852

3853
        if err := l.cfg.Peer.SendMessage(false, msg); err != nil {
125✔
3854
                l.log.Warnf("Send update fail failed: %v", err)
×
3855
        }
×
3856

3857
        return nil
125✔
3858
}
3859

3860
// sendMalformedHTLCError helper function which sends the malformed HTLC update
3861
// to the payment sender.
3862
func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
3863
        code lnwire.FailCode, onionBlob []byte, sourceRef *channeldb.AddRef) {
7✔
3864

7✔
3865
        shaOnionBlob := sha256.Sum256(onionBlob)
7✔
3866
        err := l.channel.MalformedFailHTLC(htlcIndex, code, shaOnionBlob, sourceRef)
7✔
3867
        if err != nil {
7✔
3868
                l.log.Errorf("unable cancel htlc: %v", err)
×
3869
                return
×
3870
        }
×
3871

3872
        l.cfg.Peer.SendMessage(false, &lnwire.UpdateFailMalformedHTLC{
7✔
3873
                ChanID:       l.ChanID(),
7✔
3874
                ID:           htlcIndex,
7✔
3875
                ShaOnionBlob: shaOnionBlob,
7✔
3876
                FailureCode:  code,
7✔
3877
        })
7✔
3878
}
3879

3880
// fail is a function which is used to encapsulate the action necessary for
3881
// properly failing the link. It takes a LinkFailureError, which will be passed
3882
// to the OnChannelFailure closure, in order for it to determine if we should
3883
// force close the channel, and if we should send an error message to the
3884
// remote peer.
3885
func (l *channelLink) fail(linkErr LinkFailureError,
3886
        format string, a ...interface{}) {
12✔
3887
        reason := errors.Errorf(format, a...)
12✔
3888

12✔
3889
        // Return if we have already notified about a failure.
12✔
3890
        if l.failed {
12✔
3891
                l.log.Warnf("ignoring link failure (%v), as link already "+
×
3892
                        "failed", reason)
×
3893
                return
×
3894
        }
×
3895

3896
        l.log.Errorf("failing link: %s with error: %v", reason, linkErr)
12✔
3897

12✔
3898
        // Set failed, such that we won't process any more updates, and notify
12✔
3899
        // the peer about the failure.
12✔
3900
        l.failed = true
12✔
3901
        l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
12✔
3902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc