• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13536249039

26 Feb 2025 03:42AM UTC coverage: 57.462% (-1.4%) from 58.835%
13536249039

Pull #8453

github

Roasbeef
peer: update chooseDeliveryScript to gen script if needed

In this commit, we update `chooseDeliveryScript` to generate a new
script if needed. This allows us to fold in a few other lines that
always followed this function into this expanded function.

The tests have been updated accordingly.
Pull Request #8453: [4/4] - multi: integrate new rbf coop close FSM into the existing peer flow

275 of 1318 new or added lines in 22 files covered. (20.86%)

19521 existing lines in 257 files now uncovered.

103858 of 180741 relevant lines covered (57.46%)

24750.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.79
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math/rand"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightningnetwork/lnd/chainntnfs"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/clock"
20
        "github.com/lightningnetwork/lnd/contractcourt"
21
        "github.com/lightningnetwork/lnd/fn/v2"
22
        "github.com/lightningnetwork/lnd/graph/db/models"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/kvdb"
25
        "github.com/lightningnetwork/lnd/lntypes"
26
        "github.com/lightningnetwork/lnd/lnutils"
27
        "github.com/lightningnetwork/lnd/lnwallet"
28
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/ticker"
31
)
32

33
const (
34
        // DefaultFwdEventInterval is the duration between attempts to flush
35
        // pending forwarding events to disk.
36
        DefaultFwdEventInterval = 15 * time.Second
37

38
        // DefaultLogInterval is the duration between attempts to log statistics
39
        // about forwarding events.
40
        DefaultLogInterval = 10 * time.Second
41

42
        // DefaultAckInterval is the duration between attempts to ack any settle
43
        // fails in a forwarding package.
44
        DefaultAckInterval = 15 * time.Second
45

46
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
47
        // be cancelled if they could not get added to an outgoing commitment.
48
        DefaultMailboxDeliveryTimeout = time.Minute
49
)
50

51
var (
52
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
53
        ErrChannelLinkNotFound = errors.New("channel link not found")
54

55
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
56
        // through the switch and is locked into another commitment txn.
57
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
58

59
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
60
        // error decryptor for this payment. This is likely due to restarting
61
        // the daemon.
62
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
63

64
        // ErrSwitchExiting signaled when the switch has received a shutdown
65
        // request.
66
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
67

68
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
69
        // active links in the switch for a specific destination.
70
        ErrNoLinksFound = errors.New("no channel links found")
71

72
        // ErrUnreadableFailureMessage is returned when the failure message
73
        // cannot be decrypted.
74
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
75

76
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
77
        // failed to be processed.
78
        ErrLocalAddFailed = errors.New("local add HTLC failed")
79

80
        // errFeeExposureExceeded is only surfaced to callers of SendHTLC and
81
        // signals that sending the HTLC would exceed the outgoing link's fee
82
        // exposure threshold.
83
        errFeeExposureExceeded = errors.New("fee exposure exceeded")
84

85
        // DefaultMaxFeeExposure is the default threshold after which we'll
86
        // fail payments if they increase our fee exposure. This is currently
87
        // set to 500m msats.
88
        DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000)
89
)
90

91
// plexPacket encapsulates switch packet and adds error channel to receive
92
// error from request handler.
93
type plexPacket struct {
94
        pkt *htlcPacket
95
        err chan error
96
}
97

98
// ChanClose represents a request which close a particular channel specified by
99
// its id.
100
type ChanClose struct {
101
        // CloseType is a variable which signals the type of channel closure the
102
        // peer should execute.
103
        CloseType contractcourt.ChannelCloseType
104

105
        // ChanPoint represent the id of the channel which should be closed.
106
        ChanPoint *wire.OutPoint
107

108
        // TargetFeePerKw is the ideal fee that was specified by the caller.
109
        // This value is only utilized if the closure type is CloseRegular.
110
        // This will be the starting offered fee when the fee negotiation
111
        // process for the cooperative closure transaction kicks off.
112
        TargetFeePerKw chainfee.SatPerKWeight
113

114
        // MaxFee is the highest fee the caller is willing to pay.
115
        //
116
        // NOTE: This field is only respected if the caller is the initiator of
117
        // the channel.
118
        MaxFee chainfee.SatPerKWeight
119

120
        // DeliveryScript is an optional delivery script to pay funds out to.
121
        DeliveryScript lnwire.DeliveryAddress
122

123
        // Updates is used by request creator to receive the notifications about
124
        // execution of the close channel request.
125
        Updates chan interface{}
126

127
        // Err is used by request creator to receive request execution error.
128
        Err chan error
129

130
        // Ctx is a context linked to the lifetime of the caller.
131
        Ctx context.Context //nolint:containedctx
132
}
133

134
// Config defines the configuration for the service. ALL elements within the
135
// configuration MUST be non-nil for the service to carry out its duties.
136
type Config struct {
137
        // FwdingLog is an interface that will be used by the switch to log
138
        // forwarding events. A forwarding event happens each time a payment
139
        // circuit is successfully completed. So when we forward an HTLC, and a
140
        // settle is eventually received.
141
        FwdingLog ForwardingLog
142

143
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
144
        // forced unilateral closure of the channel initiated by a local
145
        // subsystem.
146
        LocalChannelClose func(pubKey []byte, request *ChanClose)
147

148
        // DB is the database backend that will be used to back the switch's
149
        // persistent circuit map.
150
        DB kvdb.Backend
151

152
        // FetchAllOpenChannels is a function that fetches all currently open
153
        // channels from the channel database.
154
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
155

156
        // FetchAllChannels is a function that fetches all pending open, open,
157
        // and waiting close channels from the database.
158
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
159

160
        // FetchClosedChannels is a function that fetches all closed channels
161
        // from the channel database.
162
        FetchClosedChannels func(
163
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
164

165
        // SwitchPackager provides access to the forwarding packages of all
166
        // active channels. This gives the switch the ability to read arbitrary
167
        // forwarding packages, and ack settles and fails contained within them.
168
        SwitchPackager channeldb.FwdOperator
169

170
        // ExtractErrorEncrypter is an interface allowing switch to reextract
171
        // error encrypters stored in the circuit map on restarts, since they
172
        // are not stored directly within the database.
173
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
174

175
        // FetchLastChannelUpdate retrieves the latest routing policy for a
176
        // target channel. This channel will typically be the outgoing channel
177
        // specified when we receive an incoming HTLC.  This will be used to
178
        // provide payment senders our latest policy when sending encrypted
179
        // error messages.
180
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
181
                *lnwire.ChannelUpdate1, error)
182

183
        // Notifier is an instance of a chain notifier that we'll use to signal
184
        // the switch when a new block has arrived.
185
        Notifier chainntnfs.ChainNotifier
186

187
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
188
        // events through.
189
        HtlcNotifier htlcNotifier
190

191
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
192
        // pending forwarding events.
193
        FwdEventTicker ticker.Ticker
194

195
        // LogEventTicker is a signal instructing the htlcswitch to log
196
        // aggregate stats about it's forwarding during the last interval.
197
        LogEventTicker ticker.Ticker
198

199
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
200
        // fails in forwarding packages.
201
        AckEventTicker ticker.Ticker
202

203
        // AllowCircularRoute is true if the user has configured their node to
204
        // allow forwards that arrive and depart our node over the same channel.
205
        AllowCircularRoute bool
206

207
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
208
        // HTLCs that are not from the source hop.
209
        RejectHTLC bool
210

211
        // Clock is a time source for the switch.
212
        Clock clock.Clock
213

214
        // MailboxDeliveryTimeout is the interval after which Adds will be
215
        // cancelled if they have not been yet been delivered to a link. The
216
        // computed deadline will expiry this long after the Adds are added to
217
        // a mailbox via AddPacket.
218
        MailboxDeliveryTimeout time.Duration
219

220
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
221
        // fail incoming or outgoing payments for a particular channel.
222
        MaxFeeExposure lnwire.MilliSatoshi
223

224
        // SignAliasUpdate is used when sending FailureMessages backwards for
225
        // option_scid_alias channels. This avoids a potential privacy leak by
226
        // replacing the public, confirmed SCID with the alias in the
227
        // ChannelUpdate.
228
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
229
                error)
230

231
        // IsAlias returns whether or not a given SCID is an alias.
232
        IsAlias func(scid lnwire.ShortChannelID) bool
233
}
234

235
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
236
// Connected peers with active channels are treated as named interfaces which
237
// refer to active channels as links. A link is the switch's message
238
// communication point with the goroutine that manages an active channel. New
239
// links are registered each time a channel is created, and unregistered once
240
// the channel is closed. The switch manages the hand-off process for multi-hop
241
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
242
// notifies users local-systems concerning their outstanding payment requests.
243
type Switch struct {
244
        started  int32 // To be used atomically.
245
        shutdown int32 // To be used atomically.
246

247
        // bestHeight is the best known height of the main chain. The links will
248
        // be used this information to govern decisions based on HTLC timeouts.
249
        // This will be retrieved by the registered links atomically.
250
        bestHeight uint32
251

252
        wg   sync.WaitGroup
253
        quit chan struct{}
254

255
        // cfg is a copy of the configuration struct that the htlc switch
256
        // service was initialized with.
257
        cfg *Config
258

259
        // networkResults stores the results of payments initiated by the user.
260
        // The store is used to later look up the payments and notify the
261
        // user of the result when they are complete. Each payment attempt
262
        // should be given a unique integer ID when it is created, otherwise
263
        // results might be overwritten.
264
        networkResults *networkResultStore
265

266
        // circuits is storage for payment circuits which are used to
267
        // forward the settle/fail htlc updates back to the add htlc initiator.
268
        circuits CircuitMap
269

270
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
271
        // the switch, and facilitates delayed delivery of packets to links that
272
        // later come online.
273
        mailOrchestrator *mailOrchestrator
274

275
        // indexMtx is a read/write mutex that protects the set of indexes
276
        // below.
277
        indexMtx sync.RWMutex
278

279
        // pendingLinkIndex holds links that have not had their final, live
280
        // short_chan_id assigned.
281
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
282

283
        // links is a map of channel id and channel link which manages
284
        // this channel.
285
        linkIndex map[lnwire.ChannelID]ChannelLink
286

287
        // forwardingIndex is an index which is consulted by the switch when it
288
        // needs to locate the next hop to forward an incoming/outgoing HTLC
289
        // update to/from.
290
        //
291
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
292
        // ChannelLink
293
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
294

295
        // interfaceIndex maps the compressed public key of a peer to all the
296
        // channels that the switch maintains with that peer.
297
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
298

299
        // linkStopIndex stores the currently stopping ChannelLinks,
300
        // represented by their ChannelID. The key is the link's ChannelID and
301
        // the value is a chan that is closed when the link has fully stopped.
302
        // This map is only added to if RemoveLink is called and is not added
303
        // to when the Switch is shutting down and calls Stop() on each link.
304
        //
305
        // MUST be used with the indexMtx.
306
        linkStopIndex map[lnwire.ChannelID]chan struct{}
307

308
        // htlcPlex is the channel which all connected links use to coordinate
309
        // the setup/teardown of Sphinx (onion routing) payment circuits.
310
        // Active links forward any add/settle messages over this channel each
311
        // state transition, sending new adds/settles which are fully locked
312
        // in.
313
        htlcPlex chan *plexPacket
314

315
        // chanCloseRequests is used to transfer the channel close request to
316
        // the channel close handler.
317
        chanCloseRequests chan *ChanClose
318

319
        // resolutionMsgs is the channel that all external contract resolution
320
        // messages will be sent over.
321
        resolutionMsgs chan *resolutionMsg
322

323
        // pendingFwdingEvents is the set of forwarding events which have been
324
        // collected during the current interval, but hasn't yet been written
325
        // to the forwarding log.
326
        fwdEventMtx         sync.Mutex
327
        pendingFwdingEvents []channeldb.ForwardingEvent
328

329
        // blockEpochStream is an active block epoch event stream backed by an
330
        // active ChainNotifier instance. This will be used to retrieve the
331
        // latest height of the chain.
332
        blockEpochStream *chainntnfs.BlockEpochEvent
333

334
        // pendingSettleFails is the set of settle/fail entries that we need to
335
        // ack in the forwarding package of the outgoing link. This was added to
336
        // make pipelining settles more efficient.
337
        pendingSettleFails []channeldb.SettleFailRef
338

339
        // resMsgStore is used to store the set of ResolutionMsg that come from
340
        // contractcourt. This is used so the Switch can properly forward them,
341
        // even on restarts.
342
        resMsgStore *resolutionStore
343

344
        // aliasToReal is a map used for option-scid-alias feature-bit links.
345
        // The alias SCID is the key and the real, confirmed SCID is the value.
346
        // If the channel is unconfirmed, there will not be a mapping for it.
347
        // Since channels can have multiple aliases, this map is essentially a
348
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
349
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
350

351
        // baseIndex is a map used for option-scid-alias feature-bit links.
352
        // The value is the SCID of the link's ShortChannelID. This value may
353
        // be an alias for zero-conf channels or a confirmed SCID for
354
        // non-zero-conf channels with the option-scid-alias feature-bit. The
355
        // key includes the value itself and also any other aliases. This MUST
356
        // be accessed with the indexMtx.
357
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
358
}
359

360
// New creates the new instance of htlc switch.
361
func New(cfg Config, currentHeight uint32) (*Switch, error) {
341✔
362
        resStore := newResolutionStore(cfg.DB)
341✔
363

341✔
364
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
341✔
365
                DB:                    cfg.DB,
341✔
366
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
341✔
367
                FetchClosedChannels:   cfg.FetchClosedChannels,
341✔
368
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
341✔
369
                CheckResolutionMsg:    resStore.checkResolutionMsg,
341✔
370
        })
341✔
371
        if err != nil {
341✔
372
                return nil, err
×
373
        }
×
374

375
        s := &Switch{
341✔
376
                bestHeight:        currentHeight,
341✔
377
                cfg:               &cfg,
341✔
378
                circuits:          circuitMap,
341✔
379
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
341✔
380
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
341✔
381
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
341✔
382
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
341✔
383
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
341✔
384
                networkResults:    newNetworkResultStore(cfg.DB),
341✔
385
                htlcPlex:          make(chan *plexPacket),
341✔
386
                chanCloseRequests: make(chan *ChanClose),
341✔
387
                resolutionMsgs:    make(chan *resolutionMsg),
341✔
388
                resMsgStore:       resStore,
341✔
389
                quit:              make(chan struct{}),
341✔
390
        }
341✔
391

341✔
392
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
341✔
393
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
341✔
394

341✔
395
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
341✔
396
                forwardPackets:    s.ForwardPackets,
341✔
397
                clock:             s.cfg.Clock,
341✔
398
                expiry:            s.cfg.MailboxDeliveryTimeout,
341✔
399
                failMailboxUpdate: s.failMailboxUpdate,
341✔
400
        })
341✔
401

341✔
402
        return s, nil
341✔
403
}
404

405
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
406
// channel. We'll use this channel to synchronize delivery of the message with
407
// the caller.
408
type resolutionMsg struct {
409
        contractcourt.ResolutionMsg
410

411
        errChan chan error
412
}
413

414
// ProcessContractResolution is called by active contract resolvers once a
415
// contract they are watching over has been fully resolved. The message carries
416
// an external signal that *would* have been sent if the outgoing channel
417
// didn't need to go to the chain in order to fulfill a contract. We'll process
418
// this message just as if it came from an active outgoing channel.
419
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
1✔
420
        errChan := make(chan error, 1)
1✔
421

1✔
422
        select {
1✔
423
        case s.resolutionMsgs <- &resolutionMsg{
424
                ResolutionMsg: msg,
425
                errChan:       errChan,
426
        }:
1✔
427
        case <-s.quit:
×
428
                return ErrSwitchExiting
×
429
        }
430

431
        select {
1✔
432
        case err := <-errChan:
1✔
433
                return err
1✔
434
        case <-s.quit:
×
435
                return ErrSwitchExiting
×
436
        }
437
}
438

439
// HasAttemptResult reads the network result store to fetch the specified
440
// attempt. Returns true if the attempt result exists.
UNCOV
441
func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) {
×
UNCOV
442
        _, err := s.networkResults.getResult(attemptID)
×
UNCOV
443
        if err == nil {
×
444
                return true, nil
×
445
        }
×
446

UNCOV
447
        if !errors.Is(err, ErrPaymentIDNotFound) {
×
448
                return false, err
×
449
        }
×
450

UNCOV
451
        return false, nil
×
452
}
453

454
// GetAttemptResult returns the result of the HTLC attempt with the given
455
// attemptID. The paymentHash should be set to the payment's overall hash, or
456
// in case of AMP payments the payment's unique identifier.
457
//
458
// The method returns a channel where the HTLC attempt result will be sent when
459
// available, or an error is encountered during forwarding. When a result is
460
// received on the channel, the HTLC is guaranteed to no longer be in flight.
461
// The switch shutting down is signaled by closing the channel. If the
462
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
463
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
464
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
307✔
465

307✔
466
        var (
307✔
467
                nChan <-chan *networkResult
307✔
468
                err   error
307✔
469
                inKey = CircuitKey{
307✔
470
                        ChanID: hop.Source,
307✔
471
                        HtlcID: attemptID,
307✔
472
                }
307✔
473
        )
307✔
474

307✔
475
        // If the HTLC is not found in the circuit map, check whether a result
307✔
476
        // is already available.
307✔
477
        // Assumption: no one will add this attempt ID other than the caller.
307✔
478
        if s.circuits.LookupCircuit(inKey) == nil {
312✔
479
                res, err := s.networkResults.getResult(attemptID)
5✔
480
                if err != nil {
7✔
481
                        return nil, err
2✔
482
                }
2✔
483
                c := make(chan *networkResult, 1)
3✔
484
                c <- res
3✔
485
                nChan = c
3✔
486
        } else {
302✔
487
                // The HTLC was committed to the circuits, subscribe for a
302✔
488
                // result.
302✔
489
                nChan, err = s.networkResults.subscribeResult(attemptID)
302✔
490
                if err != nil {
302✔
491
                        return nil, err
×
492
                }
×
493
        }
494

495
        resultChan := make(chan *PaymentResult, 1)
305✔
496

305✔
497
        // Since the attempt was known, we can start a goroutine that can
305✔
498
        // extract the result when it is available, and pass it on to the
305✔
499
        // caller.
305✔
500
        s.wg.Add(1)
305✔
501
        go func() {
610✔
502
                defer s.wg.Done()
305✔
503

305✔
504
                var n *networkResult
305✔
505
                select {
305✔
506
                case n = <-nChan:
301✔
507
                case <-s.quit:
4✔
508
                        // We close the result channel to signal a shutdown. We
4✔
509
                        // don't send any result in this case since the HTLC is
4✔
510
                        // still in flight.
4✔
511
                        close(resultChan)
4✔
512
                        return
4✔
513
                }
514

515
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
301✔
516
                        attemptID)
301✔
517

301✔
518
                // Extract the result and pass it to the result channel.
301✔
519
                result, err := s.extractResult(
301✔
520
                        deobfuscator, n, attemptID, paymentHash,
301✔
521
                )
301✔
522
                if err != nil {
301✔
523
                        e := fmt.Errorf("unable to extract result: %w", err)
×
524
                        log.Error(e)
×
525
                        resultChan <- &PaymentResult{
×
526
                                Error: e,
×
527
                        }
×
528
                        return
×
529
                }
×
530
                resultChan <- result
301✔
531
        }()
532

533
        return resultChan, nil
305✔
534
}
535

536
// CleanStore calls the underlying result store, telling it is safe to delete
537
// all entries except the ones in the keepPids map. This should be called
538
// preiodically to let the switch clean up payment results that we have
539
// handled.
UNCOV
540
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
×
UNCOV
541
        return s.networkResults.cleanStore(keepPids)
×
UNCOV
542
}
×
543

544
// SendHTLC is used by other subsystems which aren't belong to htlc switch
545
// package in order to send the htlc update. The attemptID used MUST be unique
546
// for this HTLC, and MUST be used only once, otherwise the switch might reject
547
// it.
548
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
549
        htlc *lnwire.UpdateAddHTLC) error {
416✔
550

416✔
551
        // Generate and send new update packet, if error will be received on
416✔
552
        // this stage it means that packet haven't left boundaries of our
416✔
553
        // system and something wrong happened.
416✔
554
        packet := &htlcPacket{
416✔
555
                incomingChanID: hop.Source,
416✔
556
                incomingHTLCID: attemptID,
416✔
557
                outgoingChanID: firstHop,
416✔
558
                htlc:           htlc,
416✔
559
                amount:         htlc.Amount,
416✔
560
        }
416✔
561

416✔
562
        // Attempt to fetch the target link before creating a circuit so that
416✔
563
        // we don't leave dangling circuits. The getLocalLink method does not
416✔
564
        // require the circuit variable to be set on the *htlcPacket.
416✔
565
        link, linkErr := s.getLocalLink(packet, htlc)
416✔
566
        if linkErr != nil {
420✔
567
                // Notify the htlc notifier of a link failure on our outgoing
4✔
568
                // link. Incoming timelock/amount values are not set because
4✔
569
                // they are not present for local sends.
4✔
570
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
4✔
571
                        newHtlcKey(packet),
4✔
572
                        HtlcInfo{
4✔
573
                                OutgoingTimeLock: htlc.Expiry,
4✔
574
                                OutgoingAmt:      htlc.Amount,
4✔
575
                        },
4✔
576
                        HtlcEventTypeSend,
4✔
577
                        linkErr,
4✔
578
                        false,
4✔
579
                )
4✔
580

4✔
581
                return linkErr
4✔
582
        }
4✔
583

584
        // Evaluate whether this HTLC would bypass our fee exposure. If it
585
        // does, don't send it out and instead return an error.
586
        if s.dustExceedsFeeThreshold(link, htlc.Amount, false) {
413✔
587
                // Notify the htlc notifier of a link failure on our outgoing
1✔
588
                // link. We use the FailTemporaryChannelFailure in place of a
1✔
589
                // more descriptive error message.
1✔
590
                linkErr := NewLinkError(
1✔
591
                        &lnwire.FailTemporaryChannelFailure{},
1✔
592
                )
1✔
593
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
1✔
594
                        newHtlcKey(packet),
1✔
595
                        HtlcInfo{
1✔
596
                                OutgoingTimeLock: htlc.Expiry,
1✔
597
                                OutgoingAmt:      htlc.Amount,
1✔
598
                        },
1✔
599
                        HtlcEventTypeSend,
1✔
600
                        linkErr,
1✔
601
                        false,
1✔
602
                )
1✔
603

1✔
604
                return errFeeExposureExceeded
1✔
605
        }
1✔
606

607
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
411✔
608
        actions, err := s.circuits.CommitCircuits(circuit)
411✔
609
        if err != nil {
411✔
610
                log.Errorf("unable to commit circuit in switch: %v", err)
×
611
                return err
×
612
        }
×
613

614
        // Drop duplicate packet if it has already been seen.
615
        switch {
411✔
616
        case len(actions.Drops) == 1:
1✔
617
                return ErrDuplicateAdd
1✔
618

619
        case len(actions.Fails) == 1:
×
620
                return ErrLocalAddFailed
×
621
        }
622

623
        // Give the packet to the link's mailbox so that HTLC's are properly
624
        // canceled back if the mailbox timeout elapses.
625
        packet.circuit = circuit
410✔
626

410✔
627
        return link.handleSwitchPacket(packet)
410✔
628
}
629

630
// UpdateForwardingPolicies sends a message to the switch to update the
631
// forwarding policies for the set of target channels, keyed in chanPolicies.
632
//
633
// NOTE: This function is synchronous and will block until either the
634
// forwarding policies for all links have been updated, or the switch shuts
635
// down.
636
func (s *Switch) UpdateForwardingPolicies(
UNCOV
637
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
×
UNCOV
638

×
UNCOV
639
        log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure(
×
UNCOV
640
                chanPolicies))
×
UNCOV
641

×
UNCOV
642
        s.indexMtx.RLock()
×
UNCOV
643

×
UNCOV
644
        // Update each link in chanPolicies.
×
UNCOV
645
        for targetLink, policy := range chanPolicies {
×
UNCOV
646
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
×
UNCOV
647

×
UNCOV
648
                link, ok := s.linkIndex[cid]
×
UNCOV
649
                if !ok {
×
650
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
651
                                "link policy", targetLink)
×
652
                        continue
×
653
                }
654

UNCOV
655
                link.UpdateForwardingPolicy(policy)
×
656
        }
657

UNCOV
658
        s.indexMtx.RUnlock()
×
659
}
660

661
// IsForwardedHTLC checks for a given channel and htlc index if it is related
662
// to an opened circuit that represents a forwarded payment.
663
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
664
        htlcIndex uint64) bool {
2✔
665

2✔
666
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
2✔
667
                ChanID: chanID,
2✔
668
                HtlcID: htlcIndex,
2✔
669
        })
2✔
670
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
2✔
671
}
2✔
672

673
// ForwardPackets adds a list of packets to the switch for processing. Fails
674
// and settles are added on a first past, simultaneously constructing circuits
675
// for any adds. After persisting the circuits, another pass of the adds is
676
// given to forward them through the router. The sending link's quit channel is
677
// used to prevent deadlocks when the switch stops a link in the midst of
678
// forwarding.
679
func (s *Switch) ForwardPackets(linkQuit <-chan struct{},
680
        packets ...*htlcPacket) error {
869✔
681

869✔
682
        var (
869✔
683
                // fwdChan is a buffered channel used to receive err msgs from
869✔
684
                // the htlcPlex when forwarding this batch.
869✔
685
                fwdChan = make(chan error, len(packets))
869✔
686

869✔
687
                // numSent keeps a running count of how many packets are
869✔
688
                // forwarded to the switch, which determines how many responses
869✔
689
                // we will wait for on the fwdChan..
869✔
690
                numSent int
869✔
691
        )
869✔
692

869✔
693
        // No packets, nothing to do.
869✔
694
        if len(packets) == 0 {
1,089✔
695
                return nil
220✔
696
        }
220✔
697

698
        // Setup a barrier to prevent the background tasks from processing
699
        // responses until this function returns to the user.
700
        var wg sync.WaitGroup
649✔
701
        wg.Add(1)
649✔
702
        defer wg.Done()
649✔
703

649✔
704
        // Before spawning the following goroutine to proxy our error responses,
649✔
705
        // check to see if we have already been issued a shutdown request. If
649✔
706
        // so, we exit early to avoid incrementing the switch's waitgroup while
649✔
707
        // it is already in the process of shutting down.
649✔
708
        select {
649✔
709
        case <-linkQuit:
×
710
                return nil
×
711
        case <-s.quit:
1✔
712
                return nil
1✔
713
        default:
648✔
714
                // Spawn a goroutine to log the errors returned from failed packets.
648✔
715
                s.wg.Add(1)
648✔
716
                go s.logFwdErrs(&numSent, &wg, fwdChan)
648✔
717
        }
718

719
        // Make a first pass over the packets, forwarding any settles or fails.
720
        // As adds are found, we create a circuit and append it to our set of
721
        // circuits to be written to disk.
722
        var circuits []*PaymentCircuit
648✔
723
        var addBatch []*htlcPacket
648✔
724
        for _, packet := range packets {
1,296✔
725
                switch htlc := packet.htlc.(type) {
648✔
726
                case *lnwire.UpdateAddHTLC:
87✔
727
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
87✔
728
                        packet.circuit = circuit
87✔
729
                        circuits = append(circuits, circuit)
87✔
730
                        addBatch = append(addBatch, packet)
87✔
731
                default:
561✔
732
                        err := s.routeAsync(packet, fwdChan, linkQuit)
561✔
733
                        if err != nil {
571✔
734
                                return fmt.Errorf("failed to forward packet %w",
10✔
735
                                        err)
10✔
736
                        }
10✔
737
                        numSent++
536✔
738
                }
739
        }
740

741
        // If this batch did not contain any circuits to commit, we can return
742
        // early.
743
        if len(circuits) == 0 {
1,159✔
744
                return nil
536✔
745
        }
536✔
746

747
        // Write any circuits that we found to disk.
748
        actions, err := s.circuits.CommitCircuits(circuits...)
87✔
749
        if err != nil {
87✔
750
                log.Errorf("unable to commit circuits in switch: %v", err)
×
751
        }
×
752

753
        // Split the htlc packets by comparing an in-order seek to the head of
754
        // the added, dropped, or failed circuits.
755
        //
756
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
757
        // circuits, and that the union of the sets results in the original set
758
        // of circuits.
759
        var addedPackets, failedPackets []*htlcPacket
87✔
760
        for _, packet := range addBatch {
174✔
761
                switch {
87✔
762
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
83✔
763
                        addedPackets = append(addedPackets, packet)
83✔
764
                        actions.Adds = actions.Adds[1:]
83✔
765

766
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
1✔
767
                        actions.Drops = actions.Drops[1:]
1✔
768

769
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
3✔
770
                        failedPackets = append(failedPackets, packet)
3✔
771
                        actions.Fails = actions.Fails[1:]
3✔
772
                }
773
        }
774

775
        // Now, forward any packets for circuits that were successfully added to
776
        // the switch's circuit map.
777
        for _, packet := range addedPackets {
170✔
778
                err := s.routeAsync(packet, fwdChan, linkQuit)
83✔
779
                if err != nil {
84✔
780
                        return fmt.Errorf("failed to forward packet %w", err)
1✔
781
                }
1✔
782
                numSent++
82✔
783
        }
784

785
        // Lastly, for any packets that failed, this implies that they were
786
        // left in a half added state, which can happen when recovering from
787
        // failures.
788
        if len(failedPackets) > 0 {
89✔
789
                var failure lnwire.FailureMessage
3✔
790
                incomingID := failedPackets[0].incomingChanID
3✔
791

3✔
792
                // If the incoming channel is an option_scid_alias channel,
3✔
793
                // then we'll need to replace the SCID in the ChannelUpdate.
3✔
794
                update := s.failAliasUpdate(incomingID, true)
3✔
795
                if update == nil {
4✔
796
                        // Fallback to the original non-option behavior.
1✔
797
                        update, err := s.cfg.FetchLastChannelUpdate(
1✔
798
                                incomingID,
1✔
799
                        )
1✔
800
                        if err != nil {
1✔
801
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
802
                        } else {
1✔
803
                                failure = lnwire.NewTemporaryChannelFailure(
1✔
804
                                        update,
1✔
805
                                )
1✔
806
                        }
1✔
807
                } else {
2✔
808
                        // This is an option_scid_alias channel.
2✔
809
                        failure = lnwire.NewTemporaryChannelFailure(update)
2✔
810
                }
2✔
811

812
                linkError := NewDetailedLinkError(
3✔
813
                        failure, OutgoingFailureIncompleteForward,
3✔
814
                )
3✔
815

3✔
816
                for _, packet := range failedPackets {
6✔
817
                        // We don't handle the error here since this method
3✔
818
                        // always returns an error.
3✔
819
                        _ = s.failAddPacket(packet, linkError)
3✔
820
                }
3✔
821
        }
822

823
        return nil
86✔
824
}
825

826
// logFwdErrs logs any errors received on `fwdChan`.
827
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
648✔
828
        defer s.wg.Done()
648✔
829

648✔
830
        // Wait here until the outer function has finished persisting
648✔
831
        // and routing the packets. This guarantees we don't read from num until
648✔
832
        // the value is accurate.
648✔
833
        wg.Wait()
648✔
834

648✔
835
        numSent := *num
648✔
836
        for i := 0; i < numSent; i++ {
1,266✔
837
                select {
618✔
838
                case err := <-fwdChan:
616✔
839
                        if err != nil {
639✔
840
                                log.Errorf("Unhandled error while reforwarding htlc "+
23✔
841
                                        "settle/fail over htlcswitch: %v", err)
23✔
842
                        }
23✔
843
                case <-s.quit:
2✔
844
                        log.Errorf("unable to forward htlc packet " +
2✔
845
                                "htlc switch was stopped")
2✔
846
                        return
2✔
847
                }
848
        }
849
}
850

851
// routeAsync sends a packet through the htlc switch, using the provided err
852
// chan to propagate errors back to the caller. The link's quit channel is
853
// provided so that the send can be canceled if either the link or the switch
854
// receive a shutdown requuest. This method does not wait for a response from
855
// the htlcForwarder before returning.
856
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
857
        linkQuit <-chan struct{}) error {
644✔
858

644✔
859
        command := &plexPacket{
644✔
860
                pkt: packet,
644✔
861
                err: errChan,
644✔
862
        }
644✔
863

644✔
864
        select {
644✔
865
        case s.htlcPlex <- command:
618✔
866
                return nil
618✔
867
        case <-linkQuit:
11✔
868
                return ErrLinkShuttingDown
11✔
869
        case <-s.quit:
×
870
                return errors.New("htlc switch was stopped")
×
871
        }
872
}
873

874
// getLocalLink handles the addition of a htlc for a send that originates from
875
// our node. It returns the link that the htlc should be forwarded outwards on,
876
// and a link error if the htlc cannot be forwarded.
877
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
878
        ChannelLink, *LinkError) {
416✔
879

416✔
880
        // Try to find links by node destination.
416✔
881
        s.indexMtx.RLock()
416✔
882
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
416✔
883
        defer s.indexMtx.RUnlock()
416✔
884
        if err != nil {
417✔
885
                // If the link was not found for the outgoingChanID, an outside
1✔
886
                // subsystem may be using the confirmed SCID of a zero-conf
1✔
887
                // channel. In this case, we'll consult the Switch maps to see
1✔
888
                // if an alias exists and use the alias to lookup the link.
1✔
889
                // This extra step is a consequence of not updating the Switch
1✔
890
                // forwardingIndex when a zero-conf channel is confirmed. We
1✔
891
                // don't need to change the outgoingChanID since the link will
1✔
892
                // do that upon receiving the packet.
1✔
893
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
1✔
894
                if !ok {
1✔
UNCOV
895
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
×
UNCOV
896
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
UNCOV
897
                }
×
898

899
                // The base SCID was found, so we'll use that to fetch the
900
                // link.
901
                link, err = s.getLinkByShortID(baseScid)
1✔
902
                if err != nil {
1✔
903
                        log.Errorf("Link %v not found", baseScid)
×
904
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
905
                }
×
906
        }
907

908
        if !link.EligibleToForward() {
417✔
909
                log.Errorf("Link %v is not available to forward",
1✔
910
                        pkt.outgoingChanID)
1✔
911

1✔
912
                // The update does not need to be populated as the error
1✔
913
                // will be returned back to the router.
1✔
914
                return nil, NewDetailedLinkError(
1✔
915
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
916
                        OutgoingFailureLinkNotEligible,
1✔
917
                )
1✔
918
        }
1✔
919

920
        // Ensure that the htlc satisfies the outgoing channel policy.
921
        currentHeight := atomic.LoadUint32(&s.bestHeight)
415✔
922
        htlcErr := link.CheckHtlcTransit(
415✔
923
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
415✔
924
                htlc.CustomRecords,
415✔
925
        )
415✔
926
        if htlcErr != nil {
418✔
927
                log.Errorf("Link %v policy for local forward not "+
3✔
928
                        "satisfied", pkt.outgoingChanID)
3✔
929
                return nil, htlcErr
3✔
930
        }
3✔
931
        return link, nil
412✔
932
}
933

934
// handleLocalResponse processes a Settle or Fail responding to a
935
// locally-initiated payment. This is handled asynchronously to avoid blocking
936
// the main event loop within the switch, as these operations can require
937
// multiple db transactions. The guarantees of the circuit map are stringent
938
// enough such that we are able to tolerate reordering of these operations
939
// without side effects. The primary operations handled are:
940
//  1. Save the payment result to the pending payment store.
941
//  2. Notify subscribers about the payment result.
942
//  3. Ack settle/fail references, to avoid resending this response internally
943
//  4. Teardown the closing circuit in the circuit map
944
//
945
// NOTE: This method MUST be spawned as a goroutine.
946
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
304✔
947
        defer s.wg.Done()
304✔
948

304✔
949
        attemptID := pkt.incomingHTLCID
304✔
950

304✔
951
        // The error reason will be unencypted in case this a local
304✔
952
        // failure or a converted error.
304✔
953
        unencrypted := pkt.localFailure || pkt.convertedError
304✔
954
        n := &networkResult{
304✔
955
                msg:          pkt.htlc,
304✔
956
                unencrypted:  unencrypted,
304✔
957
                isResolution: pkt.isResolution,
304✔
958
        }
304✔
959

304✔
960
        // Store the result to the db. This will also notify subscribers about
304✔
961
        // the result.
304✔
962
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
304✔
963
                log.Errorf("Unable to store attempt result for pid=%v: %v",
×
964
                        attemptID, err)
×
965
                return
×
966
        }
×
967

968
        // First, we'll clean up any fwdpkg references, circuit entries, and
969
        // mark in our db that the payment for this payment hash has either
970
        // succeeded or failed.
971
        //
972
        // If this response is contained in a forwarding package, we'll start by
973
        // acking the settle/fail so that we don't continue to retransmit the
974
        // HTLC internally.
975
        if pkt.destRef != nil {
422✔
976
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
118✔
977
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
978
                                *pkt.destRef, err)
×
979
                        return
×
980
                }
×
981
        }
982

983
        // Next, we'll remove the circuit since we are about to complete an
984
        // fulfill/fail of this HTLC. Since we've already removed the
985
        // settle/fail fwdpkg reference, the response from the peer cannot be
986
        // replayed internally if this step fails. If this happens, this logic
987
        // will be executed when a provided resolution message comes through.
988
        // This can only happen if the circuit is still open, which is why this
989
        // ordering is chosen.
990
        if err := s.teardownCircuit(pkt); err != nil {
304✔
991
                log.Errorf("Unable to teardown circuit %s: %v",
×
992
                        pkt.inKey(), err)
×
993
                return
×
994
        }
×
995

996
        // Finally, notify on the htlc failure or success that has been handled.
997
        key := newHtlcKey(pkt)
304✔
998
        eventType := getEventType(pkt)
304✔
999

304✔
1000
        switch htlc := pkt.htlc.(type) {
304✔
1001
        case *lnwire.UpdateFulfillHTLC:
180✔
1002
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
180✔
1003
                        eventType)
180✔
1004

1005
        case *lnwire.UpdateFailHTLC:
124✔
1006
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
124✔
1007
        }
1008
}
1009

1010
// extractResult uses the given deobfuscator to extract the payment result from
1011
// the given network message.
1012
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
1013
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
301✔
1014

301✔
1015
        switch htlc := n.msg.(type) {
301✔
1016

1017
        // We've received a settle update which means we can finalize the user
1018
        // payment and return successful response.
1019
        case *lnwire.UpdateFulfillHTLC:
177✔
1020
                return &PaymentResult{
177✔
1021
                        Preimage: htlc.PaymentPreimage,
177✔
1022
                }, nil
177✔
1023

1024
        // We've received a fail update which means we can finalize the
1025
        // user payment and return fail response.
1026
        case *lnwire.UpdateFailHTLC:
124✔
1027
                // TODO(yy): construct deobfuscator here to avoid creating it
124✔
1028
                // in paymentLifecycle even for settled HTLCs.
124✔
1029
                paymentErr := s.parseFailedPayment(
124✔
1030
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
124✔
1031
                        n.isResolution, htlc,
124✔
1032
                )
124✔
1033

124✔
1034
                return &PaymentResult{
124✔
1035
                        Error: paymentErr,
124✔
1036
                }, nil
124✔
1037

1038
        default:
×
1039
                return nil, fmt.Errorf("received unknown response type: %T",
×
1040
                        htlc)
×
1041
        }
1042
}
1043

1044
// parseFailedPayment determines the appropriate failure message to return to
1045
// a user initiated payment. The three cases handled are:
1046
//  1. An unencrypted failure, which should already plaintext.
1047
//  2. A resolution from the chain arbitrator, which possibly has no failure
1048
//     reason attached.
1049
//  3. A failure from the remote party, which will need to be decrypted using
1050
//     the payment deobfuscator.
1051
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1052
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1053
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
124✔
1054

124✔
1055
        switch {
124✔
1056

1057
        // The payment never cleared the link, so we don't need to
1058
        // decrypt the error, simply decode it them report back to the
1059
        // user.
1060
        case unencrypted:
5✔
1061
                r := bytes.NewReader(htlc.Reason)
5✔
1062
                failureMsg, err := lnwire.DecodeFailure(r, 0)
5✔
1063
                if err != nil {
5✔
1064
                        // If we could not decode the failure reason, return a link
×
1065
                        // error indicating that we failed to decode the onion.
×
1066
                        linkError := NewDetailedLinkError(
×
1067
                                // As this didn't even clear the link, we don't
×
1068
                                // need to apply an update here since it goes
×
1069
                                // directly to the router.
×
1070
                                lnwire.NewTemporaryChannelFailure(nil),
×
1071
                                OutgoingFailureDecodeError,
×
1072
                        )
×
1073

×
1074
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1075
                                linkError.FailureDetail.FailureString(),
×
1076
                                paymentHash, attemptID, err)
×
1077

×
1078
                        return linkError
×
1079
                }
×
1080

1081
                // If we successfully decoded the failure reason, return it.
1082
                return NewLinkError(failureMsg)
5✔
1083

1084
        // A payment had to be timed out on chain before it got past
1085
        // the first hop. In this case, we'll report a permanent
1086
        // channel failure as this means us, or the remote party had to
1087
        // go on chain.
UNCOV
1088
        case isResolution && htlc.Reason == nil:
×
UNCOV
1089
                linkError := NewDetailedLinkError(
×
UNCOV
1090
                        &lnwire.FailPermanentChannelFailure{},
×
UNCOV
1091
                        OutgoingFailureOnChainTimeout,
×
UNCOV
1092
                )
×
UNCOV
1093

×
UNCOV
1094
                log.Infof("%v: hash=%v, pid=%d",
×
UNCOV
1095
                        linkError.FailureDetail.FailureString(),
×
UNCOV
1096
                        paymentHash, attemptID)
×
UNCOV
1097

×
UNCOV
1098
                return linkError
×
1099

1100
        // A regular multi-hop payment error that we'll need to
1101
        // decrypt.
1102
        default:
119✔
1103
                // We'll attempt to fully decrypt the onion encrypted
119✔
1104
                // error. If we're unable to then we'll bail early.
119✔
1105
                failure, err := deobfuscator.DecryptError(htlc.Reason)
119✔
1106
                if err != nil {
120✔
1107
                        log.Errorf("unable to de-obfuscate onion failure "+
1✔
1108
                                "(hash=%v, pid=%d): %v",
1✔
1109
                                paymentHash, attemptID, err)
1✔
1110

1✔
1111
                        return ErrUnreadableFailureMessage
1✔
1112
                }
1✔
1113

1114
                return failure
118✔
1115
        }
1116
}
1117

1118
// handlePacketForward is used in cases when we need forward the htlc update
1119
// from one channel link to another and be able to propagate the settle/fail
1120
// updates back. This behaviour is achieved by creation of payment circuits.
1121
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
619✔
1122
        switch htlc := packet.htlc.(type) {
619✔
1123
        // Channel link forwarded us a new htlc, therefore we initiate the
1124
        // payment circuit within our internal state so we can properly forward
1125
        // the ultimate settle message back latter.
1126
        case *lnwire.UpdateAddHTLC:
82✔
1127
                return s.handlePacketAdd(packet, htlc)
82✔
1128

1129
        case *lnwire.UpdateFulfillHTLC:
399✔
1130
                return s.handlePacketSettle(packet)
399✔
1131

1132
        // Channel link forwarded us an update_fail_htlc message.
1133
        //
1134
        // NOTE: when the channel link receives an update_fail_malformed_htlc
1135
        // from upstream, it will convert the message into update_fail_htlc and
1136
        // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC`
1137
        // here.
1138
        case *lnwire.UpdateFailHTLC:
138✔
1139
                return s.handlePacketFail(packet, htlc)
138✔
1140

1141
        default:
×
1142
                return fmt.Errorf("wrong update type: %T", htlc)
×
1143
        }
1144
}
1145

1146
// checkCircularForward checks whether a forward is circular (arrives and
1147
// departs on the same link) and returns a link error if the switch is
1148
// configured to disallow this behaviour.
1149
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1150
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
90✔
1151

90✔
1152
        // If they are equal, we can skip the alias mapping checks.
90✔
1153
        if incoming == outgoing {
94✔
1154
                // The switch may be configured to allow circular routes, so
4✔
1155
                // just log and return nil.
4✔
1156
                if allowCircular {
6✔
1157
                        log.Debugf("allowing circular route over link: %v "+
2✔
1158
                                "(payment hash: %x)", incoming, paymentHash)
2✔
1159
                        return nil
2✔
1160
                }
2✔
1161

1162
                // Otherwise, we'll return a temporary channel failure.
1163
                return NewDetailedLinkError(
2✔
1164
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1165
                        OutgoingFailureCircularRoute,
2✔
1166
                )
2✔
1167
        }
1168

1169
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1170
        // outgoing SCIDs. If either one does not have a base SCID, then the
1171
        // two channels are not equal since one will be a channel that does not
1172
        // need a mapping and SCID equality was checked above. If the "base"
1173
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1174
        s.indexMtx.RLock()
86✔
1175
        incomingBaseScid, ok := s.baseIndex[incoming]
86✔
1176
        if !ok {
166✔
1177
                // This channel does not use baseIndex, bail out.
80✔
1178
                s.indexMtx.RUnlock()
80✔
1179
                return nil
80✔
1180
        }
80✔
1181

1182
        outgoingBaseScid, ok := s.baseIndex[outgoing]
6✔
1183
        if !ok {
8✔
1184
                // This channel does not use baseIndex, bail out.
2✔
1185
                s.indexMtx.RUnlock()
2✔
1186
                return nil
2✔
1187
        }
2✔
1188
        s.indexMtx.RUnlock()
4✔
1189

4✔
1190
        // Check base SCID equality.
4✔
1191
        if incomingBaseScid != outgoingBaseScid {
4✔
UNCOV
1192
                // The base SCIDs are not equal so these are not the same
×
UNCOV
1193
                // channel.
×
UNCOV
1194
                return nil
×
UNCOV
1195
        }
×
1196

1197
        // If the incoming and outgoing link are equal, the htlc is part of a
1198
        // circular route which may be used to lock up our liquidity. If the
1199
        // switch is configured to allow circular routes, log that we are
1200
        // allowing the route then return nil.
1201
        if allowCircular {
6✔
1202
                log.Debugf("allowing circular route over link: %v "+
2✔
1203
                        "(payment hash: %x)", incoming, paymentHash)
2✔
1204
                return nil
2✔
1205
        }
2✔
1206

1207
        // If our node disallows circular routes, return a temporary channel
1208
        // failure. There is nothing wrong with the policy used by the remote
1209
        // node, so we do not include a channel update.
1210
        return NewDetailedLinkError(
2✔
1211
                lnwire.NewTemporaryChannelFailure(nil),
2✔
1212
                OutgoingFailureCircularRoute,
2✔
1213
        )
2✔
1214
}
1215

1216
// failAddPacket encrypts a fail packet back to an add packet's source.
1217
// The ciphertext will be derived from the failure message proivded by context.
1218
// This method returns the failErr if all other steps complete successfully.
1219
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
26✔
1220
        // Encrypt the failure so that the sender will be able to read the error
26✔
1221
        // message. Since we failed this packet, we use EncryptFirstHop to
26✔
1222
        // obfuscate the failure for their eyes only.
26✔
1223
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
26✔
1224
        if err != nil {
26✔
1225
                err := fmt.Errorf("unable to obfuscate "+
×
1226
                        "error: %v", err)
×
1227
                log.Error(err)
×
1228
                return err
×
1229
        }
×
1230

1231
        log.Error(failure.Error())
26✔
1232

26✔
1233
        // Create a failure packet for this htlc. The full set of
26✔
1234
        // information about the htlc failure is included so that they can
26✔
1235
        // be included in link failure notifications.
26✔
1236
        failPkt := &htlcPacket{
26✔
1237
                sourceRef:       packet.sourceRef,
26✔
1238
                incomingChanID:  packet.incomingChanID,
26✔
1239
                incomingHTLCID:  packet.incomingHTLCID,
26✔
1240
                outgoingChanID:  packet.outgoingChanID,
26✔
1241
                outgoingHTLCID:  packet.outgoingHTLCID,
26✔
1242
                incomingAmount:  packet.incomingAmount,
26✔
1243
                amount:          packet.amount,
26✔
1244
                incomingTimeout: packet.incomingTimeout,
26✔
1245
                outgoingTimeout: packet.outgoingTimeout,
26✔
1246
                circuit:         packet.circuit,
26✔
1247
                obfuscator:      packet.obfuscator,
26✔
1248
                linkFailure:     failure,
26✔
1249
                htlc: &lnwire.UpdateFailHTLC{
26✔
1250
                        Reason: reason,
26✔
1251
                },
26✔
1252
        }
26✔
1253

26✔
1254
        // Route a fail packet back to the source link.
26✔
1255
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
26✔
1256
        if err != nil {
26✔
1257
                err = fmt.Errorf("source chanid=%v unable to "+
×
1258
                        "handle switch packet: %v",
×
1259
                        packet.incomingChanID, err)
×
1260
                log.Error(err)
×
1261
                return err
×
1262
        }
×
1263

1264
        return failure
26✔
1265
}
1266

1267
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1268
// attempts to determine the source that forwarded this htlc. This method will
1269
// set the incoming chan and htlc ID of the given packet if the source was
1270
// found, and will properly [re]encrypt any failure messages.
1271
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
537✔
1272
        // If the packet has its source, that means it was failed locally by
537✔
1273
        // the outgoing link. We fail it here to make sure only one response
537✔
1274
        // makes it through the switch.
537✔
1275
        if pkt.hasSource {
548✔
1276
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
11✔
1277
                switch err {
11✔
1278

1279
                // Circuit successfully closed.
1280
                case nil:
11✔
1281
                        return circuit, nil
11✔
1282

1283
                // Circuit was previously closed, but has not been deleted.
1284
                // We'll just drop this response until the circuit has been
1285
                // fully removed.
1286
                case ErrCircuitClosing:
×
1287
                        return nil, err
×
1288

1289
                // Failed to close circuit because it does not exist. This is
1290
                // likely because the circuit was already successfully closed.
1291
                // Since this packet failed locally, there is no forwarding
1292
                // package entry to acknowledge.
1293
                case ErrUnknownCircuit:
×
1294
                        return nil, err
×
1295

1296
                // Unexpected error.
1297
                default:
×
1298
                        return nil, err
×
1299
                }
1300
        }
1301

1302
        // Otherwise, this is packet was received from the remote party.  Use
1303
        // circuit map to find the incoming link to receive the settle/fail.
1304
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
526✔
1305
        switch err {
526✔
1306

1307
        // Open circuit successfully closed.
1308
        case nil:
336✔
1309
                pkt.incomingChanID = circuit.Incoming.ChanID
336✔
1310
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
336✔
1311
                pkt.circuit = circuit
336✔
1312
                pkt.sourceRef = &circuit.AddRef
336✔
1313

336✔
1314
                pktType := "SETTLE"
336✔
1315
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
463✔
1316
                        pktType = "FAIL"
127✔
1317
                }
127✔
1318

1319
                log.Debugf("Closed completed %s circuit for %x: "+
336✔
1320
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
336✔
1321
                        pkt.incomingChanID, pkt.incomingHTLCID,
336✔
1322
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
336✔
1323

336✔
1324
                return circuit, nil
336✔
1325

1326
        // Circuit was previously closed, but has not been deleted. We'll just
1327
        // drop this response until the circuit has been removed.
UNCOV
1328
        case ErrCircuitClosing:
×
UNCOV
1329
                return nil, err
×
1330

1331
        // Failed to close circuit because it does not exist. This is likely
1332
        // because the circuit was already successfully closed.
1333
        case ErrUnknownCircuit:
190✔
1334
                if pkt.destRef != nil {
379✔
1335
                        // Add this SettleFailRef to the set of pending settle/fail entries
189✔
1336
                        // awaiting acknowledgement.
189✔
1337
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
189✔
1338
                }
189✔
1339

1340
                // If this is a settle, we will not log an error message as settles
1341
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1342
                // can hit this case if the link restarts after having just sent a fail
1343
                // to the switch.
1344
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
190✔
1345
                if !isSettle {
190✔
1346
                        err := fmt.Errorf("unable to find target channel "+
×
1347
                                "for HTLC fail: channel ID = %s, "+
×
1348
                                "HTLC ID = %d", pkt.outgoingChanID,
×
1349
                                pkt.outgoingHTLCID)
×
1350
                        log.Error(err)
×
1351

×
1352
                        return nil, err
×
1353
                }
×
1354

1355
                return nil, nil
190✔
1356

1357
        // Unexpected error.
1358
        default:
×
1359
                return nil, err
×
1360
        }
1361
}
1362

1363
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1364
// forwarding package of the outgoing link for a payment circuit. We do this if
1365
// we're the originator of the payment, so the link stops attempting to
1366
// re-broadcast.
1367
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
118✔
1368
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
236✔
1369
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
118✔
1370
        })
118✔
1371
}
1372

1373
// teardownCircuit removes a pending or open circuit from the switch's circuit
1374
// map and prints useful logging statements regarding the outcome.
1375
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
314✔
1376
        var pktType string
314✔
1377
        switch htlc := pkt.htlc.(type) {
314✔
1378
        case *lnwire.UpdateFulfillHTLC:
186✔
1379
                pktType = "SETTLE"
186✔
1380
        case *lnwire.UpdateFailHTLC:
128✔
1381
                pktType = "FAIL"
128✔
1382
        default:
×
1383
                return fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1384
        }
1385

1386
        var paymentHash lntypes.Hash
314✔
1387

314✔
1388
        // Perform a defensive check to make sure we don't try to access a nil
314✔
1389
        // circuit.
314✔
1390
        circuit := pkt.circuit
314✔
1391
        if circuit != nil {
628✔
1392
                copy(paymentHash[:], circuit.PaymentHash[:])
314✔
1393
        }
314✔
1394

1395
        log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+
314✔
1396
                "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
314✔
1397

314✔
1398
        err := s.circuits.DeleteCircuits(pkt.inKey())
314✔
1399
        if err != nil {
314✔
1400
                log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+
×
1401
                        "with payment_hash=%v using %s pkt", pkt.incomingChanID,
×
1402
                        pkt.incomingHTLCID, pkt.outgoingChanID,
×
1403
                        pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType)
×
1404

×
1405
                return err
×
1406
        }
×
1407

1408
        log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType,
314✔
1409
                paymentHash, pkt.incomingChanID, pkt.incomingHTLCID,
314✔
1410
                pkt.outgoingChanID, pkt.outgoingHTLCID)
314✔
1411

314✔
1412
        return nil
314✔
1413
}
1414

1415
// CloseLink creates and sends the close channel command to the target link
1416
// directing the specified closure type. If the closure type is CloseRegular,
1417
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1418
// a starting point for close negotiation. The deliveryScript parameter is an
1419
// optional parameter which sets a user specified script to close out to.
1420
func (s *Switch) CloseLink(ctx context.Context, chanPoint *wire.OutPoint,
1421
        closeType contractcourt.ChannelCloseType,
1422
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
UNCOV
1423
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
×
UNCOV
1424

×
UNCOV
1425
        // TODO(roasbeef) abstract out the close updates.
×
UNCOV
1426
        updateChan := make(chan interface{}, 2)
×
UNCOV
1427
        errChan := make(chan error, 1)
×
UNCOV
1428

×
UNCOV
1429
        command := &ChanClose{
×
UNCOV
1430
                CloseType:      closeType,
×
UNCOV
1431
                ChanPoint:      chanPoint,
×
UNCOV
1432
                Updates:        updateChan,
×
UNCOV
1433
                TargetFeePerKw: targetFeePerKw,
×
UNCOV
1434
                DeliveryScript: deliveryScript,
×
UNCOV
1435
                Err:            errChan,
×
NEW
1436
                MaxFee:         maxFee,
×
NEW
1437
                Ctx:            ctx,
×
UNCOV
1438
        }
×
UNCOV
1439

×
UNCOV
1440
        select {
×
UNCOV
1441
        case s.chanCloseRequests <- command:
×
UNCOV
1442
                return updateChan, errChan
×
1443

1444
        case <-s.quit:
×
1445
                errChan <- ErrSwitchExiting
×
1446
                close(updateChan)
×
1447
                return updateChan, errChan
×
1448
        }
1449
}
1450

1451
// htlcForwarder is responsible for optimally forwarding (and possibly
1452
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1453
// links. The duties of the forwarder are similar to that of a network switch,
1454
// in that it facilitates multi-hop payments by acting as a central messaging
1455
// bus. The switch communicates will active links to create, manage, and tear
1456
// down active onion routed payments. Each active channel is modeled as
1457
// networked device with metadata such as the available payment bandwidth, and
1458
// total link capacity.
1459
//
1460
// NOTE: This MUST be run as a goroutine.
1461
func (s *Switch) htlcForwarder() {
207✔
1462
        defer s.wg.Done()
207✔
1463

207✔
1464
        defer func() {
414✔
1465
                s.blockEpochStream.Cancel()
207✔
1466

207✔
1467
                // Remove all links once we've been signalled for shutdown.
207✔
1468
                var linksToStop []ChannelLink
207✔
1469
                s.indexMtx.Lock()
207✔
1470
                for _, link := range s.linkIndex {
499✔
1471
                        activeLink := s.removeLink(link.ChanID())
292✔
1472
                        if activeLink == nil {
292✔
1473
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1474
                                        "on stop", link.ChanID())
×
1475
                                continue
×
1476
                        }
1477
                        linksToStop = append(linksToStop, activeLink)
292✔
1478
                }
1479
                for _, link := range s.pendingLinkIndex {
208✔
1480
                        pendingLink := s.removeLink(link.ChanID())
1✔
1481
                        if pendingLink == nil {
1✔
1482
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1483
                                        "on stop", link.ChanID())
×
1484
                                continue
×
1485
                        }
1486
                        linksToStop = append(linksToStop, pendingLink)
1✔
1487
                }
1488
                s.indexMtx.Unlock()
207✔
1489

207✔
1490
                // Now that all pending and live links have been removed from
207✔
1491
                // the forwarding indexes, stop each one before shutting down.
207✔
1492
                // We'll shut them down in parallel to make exiting as fast as
207✔
1493
                // possible.
207✔
1494
                var wg sync.WaitGroup
207✔
1495
                for _, link := range linksToStop {
500✔
1496
                        wg.Add(1)
293✔
1497
                        go func(l ChannelLink) {
586✔
1498
                                defer wg.Done()
293✔
1499

293✔
1500
                                l.Stop()
293✔
1501
                        }(link)
293✔
1502
                }
1503
                wg.Wait()
207✔
1504

207✔
1505
                // Before we exit fully, we'll attempt to flush out any
207✔
1506
                // forwarding events that may still be lingering since the last
207✔
1507
                // batch flush.
207✔
1508
                if err := s.FlushForwardingEvents(); err != nil {
207✔
1509
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1510
                }
×
1511
        }()
1512

1513
        // TODO(roasbeef): cleared vs settled distinction
1514
        var (
207✔
1515
                totalNumUpdates uint64
207✔
1516
                totalSatSent    btcutil.Amount
207✔
1517
                totalSatRecv    btcutil.Amount
207✔
1518
        )
207✔
1519
        s.cfg.LogEventTicker.Resume()
207✔
1520
        defer s.cfg.LogEventTicker.Stop()
207✔
1521

207✔
1522
        // Every 15 seconds, we'll flush out the forwarding events that
207✔
1523
        // occurred during that period.
207✔
1524
        s.cfg.FwdEventTicker.Resume()
207✔
1525
        defer s.cfg.FwdEventTicker.Stop()
207✔
1526

207✔
1527
        defer s.cfg.AckEventTicker.Stop()
207✔
1528

207✔
1529
out:
207✔
1530
        for {
1,039✔
1531

832✔
1532
                // If the set of pending settle/fail entries is non-zero,
832✔
1533
                // reinstate the ack ticker so we can batch ack them.
832✔
1534
                if len(s.pendingSettleFails) > 0 {
1,209✔
1535
                        s.cfg.AckEventTicker.Resume()
377✔
1536
                }
377✔
1537

1538
                select {
832✔
UNCOV
1539
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
×
UNCOV
1540
                        if !ok {
×
1541
                                break out
×
1542
                        }
1543

UNCOV
1544
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
×
1545

1546
                // A local close request has arrived, we'll forward this to the
1547
                // relevant link (if it exists) so the channel can be
1548
                // cooperatively closed (if possible).
UNCOV
1549
                case req := <-s.chanCloseRequests:
×
UNCOV
1550
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
×
UNCOV
1551

×
UNCOV
1552
                        s.indexMtx.RLock()
×
UNCOV
1553
                        link, ok := s.linkIndex[chanID]
×
UNCOV
1554
                        if !ok {
×
UNCOV
1555
                                s.indexMtx.RUnlock()
×
UNCOV
1556

×
UNCOV
1557
                                req.Err <- fmt.Errorf("no peer for channel with "+
×
UNCOV
1558
                                        "chan_id=%x", chanID[:])
×
UNCOV
1559
                                continue
×
1560
                        }
UNCOV
1561
                        s.indexMtx.RUnlock()
×
UNCOV
1562

×
UNCOV
1563
                        peerPub := link.PeerPubKey()
×
UNCOV
1564
                        log.Debugf("Requesting local channel close: peer=%x, "+
×
UNCOV
1565
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
×
UNCOV
1566

×
UNCOV
1567
                        go s.cfg.LocalChannelClose(peerPub[:], req)
×
1568

1569
                case resolutionMsg := <-s.resolutionMsgs:
1✔
1570
                        // We'll persist the resolution message to the Switch's
1✔
1571
                        // resolution store.
1✔
1572
                        resMsg := resolutionMsg.ResolutionMsg
1✔
1573
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
1✔
1574
                        if err != nil {
1✔
1575
                                // This will only fail if there is a database
×
1576
                                // error or a serialization error. Sending the
×
1577
                                // error prevents the contractcourt from being
×
1578
                                // in a state where it believes the send was
×
1579
                                // successful, when it wasn't.
×
1580
                                log.Errorf("unable to add resolution msg: %v",
×
1581
                                        err)
×
1582
                                resolutionMsg.errChan <- err
×
1583
                                continue
×
1584
                        }
1585

1586
                        // At this point, the resolution message has been
1587
                        // persisted. It is safe to signal success by sending
1588
                        // a nil error since the Switch will re-deliver the
1589
                        // resolution message on restart.
1590
                        resolutionMsg.errChan <- nil
1✔
1591

1✔
1592
                        // Create a htlc packet for this resolution. We do
1✔
1593
                        // not have some of the information that we'll need
1✔
1594
                        // for blinded error handling here , so we'll rely on
1✔
1595
                        // our forwarding logic to fill it in later.
1✔
1596
                        pkt := &htlcPacket{
1✔
1597
                                outgoingChanID: resolutionMsg.SourceChan,
1✔
1598
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
1✔
1599
                                isResolution:   true,
1✔
1600
                        }
1✔
1601

1✔
1602
                        // Resolution messages will either be cancelling
1✔
1603
                        // backwards an existing HTLC, or settling a previously
1✔
1604
                        // outgoing HTLC. Based on this, we'll map the message
1✔
1605
                        // to the proper htlcPacket.
1✔
1606
                        if resolutionMsg.Failure != nil {
1✔
UNCOV
1607
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
×
1608
                        } else {
1✔
1609
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
1✔
1610
                                        PaymentPreimage: *resolutionMsg.PreImage,
1✔
1611
                                }
1✔
1612
                        }
1✔
1613

1614
                        log.Debugf("Received outside contract resolution, "+
1✔
1615
                                "mapping to: %v", spew.Sdump(pkt))
1✔
1616

1✔
1617
                        // We don't check the error, as the only failure we can
1✔
1618
                        // encounter is due to the circuit already being
1✔
1619
                        // closed. This is fine, as processing this message is
1✔
1620
                        // meant to be idempotent.
1✔
1621
                        err = s.handlePacketForward(pkt)
1✔
1622
                        if err != nil {
1✔
1623
                                log.Errorf("Unable to forward resolution msg: %v", err)
×
1624
                        }
×
1625

1626
                // A new packet has arrived for forwarding, we'll interpret the
1627
                // packet concretely, then either forward it along, or
1628
                // interpret a return packet to a locally initialized one.
1629
                case cmd := <-s.htlcPlex:
618✔
1630
                        cmd.err <- s.handlePacketForward(cmd.pkt)
618✔
1631

1632
                // When this time ticks, then it indicates that we should
1633
                // collect all the forwarding events since the last internal,
1634
                // and write them out to our log.
1635
                case <-s.cfg.FwdEventTicker.Ticks():
2✔
1636
                        s.wg.Add(1)
2✔
1637
                        go func() {
4✔
1638
                                defer s.wg.Done()
2✔
1639

2✔
1640
                                if err := s.FlushForwardingEvents(); err != nil {
2✔
1641
                                        log.Errorf("Unable to flush "+
×
1642
                                                "forwarding events: %v", err)
×
1643
                                }
×
1644
                        }()
1645

1646
                // The log ticker has fired, so we'll calculate some forwarding
1647
                // stats for the last 10 seconds to display within the logs to
1648
                // users.
1649
                case <-s.cfg.LogEventTicker.Ticks():
4✔
1650
                        // First, we'll collate the current running tally of
4✔
1651
                        // our forwarding stats.
4✔
1652
                        prevSatSent := totalSatSent
4✔
1653
                        prevSatRecv := totalSatRecv
4✔
1654
                        prevNumUpdates := totalNumUpdates
4✔
1655

4✔
1656
                        var (
4✔
1657
                                newNumUpdates uint64
4✔
1658
                                newSatSent    btcutil.Amount
4✔
1659
                                newSatRecv    btcutil.Amount
4✔
1660
                        )
4✔
1661

4✔
1662
                        // Next, we'll run through all the registered links and
4✔
1663
                        // compute their up-to-date forwarding stats.
4✔
1664
                        s.indexMtx.RLock()
4✔
1665
                        for _, link := range s.linkIndex {
10✔
1666
                                // TODO(roasbeef): when links first registered
6✔
1667
                                // stats printed.
6✔
1668
                                updates, sent, recv := link.Stats()
6✔
1669
                                newNumUpdates += updates
6✔
1670
                                newSatSent += sent.ToSatoshis()
6✔
1671
                                newSatRecv += recv.ToSatoshis()
6✔
1672
                        }
6✔
1673
                        s.indexMtx.RUnlock()
4✔
1674

4✔
1675
                        var (
4✔
1676
                                diffNumUpdates uint64
4✔
1677
                                diffSatSent    btcutil.Amount
4✔
1678
                                diffSatRecv    btcutil.Amount
4✔
1679
                        )
4✔
1680

4✔
1681
                        // If this is the first time we're computing these
4✔
1682
                        // stats, then the diff is just the new value. We do
4✔
1683
                        // this in order to avoid integer underflow issues.
4✔
1684
                        if prevNumUpdates == 0 {
8✔
1685
                                diffNumUpdates = newNumUpdates
4✔
1686
                                diffSatSent = newSatSent
4✔
1687
                                diffSatRecv = newSatRecv
4✔
1688
                        } else {
4✔
UNCOV
1689
                                diffNumUpdates = newNumUpdates - prevNumUpdates
×
UNCOV
1690
                                diffSatSent = newSatSent - prevSatSent
×
UNCOV
1691
                                diffSatRecv = newSatRecv - prevSatRecv
×
UNCOV
1692
                        }
×
1693

1694
                        // If the diff of num updates is zero, then we haven't
1695
                        // forwarded anything in the last 10 seconds, so we can
1696
                        // skip this update.
1697
                        if diffNumUpdates == 0 {
6✔
1698
                                continue
2✔
1699
                        }
1700

1701
                        // If the diff of num updates is negative, then some
1702
                        // links may have been unregistered from the switch, so
1703
                        // we'll update our stats to only include our registered
1704
                        // links.
1705
                        if int64(diffNumUpdates) < 0 {
2✔
UNCOV
1706
                                totalNumUpdates = newNumUpdates
×
UNCOV
1707
                                totalSatSent = newSatSent
×
UNCOV
1708
                                totalSatRecv = newSatRecv
×
UNCOV
1709
                                continue
×
1710
                        }
1711

1712
                        // Otherwise, we'll log this diff, then accumulate the
1713
                        // new stats into the running total.
1714
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
2✔
1715
                                "in the last 10 seconds (%f tx/sec)",
2✔
1716
                                diffSatSent, diffSatRecv,
2✔
1717
                                float64(diffNumUpdates)/10)
2✔
1718

2✔
1719
                        totalNumUpdates += diffNumUpdates
2✔
1720
                        totalSatSent += diffSatSent
2✔
1721
                        totalSatRecv += diffSatRecv
2✔
1722

1723
                // The ack ticker has fired so if we have any settle/fail entries
1724
                // for a forwarding package to ack, we will do so here in a batch
1725
                // db call.
UNCOV
1726
                case <-s.cfg.AckEventTicker.Ticks():
×
UNCOV
1727
                        // If the current set is empty, pause the ticker.
×
UNCOV
1728
                        if len(s.pendingSettleFails) == 0 {
×
UNCOV
1729
                                s.cfg.AckEventTicker.Pause()
×
UNCOV
1730
                                continue
×
1731
                        }
1732

1733
                        // Batch ack the settle/fail entries.
UNCOV
1734
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
×
1735
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1736
                                continue
×
1737
                        }
1738

UNCOV
1739
                        log.Tracef("Acked %d settle fails: %v",
×
UNCOV
1740
                                len(s.pendingSettleFails),
×
UNCOV
1741
                                lnutils.SpewLogClosure(s.pendingSettleFails))
×
UNCOV
1742

×
UNCOV
1743
                        // Reset the pendingSettleFails buffer while keeping acquired
×
UNCOV
1744
                        // memory.
×
UNCOV
1745
                        s.pendingSettleFails = s.pendingSettleFails[:0]
×
1746

1747
                case <-s.quit:
207✔
1748
                        return
207✔
1749
                }
1750
        }
1751
}
1752

1753
// Start starts all helper goroutines required for the operation of the switch.
1754
func (s *Switch) Start() error {
207✔
1755
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
207✔
1756
                log.Warn("Htlc Switch already started")
×
1757
                return errors.New("htlc switch already started")
×
1758
        }
×
1759

1760
        log.Infof("HTLC Switch starting")
207✔
1761

207✔
1762
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
207✔
1763
        if err != nil {
207✔
1764
                return err
×
1765
        }
×
1766
        s.blockEpochStream = blockEpochStream
207✔
1767

207✔
1768
        s.wg.Add(1)
207✔
1769
        go s.htlcForwarder()
207✔
1770

207✔
1771
        if err := s.reforwardResponses(); err != nil {
207✔
1772
                s.Stop()
×
1773
                log.Errorf("unable to reforward responses: %v", err)
×
1774
                return err
×
1775
        }
×
1776

1777
        if err := s.reforwardResolutions(); err != nil {
207✔
1778
                // We are already stopping so we can ignore the error.
×
1779
                _ = s.Stop()
×
1780
                log.Errorf("unable to reforward resolutions: %v", err)
×
1781
                return err
×
1782
        }
×
1783

1784
        return nil
207✔
1785
}
1786

1787
// reforwardResolutions fetches the set of resolution messages stored on-disk
1788
// and reforwards them if their circuits are still open. If the circuits have
1789
// been deleted, then we will delete the resolution message from the database.
1790
func (s *Switch) reforwardResolutions() error {
207✔
1791
        // Fetch all stored resolution messages, deleting the ones that are
207✔
1792
        // resolved.
207✔
1793
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
207✔
1794
        if err != nil {
207✔
1795
                return err
×
1796
        }
×
1797

1798
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
207✔
1799
        for _, resMsg := range resMsgs {
208✔
1800
                // If the open circuit no longer exists, then we can remove the
1✔
1801
                // message from the store.
1✔
1802
                outKey := CircuitKey{
1✔
1803
                        ChanID: resMsg.SourceChan,
1✔
1804
                        HtlcID: resMsg.HtlcIndex,
1✔
1805
                }
1✔
1806

1✔
1807
                if s.circuits.LookupOpenCircuit(outKey) == nil {
2✔
1808
                        // The open circuit doesn't exist.
1✔
1809
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
1✔
1810
                        if err != nil {
1✔
1811
                                return err
×
1812
                        }
×
1813

1814
                        continue
1✔
1815
                }
1816

1817
                // The circuit is still open, so we can assume that the link or
1818
                // switch (if we are the source) hasn't cleaned it up yet.
1819
                // We rely on our forwarding logic to fill in details that
1820
                // are not currently available to us.
UNCOV
1821
                resPkt := &htlcPacket{
×
UNCOV
1822
                        outgoingChanID: resMsg.SourceChan,
×
UNCOV
1823
                        outgoingHTLCID: resMsg.HtlcIndex,
×
UNCOV
1824
                        isResolution:   true,
×
UNCOV
1825
                }
×
UNCOV
1826

×
UNCOV
1827
                if resMsg.Failure != nil {
×
UNCOV
1828
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
×
UNCOV
1829
                } else {
×
1830
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
1831
                                PaymentPreimage: *resMsg.PreImage,
×
1832
                        }
×
1833
                }
×
1834

UNCOV
1835
                switchPackets = append(switchPackets, resPkt)
×
1836
        }
1837

1838
        // We'll now dispatch the set of resolution messages to the proper
1839
        // destination. An error is only encountered here if the switch is
1840
        // shutting down.
1841
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
207✔
1842
                return err
×
1843
        }
×
1844

1845
        return nil
207✔
1846
}
1847

1848
// reforwardResponses for every known, non-pending channel, loads all associated
1849
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
1850
// used to resurrect the switch's mailboxes after a restart. This also runs for
1851
// waiting close channels since there may be settles or fails that need to be
1852
// reforwarded before they completely close.
1853
func (s *Switch) reforwardResponses() error {
207✔
1854
        openChannels, err := s.cfg.FetchAllChannels()
207✔
1855
        if err != nil {
207✔
1856
                return err
×
1857
        }
×
1858

1859
        for _, openChannel := range openChannels {
336✔
1860
                shortChanID := openChannel.ShortChanID()
129✔
1861

129✔
1862
                // Locally-initiated payments never need reforwarding.
129✔
1863
                if shortChanID == hop.Source {
129✔
UNCOV
1864
                        continue
×
1865
                }
1866

1867
                // If the channel is pending, it should have no forwarding
1868
                // packages, and nothing to reforward.
1869
                if openChannel.IsPending {
129✔
1870
                        continue
×
1871
                }
1872

1873
                // Channels in open or waiting-close may still have responses in
1874
                // their forwarding packages. We will continue to reattempt
1875
                // forwarding on startup until the channel is fully-closed.
1876
                //
1877
                // Load this channel's forwarding packages, and deliver them to
1878
                // the switch.
1879
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
129✔
1880
                if err != nil {
129✔
1881
                        log.Errorf("unable to load forwarding "+
×
1882
                                "packages for %v: %v", shortChanID, err)
×
1883
                        return err
×
1884
                }
×
1885

1886
                s.reforwardSettleFails(fwdPkgs)
129✔
1887
        }
1888

1889
        return nil
207✔
1890
}
1891

1892
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
1893
// channel identifier.
1894
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
129✔
1895

129✔
1896
        var fwdPkgs []*channeldb.FwdPkg
129✔
1897
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
258✔
1898
                var err error
129✔
1899
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
129✔
1900
                        tx, source,
129✔
1901
                )
129✔
1902
                return err
129✔
1903
        }, func() {
258✔
1904
                fwdPkgs = nil
129✔
1905
        }); err != nil {
129✔
1906
                return nil, err
×
1907
        }
×
1908

1909
        return fwdPkgs, nil
129✔
1910
}
1911

1912
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
1913
// forwarding packages, and reforwards those that have not been acknowledged.
1914
// This is intended to occur on startup, in order to recover the switch's
1915
// mailboxes, and to ensure that responses can be propagated in case the
1916
// outgoing link never comes back online.
1917
//
1918
// NOTE: This should mimic the behavior processRemoteSettleFails.
1919
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
129✔
1920
        for _, fwdPkg := range fwdPkgs {
130✔
1921
                switchPackets := make([]*htlcPacket, 0, len(fwdPkg.SettleFails))
1✔
1922
                for i, update := range fwdPkg.SettleFails {
1✔
UNCOV
1923
                        // Skip any settles or fails that have already been
×
UNCOV
1924
                        // acknowledged by the incoming link that originated the
×
UNCOV
1925
                        // forwarded Add.
×
UNCOV
1926
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
×
UNCOV
1927
                                continue
×
1928
                        }
1929

UNCOV
1930
                        switch msg := update.UpdateMsg.(type) {
×
1931
                        // A settle for an HTLC we previously forwarded HTLC has
1932
                        // been received. So we'll forward the HTLC to the
1933
                        // switch which will handle propagating the settle to
1934
                        // the prior hop.
UNCOV
1935
                        case *lnwire.UpdateFulfillHTLC:
×
UNCOV
1936
                                destRef := fwdPkg.DestRef(uint16(i))
×
UNCOV
1937
                                settlePacket := &htlcPacket{
×
UNCOV
1938
                                        outgoingChanID: fwdPkg.Source,
×
UNCOV
1939
                                        outgoingHTLCID: msg.ID,
×
UNCOV
1940
                                        destRef:        &destRef,
×
UNCOV
1941
                                        htlc:           msg,
×
UNCOV
1942
                                }
×
UNCOV
1943

×
UNCOV
1944
                                // Add the packet to the batch to be forwarded, and
×
UNCOV
1945
                                // notify the overflow queue that a spare spot has been
×
UNCOV
1946
                                // freed up within the commitment state.
×
UNCOV
1947
                                switchPackets = append(switchPackets, settlePacket)
×
1948

1949
                        // A failureCode message for a previously forwarded HTLC has been
1950
                        // received. As a result a new slot will be freed up in our
1951
                        // commitment state, so we'll forward this to the switch so the
1952
                        // backwards undo can continue.
1953
                        case *lnwire.UpdateFailHTLC:
×
1954
                                // Fetch the reason the HTLC was canceled so
×
1955
                                // we can continue to propagate it. This
×
1956
                                // failure originated from another node, so
×
1957
                                // the linkFailure field is not set on this
×
1958
                                // packet. We rely on the link to fill in
×
1959
                                // additional circuit information for us.
×
1960
                                failPacket := &htlcPacket{
×
1961
                                        outgoingChanID: fwdPkg.Source,
×
1962
                                        outgoingHTLCID: msg.ID,
×
1963
                                        destRef: &channeldb.SettleFailRef{
×
1964
                                                Source: fwdPkg.Source,
×
1965
                                                Height: fwdPkg.Height,
×
1966
                                                Index:  uint16(i),
×
1967
                                        },
×
1968
                                        htlc: msg,
×
1969
                                }
×
1970

×
1971
                                // Add the packet to the batch to be forwarded, and
×
1972
                                // notify the overflow queue that a spare spot has been
×
1973
                                // freed up within the commitment state.
×
1974
                                switchPackets = append(switchPackets, failPacket)
×
1975
                        }
1976
                }
1977

1978
                // Since this send isn't tied to a specific link, we pass a nil
1979
                // link quit channel, meaning the send will fail only if the
1980
                // switch receives a shutdown request.
1981
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
1✔
1982
                        log.Errorf("Unhandled error while reforwarding packets "+
×
1983
                                "settle/fail over htlcswitch: %v", err)
×
1984
                }
×
1985
        }
1986
}
1987

1988
// Stop gracefully stops all active helper goroutines, then waits until they've
1989
// exited.
1990
func (s *Switch) Stop() error {
439✔
1991
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
569✔
1992
                log.Warn("Htlc Switch already stopped")
130✔
1993
                return errors.New("htlc switch already shutdown")
130✔
1994
        }
130✔
1995

1996
        log.Info("HTLC Switch shutting down...")
309✔
1997
        defer log.Debug("HTLC Switch shutdown complete")
309✔
1998

309✔
1999
        close(s.quit)
309✔
2000

309✔
2001
        s.wg.Wait()
309✔
2002

309✔
2003
        // Wait until all active goroutines have finished exiting before
309✔
2004
        // stopping the mailboxes, otherwise the mailbox map could still be
309✔
2005
        // accessed and modified.
309✔
2006
        s.mailOrchestrator.Stop()
309✔
2007

309✔
2008
        return nil
309✔
2009
}
2010

2011
// CreateAndAddLink will create a link and then add it to the internal maps
2012
// when given a ChannelLinkConfig and LightningChannel.
2013
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
UNCOV
2014
        lnChan *lnwallet.LightningChannel) error {
×
UNCOV
2015

×
UNCOV
2016
        link := NewChannelLink(linkCfg, lnChan)
×
UNCOV
2017
        return s.AddLink(link)
×
UNCOV
2018
}
×
2019

2020
// AddLink is used to initiate the handling of the add link command. The
2021
// request will be propagated and handled in the main goroutine.
2022
func (s *Switch) AddLink(link ChannelLink) error {
336✔
2023
        s.indexMtx.Lock()
336✔
2024
        defer s.indexMtx.Unlock()
336✔
2025

336✔
2026
        chanID := link.ChanID()
336✔
2027

336✔
2028
        // First, ensure that this link is not already active in the switch.
336✔
2029
        _, err := s.getLink(chanID)
336✔
2030
        if err == nil {
337✔
2031
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
1✔
2032
                        "active", chanID)
1✔
2033
        }
1✔
2034

2035
        // Get and attach the mailbox for this link, which buffers packets in
2036
        // case there packets that we tried to deliver while this link was
2037
        // offline.
2038
        shortChanID := link.ShortChanID()
335✔
2039
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
335✔
2040
        link.AttachMailBox(mailbox)
335✔
2041

335✔
2042
        // Attach the Switch's failAliasUpdate function to the link.
335✔
2043
        link.attachFailAliasUpdate(s.failAliasUpdate)
335✔
2044

335✔
2045
        if err := link.Start(); err != nil {
335✔
2046
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2047
                        chanID, err)
×
2048
                s.removeLink(chanID)
×
2049
                return err
×
2050
        }
×
2051

2052
        if shortChanID == hop.Source {
336✔
2053
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
1✔
2054
                        chanID, shortChanID)
1✔
2055

1✔
2056
                s.pendingLinkIndex[chanID] = link
1✔
2057
        } else {
335✔
2058
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
334✔
2059
                        chanID, shortChanID)
334✔
2060

334✔
2061
                s.addLiveLink(link)
334✔
2062
                s.mailOrchestrator.BindLiveShortChanID(
334✔
2063
                        mailbox, chanID, shortChanID,
334✔
2064
                )
334✔
2065
        }
334✔
2066

2067
        return nil
335✔
2068
}
2069

2070
// addLiveLink adds a link to all associated forwarding index, this makes it a
2071
// candidate for forwarding HTLCs.
2072
func (s *Switch) addLiveLink(link ChannelLink) {
334✔
2073
        linkScid := link.ShortChanID()
334✔
2074

334✔
2075
        // We'll add the link to the linkIndex which lets us quickly
334✔
2076
        // look up a channel when we need to close or register it, and
334✔
2077
        // the forwarding index which'll be used when forwarding HTLC's
334✔
2078
        // in the multi-hop setting.
334✔
2079
        s.linkIndex[link.ChanID()] = link
334✔
2080
        s.forwardingIndex[linkScid] = link
334✔
2081

334✔
2082
        // Next we'll add the link to the interface index so we can
334✔
2083
        // quickly look up all the channels for a particular node.
334✔
2084
        peerPub := link.PeerPubKey()
334✔
2085
        if _, ok := s.interfaceIndex[peerPub]; !ok {
663✔
2086
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
329✔
2087
        }
329✔
2088
        s.interfaceIndex[peerPub][link.ChanID()] = link
334✔
2089

334✔
2090
        s.updateLinkAliases(link)
334✔
2091
}
2092

2093
// UpdateLinkAliases is the externally exposed wrapper for updating link
2094
// aliases. It acquires the indexMtx and calls the internal method.
UNCOV
2095
func (s *Switch) UpdateLinkAliases(link ChannelLink) {
×
UNCOV
2096
        s.indexMtx.Lock()
×
UNCOV
2097
        defer s.indexMtx.Unlock()
×
UNCOV
2098

×
UNCOV
2099
        s.updateLinkAliases(link)
×
UNCOV
2100
}
×
2101

2102
// updateLinkAliases updates the aliases for a given link. This will cause the
2103
// htlcswitch to consult the alias manager on the up to date values of its
2104
// alias maps.
2105
//
2106
// NOTE: this MUST be called with the indexMtx held.
2107
func (s *Switch) updateLinkAliases(link ChannelLink) {
334✔
2108
        linkScid := link.ShortChanID()
334✔
2109

334✔
2110
        aliases := link.getAliases()
334✔
2111
        if link.isZeroConf() {
353✔
2112
                if link.zeroConfConfirmed() {
34✔
2113
                        // Since the zero-conf channel has confirmed, we can
15✔
2114
                        // populate the aliasToReal mapping.
15✔
2115
                        confirmedScid := link.confirmedScid()
15✔
2116

15✔
2117
                        for _, alias := range aliases {
37✔
2118
                                s.aliasToReal[alias] = confirmedScid
22✔
2119
                        }
22✔
2120

2121
                        // Add the confirmed SCID as a key in the baseIndex.
2122
                        s.baseIndex[confirmedScid] = linkScid
15✔
2123
                }
2124

2125
                // Now we populate the baseIndex which will be used to fetch
2126
                // the link given any of the channel's alias SCIDs or the real
2127
                // SCID. The link's SCID is an alias, so we don't need to
2128
                // special-case it like the option-scid-alias feature-bit case
2129
                // further down.
2130
                for _, alias := range aliases {
46✔
2131
                        s.baseIndex[alias] = linkScid
27✔
2132
                }
27✔
2133
        } else if link.negotiatedAliasFeature() {
331✔
2134
                // First, we flush any alias mappings for this link's scid
16✔
2135
                // before we populate the map again, in order to get rid of old
16✔
2136
                // values that no longer exist.
16✔
2137
                for alias, real := range s.aliasToReal {
18✔
2138
                        if real == linkScid {
2✔
UNCOV
2139
                                delete(s.aliasToReal, alias)
×
UNCOV
2140
                        }
×
2141
                }
2142

2143
                for alias, real := range s.baseIndex {
19✔
2144
                        if real == linkScid {
3✔
UNCOV
2145
                                delete(s.baseIndex, alias)
×
UNCOV
2146
                        }
×
2147
                }
2148

2149
                // The link's SCID is the confirmed SCID for non-zero-conf
2150
                // option-scid-alias feature bit channels.
2151
                for _, alias := range aliases {
39✔
2152
                        s.aliasToReal[alias] = linkScid
23✔
2153
                        s.baseIndex[alias] = linkScid
23✔
2154
                }
23✔
2155

2156
                // Since the link's SCID is confirmed, it was not included in
2157
                // the baseIndex above as a key. Add it now.
2158
                s.baseIndex[linkScid] = linkScid
16✔
2159
        }
2160
}
2161

2162
// GetLink is used to initiate the handling of the get link command. The
2163
// request will be propagated/handled to/in the main goroutine.
2164
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2165
        error) {
3,199✔
2166

3,199✔
2167
        s.indexMtx.RLock()
3,199✔
2168
        defer s.indexMtx.RUnlock()
3,199✔
2169

3,199✔
2170
        return s.getLink(chanID)
3,199✔
2171
}
3,199✔
2172

2173
// getLink returns the link stored in either the pending index or the live
2174
// lindex.
2175
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
3,864✔
2176
        link, ok := s.linkIndex[chanID]
3,864✔
2177
        if !ok {
4,200✔
2178
                link, ok = s.pendingLinkIndex[chanID]
336✔
2179
                if !ok {
671✔
2180
                        return nil, ErrChannelLinkNotFound
335✔
2181
                }
335✔
2182
        }
2183

2184
        return link, nil
3,529✔
2185
}
2186

2187
// GetLinkByShortID attempts to return the link which possesses the target short
2188
// channel ID.
2189
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
UNCOV
2190
        error) {
×
UNCOV
2191

×
UNCOV
2192
        s.indexMtx.RLock()
×
UNCOV
2193
        defer s.indexMtx.RUnlock()
×
UNCOV
2194

×
UNCOV
2195
        link, err := s.getLinkByShortID(chanID)
×
UNCOV
2196
        if err != nil {
×
UNCOV
2197
                // If we failed to find the link under the passed-in SCID, we
×
UNCOV
2198
                // consult the Switch's baseIndex map to see if the confirmed
×
UNCOV
2199
                // SCID was used for a zero-conf channel.
×
UNCOV
2200
                aliasID, ok := s.baseIndex[chanID]
×
UNCOV
2201
                if !ok {
×
UNCOV
2202
                        return nil, err
×
UNCOV
2203
                }
×
2204

2205
                // An alias was found, use it to lookup if a link exists.
UNCOV
2206
                return s.getLinkByShortID(aliasID)
×
2207
        }
2208

UNCOV
2209
        return link, nil
×
2210
}
2211

2212
// getLinkByShortID attempts to return the link which possesses the target
2213
// short channel ID.
2214
//
2215
// NOTE: This MUST be called with the indexMtx held.
2216
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
477✔
2217
        link, ok := s.forwardingIndex[chanID]
477✔
2218
        if !ok {
478✔
2219
                return nil, ErrChannelLinkNotFound
1✔
2220
        }
1✔
2221

2222
        return link, nil
476✔
2223
}
2224

2225
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2226
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2227
// the outgoingChanID will be changed so that an error can be properly
2228
// attributed when looping over linkErrs in handlePacketForward.
2229
//
2230
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2231
// public or not.
2232
//
2233
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2234
//   - If there is no entry found in baseIndex, fetch the link. This channel
2235
//     did not have the option-scid-alias feature negotiated (which includes
2236
//     zero-conf and option-scid-alias channel-types).
2237
//   - If there is an entry found, fetch the link from forwardingIndex and
2238
//     fail if this is a private link.
2239
//
2240
// NOTE: This MUST be called with the indexMtx read lock held.
2241
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
80✔
2242
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
80✔
2243
        chanID := pkt.outgoingChanID
80✔
2244
        aliasID := s.cfg.IsAlias(chanID)
80✔
2245

80✔
2246
        // Set the originalOutgoingChanID so the proper channel_update can be
80✔
2247
        // sent back if the option-scid-alias feature bit was negotiated.
80✔
2248
        pkt.originalOutgoingChanID = chanID
80✔
2249

80✔
2250
        if aliasID {
95✔
2251
                // Since outgoingChanID is an alias, we'll fetch the link via
15✔
2252
                // baseIndex.
15✔
2253
                baseScid, ok := s.baseIndex[chanID]
15✔
2254
                if !ok {
15✔
2255
                        // No mapping exists, bail.
×
2256
                        return nil, ErrChannelLinkNotFound
×
2257
                }
×
2258

2259
                // A mapping exists, so use baseScid to find the link in the
2260
                // forwardingIndex.
2261
                link, ok := s.forwardingIndex[baseScid]
15✔
2262
                if !ok {
15✔
2263
                        // Link not found, bail.
×
2264
                        return nil, ErrChannelLinkNotFound
×
2265
                }
×
2266

2267
                // Change the packet's outgoingChanID field so that errors are
2268
                // properly attributed.
2269
                pkt.outgoingChanID = baseScid
15✔
2270

15✔
2271
                // Return the link without checking if it's private or not.
15✔
2272
                return link, nil
15✔
2273
        }
2274

2275
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2276
        // SCID from baseIndex.
2277
        baseScid, ok := s.baseIndex[chanID]
65✔
2278
        if !ok {
123✔
2279
                // outgoingChanID is not a key in base index meaning this
58✔
2280
                // channel did not have the option-scid-alias feature bit
58✔
2281
                // negotiated. We'll fetch the link and return it.
58✔
2282
                link, ok := s.forwardingIndex[chanID]
58✔
2283
                if !ok {
60✔
2284
                        // The link wasn't found, bail out.
2✔
2285
                        return nil, ErrChannelLinkNotFound
2✔
2286
                }
2✔
2287

2288
                return link, nil
56✔
2289
        }
2290

2291
        // Fetch the link whose internal SCID is baseScid.
2292
        link, ok := s.forwardingIndex[baseScid]
7✔
2293
        if !ok {
7✔
2294
                // Link wasn't found, bail out.
×
2295
                return nil, ErrChannelLinkNotFound
×
2296
        }
×
2297

2298
        // If the link is unadvertised, we fail since the real SCID was used to
2299
        // forward over it and this is a channel where the option-scid-alias
2300
        // feature bit was negotiated.
2301
        if link.IsUnadvertised() {
9✔
2302
                return nil, ErrChannelLinkNotFound
2✔
2303
        }
2✔
2304

2305
        // The link is public so the confirmed SCID can be used to forward over
2306
        // it. We'll also replace pkt's outgoingChanID field so errors can
2307
        // properly be attributed in the calling function.
2308
        pkt.outgoingChanID = baseScid
5✔
2309
        return link, nil
5✔
2310
}
2311

2312
// HasActiveLink returns true if the given channel ID has a link in the link
2313
// index AND the link is eligible to forward.
2314
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
2✔
2315
        s.indexMtx.RLock()
2✔
2316
        defer s.indexMtx.RUnlock()
2✔
2317

2✔
2318
        if link, ok := s.linkIndex[chanID]; ok {
4✔
2319
                return link.EligibleToForward()
2✔
2320
        }
2✔
2321

UNCOV
2322
        return false
×
2323
}
2324

2325
// RemoveLink purges the switch of any link associated with chanID. If a pending
2326
// or active link is not found, this method does nothing. Otherwise, the method
2327
// returns after the link has been completely shutdown.
2328
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
18✔
2329
        s.indexMtx.Lock()
18✔
2330
        link, err := s.getLink(chanID)
18✔
2331
        if err != nil {
18✔
UNCOV
2332
                // If err is non-nil, this means that link is also nil. The
×
UNCOV
2333
                // link variable cannot be nil without err being non-nil.
×
UNCOV
2334
                s.indexMtx.Unlock()
×
UNCOV
2335
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
×
UNCOV
2336
                        chanID, err)
×
UNCOV
2337
                return
×
UNCOV
2338
        }
×
2339

2340
        // Check if the link is already stopping and grab the stop chan if it
2341
        // is.
2342
        stopChan, ok := s.linkStopIndex[chanID]
18✔
2343
        if !ok {
36✔
2344
                // If the link is non-nil, it is not currently stopping, so
18✔
2345
                // we'll add a stop chan to the linkStopIndex.
18✔
2346
                stopChan = make(chan struct{})
18✔
2347
                s.linkStopIndex[chanID] = stopChan
18✔
2348
        }
18✔
2349
        s.indexMtx.Unlock()
18✔
2350

18✔
2351
        if ok {
18✔
2352
                // If the stop chan exists, we will wait for it to be closed.
×
2353
                // Once it is closed, we will exit.
×
2354
                select {
×
2355
                case <-stopChan:
×
2356
                        return
×
2357
                case <-s.quit:
×
2358
                        return
×
2359
                }
2360
        }
2361

2362
        // Stop the link before removing it from the maps.
2363
        link.Stop()
18✔
2364

18✔
2365
        s.indexMtx.Lock()
18✔
2366
        _ = s.removeLink(chanID)
18✔
2367

18✔
2368
        // Close stopChan and remove this link from the linkStopIndex.
18✔
2369
        // Deleting from the index and removing from the link must be done
18✔
2370
        // in the same block while the mutex is held.
18✔
2371
        close(stopChan)
18✔
2372
        delete(s.linkStopIndex, chanID)
18✔
2373
        s.indexMtx.Unlock()
18✔
2374
}
2375

2376
// removeLink is used to remove and stop the channel link.
2377
//
2378
// NOTE: This MUST be called with the indexMtx held.
2379
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
311✔
2380
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
311✔
2381

311✔
2382
        link, err := s.getLink(chanID)
311✔
2383
        if err != nil {
311✔
2384
                return nil
×
2385
        }
×
2386

2387
        // Remove the channel from live link indexes.
2388
        delete(s.pendingLinkIndex, link.ChanID())
311✔
2389
        delete(s.linkIndex, link.ChanID())
311✔
2390
        delete(s.forwardingIndex, link.ShortChanID())
311✔
2391

311✔
2392
        // If the link has been added to the peer index, then we'll move to
311✔
2393
        // delete the entry within the index.
311✔
2394
        peerPub := link.PeerPubKey()
311✔
2395
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
621✔
2396
                delete(peerIndex, link.ChanID())
310✔
2397

310✔
2398
                // If after deletion, there are no longer any links, then we'll
310✔
2399
                // remove the interface map all together.
310✔
2400
                if len(peerIndex) == 0 {
615✔
2401
                        delete(s.interfaceIndex, peerPub)
305✔
2402
                }
305✔
2403
        }
2404

2405
        return link
311✔
2406
}
2407

2408
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2409
// underlying channel state. This is only used in zero-conf channels to allow
2410
// the confirmed SCID to be updated.
2411
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
1✔
2412
        s.indexMtx.Lock()
1✔
2413
        defer s.indexMtx.Unlock()
1✔
2414

1✔
2415
        // Locate the target link in the link index. If no such link exists,
1✔
2416
        // then we will ignore the request.
1✔
2417
        link, ok := s.linkIndex[chanID]
1✔
2418
        if !ok {
1✔
2419
                return fmt.Errorf("link %v not found", chanID)
×
2420
        }
×
2421

2422
        // Try to update the link's underlying channel state, returning early
2423
        // if this update failed.
2424
        _, err := link.UpdateShortChanID()
1✔
2425
        if err != nil {
1✔
2426
                return err
×
2427
        }
×
2428

2429
        // Since the zero-conf channel is confirmed, we should populate the
2430
        // aliasToReal map and update the baseIndex.
2431
        aliases := link.getAliases()
1✔
2432

1✔
2433
        confirmedScid := link.confirmedScid()
1✔
2434

1✔
2435
        for _, alias := range aliases {
3✔
2436
                s.aliasToReal[alias] = confirmedScid
2✔
2437
        }
2✔
2438

2439
        s.baseIndex[confirmedScid] = link.ShortChanID()
1✔
2440

1✔
2441
        return nil
1✔
2442
}
2443

2444
// GetLinksByInterface fetches all the links connected to a particular node
2445
// identified by the serialized compressed form of its public key.
2446
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
UNCOV
2447
        error) {
×
UNCOV
2448

×
UNCOV
2449
        s.indexMtx.RLock()
×
UNCOV
2450
        defer s.indexMtx.RUnlock()
×
UNCOV
2451

×
UNCOV
2452
        var handlers []ChannelUpdateHandler
×
UNCOV
2453

×
UNCOV
2454
        links, err := s.getLinks(hop)
×
UNCOV
2455
        if err != nil {
×
UNCOV
2456
                return nil, err
×
UNCOV
2457
        }
×
2458

2459
        // Range over the returned []ChannelLink to convert them into
2460
        // []ChannelUpdateHandler.
UNCOV
2461
        for _, link := range links {
×
UNCOV
2462
                handlers = append(handlers, link)
×
UNCOV
2463
        }
×
2464

UNCOV
2465
        return handlers, nil
×
2466
}
2467

2468
// getLinks is function which returns the channel links of the peer by hop
2469
// destination id.
2470
//
2471
// NOTE: This MUST be called with the indexMtx held.
2472
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
76✔
2473
        links, ok := s.interfaceIndex[destination]
76✔
2474
        if !ok {
76✔
UNCOV
2475
                return nil, ErrNoLinksFound
×
UNCOV
2476
        }
×
2477

2478
        channelLinks := make([]ChannelLink, 0, len(links))
76✔
2479
        for _, link := range links {
156✔
2480
                channelLinks = append(channelLinks, link)
80✔
2481
        }
80✔
2482

2483
        return channelLinks, nil
76✔
2484
}
2485

2486
// CircuitModifier returns a reference to subset of the interfaces provided by
2487
// the circuit map, to allow links to open and close circuits.
2488
func (s *Switch) CircuitModifier() CircuitModifier {
215✔
2489
        return s.circuits
215✔
2490
}
215✔
2491

2492
// CircuitLookup returns a reference to subset of the interfaces provided by the
2493
// circuit map, to allow looking up circuits.
UNCOV
2494
func (s *Switch) CircuitLookup() CircuitLookup {
×
UNCOV
2495
        return s.circuits
×
UNCOV
2496
}
×
2497

2498
// commitCircuits persistently adds a circuit to the switch's circuit map.
2499
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2500
        *CircuitFwdActions, error) {
17✔
2501

17✔
2502
        return s.circuits.CommitCircuits(circuits...)
17✔
2503
}
17✔
2504

2505
// FlushForwardingEvents flushes out the set of pending forwarding events to
2506
// the persistent log. This will be used by the switch to periodically flush
2507
// out the set of forwarding events to disk. External callers can also use this
2508
// method to ensure all data is flushed to dis before querying the log.
2509
func (s *Switch) FlushForwardingEvents() error {
209✔
2510
        // First, we'll obtain a copy of the current set of pending forwarding
209✔
2511
        // events.
209✔
2512
        s.fwdEventMtx.Lock()
209✔
2513

209✔
2514
        // If we won't have any forwarding events, then we can exit early.
209✔
2515
        if len(s.pendingFwdingEvents) == 0 {
401✔
2516
                s.fwdEventMtx.Unlock()
192✔
2517
                return nil
192✔
2518
        }
192✔
2519

2520
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
17✔
2521
        copy(events[:], s.pendingFwdingEvents[:])
17✔
2522

17✔
2523
        // With the copy obtained, we can now clear out the header pointer of
17✔
2524
        // the current slice. This way, we can re-use the underlying storage
17✔
2525
        // allocated for the slice.
17✔
2526
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
17✔
2527
        s.fwdEventMtx.Unlock()
17✔
2528

17✔
2529
        // Finally, we'll write out the copied events to the persistent
17✔
2530
        // forwarding log.
17✔
2531
        return s.cfg.FwdingLog.AddForwardingEvents(events)
17✔
2532
}
2533

2534
// BestHeight returns the best height known to the switch.
2535
func (s *Switch) BestHeight() uint32 {
447✔
2536
        return atomic.LoadUint32(&s.bestHeight)
447✔
2537
}
447✔
2538

2539
// dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean
2540
// to determine whether the default fee threshold has been exceeded. This
2541
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2542
// commitment's dust with the mailbox's dust with the amount is checked against
2543
// the fee exposure threshold. If incoming is true, then the amount is not
2544
// included in the sum as it was already included in the commitment's dust. A
2545
// boolean is returned telling the caller whether the HTLC should be failed
2546
// back.
2547
func (s *Switch) dustExceedsFeeThreshold(link ChannelLink,
2548
        amount lnwire.MilliSatoshi, incoming bool) bool {
532✔
2549

532✔
2550
        // Retrieve the link's current commitment feerate and dustClosure.
532✔
2551
        feeRate := link.getFeeRate()
532✔
2552
        isDust := link.getDustClosure()
532✔
2553

532✔
2554
        // Evaluate if the HTLC is dust on either sides' commitment.
532✔
2555
        isLocalDust := isDust(
532✔
2556
                feeRate, incoming, lntypes.Local, amount.ToSatoshis(),
532✔
2557
        )
532✔
2558
        isRemoteDust := isDust(
532✔
2559
                feeRate, incoming, lntypes.Remote, amount.ToSatoshis(),
532✔
2560
        )
532✔
2561

532✔
2562
        if !(isLocalDust || isRemoteDust) {
664✔
2563
                // If the HTLC is not dust on either commitment, it's fine to
132✔
2564
                // forward.
132✔
2565
                return false
132✔
2566
        }
132✔
2567

2568
        // Fetch the dust sums currently in the mailbox for this link.
2569
        cid := link.ChanID()
400✔
2570
        sid := link.ShortChanID()
400✔
2571
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
400✔
2572
        localMailDust, remoteMailDust := mailbox.DustPackets()
400✔
2573

400✔
2574
        // If the htlc is dust on the local commitment, we'll obtain the dust
400✔
2575
        // sum for it.
400✔
2576
        if isLocalDust {
800✔
2577
                localSum := link.getDustSum(
400✔
2578
                        lntypes.Local, fn.None[chainfee.SatPerKWeight](),
400✔
2579
                )
400✔
2580
                localSum += localMailDust
400✔
2581

400✔
2582
                // Optionally include the HTLC amount only for outgoing
400✔
2583
                // HTLCs.
400✔
2584
                if !incoming {
760✔
2585
                        localSum += amount
360✔
2586
                }
360✔
2587

2588
                // Finally check against the defined fee threshold.
2589
                if localSum > s.cfg.MaxFeeExposure {
402✔
2590
                        return true
2✔
2591
                }
2✔
2592
        }
2593

2594
        // Also check if the htlc is dust on the remote commitment, if we've
2595
        // reached this point.
2596
        if isRemoteDust {
796✔
2597
                remoteSum := link.getDustSum(
398✔
2598
                        lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
398✔
2599
                )
398✔
2600
                remoteSum += remoteMailDust
398✔
2601

398✔
2602
                // Optionally include the HTLC amount only for outgoing
398✔
2603
                // HTLCs.
398✔
2604
                if !incoming {
756✔
2605
                        remoteSum += amount
358✔
2606
                }
358✔
2607

2608
                // Finally check against the defined fee threshold.
2609
                if remoteSum > s.cfg.MaxFeeExposure {
398✔
2610
                        return true
×
2611
                }
×
2612
        }
2613

2614
        // If we reached this point, this HTLC is fine to forward.
2615
        return false
398✔
2616
}
2617

2618
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2619
// it to individual mailboxes. It allows the mailboxes to construct a
2620
// FailureMessage when failing back HTLC's due to expiry and may include an
2621
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2622
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2623
// use. The mailboxScid is only used in the non-alias case, so it is always
2624
// the confirmed SCID.
2625
func (s *Switch) failMailboxUpdate(outgoingScid,
2626
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
11✔
2627

11✔
2628
        // Try to use the failAliasUpdate function in case this is a channel
11✔
2629
        // that uses aliases. If it returns nil, we'll fallback to the original
11✔
2630
        // pre-alias behavior.
11✔
2631
        update := s.failAliasUpdate(outgoingScid, false)
11✔
2632
        if update == nil {
16✔
2633
                // Execute the fallback behavior.
5✔
2634
                var err error
5✔
2635
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
5✔
2636
                if err != nil {
5✔
2637
                        return &lnwire.FailTemporaryNodeFailure{}
×
2638
                }
×
2639
        }
2640

2641
        return lnwire.NewTemporaryChannelFailure(update)
11✔
2642
}
2643

2644
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2645
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2646
// the associated channel is not one of these, this function will return nil
2647
// and the caller is expected to handle this properly. In this case, a return
2648
// to the original non-alias behavior is expected.
2649
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2650
        incoming bool) *lnwire.ChannelUpdate1 {
34✔
2651

34✔
2652
        // This function does not defer the unlocking because of the database
34✔
2653
        // lookups for ChannelUpdate.
34✔
2654
        s.indexMtx.RLock()
34✔
2655

34✔
2656
        if s.cfg.IsAlias(scid) {
45✔
2657
                // The alias SCID was used. In the incoming case this means
11✔
2658
                // the channel is zero-conf as the link sets the scid. In the
11✔
2659
                // outgoing case, the sender set the scid to use and may be
11✔
2660
                // either the alias or the confirmed one, if it exists.
11✔
2661
                realScid, ok := s.aliasToReal[scid]
11✔
2662
                if !ok {
11✔
2663
                        // The real, confirmed SCID does not exist yet. Find
×
2664
                        // the "base" SCID that the link uses via the
×
2665
                        // baseIndex. If we can't find it, return nil. This
×
2666
                        // means the channel is zero-conf.
×
2667
                        baseScid, ok := s.baseIndex[scid]
×
2668
                        s.indexMtx.RUnlock()
×
2669
                        if !ok {
×
2670
                                return nil
×
2671
                        }
×
2672

2673
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2674
                        if err != nil {
×
2675
                                return nil
×
2676
                        }
×
2677

2678
                        // Replace the baseScid with the passed-in alias.
2679
                        update.ShortChannelID = scid
×
2680
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2681
                        if err != nil {
×
2682
                                return nil
×
2683
                        }
×
2684

2685
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2686
                        if err != nil {
×
2687
                                return nil
×
2688
                        }
×
2689

2690
                        return update
×
2691
                }
2692

2693
                s.indexMtx.RUnlock()
11✔
2694

11✔
2695
                // Fetch the SCID via the confirmed SCID and replace it with
11✔
2696
                // the alias.
11✔
2697
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
11✔
2698
                if err != nil {
11✔
UNCOV
2699
                        return nil
×
UNCOV
2700
                }
×
2701

2702
                // In the incoming case, we want to ensure that we don't leak
2703
                // the UTXO in case the channel is private. In the outgoing
2704
                // case, since the alias was used, we do the same thing.
2705
                update.ShortChannelID = scid
11✔
2706
                sig, err := s.cfg.SignAliasUpdate(update)
11✔
2707
                if err != nil {
11✔
2708
                        return nil
×
2709
                }
×
2710

2711
                update.Signature, err = lnwire.NewSigFromSignature(sig)
11✔
2712
                if err != nil {
11✔
2713
                        return nil
×
2714
                }
×
2715

2716
                return update
11✔
2717
        }
2718

2719
        // If the confirmed SCID is not in baseIndex, this is not an
2720
        // option-scid-alias or zero-conf channel.
2721
        baseScid, ok := s.baseIndex[scid]
23✔
2722
        if !ok {
41✔
2723
                s.indexMtx.RUnlock()
18✔
2724
                return nil
18✔
2725
        }
18✔
2726

2727
        // Fetch the link so we can get an alias to use in the ShortChannelID
2728
        // of the ChannelUpdate.
2729
        link, ok := s.forwardingIndex[baseScid]
5✔
2730
        s.indexMtx.RUnlock()
5✔
2731
        if !ok {
5✔
2732
                // This should never happen, but if it does for some reason,
×
2733
                // fallback to the old behavior.
×
2734
                return nil
×
2735
        }
×
2736

2737
        aliases := link.getAliases()
5✔
2738
        if len(aliases) == 0 {
5✔
2739
                // This should never happen, but if it does, fallback.
×
2740
                return nil
×
2741
        }
×
2742

2743
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2744
        update, err := s.cfg.FetchLastChannelUpdate(scid)
5✔
2745
        if err != nil {
5✔
2746
                return nil
×
2747
        }
×
2748

2749
        // The incoming case will replace the ShortChannelID in the retrieved
2750
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2751
        // would happen if a private non-zero-conf option-scid-alias
2752
        // feature-bit channel leaked its UTXO here rather than supplying an
2753
        // alias. In the outgoing case, the confirmed SCID was actually used
2754
        // for forwarding in the onion, so no replacement is necessary as the
2755
        // sender knows the scid.
2756
        if incoming {
7✔
2757
                // We will replace and sign the update with the first alias.
2✔
2758
                // Since this happens on the incoming side, it's not actually
2✔
2759
                // possible to know what the sender used in the onion.
2✔
2760
                update.ShortChannelID = aliases[0]
2✔
2761
                sig, err := s.cfg.SignAliasUpdate(update)
2✔
2762
                if err != nil {
2✔
2763
                        return nil
×
2764
                }
×
2765

2766
                update.Signature, err = lnwire.NewSigFromSignature(sig)
2✔
2767
                if err != nil {
2✔
2768
                        return nil
×
2769
                }
×
2770
        }
2771

2772
        return update
5✔
2773
}
2774

2775
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
2776
// that a link has a new alias.
2777
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
2778
        alias lnwire.ShortChannelID) error {
×
2779

×
2780
        // Fetch the link so that we can update the underlying channel's set of
×
2781
        // aliases.
×
2782
        s.indexMtx.RLock()
×
2783
        link, err := s.getLink(chanID)
×
2784
        s.indexMtx.RUnlock()
×
2785
        if err != nil {
×
2786
                return err
×
2787
        }
×
2788

2789
        // If the link is a channel where the option-scid-alias feature bit was
2790
        // not negotiated, we'll return an error.
2791
        if !link.negotiatedAliasFeature() {
×
2792
                return fmt.Errorf("attempted to update non-alias channel")
×
2793
        }
×
2794

2795
        linkScid := link.ShortChanID()
×
2796

×
2797
        // We'll update the maps so the Switch includes this alias in its
×
2798
        // forwarding decisions.
×
2799
        if link.isZeroConf() {
×
2800
                if link.zeroConfConfirmed() {
×
2801
                        // If the channel has confirmed on-chain, we'll
×
2802
                        // add this alias to the aliasToReal map.
×
2803
                        confirmedScid := link.confirmedScid()
×
2804

×
2805
                        s.aliasToReal[alias] = confirmedScid
×
2806
                }
×
2807

2808
                // Add this alias to the baseIndex mapping.
2809
                s.baseIndex[alias] = linkScid
×
2810
        } else if link.negotiatedAliasFeature() {
×
2811
                // The channel is confirmed, so we'll populate the aliasToReal
×
2812
                // and baseIndex maps.
×
2813
                s.aliasToReal[alias] = linkScid
×
2814
                s.baseIndex[alias] = linkScid
×
2815
        }
×
2816

2817
        return nil
×
2818
}
2819

2820
// handlePacketAdd handles forwarding an Add packet.
2821
func (s *Switch) handlePacketAdd(packet *htlcPacket,
2822
        htlc *lnwire.UpdateAddHTLC) error {
82✔
2823

82✔
2824
        // Check if the node is set to reject all onward HTLCs and also make
82✔
2825
        // sure that HTLC is not from the source node.
82✔
2826
        if s.cfg.RejectHTLC {
83✔
2827
                failure := NewDetailedLinkError(
1✔
2828
                        &lnwire.FailChannelDisabled{},
1✔
2829
                        OutgoingFailureForwardsDisabled,
1✔
2830
                )
1✔
2831

1✔
2832
                return s.failAddPacket(packet, failure)
1✔
2833
        }
1✔
2834

2835
        // Before we attempt to find a non-strict forwarding path for this
2836
        // htlc, check whether the htlc is being routed over the same incoming
2837
        // and outgoing channel. If our node does not allow forwards of this
2838
        // nature, we fail the htlc early. This check is in place to disallow
2839
        // inefficiently routed htlcs from locking up our balance. With
2840
        // channels where the option-scid-alias feature was negotiated, we also
2841
        // have to be sure that the IDs aren't the same since one or both could
2842
        // be an alias.
2843
        linkErr := s.checkCircularForward(
81✔
2844
                packet.incomingChanID, packet.outgoingChanID,
81✔
2845
                s.cfg.AllowCircularRoute, htlc.PaymentHash,
81✔
2846
        )
81✔
2847
        if linkErr != nil {
82✔
2848
                return s.failAddPacket(packet, linkErr)
1✔
2849
        }
1✔
2850

2851
        s.indexMtx.RLock()
80✔
2852
        targetLink, err := s.getLinkByMapping(packet)
80✔
2853
        if err != nil {
84✔
2854
                s.indexMtx.RUnlock()
4✔
2855

4✔
2856
                log.Debugf("unable to find link with "+
4✔
2857
                        "destination %v", packet.outgoingChanID)
4✔
2858

4✔
2859
                // If packet was forwarded from another channel link than we
4✔
2860
                // should notify this link that some error occurred.
4✔
2861
                linkError := NewLinkError(
4✔
2862
                        &lnwire.FailUnknownNextPeer{},
4✔
2863
                )
4✔
2864

4✔
2865
                return s.failAddPacket(packet, linkError)
4✔
2866
        }
4✔
2867
        targetPeerKey := targetLink.PeerPubKey()
76✔
2868
        interfaceLinks, _ := s.getLinks(targetPeerKey)
76✔
2869
        s.indexMtx.RUnlock()
76✔
2870

76✔
2871
        // We'll keep track of any HTLC failures during the link selection
76✔
2872
        // process. This way we can return the error for precise link that the
76✔
2873
        // sender selected, while optimistically trying all links to utilize
76✔
2874
        // our available bandwidth.
76✔
2875
        linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
76✔
2876

76✔
2877
        // Find all destination channel links with appropriate bandwidth.
76✔
2878
        var destinations []ChannelLink
76✔
2879
        for _, link := range interfaceLinks {
156✔
2880
                var failure *LinkError
80✔
2881

80✔
2882
                // We'll skip any links that aren't yet eligible for
80✔
2883
                // forwarding.
80✔
2884
                if !link.EligibleToForward() {
84✔
2885
                        failure = NewDetailedLinkError(
4✔
2886
                                &lnwire.FailUnknownNextPeer{},
4✔
2887
                                OutgoingFailureLinkNotEligible,
4✔
2888
                        )
4✔
2889
                } else {
80✔
2890
                        // We'll ensure that the HTLC satisfies the current
76✔
2891
                        // forwarding conditions of this target link.
76✔
2892
                        currentHeight := atomic.LoadUint32(&s.bestHeight)
76✔
2893
                        failure = link.CheckHtlcForward(
76✔
2894
                                htlc.PaymentHash, packet.incomingAmount,
76✔
2895
                                packet.amount, packet.incomingTimeout,
76✔
2896
                                packet.outgoingTimeout, packet.inboundFee,
76✔
2897
                                currentHeight, packet.originalOutgoingChanID,
76✔
2898
                                htlc.CustomRecords,
76✔
2899
                        )
76✔
2900
                }
76✔
2901

2902
                // If this link can forward the htlc, add it to the set of
2903
                // destinations.
2904
                if failure == nil {
140✔
2905
                        destinations = append(destinations, link)
60✔
2906
                        continue
60✔
2907
                }
2908

2909
                linkErrs[link.ShortChanID()] = failure
20✔
2910
        }
2911

2912
        // If we had a forwarding failure due to the HTLC not satisfying the
2913
        // current policy, then we'll send back an error, but ensure we send
2914
        // back the error sourced at the *target* link.
2915
        if len(destinations) == 0 {
92✔
2916
                // At this point, some or all of the links rejected the HTLC so
16✔
2917
                // we couldn't forward it. So we'll try to look up the error
16✔
2918
                // that came from the source.
16✔
2919
                linkErr, ok := linkErrs[packet.outgoingChanID]
16✔
2920
                if !ok {
16✔
2921
                        // If we can't find the error of the source, then we'll
×
2922
                        // return an unknown next peer, though this should
×
2923
                        // never happen.
×
2924
                        linkErr = NewLinkError(
×
2925
                                &lnwire.FailUnknownNextPeer{},
×
2926
                        )
×
2927
                        log.Warnf("unable to find err source for "+
×
2928
                                "outgoing_link=%v, errors=%v",
×
2929
                                packet.outgoingChanID,
×
2930
                                lnutils.SpewLogClosure(linkErrs))
×
2931
                }
×
2932

2933
                log.Tracef("incoming HTLC(%x) violated "+
16✔
2934
                        "target outgoing link (id=%v) policy: %v",
16✔
2935
                        htlc.PaymentHash[:], packet.outgoingChanID,
16✔
2936
                        linkErr)
16✔
2937

16✔
2938
                return s.failAddPacket(packet, linkErr)
16✔
2939
        }
2940

2941
        // Choose a random link out of the set of links that can forward this
2942
        // htlc. The reason for randomization is to evenly distribute the htlc
2943
        // load without making assumptions about what the best channel is.
2944
        //nolint:gosec
2945
        destination := destinations[rand.Intn(len(destinations))]
60✔
2946

60✔
2947
        // Retrieve the incoming link by its ShortChannelID. Note that the
60✔
2948
        // incomingChanID is never set to hop.Source here.
60✔
2949
        s.indexMtx.RLock()
60✔
2950
        incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
60✔
2951
        s.indexMtx.RUnlock()
60✔
2952
        if err != nil {
60✔
2953
                // If we couldn't find the incoming link, we can't evaluate the
×
2954
                // incoming's exposure to dust, so we just fail the HTLC back.
×
2955
                linkErr := NewLinkError(
×
2956
                        &lnwire.FailTemporaryChannelFailure{},
×
2957
                )
×
2958

×
2959
                return s.failAddPacket(packet, linkErr)
×
2960
        }
×
2961

2962
        // Evaluate whether this HTLC would increase our fee exposure over the
2963
        // threshold on the incoming link. If it does, fail it backwards.
2964
        if s.dustExceedsFeeThreshold(
60✔
2965
                incomingLink, packet.incomingAmount, true,
60✔
2966
        ) {
60✔
2967
                // The incoming dust exceeds the threshold, so we fail the add
×
2968
                // back.
×
2969
                linkErr := NewLinkError(
×
2970
                        &lnwire.FailTemporaryChannelFailure{},
×
2971
                )
×
2972

×
2973
                return s.failAddPacket(packet, linkErr)
×
2974
        }
×
2975

2976
        // Also evaluate whether this HTLC would increase our fee exposure over
2977
        // the threshold on the destination link. If it does, fail it back.
2978
        if s.dustExceedsFeeThreshold(
60✔
2979
                destination, packet.amount, false,
60✔
2980
        ) {
61✔
2981
                // The outgoing dust exceeds the threshold, so we fail the add
1✔
2982
                // back.
1✔
2983
                linkErr := NewLinkError(
1✔
2984
                        &lnwire.FailTemporaryChannelFailure{},
1✔
2985
                )
1✔
2986

1✔
2987
                return s.failAddPacket(packet, linkErr)
1✔
2988
        }
1✔
2989

2990
        // Send the packet to the destination channel link which manages the
2991
        // channel.
2992
        packet.outgoingChanID = destination.ShortChanID()
59✔
2993

59✔
2994
        return destination.handleSwitchPacket(packet)
59✔
2995
}
2996

2997
// handlePacketSettle handles forwarding a settle packet.
2998
func (s *Switch) handlePacketSettle(packet *htlcPacket) error {
399✔
2999
        // If the source of this packet has not been set, use the circuit map
399✔
3000
        // to lookup the origin.
399✔
3001
        circuit, err := s.closeCircuit(packet)
399✔
3002

399✔
3003
        // If the circuit is in the process of closing, we will return a nil as
399✔
3004
        // there's another packet handling undergoing.
399✔
3005
        if errors.Is(err, ErrCircuitClosing) {
399✔
UNCOV
3006
                log.Debugf("Circuit is closing for packet=%v", packet)
×
UNCOV
3007
                return nil
×
UNCOV
3008
        }
×
3009

3010
        // Exit early if there's another error.
3011
        if err != nil {
399✔
3012
                return err
×
3013
        }
×
3014

3015
        // closeCircuit returns a nil circuit when a settle packet returns an
3016
        // ErrUnknownCircuit error upon the inner call to CloseCircuit.
3017
        //
3018
        // NOTE: We can only get a nil circuit when it has already been deleted
3019
        // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck`
3020
        // is received, which invokes `processRemoteSettleFails` in its link.
3021
        if circuit == nil {
589✔
3022
                log.Debugf("Circuit already closed for packet=%v", packet)
190✔
3023
                return nil
190✔
3024
        }
190✔
3025

3026
        localHTLC := packet.incomingChanID == hop.Source
209✔
3027

209✔
3028
        // If this is a locally initiated HTLC, we need to handle the packet by
209✔
3029
        // storing the network result.
209✔
3030
        //
209✔
3031
        // A blank IncomingChanID in a circuit indicates that it is a pending
209✔
3032
        // user-initiated payment.
209✔
3033
        //
209✔
3034
        // NOTE: `closeCircuit` modifies the state of `packet`.
209✔
3035
        if localHTLC {
389✔
3036
                // TODO(yy): remove the goroutine and send back the error here.
180✔
3037
                s.wg.Add(1)
180✔
3038
                go s.handleLocalResponse(packet)
180✔
3039

180✔
3040
                // If this is a locally initiated HTLC, there's no need to
180✔
3041
                // forward it so we exit.
180✔
3042
                return nil
180✔
3043
        }
180✔
3044

3045
        // If this is an HTLC settle, and it wasn't from a locally initiated
3046
        // HTLC, then we'll log a forwarding event so we can flush it to disk
3047
        // later.
3048
        if circuit.Outgoing != nil {
58✔
3049
                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
29✔
3050
                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
29✔
3051
                        circuit.PaymentHash[:], circuit.OutgoingAmount,
29✔
3052
                        circuit.IncomingAmount-circuit.OutgoingAmount,
29✔
3053
                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
29✔
3054

29✔
3055
                s.fwdEventMtx.Lock()
29✔
3056
                s.pendingFwdingEvents = append(
29✔
3057
                        s.pendingFwdingEvents,
29✔
3058
                        channeldb.ForwardingEvent{
29✔
3059
                                Timestamp:      time.Now(),
29✔
3060
                                IncomingChanID: circuit.Incoming.ChanID,
29✔
3061
                                OutgoingChanID: circuit.Outgoing.ChanID,
29✔
3062
                                AmtIn:          circuit.IncomingAmount,
29✔
3063
                                AmtOut:         circuit.OutgoingAmount,
29✔
3064
                        },
29✔
3065
                )
29✔
3066
                s.fwdEventMtx.Unlock()
29✔
3067
        }
29✔
3068

3069
        // Deliver this packet.
3070
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
29✔
3071
}
3072

3073
// handlePacketFail handles forwarding a fail packet.
3074
func (s *Switch) handlePacketFail(packet *htlcPacket,
3075
        htlc *lnwire.UpdateFailHTLC) error {
138✔
3076

138✔
3077
        // If the source of this packet has not been set, use the circuit map
138✔
3078
        // to lookup the origin.
138✔
3079
        circuit, err := s.closeCircuit(packet)
138✔
3080
        if err != nil {
138✔
3081
                return err
×
3082
        }
×
3083

3084
        // If this is a locally initiated HTLC, we need to handle the packet by
3085
        // storing the network result.
3086
        //
3087
        // A blank IncomingChanID in a circuit indicates that it is a pending
3088
        // user-initiated payment.
3089
        //
3090
        // NOTE: `closeCircuit` modifies the state of `packet`.
3091
        if packet.incomingChanID == hop.Source {
262✔
3092
                // TODO(yy): remove the goroutine and send back the error here.
124✔
3093
                s.wg.Add(1)
124✔
3094
                go s.handleLocalResponse(packet)
124✔
3095

124✔
3096
                // If this is a locally initiated HTLC, there's no need to
124✔
3097
                // forward it so we exit.
124✔
3098
                return nil
124✔
3099
        }
124✔
3100

3101
        // Exit early if this hasSource is true. This flag is only set via
3102
        // mailbox's `FailAdd`. This method has two callsites,
3103
        // - the packet has timed out after `MailboxDeliveryTimeout`, defaults
3104
        //   to 1 min.
3105
        // - the HTLC fails the validation in `channel.AddHTLC`.
3106
        // In either case, the `Reason` field is populated. Thus there's no
3107
        // need to proceed and extract the failure reason below.
3108
        if packet.hasSource {
21✔
3109
                // Deliver this packet.
7✔
3110
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
7✔
3111
        }
7✔
3112

3113
        // HTLC resolutions and messages restored from disk don't have the
3114
        // obfuscator set from the original htlc add packet - set it here for
3115
        // use in blinded errors.
3116
        packet.obfuscator = circuit.ErrorEncrypter
7✔
3117

7✔
3118
        switch {
7✔
3119
        // No message to encrypt, locally sourced payment.
3120
        case circuit.ErrorEncrypter == nil:
×
3121
                // TODO(yy) further check this case as we shouldn't end up here
3122
                // as `isLocal` is already false.
3123

3124
        // If this is a resolution message, then we'll need to encrypt it as
3125
        // it's actually internally sourced.
UNCOV
3126
        case packet.isResolution:
×
UNCOV
3127
                var err error
×
UNCOV
3128
                // TODO(roasbeef): don't need to pass actually?
×
UNCOV
3129
                failure := &lnwire.FailPermanentChannelFailure{}
×
UNCOV
3130
                htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
×
UNCOV
3131
                        failure,
×
UNCOV
3132
                )
×
UNCOV
3133
                if err != nil {
×
3134
                        err = fmt.Errorf("unable to obfuscate error: %w", err)
×
3135
                        log.Error(err)
×
3136
                }
×
3137

3138
        // Alternatively, if the remote party sends us an
3139
        // UpdateFailMalformedHTLC, then we'll need to convert this into a
3140
        // proper well formatted onion error as there's no HMAC currently.
3141
        case packet.convertedError:
2✔
3142
                log.Infof("Converting malformed HTLC error for circuit for "+
2✔
3143
                        "Circuit(%x: (%s, %d) <-> (%s, %d))",
2✔
3144
                        packet.circuit.PaymentHash,
2✔
3145
                        packet.incomingChanID, packet.incomingHTLCID,
2✔
3146
                        packet.outgoingChanID, packet.outgoingHTLCID)
2✔
3147

2✔
3148
                htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
2✔
3149
                        htlc.Reason,
2✔
3150
                )
2✔
3151

3152
        default:
5✔
3153
                // Otherwise, it's a forwarded error, so we'll perform a
5✔
3154
                // wrapper encryption as normal.
5✔
3155
                htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
5✔
3156
                        htlc.Reason,
5✔
3157
                )
5✔
3158
        }
3159

3160
        // Deliver this packet.
3161
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
7✔
3162
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc