• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12312390362

13 Dec 2024 08:44AM UTC coverage: 57.458% (+8.5%) from 48.92%
12312390362

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

101853 of 177264 relevant lines covered (57.46%)

24972.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.88
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math/rand"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/davecgh/go-spew/spew"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/clock"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
23
        "github.com/lightningnetwork/lnd/kvdb"
24
        "github.com/lightningnetwork/lnd/lntypes"
25
        "github.com/lightningnetwork/lnd/lnutils"
26
        "github.com/lightningnetwork/lnd/lnwallet"
27
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
28
        "github.com/lightningnetwork/lnd/lnwire"
29
        "github.com/lightningnetwork/lnd/ticker"
30
)
31

32
const (
33
        // DefaultFwdEventInterval is the duration between attempts to flush
34
        // pending forwarding events to disk.
35
        DefaultFwdEventInterval = 15 * time.Second
36

37
        // DefaultLogInterval is the duration between attempts to log statistics
38
        // about forwarding events.
39
        DefaultLogInterval = 10 * time.Second
40

41
        // DefaultAckInterval is the duration between attempts to ack any settle
42
        // fails in a forwarding package.
43
        DefaultAckInterval = 15 * time.Second
44

45
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
46
        // be cancelled if they could not get added to an outgoing commitment.
47
        DefaultMailboxDeliveryTimeout = time.Minute
48
)
49

50
var (
51
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
52
        ErrChannelLinkNotFound = errors.New("channel link not found")
53

54
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
55
        // through the switch and is locked into another commitment txn.
56
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
57

58
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
59
        // error decryptor for this payment. This is likely due to restarting
60
        // the daemon.
61
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
62

63
        // ErrSwitchExiting signaled when the switch has received a shutdown
64
        // request.
65
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
66

67
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
68
        // active links in the switch for a specific destination.
69
        ErrNoLinksFound = errors.New("no channel links found")
70

71
        // ErrUnreadableFailureMessage is returned when the failure message
72
        // cannot be decrypted.
73
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
74

75
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
76
        // failed to be processed.
77
        ErrLocalAddFailed = errors.New("local add HTLC failed")
78

79
        // errFeeExposureExceeded is only surfaced to callers of SendHTLC and
80
        // signals that sending the HTLC would exceed the outgoing link's fee
81
        // exposure threshold.
82
        errFeeExposureExceeded = errors.New("fee exposure exceeded")
83

84
        // DefaultMaxFeeExposure is the default threshold after which we'll
85
        // fail payments if they increase our fee exposure. This is currently
86
        // set to 500m msats.
87
        DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000)
88
)
89

90
// plexPacket encapsulates switch packet and adds error channel to receive
91
// error from request handler.
92
type plexPacket struct {
93
        pkt *htlcPacket
94
        err chan error
95
}
96

97
// ChanClose represents a request which close a particular channel specified by
98
// its id.
99
type ChanClose struct {
100
        // CloseType is a variable which signals the type of channel closure the
101
        // peer should execute.
102
        CloseType contractcourt.ChannelCloseType
103

104
        // ChanPoint represent the id of the channel which should be closed.
105
        ChanPoint *wire.OutPoint
106

107
        // TargetFeePerKw is the ideal fee that was specified by the caller.
108
        // This value is only utilized if the closure type is CloseRegular.
109
        // This will be the starting offered fee when the fee negotiation
110
        // process for the cooperative closure transaction kicks off.
111
        TargetFeePerKw chainfee.SatPerKWeight
112

113
        // MaxFee is the highest fee the caller is willing to pay.
114
        //
115
        // NOTE: This field is only respected if the caller is the initiator of
116
        // the channel.
117
        MaxFee chainfee.SatPerKWeight
118

119
        // DeliveryScript is an optional delivery script to pay funds out to.
120
        DeliveryScript lnwire.DeliveryAddress
121

122
        // Updates is used by request creator to receive the notifications about
123
        // execution of the close channel request.
124
        Updates chan interface{}
125

126
        // Err is used by request creator to receive request execution error.
127
        Err chan error
128
}
129

130
// Config defines the configuration for the service. ALL elements within the
131
// configuration MUST be non-nil for the service to carry out its duties.
132
type Config struct {
133
        // FwdingLog is an interface that will be used by the switch to log
134
        // forwarding events. A forwarding event happens each time a payment
135
        // circuit is successfully completed. So when we forward an HTLC, and a
136
        // settle is eventually received.
137
        FwdingLog ForwardingLog
138

139
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
140
        // forced unilateral closure of the channel initiated by a local
141
        // subsystem.
142
        LocalChannelClose func(pubKey []byte, request *ChanClose)
143

144
        // DB is the database backend that will be used to back the switch's
145
        // persistent circuit map.
146
        DB kvdb.Backend
147

148
        // FetchAllOpenChannels is a function that fetches all currently open
149
        // channels from the channel database.
150
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
151

152
        // FetchAllChannels is a function that fetches all pending open, open,
153
        // and waiting close channels from the database.
154
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
155

156
        // FetchClosedChannels is a function that fetches all closed channels
157
        // from the channel database.
158
        FetchClosedChannels func(
159
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
160

161
        // SwitchPackager provides access to the forwarding packages of all
162
        // active channels. This gives the switch the ability to read arbitrary
163
        // forwarding packages, and ack settles and fails contained within them.
164
        SwitchPackager channeldb.FwdOperator
165

166
        // ExtractErrorEncrypter is an interface allowing switch to reextract
167
        // error encrypters stored in the circuit map on restarts, since they
168
        // are not stored directly within the database.
169
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
170

171
        // FetchLastChannelUpdate retrieves the latest routing policy for a
172
        // target channel. This channel will typically be the outgoing channel
173
        // specified when we receive an incoming HTLC.  This will be used to
174
        // provide payment senders our latest policy when sending encrypted
175
        // error messages.
176
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
177
                *lnwire.ChannelUpdate1, error)
178

179
        // Notifier is an instance of a chain notifier that we'll use to signal
180
        // the switch when a new block has arrived.
181
        Notifier chainntnfs.ChainNotifier
182

183
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
184
        // events through.
185
        HtlcNotifier htlcNotifier
186

187
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
188
        // pending forwarding events.
189
        FwdEventTicker ticker.Ticker
190

191
        // LogEventTicker is a signal instructing the htlcswitch to log
192
        // aggregate stats about it's forwarding during the last interval.
193
        LogEventTicker ticker.Ticker
194

195
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
196
        // fails in forwarding packages.
197
        AckEventTicker ticker.Ticker
198

199
        // AllowCircularRoute is true if the user has configured their node to
200
        // allow forwards that arrive and depart our node over the same channel.
201
        AllowCircularRoute bool
202

203
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
204
        // HTLCs that are not from the source hop.
205
        RejectHTLC bool
206

207
        // Clock is a time source for the switch.
208
        Clock clock.Clock
209

210
        // MailboxDeliveryTimeout is the interval after which Adds will be
211
        // cancelled if they have not been yet been delivered to a link. The
212
        // computed deadline will expiry this long after the Adds are added to
213
        // a mailbox via AddPacket.
214
        MailboxDeliveryTimeout time.Duration
215

216
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
217
        // fail incoming or outgoing payments for a particular channel.
218
        MaxFeeExposure lnwire.MilliSatoshi
219

220
        // SignAliasUpdate is used when sending FailureMessages backwards for
221
        // option_scid_alias channels. This avoids a potential privacy leak by
222
        // replacing the public, confirmed SCID with the alias in the
223
        // ChannelUpdate.
224
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
225
                error)
226

227
        // IsAlias returns whether or not a given SCID is an alias.
228
        IsAlias func(scid lnwire.ShortChannelID) bool
229
}
230

231
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
232
// Connected peers with active channels are treated as named interfaces which
233
// refer to active channels as links. A link is the switch's message
234
// communication point with the goroutine that manages an active channel. New
235
// links are registered each time a channel is created, and unregistered once
236
// the channel is closed. The switch manages the hand-off process for multi-hop
237
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
238
// notifies users local-systems concerning their outstanding payment requests.
239
type Switch struct {
240
        started  int32 // To be used atomically.
241
        shutdown int32 // To be used atomically.
242

243
        // bestHeight is the best known height of the main chain. The links will
244
        // be used this information to govern decisions based on HTLC timeouts.
245
        // This will be retrieved by the registered links atomically.
246
        bestHeight uint32
247

248
        wg   sync.WaitGroup
249
        quit chan struct{}
250

251
        // cfg is a copy of the configuration struct that the htlc switch
252
        // service was initialized with.
253
        cfg *Config
254

255
        // networkResults stores the results of payments initiated by the user.
256
        // The store is used to later look up the payments and notify the
257
        // user of the result when they are complete. Each payment attempt
258
        // should be given a unique integer ID when it is created, otherwise
259
        // results might be overwritten.
260
        networkResults *networkResultStore
261

262
        // circuits is storage for payment circuits which are used to
263
        // forward the settle/fail htlc updates back to the add htlc initiator.
264
        circuits CircuitMap
265

266
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
267
        // the switch, and facilitates delayed delivery of packets to links that
268
        // later come online.
269
        mailOrchestrator *mailOrchestrator
270

271
        // indexMtx is a read/write mutex that protects the set of indexes
272
        // below.
273
        indexMtx sync.RWMutex
274

275
        // pendingLinkIndex holds links that have not had their final, live
276
        // short_chan_id assigned.
277
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
278

279
        // links is a map of channel id and channel link which manages
280
        // this channel.
281
        linkIndex map[lnwire.ChannelID]ChannelLink
282

283
        // forwardingIndex is an index which is consulted by the switch when it
284
        // needs to locate the next hop to forward an incoming/outgoing HTLC
285
        // update to/from.
286
        //
287
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
288
        // ChannelLink
289
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
290

291
        // interfaceIndex maps the compressed public key of a peer to all the
292
        // channels that the switch maintains with that peer.
293
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
294

295
        // linkStopIndex stores the currently stopping ChannelLinks,
296
        // represented by their ChannelID. The key is the link's ChannelID and
297
        // the value is a chan that is closed when the link has fully stopped.
298
        // This map is only added to if RemoveLink is called and is not added
299
        // to when the Switch is shutting down and calls Stop() on each link.
300
        //
301
        // MUST be used with the indexMtx.
302
        linkStopIndex map[lnwire.ChannelID]chan struct{}
303

304
        // htlcPlex is the channel which all connected links use to coordinate
305
        // the setup/teardown of Sphinx (onion routing) payment circuits.
306
        // Active links forward any add/settle messages over this channel each
307
        // state transition, sending new adds/settles which are fully locked
308
        // in.
309
        htlcPlex chan *plexPacket
310

311
        // chanCloseRequests is used to transfer the channel close request to
312
        // the channel close handler.
313
        chanCloseRequests chan *ChanClose
314

315
        // resolutionMsgs is the channel that all external contract resolution
316
        // messages will be sent over.
317
        resolutionMsgs chan *resolutionMsg
318

319
        // pendingFwdingEvents is the set of forwarding events which have been
320
        // collected during the current interval, but hasn't yet been written
321
        // to the forwarding log.
322
        fwdEventMtx         sync.Mutex
323
        pendingFwdingEvents []channeldb.ForwardingEvent
324

325
        // blockEpochStream is an active block epoch event stream backed by an
326
        // active ChainNotifier instance. This will be used to retrieve the
327
        // latest height of the chain.
328
        blockEpochStream *chainntnfs.BlockEpochEvent
329

330
        // pendingSettleFails is the set of settle/fail entries that we need to
331
        // ack in the forwarding package of the outgoing link. This was added to
332
        // make pipelining settles more efficient.
333
        pendingSettleFails []channeldb.SettleFailRef
334

335
        // resMsgStore is used to store the set of ResolutionMsg that come from
336
        // contractcourt. This is used so the Switch can properly forward them,
337
        // even on restarts.
338
        resMsgStore *resolutionStore
339

340
        // aliasToReal is a map used for option-scid-alias feature-bit links.
341
        // The alias SCID is the key and the real, confirmed SCID is the value.
342
        // If the channel is unconfirmed, there will not be a mapping for it.
343
        // Since channels can have multiple aliases, this map is essentially a
344
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
345
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
346

347
        // baseIndex is a map used for option-scid-alias feature-bit links.
348
        // The value is the SCID of the link's ShortChannelID. This value may
349
        // be an alias for zero-conf channels or a confirmed SCID for
350
        // non-zero-conf channels with the option-scid-alias feature-bit. The
351
        // key includes the value itself and also any other aliases. This MUST
352
        // be accessed with the indexMtx.
353
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
354
}
355

356
// New creates the new instance of htlc switch.
357
func New(cfg Config, currentHeight uint32) (*Switch, error) {
341✔
358
        resStore := newResolutionStore(cfg.DB)
341✔
359

341✔
360
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
341✔
361
                DB:                    cfg.DB,
341✔
362
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
341✔
363
                FetchClosedChannels:   cfg.FetchClosedChannels,
341✔
364
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
341✔
365
                CheckResolutionMsg:    resStore.checkResolutionMsg,
341✔
366
        })
341✔
367
        if err != nil {
341✔
368
                return nil, err
×
369
        }
×
370

371
        s := &Switch{
341✔
372
                bestHeight:        currentHeight,
341✔
373
                cfg:               &cfg,
341✔
374
                circuits:          circuitMap,
341✔
375
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
341✔
376
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
341✔
377
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
341✔
378
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
341✔
379
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
341✔
380
                networkResults:    newNetworkResultStore(cfg.DB),
341✔
381
                htlcPlex:          make(chan *plexPacket),
341✔
382
                chanCloseRequests: make(chan *ChanClose),
341✔
383
                resolutionMsgs:    make(chan *resolutionMsg),
341✔
384
                resMsgStore:       resStore,
341✔
385
                quit:              make(chan struct{}),
341✔
386
        }
341✔
387

341✔
388
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
341✔
389
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
341✔
390

341✔
391
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
341✔
392
                forwardPackets:    s.ForwardPackets,
341✔
393
                clock:             s.cfg.Clock,
341✔
394
                expiry:            s.cfg.MailboxDeliveryTimeout,
341✔
395
                failMailboxUpdate: s.failMailboxUpdate,
341✔
396
        })
341✔
397

341✔
398
        return s, nil
341✔
399
}
400

401
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
402
// channel. We'll use this channel to synchronize delivery of the message with
403
// the caller.
404
type resolutionMsg struct {
405
        contractcourt.ResolutionMsg
406

407
        errChan chan error
408
}
409

410
// ProcessContractResolution is called by active contract resolvers once a
411
// contract they are watching over has been fully resolved. The message carries
412
// an external signal that *would* have been sent if the outgoing channel
413
// didn't need to go to the chain in order to fulfill a contract. We'll process
414
// this message just as if it came from an active outgoing channel.
415
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
1✔
416
        errChan := make(chan error, 1)
1✔
417

1✔
418
        select {
1✔
419
        case s.resolutionMsgs <- &resolutionMsg{
420
                ResolutionMsg: msg,
421
                errChan:       errChan,
422
        }:
1✔
423
        case <-s.quit:
×
424
                return ErrSwitchExiting
×
425
        }
426

427
        select {
1✔
428
        case err := <-errChan:
1✔
429
                return err
1✔
430
        case <-s.quit:
×
431
                return ErrSwitchExiting
×
432
        }
433
}
434

435
// HasAttemptResult reads the network result store to fetch the specified
436
// attempt. Returns true if the attempt result exists.
437
func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) {
×
438
        _, err := s.networkResults.getResult(attemptID)
×
439
        if err == nil {
×
440
                return true, nil
×
441
        }
×
442

443
        if !errors.Is(err, ErrPaymentIDNotFound) {
×
444
                return false, err
×
445
        }
×
446

447
        return false, nil
×
448
}
449

450
// GetAttemptResult returns the result of the HTLC attempt with the given
451
// attemptID. The paymentHash should be set to the payment's overall hash, or
452
// in case of AMP payments the payment's unique identifier.
453
//
454
// The method returns a channel where the HTLC attempt result will be sent when
455
// available, or an error is encountered during forwarding. When a result is
456
// received on the channel, the HTLC is guaranteed to no longer be in flight.
457
// The switch shutting down is signaled by closing the channel. If the
458
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
459
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
460
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
307✔
461

307✔
462
        var (
307✔
463
                nChan <-chan *networkResult
307✔
464
                err   error
307✔
465
                inKey = CircuitKey{
307✔
466
                        ChanID: hop.Source,
307✔
467
                        HtlcID: attemptID,
307✔
468
                }
307✔
469
        )
307✔
470

307✔
471
        // If the HTLC is not found in the circuit map, check whether a result
307✔
472
        // is already available.
307✔
473
        // Assumption: no one will add this attempt ID other than the caller.
307✔
474
        if s.circuits.LookupCircuit(inKey) == nil {
312✔
475
                res, err := s.networkResults.getResult(attemptID)
5✔
476
                if err != nil {
7✔
477
                        return nil, err
2✔
478
                }
2✔
479
                c := make(chan *networkResult, 1)
3✔
480
                c <- res
3✔
481
                nChan = c
3✔
482
        } else {
302✔
483
                // The HTLC was committed to the circuits, subscribe for a
302✔
484
                // result.
302✔
485
                nChan, err = s.networkResults.subscribeResult(attemptID)
302✔
486
                if err != nil {
302✔
487
                        return nil, err
×
488
                }
×
489
        }
490

491
        resultChan := make(chan *PaymentResult, 1)
305✔
492

305✔
493
        // Since the attempt was known, we can start a goroutine that can
305✔
494
        // extract the result when it is available, and pass it on to the
305✔
495
        // caller.
305✔
496
        s.wg.Add(1)
305✔
497
        go func() {
610✔
498
                defer s.wg.Done()
305✔
499

305✔
500
                var n *networkResult
305✔
501
                select {
305✔
502
                case n = <-nChan:
301✔
503
                case <-s.quit:
4✔
504
                        // We close the result channel to signal a shutdown. We
4✔
505
                        // don't send any result in this case since the HTLC is
4✔
506
                        // still in flight.
4✔
507
                        close(resultChan)
4✔
508
                        return
4✔
509
                }
510

511
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
301✔
512
                        attemptID)
301✔
513

301✔
514
                // Extract the result and pass it to the result channel.
301✔
515
                result, err := s.extractResult(
301✔
516
                        deobfuscator, n, attemptID, paymentHash,
301✔
517
                )
301✔
518
                if err != nil {
301✔
519
                        e := fmt.Errorf("unable to extract result: %w", err)
×
520
                        log.Error(e)
×
521
                        resultChan <- &PaymentResult{
×
522
                                Error: e,
×
523
                        }
×
524
                        return
×
525
                }
×
526
                resultChan <- result
301✔
527
        }()
528

529
        return resultChan, nil
305✔
530
}
531

532
// CleanStore calls the underlying result store, telling it is safe to delete
533
// all entries except the ones in the keepPids map. This should be called
534
// preiodically to let the switch clean up payment results that we have
535
// handled.
536
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
×
537
        return s.networkResults.cleanStore(keepPids)
×
538
}
×
539

540
// SendHTLC is used by other subsystems which aren't belong to htlc switch
541
// package in order to send the htlc update. The attemptID used MUST be unique
542
// for this HTLC, and MUST be used only once, otherwise the switch might reject
543
// it.
544
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
545
        htlc *lnwire.UpdateAddHTLC) error {
416✔
546

416✔
547
        // Generate and send new update packet, if error will be received on
416✔
548
        // this stage it means that packet haven't left boundaries of our
416✔
549
        // system and something wrong happened.
416✔
550
        packet := &htlcPacket{
416✔
551
                incomingChanID: hop.Source,
416✔
552
                incomingHTLCID: attemptID,
416✔
553
                outgoingChanID: firstHop,
416✔
554
                htlc:           htlc,
416✔
555
                amount:         htlc.Amount,
416✔
556
        }
416✔
557

416✔
558
        // Attempt to fetch the target link before creating a circuit so that
416✔
559
        // we don't leave dangling circuits. The getLocalLink method does not
416✔
560
        // require the circuit variable to be set on the *htlcPacket.
416✔
561
        link, linkErr := s.getLocalLink(packet, htlc)
416✔
562
        if linkErr != nil {
420✔
563
                // Notify the htlc notifier of a link failure on our outgoing
4✔
564
                // link. Incoming timelock/amount values are not set because
4✔
565
                // they are not present for local sends.
4✔
566
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
4✔
567
                        newHtlcKey(packet),
4✔
568
                        HtlcInfo{
4✔
569
                                OutgoingTimeLock: htlc.Expiry,
4✔
570
                                OutgoingAmt:      htlc.Amount,
4✔
571
                        },
4✔
572
                        HtlcEventTypeSend,
4✔
573
                        linkErr,
4✔
574
                        false,
4✔
575
                )
4✔
576

4✔
577
                return linkErr
4✔
578
        }
4✔
579

580
        // Evaluate whether this HTLC would bypass our fee exposure. If it
581
        // does, don't send it out and instead return an error.
582
        if s.dustExceedsFeeThreshold(link, htlc.Amount, false) {
413✔
583
                // Notify the htlc notifier of a link failure on our outgoing
1✔
584
                // link. We use the FailTemporaryChannelFailure in place of a
1✔
585
                // more descriptive error message.
1✔
586
                linkErr := NewLinkError(
1✔
587
                        &lnwire.FailTemporaryChannelFailure{},
1✔
588
                )
1✔
589
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
1✔
590
                        newHtlcKey(packet),
1✔
591
                        HtlcInfo{
1✔
592
                                OutgoingTimeLock: htlc.Expiry,
1✔
593
                                OutgoingAmt:      htlc.Amount,
1✔
594
                        },
1✔
595
                        HtlcEventTypeSend,
1✔
596
                        linkErr,
1✔
597
                        false,
1✔
598
                )
1✔
599

1✔
600
                return errFeeExposureExceeded
1✔
601
        }
1✔
602

603
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
411✔
604
        actions, err := s.circuits.CommitCircuits(circuit)
411✔
605
        if err != nil {
411✔
606
                log.Errorf("unable to commit circuit in switch: %v", err)
×
607
                return err
×
608
        }
×
609

610
        // Drop duplicate packet if it has already been seen.
611
        switch {
411✔
612
        case len(actions.Drops) == 1:
1✔
613
                return ErrDuplicateAdd
1✔
614

615
        case len(actions.Fails) == 1:
×
616
                return ErrLocalAddFailed
×
617
        }
618

619
        // Give the packet to the link's mailbox so that HTLC's are properly
620
        // canceled back if the mailbox timeout elapses.
621
        packet.circuit = circuit
410✔
622

410✔
623
        return link.handleSwitchPacket(packet)
410✔
624
}
625

626
// UpdateForwardingPolicies sends a message to the switch to update the
627
// forwarding policies for the set of target channels, keyed in chanPolicies.
628
//
629
// NOTE: This function is synchronous and will block until either the
630
// forwarding policies for all links have been updated, or the switch shuts
631
// down.
632
func (s *Switch) UpdateForwardingPolicies(
633
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
×
634

×
635
        log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure(
×
636
                chanPolicies))
×
637

×
638
        s.indexMtx.RLock()
×
639

×
640
        // Update each link in chanPolicies.
×
641
        for targetLink, policy := range chanPolicies {
×
642
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
×
643

×
644
                link, ok := s.linkIndex[cid]
×
645
                if !ok {
×
646
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
647
                                "link policy", targetLink)
×
648
                        continue
×
649
                }
650

651
                link.UpdateForwardingPolicy(policy)
×
652
        }
653

654
        s.indexMtx.RUnlock()
×
655
}
656

657
// IsForwardedHTLC checks for a given channel and htlc index if it is related
658
// to an opened circuit that represents a forwarded payment.
659
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
660
        htlcIndex uint64) bool {
2✔
661

2✔
662
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
2✔
663
                ChanID: chanID,
2✔
664
                HtlcID: htlcIndex,
2✔
665
        })
2✔
666
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
2✔
667
}
2✔
668

669
// ForwardPackets adds a list of packets to the switch for processing. Fails
670
// and settles are added on a first past, simultaneously constructing circuits
671
// for any adds. After persisting the circuits, another pass of the adds is
672
// given to forward them through the router. The sending link's quit channel is
673
// used to prevent deadlocks when the switch stops a link in the midst of
674
// forwarding.
675
func (s *Switch) ForwardPackets(linkQuit <-chan struct{},
676
        packets ...*htlcPacket) error {
869✔
677

869✔
678
        var (
869✔
679
                // fwdChan is a buffered channel used to receive err msgs from
869✔
680
                // the htlcPlex when forwarding this batch.
869✔
681
                fwdChan = make(chan error, len(packets))
869✔
682

869✔
683
                // numSent keeps a running count of how many packets are
869✔
684
                // forwarded to the switch, which determines how many responses
869✔
685
                // we will wait for on the fwdChan..
869✔
686
                numSent int
869✔
687
        )
869✔
688

869✔
689
        // No packets, nothing to do.
869✔
690
        if len(packets) == 0 {
1,089✔
691
                return nil
220✔
692
        }
220✔
693

694
        // Setup a barrier to prevent the background tasks from processing
695
        // responses until this function returns to the user.
696
        var wg sync.WaitGroup
649✔
697
        wg.Add(1)
649✔
698
        defer wg.Done()
649✔
699

649✔
700
        // Before spawning the following goroutine to proxy our error responses,
649✔
701
        // check to see if we have already been issued a shutdown request. If
649✔
702
        // so, we exit early to avoid incrementing the switch's waitgroup while
649✔
703
        // it is already in the process of shutting down.
649✔
704
        select {
649✔
705
        case <-linkQuit:
×
706
                return nil
×
707
        case <-s.quit:
1✔
708
                return nil
1✔
709
        default:
648✔
710
                // Spawn a goroutine to log the errors returned from failed packets.
648✔
711
                s.wg.Add(1)
648✔
712
                go s.logFwdErrs(&numSent, &wg, fwdChan)
648✔
713
        }
714

715
        // Make a first pass over the packets, forwarding any settles or fails.
716
        // As adds are found, we create a circuit and append it to our set of
717
        // circuits to be written to disk.
718
        var circuits []*PaymentCircuit
648✔
719
        var addBatch []*htlcPacket
648✔
720
        for _, packet := range packets {
1,296✔
721
                switch htlc := packet.htlc.(type) {
648✔
722
                case *lnwire.UpdateAddHTLC:
87✔
723
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
87✔
724
                        packet.circuit = circuit
87✔
725
                        circuits = append(circuits, circuit)
87✔
726
                        addBatch = append(addBatch, packet)
87✔
727
                default:
561✔
728
                        err := s.routeAsync(packet, fwdChan, linkQuit)
561✔
729
                        if err != nil {
571✔
730
                                return fmt.Errorf("failed to forward packet %w",
10✔
731
                                        err)
10✔
732
                        }
10✔
733
                        numSent++
536✔
734
                }
735
        }
736

737
        // If this batch did not contain any circuits to commit, we can return
738
        // early.
739
        if len(circuits) == 0 {
1,159✔
740
                return nil
536✔
741
        }
536✔
742

743
        // Write any circuits that we found to disk.
744
        actions, err := s.circuits.CommitCircuits(circuits...)
87✔
745
        if err != nil {
87✔
746
                log.Errorf("unable to commit circuits in switch: %v", err)
×
747
        }
×
748

749
        // Split the htlc packets by comparing an in-order seek to the head of
750
        // the added, dropped, or failed circuits.
751
        //
752
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
753
        // circuits, and that the union of the sets results in the original set
754
        // of circuits.
755
        var addedPackets, failedPackets []*htlcPacket
87✔
756
        for _, packet := range addBatch {
174✔
757
                switch {
87✔
758
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
83✔
759
                        addedPackets = append(addedPackets, packet)
83✔
760
                        actions.Adds = actions.Adds[1:]
83✔
761

762
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
1✔
763
                        actions.Drops = actions.Drops[1:]
1✔
764

765
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
3✔
766
                        failedPackets = append(failedPackets, packet)
3✔
767
                        actions.Fails = actions.Fails[1:]
3✔
768
                }
769
        }
770

771
        // Now, forward any packets for circuits that were successfully added to
772
        // the switch's circuit map.
773
        for _, packet := range addedPackets {
170✔
774
                err := s.routeAsync(packet, fwdChan, linkQuit)
83✔
775
                if err != nil {
84✔
776
                        return fmt.Errorf("failed to forward packet %w", err)
1✔
777
                }
1✔
778
                numSent++
82✔
779
        }
780

781
        // Lastly, for any packets that failed, this implies that they were
782
        // left in a half added state, which can happen when recovering from
783
        // failures.
784
        if len(failedPackets) > 0 {
89✔
785
                var failure lnwire.FailureMessage
3✔
786
                incomingID := failedPackets[0].incomingChanID
3✔
787

3✔
788
                // If the incoming channel is an option_scid_alias channel,
3✔
789
                // then we'll need to replace the SCID in the ChannelUpdate.
3✔
790
                update := s.failAliasUpdate(incomingID, true)
3✔
791
                if update == nil {
4✔
792
                        // Fallback to the original non-option behavior.
1✔
793
                        update, err := s.cfg.FetchLastChannelUpdate(
1✔
794
                                incomingID,
1✔
795
                        )
1✔
796
                        if err != nil {
1✔
797
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
798
                        } else {
1✔
799
                                failure = lnwire.NewTemporaryChannelFailure(
1✔
800
                                        update,
1✔
801
                                )
1✔
802
                        }
1✔
803
                } else {
2✔
804
                        // This is an option_scid_alias channel.
2✔
805
                        failure = lnwire.NewTemporaryChannelFailure(update)
2✔
806
                }
2✔
807

808
                linkError := NewDetailedLinkError(
3✔
809
                        failure, OutgoingFailureIncompleteForward,
3✔
810
                )
3✔
811

3✔
812
                for _, packet := range failedPackets {
6✔
813
                        // We don't handle the error here since this method
3✔
814
                        // always returns an error.
3✔
815
                        _ = s.failAddPacket(packet, linkError)
3✔
816
                }
3✔
817
        }
818

819
        return nil
86✔
820
}
821

822
// logFwdErrs logs any errors received on `fwdChan`.
823
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
648✔
824
        defer s.wg.Done()
648✔
825

648✔
826
        // Wait here until the outer function has finished persisting
648✔
827
        // and routing the packets. This guarantees we don't read from num until
648✔
828
        // the value is accurate.
648✔
829
        wg.Wait()
648✔
830

648✔
831
        numSent := *num
648✔
832
        for i := 0; i < numSent; i++ {
1,266✔
833
                select {
618✔
834
                case err := <-fwdChan:
617✔
835
                        if err != nil {
640✔
836
                                log.Errorf("Unhandled error while reforwarding htlc "+
23✔
837
                                        "settle/fail over htlcswitch: %v", err)
23✔
838
                        }
23✔
839
                case <-s.quit:
1✔
840
                        log.Errorf("unable to forward htlc packet " +
1✔
841
                                "htlc switch was stopped")
1✔
842
                        return
1✔
843
                }
844
        }
845
}
846

847
// routeAsync sends a packet through the htlc switch, using the provided err
848
// chan to propagate errors back to the caller. The link's quit channel is
849
// provided so that the send can be canceled if either the link or the switch
850
// receive a shutdown requuest. This method does not wait for a response from
851
// the htlcForwarder before returning.
852
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
853
        linkQuit <-chan struct{}) error {
644✔
854

644✔
855
        command := &plexPacket{
644✔
856
                pkt: packet,
644✔
857
                err: errChan,
644✔
858
        }
644✔
859

644✔
860
        select {
644✔
861
        case s.htlcPlex <- command:
618✔
862
                return nil
618✔
863
        case <-linkQuit:
11✔
864
                return ErrLinkShuttingDown
11✔
865
        case <-s.quit:
×
866
                return errors.New("htlc switch was stopped")
×
867
        }
868
}
869

870
// getLocalLink handles the addition of a htlc for a send that originates from
871
// our node. It returns the link that the htlc should be forwarded outwards on,
872
// and a link error if the htlc cannot be forwarded.
873
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
874
        ChannelLink, *LinkError) {
416✔
875

416✔
876
        // Try to find links by node destination.
416✔
877
        s.indexMtx.RLock()
416✔
878
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
416✔
879
        defer s.indexMtx.RUnlock()
416✔
880
        if err != nil {
417✔
881
                // If the link was not found for the outgoingChanID, an outside
1✔
882
                // subsystem may be using the confirmed SCID of a zero-conf
1✔
883
                // channel. In this case, we'll consult the Switch maps to see
1✔
884
                // if an alias exists and use the alias to lookup the link.
1✔
885
                // This extra step is a consequence of not updating the Switch
1✔
886
                // forwardingIndex when a zero-conf channel is confirmed. We
1✔
887
                // don't need to change the outgoingChanID since the link will
1✔
888
                // do that upon receiving the packet.
1✔
889
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
1✔
890
                if !ok {
1✔
891
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
×
892
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
893
                }
×
894

895
                // The base SCID was found, so we'll use that to fetch the
896
                // link.
897
                link, err = s.getLinkByShortID(baseScid)
1✔
898
                if err != nil {
1✔
899
                        log.Errorf("Link %v not found", baseScid)
×
900
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
901
                }
×
902
        }
903

904
        if !link.EligibleToForward() {
417✔
905
                log.Errorf("Link %v is not available to forward",
1✔
906
                        pkt.outgoingChanID)
1✔
907

1✔
908
                // The update does not need to be populated as the error
1✔
909
                // will be returned back to the router.
1✔
910
                return nil, NewDetailedLinkError(
1✔
911
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
912
                        OutgoingFailureLinkNotEligible,
1✔
913
                )
1✔
914
        }
1✔
915

916
        // Ensure that the htlc satisfies the outgoing channel policy.
917
        currentHeight := atomic.LoadUint32(&s.bestHeight)
415✔
918
        htlcErr := link.CheckHtlcTransit(
415✔
919
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
415✔
920
                htlc.CustomRecords,
415✔
921
        )
415✔
922
        if htlcErr != nil {
418✔
923
                log.Errorf("Link %v policy for local forward not "+
3✔
924
                        "satisfied", pkt.outgoingChanID)
3✔
925
                return nil, htlcErr
3✔
926
        }
3✔
927
        return link, nil
412✔
928
}
929

930
// handleLocalResponse processes a Settle or Fail responding to a
931
// locally-initiated payment. This is handled asynchronously to avoid blocking
932
// the main event loop within the switch, as these operations can require
933
// multiple db transactions. The guarantees of the circuit map are stringent
934
// enough such that we are able to tolerate reordering of these operations
935
// without side effects. The primary operations handled are:
936
//  1. Save the payment result to the pending payment store.
937
//  2. Notify subscribers about the payment result.
938
//  3. Ack settle/fail references, to avoid resending this response internally
939
//  4. Teardown the closing circuit in the circuit map
940
//
941
// NOTE: This method MUST be spawned as a goroutine.
942
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
304✔
943
        defer s.wg.Done()
304✔
944

304✔
945
        attemptID := pkt.incomingHTLCID
304✔
946

304✔
947
        // The error reason will be unencypted in case this a local
304✔
948
        // failure or a converted error.
304✔
949
        unencrypted := pkt.localFailure || pkt.convertedError
304✔
950
        n := &networkResult{
304✔
951
                msg:          pkt.htlc,
304✔
952
                unencrypted:  unencrypted,
304✔
953
                isResolution: pkt.isResolution,
304✔
954
        }
304✔
955

304✔
956
        // Store the result to the db. This will also notify subscribers about
304✔
957
        // the result.
304✔
958
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
304✔
959
                log.Errorf("Unable to store attempt result for pid=%v: %v",
×
960
                        attemptID, err)
×
961
                return
×
962
        }
×
963

964
        // First, we'll clean up any fwdpkg references, circuit entries, and
965
        // mark in our db that the payment for this payment hash has either
966
        // succeeded or failed.
967
        //
968
        // If this response is contained in a forwarding package, we'll start by
969
        // acking the settle/fail so that we don't continue to retransmit the
970
        // HTLC internally.
971
        if pkt.destRef != nil {
422✔
972
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
118✔
973
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
974
                                *pkt.destRef, err)
×
975
                        return
×
976
                }
×
977
        }
978

979
        // Next, we'll remove the circuit since we are about to complete an
980
        // fulfill/fail of this HTLC. Since we've already removed the
981
        // settle/fail fwdpkg reference, the response from the peer cannot be
982
        // replayed internally if this step fails. If this happens, this logic
983
        // will be executed when a provided resolution message comes through.
984
        // This can only happen if the circuit is still open, which is why this
985
        // ordering is chosen.
986
        if err := s.teardownCircuit(pkt); err != nil {
304✔
987
                log.Errorf("Unable to teardown circuit %s: %v",
×
988
                        pkt.inKey(), err)
×
989
                return
×
990
        }
×
991

992
        // Finally, notify on the htlc failure or success that has been handled.
993
        key := newHtlcKey(pkt)
304✔
994
        eventType := getEventType(pkt)
304✔
995

304✔
996
        switch htlc := pkt.htlc.(type) {
304✔
997
        case *lnwire.UpdateFulfillHTLC:
180✔
998
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
180✔
999
                        eventType)
180✔
1000

1001
        case *lnwire.UpdateFailHTLC:
124✔
1002
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
124✔
1003
        }
1004
}
1005

1006
// extractResult uses the given deobfuscator to extract the payment result from
1007
// the given network message.
1008
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
1009
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
301✔
1010

301✔
1011
        switch htlc := n.msg.(type) {
301✔
1012

1013
        // We've received a settle update which means we can finalize the user
1014
        // payment and return successful response.
1015
        case *lnwire.UpdateFulfillHTLC:
177✔
1016
                return &PaymentResult{
177✔
1017
                        Preimage: htlc.PaymentPreimage,
177✔
1018
                }, nil
177✔
1019

1020
        // We've received a fail update which means we can finalize the
1021
        // user payment and return fail response.
1022
        case *lnwire.UpdateFailHTLC:
124✔
1023
                // TODO(yy): construct deobfuscator here to avoid creating it
124✔
1024
                // in paymentLifecycle even for settled HTLCs.
124✔
1025
                paymentErr := s.parseFailedPayment(
124✔
1026
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
124✔
1027
                        n.isResolution, htlc,
124✔
1028
                )
124✔
1029

124✔
1030
                return &PaymentResult{
124✔
1031
                        Error: paymentErr,
124✔
1032
                }, nil
124✔
1033

1034
        default:
×
1035
                return nil, fmt.Errorf("received unknown response type: %T",
×
1036
                        htlc)
×
1037
        }
1038
}
1039

1040
// parseFailedPayment determines the appropriate failure message to return to
1041
// a user initiated payment. The three cases handled are:
1042
//  1. An unencrypted failure, which should already plaintext.
1043
//  2. A resolution from the chain arbitrator, which possibly has no failure
1044
//     reason attached.
1045
//  3. A failure from the remote party, which will need to be decrypted using
1046
//     the payment deobfuscator.
1047
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1048
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1049
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
124✔
1050

124✔
1051
        switch {
124✔
1052

1053
        // The payment never cleared the link, so we don't need to
1054
        // decrypt the error, simply decode it them report back to the
1055
        // user.
1056
        case unencrypted:
5✔
1057
                r := bytes.NewReader(htlc.Reason)
5✔
1058
                failureMsg, err := lnwire.DecodeFailure(r, 0)
5✔
1059
                if err != nil {
5✔
1060
                        // If we could not decode the failure reason, return a link
×
1061
                        // error indicating that we failed to decode the onion.
×
1062
                        linkError := NewDetailedLinkError(
×
1063
                                // As this didn't even clear the link, we don't
×
1064
                                // need to apply an update here since it goes
×
1065
                                // directly to the router.
×
1066
                                lnwire.NewTemporaryChannelFailure(nil),
×
1067
                                OutgoingFailureDecodeError,
×
1068
                        )
×
1069

×
1070
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1071
                                linkError.FailureDetail.FailureString(),
×
1072
                                paymentHash, attemptID, err)
×
1073

×
1074
                        return linkError
×
1075
                }
×
1076

1077
                // If we successfully decoded the failure reason, return it.
1078
                return NewLinkError(failureMsg)
5✔
1079

1080
        // A payment had to be timed out on chain before it got past
1081
        // the first hop. In this case, we'll report a permanent
1082
        // channel failure as this means us, or the remote party had to
1083
        // go on chain.
1084
        case isResolution && htlc.Reason == nil:
×
1085
                linkError := NewDetailedLinkError(
×
1086
                        &lnwire.FailPermanentChannelFailure{},
×
1087
                        OutgoingFailureOnChainTimeout,
×
1088
                )
×
1089

×
1090
                log.Infof("%v: hash=%v, pid=%d",
×
1091
                        linkError.FailureDetail.FailureString(),
×
1092
                        paymentHash, attemptID)
×
1093

×
1094
                return linkError
×
1095

1096
        // A regular multi-hop payment error that we'll need to
1097
        // decrypt.
1098
        default:
119✔
1099
                // We'll attempt to fully decrypt the onion encrypted
119✔
1100
                // error. If we're unable to then we'll bail early.
119✔
1101
                failure, err := deobfuscator.DecryptError(htlc.Reason)
119✔
1102
                if err != nil {
120✔
1103
                        log.Errorf("unable to de-obfuscate onion failure "+
1✔
1104
                                "(hash=%v, pid=%d): %v",
1✔
1105
                                paymentHash, attemptID, err)
1✔
1106

1✔
1107
                        return ErrUnreadableFailureMessage
1✔
1108
                }
1✔
1109

1110
                return failure
118✔
1111
        }
1112
}
1113

1114
// handlePacketForward is used in cases when we need forward the htlc update
1115
// from one channel link to another and be able to propagate the settle/fail
1116
// updates back. This behaviour is achieved by creation of payment circuits.
1117
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
619✔
1118
        switch htlc := packet.htlc.(type) {
619✔
1119
        // Channel link forwarded us a new htlc, therefore we initiate the
1120
        // payment circuit within our internal state so we can properly forward
1121
        // the ultimate settle message back latter.
1122
        case *lnwire.UpdateAddHTLC:
82✔
1123
                return s.handlePacketAdd(packet, htlc)
82✔
1124

1125
        case *lnwire.UpdateFulfillHTLC:
399✔
1126
                return s.handlePacketSettle(packet)
399✔
1127

1128
        // Channel link forwarded us an update_fail_htlc message.
1129
        //
1130
        // NOTE: when the channel link receives an update_fail_malformed_htlc
1131
        // from upstream, it will convert the message into update_fail_htlc and
1132
        // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC`
1133
        // here.
1134
        case *lnwire.UpdateFailHTLC:
138✔
1135
                return s.handlePacketFail(packet, htlc)
138✔
1136

1137
        default:
×
1138
                return fmt.Errorf("wrong update type: %T", htlc)
×
1139
        }
1140
}
1141

1142
// checkCircularForward checks whether a forward is circular (arrives and
1143
// departs on the same link) and returns a link error if the switch is
1144
// configured to disallow this behaviour.
1145
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1146
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
90✔
1147

90✔
1148
        // If they are equal, we can skip the alias mapping checks.
90✔
1149
        if incoming == outgoing {
94✔
1150
                // The switch may be configured to allow circular routes, so
4✔
1151
                // just log and return nil.
4✔
1152
                if allowCircular {
6✔
1153
                        log.Debugf("allowing circular route over link: %v "+
2✔
1154
                                "(payment hash: %x)", incoming, paymentHash)
2✔
1155
                        return nil
2✔
1156
                }
2✔
1157

1158
                // Otherwise, we'll return a temporary channel failure.
1159
                return NewDetailedLinkError(
2✔
1160
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1161
                        OutgoingFailureCircularRoute,
2✔
1162
                )
2✔
1163
        }
1164

1165
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1166
        // outgoing SCIDs. If either one does not have a base SCID, then the
1167
        // two channels are not equal since one will be a channel that does not
1168
        // need a mapping and SCID equality was checked above. If the "base"
1169
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1170
        s.indexMtx.RLock()
86✔
1171
        incomingBaseScid, ok := s.baseIndex[incoming]
86✔
1172
        if !ok {
166✔
1173
                // This channel does not use baseIndex, bail out.
80✔
1174
                s.indexMtx.RUnlock()
80✔
1175
                return nil
80✔
1176
        }
80✔
1177

1178
        outgoingBaseScid, ok := s.baseIndex[outgoing]
6✔
1179
        if !ok {
8✔
1180
                // This channel does not use baseIndex, bail out.
2✔
1181
                s.indexMtx.RUnlock()
2✔
1182
                return nil
2✔
1183
        }
2✔
1184
        s.indexMtx.RUnlock()
4✔
1185

4✔
1186
        // Check base SCID equality.
4✔
1187
        if incomingBaseScid != outgoingBaseScid {
4✔
1188
                // The base SCIDs are not equal so these are not the same
×
1189
                // channel.
×
1190
                return nil
×
1191
        }
×
1192

1193
        // If the incoming and outgoing link are equal, the htlc is part of a
1194
        // circular route which may be used to lock up our liquidity. If the
1195
        // switch is configured to allow circular routes, log that we are
1196
        // allowing the route then return nil.
1197
        if allowCircular {
6✔
1198
                log.Debugf("allowing circular route over link: %v "+
2✔
1199
                        "(payment hash: %x)", incoming, paymentHash)
2✔
1200
                return nil
2✔
1201
        }
2✔
1202

1203
        // If our node disallows circular routes, return a temporary channel
1204
        // failure. There is nothing wrong with the policy used by the remote
1205
        // node, so we do not include a channel update.
1206
        return NewDetailedLinkError(
2✔
1207
                lnwire.NewTemporaryChannelFailure(nil),
2✔
1208
                OutgoingFailureCircularRoute,
2✔
1209
        )
2✔
1210
}
1211

1212
// failAddPacket encrypts a fail packet back to an add packet's source.
1213
// The ciphertext will be derived from the failure message proivded by context.
1214
// This method returns the failErr if all other steps complete successfully.
1215
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
26✔
1216
        // Encrypt the failure so that the sender will be able to read the error
26✔
1217
        // message. Since we failed this packet, we use EncryptFirstHop to
26✔
1218
        // obfuscate the failure for their eyes only.
26✔
1219
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
26✔
1220
        if err != nil {
26✔
1221
                err := fmt.Errorf("unable to obfuscate "+
×
1222
                        "error: %v", err)
×
1223
                log.Error(err)
×
1224
                return err
×
1225
        }
×
1226

1227
        log.Error(failure.Error())
26✔
1228

26✔
1229
        // Create a failure packet for this htlc. The full set of
26✔
1230
        // information about the htlc failure is included so that they can
26✔
1231
        // be included in link failure notifications.
26✔
1232
        failPkt := &htlcPacket{
26✔
1233
                sourceRef:       packet.sourceRef,
26✔
1234
                incomingChanID:  packet.incomingChanID,
26✔
1235
                incomingHTLCID:  packet.incomingHTLCID,
26✔
1236
                outgoingChanID:  packet.outgoingChanID,
26✔
1237
                outgoingHTLCID:  packet.outgoingHTLCID,
26✔
1238
                incomingAmount:  packet.incomingAmount,
26✔
1239
                amount:          packet.amount,
26✔
1240
                incomingTimeout: packet.incomingTimeout,
26✔
1241
                outgoingTimeout: packet.outgoingTimeout,
26✔
1242
                circuit:         packet.circuit,
26✔
1243
                obfuscator:      packet.obfuscator,
26✔
1244
                linkFailure:     failure,
26✔
1245
                htlc: &lnwire.UpdateFailHTLC{
26✔
1246
                        Reason: reason,
26✔
1247
                },
26✔
1248
        }
26✔
1249

26✔
1250
        // Route a fail packet back to the source link.
26✔
1251
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
26✔
1252
        if err != nil {
26✔
1253
                err = fmt.Errorf("source chanid=%v unable to "+
×
1254
                        "handle switch packet: %v",
×
1255
                        packet.incomingChanID, err)
×
1256
                log.Error(err)
×
1257
                return err
×
1258
        }
×
1259

1260
        return failure
26✔
1261
}
1262

1263
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1264
// attempts to determine the source that forwarded this htlc. This method will
1265
// set the incoming chan and htlc ID of the given packet if the source was
1266
// found, and will properly [re]encrypt any failure messages.
1267
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
537✔
1268
        // If the packet has its source, that means it was failed locally by
537✔
1269
        // the outgoing link. We fail it here to make sure only one response
537✔
1270
        // makes it through the switch.
537✔
1271
        if pkt.hasSource {
548✔
1272
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
11✔
1273
                switch err {
11✔
1274

1275
                // Circuit successfully closed.
1276
                case nil:
11✔
1277
                        return circuit, nil
11✔
1278

1279
                // Circuit was previously closed, but has not been deleted.
1280
                // We'll just drop this response until the circuit has been
1281
                // fully removed.
1282
                case ErrCircuitClosing:
×
1283
                        return nil, err
×
1284

1285
                // Failed to close circuit because it does not exist. This is
1286
                // likely because the circuit was already successfully closed.
1287
                // Since this packet failed locally, there is no forwarding
1288
                // package entry to acknowledge.
1289
                case ErrUnknownCircuit:
×
1290
                        return nil, err
×
1291

1292
                // Unexpected error.
1293
                default:
×
1294
                        return nil, err
×
1295
                }
1296
        }
1297

1298
        // Otherwise, this is packet was received from the remote party.  Use
1299
        // circuit map to find the incoming link to receive the settle/fail.
1300
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
526✔
1301
        switch err {
526✔
1302

1303
        // Open circuit successfully closed.
1304
        case nil:
336✔
1305
                pkt.incomingChanID = circuit.Incoming.ChanID
336✔
1306
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
336✔
1307
                pkt.circuit = circuit
336✔
1308
                pkt.sourceRef = &circuit.AddRef
336✔
1309

336✔
1310
                pktType := "SETTLE"
336✔
1311
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
463✔
1312
                        pktType = "FAIL"
127✔
1313
                }
127✔
1314

1315
                log.Debugf("Closed completed %s circuit for %x: "+
336✔
1316
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
336✔
1317
                        pkt.incomingChanID, pkt.incomingHTLCID,
336✔
1318
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
336✔
1319

336✔
1320
                return circuit, nil
336✔
1321

1322
        // Circuit was previously closed, but has not been deleted. We'll just
1323
        // drop this response until the circuit has been removed.
1324
        case ErrCircuitClosing:
×
1325
                return nil, err
×
1326

1327
        // Failed to close circuit because it does not exist. This is likely
1328
        // because the circuit was already successfully closed.
1329
        case ErrUnknownCircuit:
190✔
1330
                if pkt.destRef != nil {
379✔
1331
                        // Add this SettleFailRef to the set of pending settle/fail entries
189✔
1332
                        // awaiting acknowledgement.
189✔
1333
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
189✔
1334
                }
189✔
1335

1336
                // If this is a settle, we will not log an error message as settles
1337
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1338
                // can hit this case if the link restarts after having just sent a fail
1339
                // to the switch.
1340
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
190✔
1341
                if !isSettle {
190✔
1342
                        err := fmt.Errorf("unable to find target channel "+
×
1343
                                "for HTLC fail: channel ID = %s, "+
×
1344
                                "HTLC ID = %d", pkt.outgoingChanID,
×
1345
                                pkt.outgoingHTLCID)
×
1346
                        log.Error(err)
×
1347

×
1348
                        return nil, err
×
1349
                }
×
1350

1351
                return nil, nil
190✔
1352

1353
        // Unexpected error.
1354
        default:
×
1355
                return nil, err
×
1356
        }
1357
}
1358

1359
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1360
// forwarding package of the outgoing link for a payment circuit. We do this if
1361
// we're the originator of the payment, so the link stops attempting to
1362
// re-broadcast.
1363
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
118✔
1364
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
236✔
1365
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
118✔
1366
        })
118✔
1367
}
1368

1369
// teardownCircuit removes a pending or open circuit from the switch's circuit
1370
// map and prints useful logging statements regarding the outcome.
1371
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
314✔
1372
        var pktType string
314✔
1373
        switch htlc := pkt.htlc.(type) {
314✔
1374
        case *lnwire.UpdateFulfillHTLC:
186✔
1375
                pktType = "SETTLE"
186✔
1376
        case *lnwire.UpdateFailHTLC:
128✔
1377
                pktType = "FAIL"
128✔
1378
        default:
×
1379
                return fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1380
        }
1381

1382
        var paymentHash lntypes.Hash
314✔
1383

314✔
1384
        // Perform a defensive check to make sure we don't try to access a nil
314✔
1385
        // circuit.
314✔
1386
        circuit := pkt.circuit
314✔
1387
        if circuit != nil {
628✔
1388
                copy(paymentHash[:], circuit.PaymentHash[:])
314✔
1389
        }
314✔
1390

1391
        log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+
314✔
1392
                "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
314✔
1393

314✔
1394
        err := s.circuits.DeleteCircuits(pkt.inKey())
314✔
1395
        if err != nil {
314✔
1396
                log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+
×
1397
                        "with payment_hash=%v using %s pkt", pkt.incomingChanID,
×
1398
                        pkt.incomingHTLCID, pkt.outgoingChanID,
×
1399
                        pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType)
×
1400

×
1401
                return err
×
1402
        }
×
1403

1404
        log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType,
314✔
1405
                paymentHash, pkt.incomingChanID, pkt.incomingHTLCID,
314✔
1406
                pkt.outgoingChanID, pkt.outgoingHTLCID)
314✔
1407

314✔
1408
        return nil
314✔
1409
}
1410

1411
// CloseLink creates and sends the close channel command to the target link
1412
// directing the specified closure type. If the closure type is CloseRegular,
1413
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1414
// a starting point for close negotiation. The deliveryScript parameter is an
1415
// optional parameter which sets a user specified script to close out to.
1416
func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
1417
        closeType contractcourt.ChannelCloseType,
1418
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
1419
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
×
1420

×
1421
        // TODO(roasbeef) abstract out the close updates.
×
1422
        updateChan := make(chan interface{}, 2)
×
1423
        errChan := make(chan error, 1)
×
1424

×
1425
        command := &ChanClose{
×
1426
                CloseType:      closeType,
×
1427
                ChanPoint:      chanPoint,
×
1428
                Updates:        updateChan,
×
1429
                TargetFeePerKw: targetFeePerKw,
×
1430
                MaxFee:         maxFee,
×
1431
                DeliveryScript: deliveryScript,
×
1432
                Err:            errChan,
×
1433
        }
×
1434

×
1435
        select {
×
1436
        case s.chanCloseRequests <- command:
×
1437
                return updateChan, errChan
×
1438

1439
        case <-s.quit:
×
1440
                errChan <- ErrSwitchExiting
×
1441
                close(updateChan)
×
1442
                return updateChan, errChan
×
1443
        }
1444
}
1445

1446
// htlcForwarder is responsible for optimally forwarding (and possibly
1447
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1448
// links. The duties of the forwarder are similar to that of a network switch,
1449
// in that it facilitates multi-hop payments by acting as a central messaging
1450
// bus. The switch communicates will active links to create, manage, and tear
1451
// down active onion routed payments. Each active channel is modeled as
1452
// networked device with metadata such as the available payment bandwidth, and
1453
// total link capacity.
1454
//
1455
// NOTE: This MUST be run as a goroutine.
1456
func (s *Switch) htlcForwarder() {
207✔
1457
        defer s.wg.Done()
207✔
1458

207✔
1459
        defer func() {
414✔
1460
                s.blockEpochStream.Cancel()
207✔
1461

207✔
1462
                // Remove all links once we've been signalled for shutdown.
207✔
1463
                var linksToStop []ChannelLink
207✔
1464
                s.indexMtx.Lock()
207✔
1465
                for _, link := range s.linkIndex {
499✔
1466
                        activeLink := s.removeLink(link.ChanID())
292✔
1467
                        if activeLink == nil {
292✔
1468
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1469
                                        "on stop", link.ChanID())
×
1470
                                continue
×
1471
                        }
1472
                        linksToStop = append(linksToStop, activeLink)
292✔
1473
                }
1474
                for _, link := range s.pendingLinkIndex {
208✔
1475
                        pendingLink := s.removeLink(link.ChanID())
1✔
1476
                        if pendingLink == nil {
1✔
1477
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1478
                                        "on stop", link.ChanID())
×
1479
                                continue
×
1480
                        }
1481
                        linksToStop = append(linksToStop, pendingLink)
1✔
1482
                }
1483
                s.indexMtx.Unlock()
207✔
1484

207✔
1485
                // Now that all pending and live links have been removed from
207✔
1486
                // the forwarding indexes, stop each one before shutting down.
207✔
1487
                // We'll shut them down in parallel to make exiting as fast as
207✔
1488
                // possible.
207✔
1489
                var wg sync.WaitGroup
207✔
1490
                for _, link := range linksToStop {
500✔
1491
                        wg.Add(1)
293✔
1492
                        go func(l ChannelLink) {
586✔
1493
                                defer wg.Done()
293✔
1494

293✔
1495
                                l.Stop()
293✔
1496
                        }(link)
293✔
1497
                }
1498
                wg.Wait()
207✔
1499

207✔
1500
                // Before we exit fully, we'll attempt to flush out any
207✔
1501
                // forwarding events that may still be lingering since the last
207✔
1502
                // batch flush.
207✔
1503
                if err := s.FlushForwardingEvents(); err != nil {
207✔
1504
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1505
                }
×
1506
        }()
1507

1508
        // TODO(roasbeef): cleared vs settled distinction
1509
        var (
207✔
1510
                totalNumUpdates uint64
207✔
1511
                totalSatSent    btcutil.Amount
207✔
1512
                totalSatRecv    btcutil.Amount
207✔
1513
        )
207✔
1514
        s.cfg.LogEventTicker.Resume()
207✔
1515
        defer s.cfg.LogEventTicker.Stop()
207✔
1516

207✔
1517
        // Every 15 seconds, we'll flush out the forwarding events that
207✔
1518
        // occurred during that period.
207✔
1519
        s.cfg.FwdEventTicker.Resume()
207✔
1520
        defer s.cfg.FwdEventTicker.Stop()
207✔
1521

207✔
1522
        defer s.cfg.AckEventTicker.Stop()
207✔
1523

207✔
1524
out:
207✔
1525
        for {
1,039✔
1526

832✔
1527
                // If the set of pending settle/fail entries is non-zero,
832✔
1528
                // reinstate the ack ticker so we can batch ack them.
832✔
1529
                if len(s.pendingSettleFails) > 0 {
1,209✔
1530
                        s.cfg.AckEventTicker.Resume()
377✔
1531
                }
377✔
1532

1533
                select {
832✔
1534
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
×
1535
                        if !ok {
×
1536
                                break out
×
1537
                        }
1538

1539
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
×
1540

1541
                // A local close request has arrived, we'll forward this to the
1542
                // relevant link (if it exists) so the channel can be
1543
                // cooperatively closed (if possible).
1544
                case req := <-s.chanCloseRequests:
×
1545
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
×
1546

×
1547
                        s.indexMtx.RLock()
×
1548
                        link, ok := s.linkIndex[chanID]
×
1549
                        if !ok {
×
1550
                                s.indexMtx.RUnlock()
×
1551

×
1552
                                req.Err <- fmt.Errorf("no peer for channel with "+
×
1553
                                        "chan_id=%x", chanID[:])
×
1554
                                continue
×
1555
                        }
1556
                        s.indexMtx.RUnlock()
×
1557

×
1558
                        peerPub := link.PeerPubKey()
×
1559
                        log.Debugf("Requesting local channel close: peer=%v, "+
×
1560
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
×
1561

×
1562
                        go s.cfg.LocalChannelClose(peerPub[:], req)
×
1563

1564
                case resolutionMsg := <-s.resolutionMsgs:
1✔
1565
                        // We'll persist the resolution message to the Switch's
1✔
1566
                        // resolution store.
1✔
1567
                        resMsg := resolutionMsg.ResolutionMsg
1✔
1568
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
1✔
1569
                        if err != nil {
1✔
1570
                                // This will only fail if there is a database
×
1571
                                // error or a serialization error. Sending the
×
1572
                                // error prevents the contractcourt from being
×
1573
                                // in a state where it believes the send was
×
1574
                                // successful, when it wasn't.
×
1575
                                log.Errorf("unable to add resolution msg: %v",
×
1576
                                        err)
×
1577
                                resolutionMsg.errChan <- err
×
1578
                                continue
×
1579
                        }
1580

1581
                        // At this point, the resolution message has been
1582
                        // persisted. It is safe to signal success by sending
1583
                        // a nil error since the Switch will re-deliver the
1584
                        // resolution message on restart.
1585
                        resolutionMsg.errChan <- nil
1✔
1586

1✔
1587
                        // Create a htlc packet for this resolution. We do
1✔
1588
                        // not have some of the information that we'll need
1✔
1589
                        // for blinded error handling here , so we'll rely on
1✔
1590
                        // our forwarding logic to fill it in later.
1✔
1591
                        pkt := &htlcPacket{
1✔
1592
                                outgoingChanID: resolutionMsg.SourceChan,
1✔
1593
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
1✔
1594
                                isResolution:   true,
1✔
1595
                        }
1✔
1596

1✔
1597
                        // Resolution messages will either be cancelling
1✔
1598
                        // backwards an existing HTLC, or settling a previously
1✔
1599
                        // outgoing HTLC. Based on this, we'll map the message
1✔
1600
                        // to the proper htlcPacket.
1✔
1601
                        if resolutionMsg.Failure != nil {
1✔
1602
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
×
1603
                        } else {
1✔
1604
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
1✔
1605
                                        PaymentPreimage: *resolutionMsg.PreImage,
1✔
1606
                                }
1✔
1607
                        }
1✔
1608

1609
                        log.Infof("Received outside contract resolution, "+
1✔
1610
                                "mapping to: %v", spew.Sdump(pkt))
1✔
1611

1✔
1612
                        // We don't check the error, as the only failure we can
1✔
1613
                        // encounter is due to the circuit already being
1✔
1614
                        // closed. This is fine, as processing this message is
1✔
1615
                        // meant to be idempotent.
1✔
1616
                        err = s.handlePacketForward(pkt)
1✔
1617
                        if err != nil {
1✔
1618
                                log.Errorf("Unable to forward resolution msg: %v", err)
×
1619
                        }
×
1620

1621
                // A new packet has arrived for forwarding, we'll interpret the
1622
                // packet concretely, then either forward it along, or
1623
                // interpret a return packet to a locally initialized one.
1624
                case cmd := <-s.htlcPlex:
618✔
1625
                        cmd.err <- s.handlePacketForward(cmd.pkt)
618✔
1626

1627
                // When this time ticks, then it indicates that we should
1628
                // collect all the forwarding events since the last internal,
1629
                // and write them out to our log.
1630
                case <-s.cfg.FwdEventTicker.Ticks():
2✔
1631
                        s.wg.Add(1)
2✔
1632
                        go func() {
4✔
1633
                                defer s.wg.Done()
2✔
1634

2✔
1635
                                if err := s.FlushForwardingEvents(); err != nil {
2✔
1636
                                        log.Errorf("Unable to flush "+
×
1637
                                                "forwarding events: %v", err)
×
1638
                                }
×
1639
                        }()
1640

1641
                // The log ticker has fired, so we'll calculate some forwarding
1642
                // stats for the last 10 seconds to display within the logs to
1643
                // users.
1644
                case <-s.cfg.LogEventTicker.Ticks():
4✔
1645
                        // First, we'll collate the current running tally of
4✔
1646
                        // our forwarding stats.
4✔
1647
                        prevSatSent := totalSatSent
4✔
1648
                        prevSatRecv := totalSatRecv
4✔
1649
                        prevNumUpdates := totalNumUpdates
4✔
1650

4✔
1651
                        var (
4✔
1652
                                newNumUpdates uint64
4✔
1653
                                newSatSent    btcutil.Amount
4✔
1654
                                newSatRecv    btcutil.Amount
4✔
1655
                        )
4✔
1656

4✔
1657
                        // Next, we'll run through all the registered links and
4✔
1658
                        // compute their up-to-date forwarding stats.
4✔
1659
                        s.indexMtx.RLock()
4✔
1660
                        for _, link := range s.linkIndex {
10✔
1661
                                // TODO(roasbeef): when links first registered
6✔
1662
                                // stats printed.
6✔
1663
                                updates, sent, recv := link.Stats()
6✔
1664
                                newNumUpdates += updates
6✔
1665
                                newSatSent += sent.ToSatoshis()
6✔
1666
                                newSatRecv += recv.ToSatoshis()
6✔
1667
                        }
6✔
1668
                        s.indexMtx.RUnlock()
4✔
1669

4✔
1670
                        var (
4✔
1671
                                diffNumUpdates uint64
4✔
1672
                                diffSatSent    btcutil.Amount
4✔
1673
                                diffSatRecv    btcutil.Amount
4✔
1674
                        )
4✔
1675

4✔
1676
                        // If this is the first time we're computing these
4✔
1677
                        // stats, then the diff is just the new value. We do
4✔
1678
                        // this in order to avoid integer underflow issues.
4✔
1679
                        if prevNumUpdates == 0 {
8✔
1680
                                diffNumUpdates = newNumUpdates
4✔
1681
                                diffSatSent = newSatSent
4✔
1682
                                diffSatRecv = newSatRecv
4✔
1683
                        } else {
4✔
1684
                                diffNumUpdates = newNumUpdates - prevNumUpdates
×
1685
                                diffSatSent = newSatSent - prevSatSent
×
1686
                                diffSatRecv = newSatRecv - prevSatRecv
×
1687
                        }
×
1688

1689
                        // If the diff of num updates is zero, then we haven't
1690
                        // forwarded anything in the last 10 seconds, so we can
1691
                        // skip this update.
1692
                        if diffNumUpdates == 0 {
6✔
1693
                                continue
2✔
1694
                        }
1695

1696
                        // If the diff of num updates is negative, then some
1697
                        // links may have been unregistered from the switch, so
1698
                        // we'll update our stats to only include our registered
1699
                        // links.
1700
                        if int64(diffNumUpdates) < 0 {
2✔
1701
                                totalNumUpdates = newNumUpdates
×
1702
                                totalSatSent = newSatSent
×
1703
                                totalSatRecv = newSatRecv
×
1704
                                continue
×
1705
                        }
1706

1707
                        // Otherwise, we'll log this diff, then accumulate the
1708
                        // new stats into the running total.
1709
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
2✔
1710
                                "in the last 10 seconds (%f tx/sec)",
2✔
1711
                                diffSatSent, diffSatRecv,
2✔
1712
                                float64(diffNumUpdates)/10)
2✔
1713

2✔
1714
                        totalNumUpdates += diffNumUpdates
2✔
1715
                        totalSatSent += diffSatSent
2✔
1716
                        totalSatRecv += diffSatRecv
2✔
1717

1718
                // The ack ticker has fired so if we have any settle/fail entries
1719
                // for a forwarding package to ack, we will do so here in a batch
1720
                // db call.
1721
                case <-s.cfg.AckEventTicker.Ticks():
×
1722
                        // If the current set is empty, pause the ticker.
×
1723
                        if len(s.pendingSettleFails) == 0 {
×
1724
                                s.cfg.AckEventTicker.Pause()
×
1725
                                continue
×
1726
                        }
1727

1728
                        // Batch ack the settle/fail entries.
1729
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
×
1730
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1731
                                continue
×
1732
                        }
1733

1734
                        log.Tracef("Acked %d settle fails: %v",
×
1735
                                len(s.pendingSettleFails),
×
1736
                                lnutils.SpewLogClosure(s.pendingSettleFails))
×
1737

×
1738
                        // Reset the pendingSettleFails buffer while keeping acquired
×
1739
                        // memory.
×
1740
                        s.pendingSettleFails = s.pendingSettleFails[:0]
×
1741

1742
                case <-s.quit:
207✔
1743
                        return
207✔
1744
                }
1745
        }
1746
}
1747

1748
// Start starts all helper goroutines required for the operation of the switch.
1749
func (s *Switch) Start() error {
207✔
1750
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
207✔
1751
                log.Warn("Htlc Switch already started")
×
1752
                return errors.New("htlc switch already started")
×
1753
        }
×
1754

1755
        log.Infof("HTLC Switch starting")
207✔
1756

207✔
1757
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
207✔
1758
        if err != nil {
207✔
1759
                return err
×
1760
        }
×
1761
        s.blockEpochStream = blockEpochStream
207✔
1762

207✔
1763
        s.wg.Add(1)
207✔
1764
        go s.htlcForwarder()
207✔
1765

207✔
1766
        if err := s.reforwardResponses(); err != nil {
207✔
1767
                s.Stop()
×
1768
                log.Errorf("unable to reforward responses: %v", err)
×
1769
                return err
×
1770
        }
×
1771

1772
        if err := s.reforwardResolutions(); err != nil {
207✔
1773
                // We are already stopping so we can ignore the error.
×
1774
                _ = s.Stop()
×
1775
                log.Errorf("unable to reforward resolutions: %v", err)
×
1776
                return err
×
1777
        }
×
1778

1779
        return nil
207✔
1780
}
1781

1782
// reforwardResolutions fetches the set of resolution messages stored on-disk
1783
// and reforwards them if their circuits are still open. If the circuits have
1784
// been deleted, then we will delete the resolution message from the database.
1785
func (s *Switch) reforwardResolutions() error {
207✔
1786
        // Fetch all stored resolution messages, deleting the ones that are
207✔
1787
        // resolved.
207✔
1788
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
207✔
1789
        if err != nil {
207✔
1790
                return err
×
1791
        }
×
1792

1793
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
207✔
1794
        for _, resMsg := range resMsgs {
208✔
1795
                // If the open circuit no longer exists, then we can remove the
1✔
1796
                // message from the store.
1✔
1797
                outKey := CircuitKey{
1✔
1798
                        ChanID: resMsg.SourceChan,
1✔
1799
                        HtlcID: resMsg.HtlcIndex,
1✔
1800
                }
1✔
1801

1✔
1802
                if s.circuits.LookupOpenCircuit(outKey) == nil {
2✔
1803
                        // The open circuit doesn't exist.
1✔
1804
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
1✔
1805
                        if err != nil {
1✔
1806
                                return err
×
1807
                        }
×
1808

1809
                        continue
1✔
1810
                }
1811

1812
                // The circuit is still open, so we can assume that the link or
1813
                // switch (if we are the source) hasn't cleaned it up yet.
1814
                // We rely on our forwarding logic to fill in details that
1815
                // are not currently available to us.
1816
                resPkt := &htlcPacket{
×
1817
                        outgoingChanID: resMsg.SourceChan,
×
1818
                        outgoingHTLCID: resMsg.HtlcIndex,
×
1819
                        isResolution:   true,
×
1820
                }
×
1821

×
1822
                if resMsg.Failure != nil {
×
1823
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
×
1824
                } else {
×
1825
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
1826
                                PaymentPreimage: *resMsg.PreImage,
×
1827
                        }
×
1828
                }
×
1829

1830
                switchPackets = append(switchPackets, resPkt)
×
1831
        }
1832

1833
        // We'll now dispatch the set of resolution messages to the proper
1834
        // destination. An error is only encountered here if the switch is
1835
        // shutting down.
1836
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
207✔
1837
                return err
×
1838
        }
×
1839

1840
        return nil
207✔
1841
}
1842

1843
// reforwardResponses for every known, non-pending channel, loads all associated
1844
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
1845
// used to resurrect the switch's mailboxes after a restart. This also runs for
1846
// waiting close channels since there may be settles or fails that need to be
1847
// reforwarded before they completely close.
1848
func (s *Switch) reforwardResponses() error {
207✔
1849
        openChannels, err := s.cfg.FetchAllChannels()
207✔
1850
        if err != nil {
207✔
1851
                return err
×
1852
        }
×
1853

1854
        for _, openChannel := range openChannels {
336✔
1855
                shortChanID := openChannel.ShortChanID()
129✔
1856

129✔
1857
                // Locally-initiated payments never need reforwarding.
129✔
1858
                if shortChanID == hop.Source {
129✔
1859
                        continue
×
1860
                }
1861

1862
                // If the channel is pending, it should have no forwarding
1863
                // packages, and nothing to reforward.
1864
                if openChannel.IsPending {
129✔
1865
                        continue
×
1866
                }
1867

1868
                // Channels in open or waiting-close may still have responses in
1869
                // their forwarding packages. We will continue to reattempt
1870
                // forwarding on startup until the channel is fully-closed.
1871
                //
1872
                // Load this channel's forwarding packages, and deliver them to
1873
                // the switch.
1874
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
129✔
1875
                if err != nil {
129✔
1876
                        log.Errorf("unable to load forwarding "+
×
1877
                                "packages for %v: %v", shortChanID, err)
×
1878
                        return err
×
1879
                }
×
1880

1881
                s.reforwardSettleFails(fwdPkgs)
129✔
1882
        }
1883

1884
        return nil
207✔
1885
}
1886

1887
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
1888
// channel identifier.
1889
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
129✔
1890

129✔
1891
        var fwdPkgs []*channeldb.FwdPkg
129✔
1892
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
258✔
1893
                var err error
129✔
1894
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
129✔
1895
                        tx, source,
129✔
1896
                )
129✔
1897
                return err
129✔
1898
        }, func() {
258✔
1899
                fwdPkgs = nil
129✔
1900
        }); err != nil {
129✔
1901
                return nil, err
×
1902
        }
×
1903

1904
        return fwdPkgs, nil
129✔
1905
}
1906

1907
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
1908
// forwarding packages, and reforwards those that have not been acknowledged.
1909
// This is intended to occur on startup, in order to recover the switch's
1910
// mailboxes, and to ensure that responses can be propagated in case the
1911
// outgoing link never comes back online.
1912
//
1913
// NOTE: This should mimic the behavior processRemoteSettleFails.
1914
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
129✔
1915
        for _, fwdPkg := range fwdPkgs {
130✔
1916
                switchPackets := make([]*htlcPacket, 0, len(fwdPkg.SettleFails))
1✔
1917
                for i, update := range fwdPkg.SettleFails {
1✔
1918
                        // Skip any settles or fails that have already been
×
1919
                        // acknowledged by the incoming link that originated the
×
1920
                        // forwarded Add.
×
1921
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
×
1922
                                continue
×
1923
                        }
1924

1925
                        switch msg := update.UpdateMsg.(type) {
×
1926
                        // A settle for an HTLC we previously forwarded HTLC has
1927
                        // been received. So we'll forward the HTLC to the
1928
                        // switch which will handle propagating the settle to
1929
                        // the prior hop.
1930
                        case *lnwire.UpdateFulfillHTLC:
×
1931
                                destRef := fwdPkg.DestRef(uint16(i))
×
1932
                                settlePacket := &htlcPacket{
×
1933
                                        outgoingChanID: fwdPkg.Source,
×
1934
                                        outgoingHTLCID: msg.ID,
×
1935
                                        destRef:        &destRef,
×
1936
                                        htlc:           msg,
×
1937
                                }
×
1938

×
1939
                                // Add the packet to the batch to be forwarded, and
×
1940
                                // notify the overflow queue that a spare spot has been
×
1941
                                // freed up within the commitment state.
×
1942
                                switchPackets = append(switchPackets, settlePacket)
×
1943

1944
                        // A failureCode message for a previously forwarded HTLC has been
1945
                        // received. As a result a new slot will be freed up in our
1946
                        // commitment state, so we'll forward this to the switch so the
1947
                        // backwards undo can continue.
1948
                        case *lnwire.UpdateFailHTLC:
×
1949
                                // Fetch the reason the HTLC was canceled so
×
1950
                                // we can continue to propagate it. This
×
1951
                                // failure originated from another node, so
×
1952
                                // the linkFailure field is not set on this
×
1953
                                // packet. We rely on the link to fill in
×
1954
                                // additional circuit information for us.
×
1955
                                failPacket := &htlcPacket{
×
1956
                                        outgoingChanID: fwdPkg.Source,
×
1957
                                        outgoingHTLCID: msg.ID,
×
1958
                                        destRef: &channeldb.SettleFailRef{
×
1959
                                                Source: fwdPkg.Source,
×
1960
                                                Height: fwdPkg.Height,
×
1961
                                                Index:  uint16(i),
×
1962
                                        },
×
1963
                                        htlc: msg,
×
1964
                                }
×
1965

×
1966
                                // Add the packet to the batch to be forwarded, and
×
1967
                                // notify the overflow queue that a spare spot has been
×
1968
                                // freed up within the commitment state.
×
1969
                                switchPackets = append(switchPackets, failPacket)
×
1970
                        }
1971
                }
1972

1973
                // Since this send isn't tied to a specific link, we pass a nil
1974
                // link quit channel, meaning the send will fail only if the
1975
                // switch receives a shutdown request.
1976
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
1✔
1977
                        log.Errorf("Unhandled error while reforwarding packets "+
×
1978
                                "settle/fail over htlcswitch: %v", err)
×
1979
                }
×
1980
        }
1981
}
1982

1983
// Stop gracefully stops all active helper goroutines, then waits until they've
1984
// exited.
1985
func (s *Switch) Stop() error {
439✔
1986
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
569✔
1987
                log.Warn("Htlc Switch already stopped")
130✔
1988
                return errors.New("htlc switch already shutdown")
130✔
1989
        }
130✔
1990

1991
        log.Info("HTLC Switch shutting down...")
309✔
1992
        defer log.Debug("HTLC Switch shutdown complete")
309✔
1993

309✔
1994
        close(s.quit)
309✔
1995

309✔
1996
        s.wg.Wait()
309✔
1997

309✔
1998
        // Wait until all active goroutines have finished exiting before
309✔
1999
        // stopping the mailboxes, otherwise the mailbox map could still be
309✔
2000
        // accessed and modified.
309✔
2001
        s.mailOrchestrator.Stop()
309✔
2002

309✔
2003
        return nil
309✔
2004
}
2005

2006
// CreateAndAddLink will create a link and then add it to the internal maps
2007
// when given a ChannelLinkConfig and LightningChannel.
2008
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
2009
        lnChan *lnwallet.LightningChannel) error {
×
2010

×
2011
        link := NewChannelLink(linkCfg, lnChan)
×
2012
        return s.AddLink(link)
×
2013
}
×
2014

2015
// AddLink is used to initiate the handling of the add link command. The
2016
// request will be propagated and handled in the main goroutine.
2017
func (s *Switch) AddLink(link ChannelLink) error {
336✔
2018
        s.indexMtx.Lock()
336✔
2019
        defer s.indexMtx.Unlock()
336✔
2020

336✔
2021
        chanID := link.ChanID()
336✔
2022

336✔
2023
        // First, ensure that this link is not already active in the switch.
336✔
2024
        _, err := s.getLink(chanID)
336✔
2025
        if err == nil {
337✔
2026
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
1✔
2027
                        "active", chanID)
1✔
2028
        }
1✔
2029

2030
        // Get and attach the mailbox for this link, which buffers packets in
2031
        // case there packets that we tried to deliver while this link was
2032
        // offline.
2033
        shortChanID := link.ShortChanID()
335✔
2034
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
335✔
2035
        link.AttachMailBox(mailbox)
335✔
2036

335✔
2037
        // Attach the Switch's failAliasUpdate function to the link.
335✔
2038
        link.attachFailAliasUpdate(s.failAliasUpdate)
335✔
2039

335✔
2040
        if err := link.Start(); err != nil {
335✔
2041
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2042
                        chanID, err)
×
2043
                s.removeLink(chanID)
×
2044
                return err
×
2045
        }
×
2046

2047
        if shortChanID == hop.Source {
336✔
2048
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
1✔
2049
                        chanID, shortChanID)
1✔
2050

1✔
2051
                s.pendingLinkIndex[chanID] = link
1✔
2052
        } else {
335✔
2053
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
334✔
2054
                        chanID, shortChanID)
334✔
2055

334✔
2056
                s.addLiveLink(link)
334✔
2057
                s.mailOrchestrator.BindLiveShortChanID(
334✔
2058
                        mailbox, chanID, shortChanID,
334✔
2059
                )
334✔
2060
        }
334✔
2061

2062
        return nil
335✔
2063
}
2064

2065
// addLiveLink adds a link to all associated forwarding index, this makes it a
2066
// candidate for forwarding HTLCs.
2067
func (s *Switch) addLiveLink(link ChannelLink) {
334✔
2068
        linkScid := link.ShortChanID()
334✔
2069

334✔
2070
        // We'll add the link to the linkIndex which lets us quickly
334✔
2071
        // look up a channel when we need to close or register it, and
334✔
2072
        // the forwarding index which'll be used when forwarding HTLC's
334✔
2073
        // in the multi-hop setting.
334✔
2074
        s.linkIndex[link.ChanID()] = link
334✔
2075
        s.forwardingIndex[linkScid] = link
334✔
2076

334✔
2077
        // Next we'll add the link to the interface index so we can
334✔
2078
        // quickly look up all the channels for a particular node.
334✔
2079
        peerPub := link.PeerPubKey()
334✔
2080
        if _, ok := s.interfaceIndex[peerPub]; !ok {
663✔
2081
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
329✔
2082
        }
329✔
2083
        s.interfaceIndex[peerPub][link.ChanID()] = link
334✔
2084

334✔
2085
        s.updateLinkAliases(link)
334✔
2086
}
2087

2088
// UpdateLinkAliases is the externally exposed wrapper for updating link
2089
// aliases. It acquires the indexMtx and calls the internal method.
2090
func (s *Switch) UpdateLinkAliases(link ChannelLink) {
×
2091
        s.indexMtx.Lock()
×
2092
        defer s.indexMtx.Unlock()
×
2093

×
2094
        s.updateLinkAliases(link)
×
2095
}
×
2096

2097
// updateLinkAliases updates the aliases for a given link. This will cause the
2098
// htlcswitch to consult the alias manager on the up to date values of its
2099
// alias maps.
2100
//
2101
// NOTE: this MUST be called with the indexMtx held.
2102
func (s *Switch) updateLinkAliases(link ChannelLink) {
334✔
2103
        linkScid := link.ShortChanID()
334✔
2104

334✔
2105
        aliases := link.getAliases()
334✔
2106
        if link.isZeroConf() {
353✔
2107
                if link.zeroConfConfirmed() {
34✔
2108
                        // Since the zero-conf channel has confirmed, we can
15✔
2109
                        // populate the aliasToReal mapping.
15✔
2110
                        confirmedScid := link.confirmedScid()
15✔
2111

15✔
2112
                        for _, alias := range aliases {
37✔
2113
                                s.aliasToReal[alias] = confirmedScid
22✔
2114
                        }
22✔
2115

2116
                        // Add the confirmed SCID as a key in the baseIndex.
2117
                        s.baseIndex[confirmedScid] = linkScid
15✔
2118
                }
2119

2120
                // Now we populate the baseIndex which will be used to fetch
2121
                // the link given any of the channel's alias SCIDs or the real
2122
                // SCID. The link's SCID is an alias, so we don't need to
2123
                // special-case it like the option-scid-alias feature-bit case
2124
                // further down.
2125
                for _, alias := range aliases {
46✔
2126
                        s.baseIndex[alias] = linkScid
27✔
2127
                }
27✔
2128
        } else if link.negotiatedAliasFeature() {
331✔
2129
                // First, we flush any alias mappings for this link's scid
16✔
2130
                // before we populate the map again, in order to get rid of old
16✔
2131
                // values that no longer exist.
16✔
2132
                for alias, real := range s.aliasToReal {
18✔
2133
                        if real == linkScid {
2✔
2134
                                delete(s.aliasToReal, alias)
×
2135
                        }
×
2136
                }
2137

2138
                for alias, real := range s.baseIndex {
19✔
2139
                        if real == linkScid {
3✔
2140
                                delete(s.baseIndex, alias)
×
2141
                        }
×
2142
                }
2143

2144
                // The link's SCID is the confirmed SCID for non-zero-conf
2145
                // option-scid-alias feature bit channels.
2146
                for _, alias := range aliases {
39✔
2147
                        s.aliasToReal[alias] = linkScid
23✔
2148
                        s.baseIndex[alias] = linkScid
23✔
2149
                }
23✔
2150

2151
                // Since the link's SCID is confirmed, it was not included in
2152
                // the baseIndex above as a key. Add it now.
2153
                s.baseIndex[linkScid] = linkScid
16✔
2154
        }
2155
}
2156

2157
// GetLink is used to initiate the handling of the get link command. The
2158
// request will be propagated/handled to/in the main goroutine.
2159
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2160
        error) {
3,199✔
2161

3,199✔
2162
        s.indexMtx.RLock()
3,199✔
2163
        defer s.indexMtx.RUnlock()
3,199✔
2164

3,199✔
2165
        return s.getLink(chanID)
3,199✔
2166
}
3,199✔
2167

2168
// getLink returns the link stored in either the pending index or the live
2169
// lindex.
2170
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
3,864✔
2171
        link, ok := s.linkIndex[chanID]
3,864✔
2172
        if !ok {
4,200✔
2173
                link, ok = s.pendingLinkIndex[chanID]
336✔
2174
                if !ok {
671✔
2175
                        return nil, ErrChannelLinkNotFound
335✔
2176
                }
335✔
2177
        }
2178

2179
        return link, nil
3,529✔
2180
}
2181

2182
// GetLinkByShortID attempts to return the link which possesses the target short
2183
// channel ID.
2184
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
2185
        error) {
×
2186

×
2187
        s.indexMtx.RLock()
×
2188
        defer s.indexMtx.RUnlock()
×
2189

×
2190
        link, err := s.getLinkByShortID(chanID)
×
2191
        if err != nil {
×
2192
                // If we failed to find the link under the passed-in SCID, we
×
2193
                // consult the Switch's baseIndex map to see if the confirmed
×
2194
                // SCID was used for a zero-conf channel.
×
2195
                aliasID, ok := s.baseIndex[chanID]
×
2196
                if !ok {
×
2197
                        return nil, err
×
2198
                }
×
2199

2200
                // An alias was found, use it to lookup if a link exists.
2201
                return s.getLinkByShortID(aliasID)
×
2202
        }
2203

2204
        return link, nil
×
2205
}
2206

2207
// getLinkByShortID attempts to return the link which possesses the target
2208
// short channel ID.
2209
//
2210
// NOTE: This MUST be called with the indexMtx held.
2211
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
477✔
2212
        link, ok := s.forwardingIndex[chanID]
477✔
2213
        if !ok {
478✔
2214
                return nil, ErrChannelLinkNotFound
1✔
2215
        }
1✔
2216

2217
        return link, nil
476✔
2218
}
2219

2220
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2221
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2222
// the outgoingChanID will be changed so that an error can be properly
2223
// attributed when looping over linkErrs in handlePacketForward.
2224
//
2225
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2226
// public or not.
2227
//
2228
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2229
//   - If there is no entry found in baseIndex, fetch the link. This channel
2230
//     did not have the option-scid-alias feature negotiated (which includes
2231
//     zero-conf and option-scid-alias channel-types).
2232
//   - If there is an entry found, fetch the link from forwardingIndex and
2233
//     fail if this is a private link.
2234
//
2235
// NOTE: This MUST be called with the indexMtx read lock held.
2236
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
80✔
2237
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
80✔
2238
        chanID := pkt.outgoingChanID
80✔
2239
        aliasID := s.cfg.IsAlias(chanID)
80✔
2240

80✔
2241
        // Set the originalOutgoingChanID so the proper channel_update can be
80✔
2242
        // sent back if the option-scid-alias feature bit was negotiated.
80✔
2243
        pkt.originalOutgoingChanID = chanID
80✔
2244

80✔
2245
        if aliasID {
95✔
2246
                // Since outgoingChanID is an alias, we'll fetch the link via
15✔
2247
                // baseIndex.
15✔
2248
                baseScid, ok := s.baseIndex[chanID]
15✔
2249
                if !ok {
15✔
2250
                        // No mapping exists, bail.
×
2251
                        return nil, ErrChannelLinkNotFound
×
2252
                }
×
2253

2254
                // A mapping exists, so use baseScid to find the link in the
2255
                // forwardingIndex.
2256
                link, ok := s.forwardingIndex[baseScid]
15✔
2257
                if !ok {
15✔
2258
                        // Link not found, bail.
×
2259
                        return nil, ErrChannelLinkNotFound
×
2260
                }
×
2261

2262
                // Change the packet's outgoingChanID field so that errors are
2263
                // properly attributed.
2264
                pkt.outgoingChanID = baseScid
15✔
2265

15✔
2266
                // Return the link without checking if it's private or not.
15✔
2267
                return link, nil
15✔
2268
        }
2269

2270
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2271
        // SCID from baseIndex.
2272
        baseScid, ok := s.baseIndex[chanID]
65✔
2273
        if !ok {
123✔
2274
                // outgoingChanID is not a key in base index meaning this
58✔
2275
                // channel did not have the option-scid-alias feature bit
58✔
2276
                // negotiated. We'll fetch the link and return it.
58✔
2277
                link, ok := s.forwardingIndex[chanID]
58✔
2278
                if !ok {
60✔
2279
                        // The link wasn't found, bail out.
2✔
2280
                        return nil, ErrChannelLinkNotFound
2✔
2281
                }
2✔
2282

2283
                return link, nil
56✔
2284
        }
2285

2286
        // Fetch the link whose internal SCID is baseScid.
2287
        link, ok := s.forwardingIndex[baseScid]
7✔
2288
        if !ok {
7✔
2289
                // Link wasn't found, bail out.
×
2290
                return nil, ErrChannelLinkNotFound
×
2291
        }
×
2292

2293
        // If the link is unadvertised, we fail since the real SCID was used to
2294
        // forward over it and this is a channel where the option-scid-alias
2295
        // feature bit was negotiated.
2296
        if link.IsUnadvertised() {
9✔
2297
                return nil, ErrChannelLinkNotFound
2✔
2298
        }
2✔
2299

2300
        // The link is public so the confirmed SCID can be used to forward over
2301
        // it. We'll also replace pkt's outgoingChanID field so errors can
2302
        // properly be attributed in the calling function.
2303
        pkt.outgoingChanID = baseScid
5✔
2304
        return link, nil
5✔
2305
}
2306

2307
// HasActiveLink returns true if the given channel ID has a link in the link
2308
// index AND the link is eligible to forward.
2309
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
2✔
2310
        s.indexMtx.RLock()
2✔
2311
        defer s.indexMtx.RUnlock()
2✔
2312

2✔
2313
        if link, ok := s.linkIndex[chanID]; ok {
4✔
2314
                return link.EligibleToForward()
2✔
2315
        }
2✔
2316

2317
        return false
×
2318
}
2319

2320
// RemoveLink purges the switch of any link associated with chanID. If a pending
2321
// or active link is not found, this method does nothing. Otherwise, the method
2322
// returns after the link has been completely shutdown.
2323
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
18✔
2324
        s.indexMtx.Lock()
18✔
2325
        link, err := s.getLink(chanID)
18✔
2326
        if err != nil {
18✔
2327
                // If err is non-nil, this means that link is also nil. The
×
2328
                // link variable cannot be nil without err being non-nil.
×
2329
                s.indexMtx.Unlock()
×
2330
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
×
2331
                        chanID, err)
×
2332
                return
×
2333
        }
×
2334

2335
        // Check if the link is already stopping and grab the stop chan if it
2336
        // is.
2337
        stopChan, ok := s.linkStopIndex[chanID]
18✔
2338
        if !ok {
36✔
2339
                // If the link is non-nil, it is not currently stopping, so
18✔
2340
                // we'll add a stop chan to the linkStopIndex.
18✔
2341
                stopChan = make(chan struct{})
18✔
2342
                s.linkStopIndex[chanID] = stopChan
18✔
2343
        }
18✔
2344
        s.indexMtx.Unlock()
18✔
2345

18✔
2346
        if ok {
18✔
2347
                // If the stop chan exists, we will wait for it to be closed.
×
2348
                // Once it is closed, we will exit.
×
2349
                select {
×
2350
                case <-stopChan:
×
2351
                        return
×
2352
                case <-s.quit:
×
2353
                        return
×
2354
                }
2355
        }
2356

2357
        // Stop the link before removing it from the maps.
2358
        link.Stop()
18✔
2359

18✔
2360
        s.indexMtx.Lock()
18✔
2361
        _ = s.removeLink(chanID)
18✔
2362

18✔
2363
        // Close stopChan and remove this link from the linkStopIndex.
18✔
2364
        // Deleting from the index and removing from the link must be done
18✔
2365
        // in the same block while the mutex is held.
18✔
2366
        close(stopChan)
18✔
2367
        delete(s.linkStopIndex, chanID)
18✔
2368
        s.indexMtx.Unlock()
18✔
2369
}
2370

2371
// removeLink is used to remove and stop the channel link.
2372
//
2373
// NOTE: This MUST be called with the indexMtx held.
2374
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
311✔
2375
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
311✔
2376

311✔
2377
        link, err := s.getLink(chanID)
311✔
2378
        if err != nil {
311✔
2379
                return nil
×
2380
        }
×
2381

2382
        // Remove the channel from live link indexes.
2383
        delete(s.pendingLinkIndex, link.ChanID())
311✔
2384
        delete(s.linkIndex, link.ChanID())
311✔
2385
        delete(s.forwardingIndex, link.ShortChanID())
311✔
2386

311✔
2387
        // If the link has been added to the peer index, then we'll move to
311✔
2388
        // delete the entry within the index.
311✔
2389
        peerPub := link.PeerPubKey()
311✔
2390
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
621✔
2391
                delete(peerIndex, link.ChanID())
310✔
2392

310✔
2393
                // If after deletion, there are no longer any links, then we'll
310✔
2394
                // remove the interface map all together.
310✔
2395
                if len(peerIndex) == 0 {
615✔
2396
                        delete(s.interfaceIndex, peerPub)
305✔
2397
                }
305✔
2398
        }
2399

2400
        return link
311✔
2401
}
2402

2403
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2404
// underlying channel state. This is only used in zero-conf channels to allow
2405
// the confirmed SCID to be updated.
2406
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
1✔
2407
        s.indexMtx.Lock()
1✔
2408
        defer s.indexMtx.Unlock()
1✔
2409

1✔
2410
        // Locate the target link in the link index. If no such link exists,
1✔
2411
        // then we will ignore the request.
1✔
2412
        link, ok := s.linkIndex[chanID]
1✔
2413
        if !ok {
1✔
2414
                return fmt.Errorf("link %v not found", chanID)
×
2415
        }
×
2416

2417
        // Try to update the link's underlying channel state, returning early
2418
        // if this update failed.
2419
        _, err := link.UpdateShortChanID()
1✔
2420
        if err != nil {
1✔
2421
                return err
×
2422
        }
×
2423

2424
        // Since the zero-conf channel is confirmed, we should populate the
2425
        // aliasToReal map and update the baseIndex.
2426
        aliases := link.getAliases()
1✔
2427

1✔
2428
        confirmedScid := link.confirmedScid()
1✔
2429

1✔
2430
        for _, alias := range aliases {
3✔
2431
                s.aliasToReal[alias] = confirmedScid
2✔
2432
        }
2✔
2433

2434
        s.baseIndex[confirmedScid] = link.ShortChanID()
1✔
2435

1✔
2436
        return nil
1✔
2437
}
2438

2439
// GetLinksByInterface fetches all the links connected to a particular node
2440
// identified by the serialized compressed form of its public key.
2441
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
2442
        error) {
×
2443

×
2444
        s.indexMtx.RLock()
×
2445
        defer s.indexMtx.RUnlock()
×
2446

×
2447
        var handlers []ChannelUpdateHandler
×
2448

×
2449
        links, err := s.getLinks(hop)
×
2450
        if err != nil {
×
2451
                return nil, err
×
2452
        }
×
2453

2454
        // Range over the returned []ChannelLink to convert them into
2455
        // []ChannelUpdateHandler.
2456
        for _, link := range links {
×
2457
                handlers = append(handlers, link)
×
2458
        }
×
2459

2460
        return handlers, nil
×
2461
}
2462

2463
// getLinks is function which returns the channel links of the peer by hop
2464
// destination id.
2465
//
2466
// NOTE: This MUST be called with the indexMtx held.
2467
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
76✔
2468
        links, ok := s.interfaceIndex[destination]
76✔
2469
        if !ok {
76✔
2470
                return nil, ErrNoLinksFound
×
2471
        }
×
2472

2473
        channelLinks := make([]ChannelLink, 0, len(links))
76✔
2474
        for _, link := range links {
156✔
2475
                channelLinks = append(channelLinks, link)
80✔
2476
        }
80✔
2477

2478
        return channelLinks, nil
76✔
2479
}
2480

2481
// CircuitModifier returns a reference to subset of the interfaces provided by
2482
// the circuit map, to allow links to open and close circuits.
2483
func (s *Switch) CircuitModifier() CircuitModifier {
215✔
2484
        return s.circuits
215✔
2485
}
215✔
2486

2487
// CircuitLookup returns a reference to subset of the interfaces provided by the
2488
// circuit map, to allow looking up circuits.
2489
func (s *Switch) CircuitLookup() CircuitLookup {
×
2490
        return s.circuits
×
2491
}
×
2492

2493
// commitCircuits persistently adds a circuit to the switch's circuit map.
2494
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2495
        *CircuitFwdActions, error) {
17✔
2496

17✔
2497
        return s.circuits.CommitCircuits(circuits...)
17✔
2498
}
17✔
2499

2500
// FlushForwardingEvents flushes out the set of pending forwarding events to
2501
// the persistent log. This will be used by the switch to periodically flush
2502
// out the set of forwarding events to disk. External callers can also use this
2503
// method to ensure all data is flushed to dis before querying the log.
2504
func (s *Switch) FlushForwardingEvents() error {
209✔
2505
        // First, we'll obtain a copy of the current set of pending forwarding
209✔
2506
        // events.
209✔
2507
        s.fwdEventMtx.Lock()
209✔
2508

209✔
2509
        // If we won't have any forwarding events, then we can exit early.
209✔
2510
        if len(s.pendingFwdingEvents) == 0 {
401✔
2511
                s.fwdEventMtx.Unlock()
192✔
2512
                return nil
192✔
2513
        }
192✔
2514

2515
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
17✔
2516
        copy(events[:], s.pendingFwdingEvents[:])
17✔
2517

17✔
2518
        // With the copy obtained, we can now clear out the header pointer of
17✔
2519
        // the current slice. This way, we can re-use the underlying storage
17✔
2520
        // allocated for the slice.
17✔
2521
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
17✔
2522
        s.fwdEventMtx.Unlock()
17✔
2523

17✔
2524
        // Finally, we'll write out the copied events to the persistent
17✔
2525
        // forwarding log.
17✔
2526
        return s.cfg.FwdingLog.AddForwardingEvents(events)
17✔
2527
}
2528

2529
// BestHeight returns the best height known to the switch.
2530
func (s *Switch) BestHeight() uint32 {
447✔
2531
        return atomic.LoadUint32(&s.bestHeight)
447✔
2532
}
447✔
2533

2534
// dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean
2535
// to determine whether the default fee threshold has been exceeded. This
2536
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2537
// commitment's dust with the mailbox's dust with the amount is checked against
2538
// the fee exposure threshold. If incoming is true, then the amount is not
2539
// included in the sum as it was already included in the commitment's dust. A
2540
// boolean is returned telling the caller whether the HTLC should be failed
2541
// back.
2542
func (s *Switch) dustExceedsFeeThreshold(link ChannelLink,
2543
        amount lnwire.MilliSatoshi, incoming bool) bool {
532✔
2544

532✔
2545
        // Retrieve the link's current commitment feerate and dustClosure.
532✔
2546
        feeRate := link.getFeeRate()
532✔
2547
        isDust := link.getDustClosure()
532✔
2548

532✔
2549
        // Evaluate if the HTLC is dust on either sides' commitment.
532✔
2550
        isLocalDust := isDust(
532✔
2551
                feeRate, incoming, lntypes.Local, amount.ToSatoshis(),
532✔
2552
        )
532✔
2553
        isRemoteDust := isDust(
532✔
2554
                feeRate, incoming, lntypes.Remote, amount.ToSatoshis(),
532✔
2555
        )
532✔
2556

532✔
2557
        if !(isLocalDust || isRemoteDust) {
664✔
2558
                // If the HTLC is not dust on either commitment, it's fine to
132✔
2559
                // forward.
132✔
2560
                return false
132✔
2561
        }
132✔
2562

2563
        // Fetch the dust sums currently in the mailbox for this link.
2564
        cid := link.ChanID()
400✔
2565
        sid := link.ShortChanID()
400✔
2566
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
400✔
2567
        localMailDust, remoteMailDust := mailbox.DustPackets()
400✔
2568

400✔
2569
        // If the htlc is dust on the local commitment, we'll obtain the dust
400✔
2570
        // sum for it.
400✔
2571
        if isLocalDust {
800✔
2572
                localSum := link.getDustSum(
400✔
2573
                        lntypes.Local, fn.None[chainfee.SatPerKWeight](),
400✔
2574
                )
400✔
2575
                localSum += localMailDust
400✔
2576

400✔
2577
                // Optionally include the HTLC amount only for outgoing
400✔
2578
                // HTLCs.
400✔
2579
                if !incoming {
760✔
2580
                        localSum += amount
360✔
2581
                }
360✔
2582

2583
                // Finally check against the defined fee threshold.
2584
                if localSum > s.cfg.MaxFeeExposure {
402✔
2585
                        return true
2✔
2586
                }
2✔
2587
        }
2588

2589
        // Also check if the htlc is dust on the remote commitment, if we've
2590
        // reached this point.
2591
        if isRemoteDust {
796✔
2592
                remoteSum := link.getDustSum(
398✔
2593
                        lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
398✔
2594
                )
398✔
2595
                remoteSum += remoteMailDust
398✔
2596

398✔
2597
                // Optionally include the HTLC amount only for outgoing
398✔
2598
                // HTLCs.
398✔
2599
                if !incoming {
756✔
2600
                        remoteSum += amount
358✔
2601
                }
358✔
2602

2603
                // Finally check against the defined fee threshold.
2604
                if remoteSum > s.cfg.MaxFeeExposure {
398✔
2605
                        return true
×
2606
                }
×
2607
        }
2608

2609
        // If we reached this point, this HTLC is fine to forward.
2610
        return false
398✔
2611
}
2612

2613
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2614
// it to individual mailboxes. It allows the mailboxes to construct a
2615
// FailureMessage when failing back HTLC's due to expiry and may include an
2616
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2617
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2618
// use. The mailboxScid is only used in the non-alias case, so it is always
2619
// the confirmed SCID.
2620
func (s *Switch) failMailboxUpdate(outgoingScid,
2621
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
11✔
2622

11✔
2623
        // Try to use the failAliasUpdate function in case this is a channel
11✔
2624
        // that uses aliases. If it returns nil, we'll fallback to the original
11✔
2625
        // pre-alias behavior.
11✔
2626
        update := s.failAliasUpdate(outgoingScid, false)
11✔
2627
        if update == nil {
16✔
2628
                // Execute the fallback behavior.
5✔
2629
                var err error
5✔
2630
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
5✔
2631
                if err != nil {
5✔
2632
                        return &lnwire.FailTemporaryNodeFailure{}
×
2633
                }
×
2634
        }
2635

2636
        return lnwire.NewTemporaryChannelFailure(update)
11✔
2637
}
2638

2639
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2640
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2641
// the associated channel is not one of these, this function will return nil
2642
// and the caller is expected to handle this properly. In this case, a return
2643
// to the original non-alias behavior is expected.
2644
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2645
        incoming bool) *lnwire.ChannelUpdate1 {
34✔
2646

34✔
2647
        // This function does not defer the unlocking because of the database
34✔
2648
        // lookups for ChannelUpdate.
34✔
2649
        s.indexMtx.RLock()
34✔
2650

34✔
2651
        if s.cfg.IsAlias(scid) {
45✔
2652
                // The alias SCID was used. In the incoming case this means
11✔
2653
                // the channel is zero-conf as the link sets the scid. In the
11✔
2654
                // outgoing case, the sender set the scid to use and may be
11✔
2655
                // either the alias or the confirmed one, if it exists.
11✔
2656
                realScid, ok := s.aliasToReal[scid]
11✔
2657
                if !ok {
11✔
2658
                        // The real, confirmed SCID does not exist yet. Find
×
2659
                        // the "base" SCID that the link uses via the
×
2660
                        // baseIndex. If we can't find it, return nil. This
×
2661
                        // means the channel is zero-conf.
×
2662
                        baseScid, ok := s.baseIndex[scid]
×
2663
                        s.indexMtx.RUnlock()
×
2664
                        if !ok {
×
2665
                                return nil
×
2666
                        }
×
2667

2668
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2669
                        if err != nil {
×
2670
                                return nil
×
2671
                        }
×
2672

2673
                        // Replace the baseScid with the passed-in alias.
2674
                        update.ShortChannelID = scid
×
2675
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2676
                        if err != nil {
×
2677
                                return nil
×
2678
                        }
×
2679

2680
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2681
                        if err != nil {
×
2682
                                return nil
×
2683
                        }
×
2684

2685
                        return update
×
2686
                }
2687

2688
                s.indexMtx.RUnlock()
11✔
2689

11✔
2690
                // Fetch the SCID via the confirmed SCID and replace it with
11✔
2691
                // the alias.
11✔
2692
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
11✔
2693
                if err != nil {
11✔
2694
                        return nil
×
2695
                }
×
2696

2697
                // In the incoming case, we want to ensure that we don't leak
2698
                // the UTXO in case the channel is private. In the outgoing
2699
                // case, since the alias was used, we do the same thing.
2700
                update.ShortChannelID = scid
11✔
2701
                sig, err := s.cfg.SignAliasUpdate(update)
11✔
2702
                if err != nil {
11✔
2703
                        return nil
×
2704
                }
×
2705

2706
                update.Signature, err = lnwire.NewSigFromSignature(sig)
11✔
2707
                if err != nil {
11✔
2708
                        return nil
×
2709
                }
×
2710

2711
                return update
11✔
2712
        }
2713

2714
        // If the confirmed SCID is not in baseIndex, this is not an
2715
        // option-scid-alias or zero-conf channel.
2716
        baseScid, ok := s.baseIndex[scid]
23✔
2717
        if !ok {
41✔
2718
                s.indexMtx.RUnlock()
18✔
2719
                return nil
18✔
2720
        }
18✔
2721

2722
        // Fetch the link so we can get an alias to use in the ShortChannelID
2723
        // of the ChannelUpdate.
2724
        link, ok := s.forwardingIndex[baseScid]
5✔
2725
        s.indexMtx.RUnlock()
5✔
2726
        if !ok {
5✔
2727
                // This should never happen, but if it does for some reason,
×
2728
                // fallback to the old behavior.
×
2729
                return nil
×
2730
        }
×
2731

2732
        aliases := link.getAliases()
5✔
2733
        if len(aliases) == 0 {
5✔
2734
                // This should never happen, but if it does, fallback.
×
2735
                return nil
×
2736
        }
×
2737

2738
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2739
        update, err := s.cfg.FetchLastChannelUpdate(scid)
5✔
2740
        if err != nil {
5✔
2741
                return nil
×
2742
        }
×
2743

2744
        // The incoming case will replace the ShortChannelID in the retrieved
2745
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2746
        // would happen if a private non-zero-conf option-scid-alias
2747
        // feature-bit channel leaked its UTXO here rather than supplying an
2748
        // alias. In the outgoing case, the confirmed SCID was actually used
2749
        // for forwarding in the onion, so no replacement is necessary as the
2750
        // sender knows the scid.
2751
        if incoming {
7✔
2752
                // We will replace and sign the update with the first alias.
2✔
2753
                // Since this happens on the incoming side, it's not actually
2✔
2754
                // possible to know what the sender used in the onion.
2✔
2755
                update.ShortChannelID = aliases[0]
2✔
2756
                sig, err := s.cfg.SignAliasUpdate(update)
2✔
2757
                if err != nil {
2✔
2758
                        return nil
×
2759
                }
×
2760

2761
                update.Signature, err = lnwire.NewSigFromSignature(sig)
2✔
2762
                if err != nil {
2✔
2763
                        return nil
×
2764
                }
×
2765
        }
2766

2767
        return update
5✔
2768
}
2769

2770
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
2771
// that a link has a new alias.
2772
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
2773
        alias lnwire.ShortChannelID) error {
×
2774

×
2775
        // Fetch the link so that we can update the underlying channel's set of
×
2776
        // aliases.
×
2777
        s.indexMtx.RLock()
×
2778
        link, err := s.getLink(chanID)
×
2779
        s.indexMtx.RUnlock()
×
2780
        if err != nil {
×
2781
                return err
×
2782
        }
×
2783

2784
        // If the link is a channel where the option-scid-alias feature bit was
2785
        // not negotiated, we'll return an error.
2786
        if !link.negotiatedAliasFeature() {
×
2787
                return fmt.Errorf("attempted to update non-alias channel")
×
2788
        }
×
2789

2790
        linkScid := link.ShortChanID()
×
2791

×
2792
        // We'll update the maps so the Switch includes this alias in its
×
2793
        // forwarding decisions.
×
2794
        if link.isZeroConf() {
×
2795
                if link.zeroConfConfirmed() {
×
2796
                        // If the channel has confirmed on-chain, we'll
×
2797
                        // add this alias to the aliasToReal map.
×
2798
                        confirmedScid := link.confirmedScid()
×
2799

×
2800
                        s.aliasToReal[alias] = confirmedScid
×
2801
                }
×
2802

2803
                // Add this alias to the baseIndex mapping.
2804
                s.baseIndex[alias] = linkScid
×
2805
        } else if link.negotiatedAliasFeature() {
×
2806
                // The channel is confirmed, so we'll populate the aliasToReal
×
2807
                // and baseIndex maps.
×
2808
                s.aliasToReal[alias] = linkScid
×
2809
                s.baseIndex[alias] = linkScid
×
2810
        }
×
2811

2812
        return nil
×
2813
}
2814

2815
// handlePacketAdd handles forwarding an Add packet.
2816
func (s *Switch) handlePacketAdd(packet *htlcPacket,
2817
        htlc *lnwire.UpdateAddHTLC) error {
82✔
2818

82✔
2819
        // Check if the node is set to reject all onward HTLCs and also make
82✔
2820
        // sure that HTLC is not from the source node.
82✔
2821
        if s.cfg.RejectHTLC {
83✔
2822
                failure := NewDetailedLinkError(
1✔
2823
                        &lnwire.FailChannelDisabled{},
1✔
2824
                        OutgoingFailureForwardsDisabled,
1✔
2825
                )
1✔
2826

1✔
2827
                return s.failAddPacket(packet, failure)
1✔
2828
        }
1✔
2829

2830
        // Before we attempt to find a non-strict forwarding path for this
2831
        // htlc, check whether the htlc is being routed over the same incoming
2832
        // and outgoing channel. If our node does not allow forwards of this
2833
        // nature, we fail the htlc early. This check is in place to disallow
2834
        // inefficiently routed htlcs from locking up our balance. With
2835
        // channels where the option-scid-alias feature was negotiated, we also
2836
        // have to be sure that the IDs aren't the same since one or both could
2837
        // be an alias.
2838
        linkErr := s.checkCircularForward(
81✔
2839
                packet.incomingChanID, packet.outgoingChanID,
81✔
2840
                s.cfg.AllowCircularRoute, htlc.PaymentHash,
81✔
2841
        )
81✔
2842
        if linkErr != nil {
82✔
2843
                return s.failAddPacket(packet, linkErr)
1✔
2844
        }
1✔
2845

2846
        s.indexMtx.RLock()
80✔
2847
        targetLink, err := s.getLinkByMapping(packet)
80✔
2848
        if err != nil {
84✔
2849
                s.indexMtx.RUnlock()
4✔
2850

4✔
2851
                log.Debugf("unable to find link with "+
4✔
2852
                        "destination %v", packet.outgoingChanID)
4✔
2853

4✔
2854
                // If packet was forwarded from another channel link than we
4✔
2855
                // should notify this link that some error occurred.
4✔
2856
                linkError := NewLinkError(
4✔
2857
                        &lnwire.FailUnknownNextPeer{},
4✔
2858
                )
4✔
2859

4✔
2860
                return s.failAddPacket(packet, linkError)
4✔
2861
        }
4✔
2862
        targetPeerKey := targetLink.PeerPubKey()
76✔
2863
        interfaceLinks, _ := s.getLinks(targetPeerKey)
76✔
2864
        s.indexMtx.RUnlock()
76✔
2865

76✔
2866
        // We'll keep track of any HTLC failures during the link selection
76✔
2867
        // process. This way we can return the error for precise link that the
76✔
2868
        // sender selected, while optimistically trying all links to utilize
76✔
2869
        // our available bandwidth.
76✔
2870
        linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
76✔
2871

76✔
2872
        // Find all destination channel links with appropriate bandwidth.
76✔
2873
        var destinations []ChannelLink
76✔
2874
        for _, link := range interfaceLinks {
156✔
2875
                var failure *LinkError
80✔
2876

80✔
2877
                // We'll skip any links that aren't yet eligible for
80✔
2878
                // forwarding.
80✔
2879
                if !link.EligibleToForward() {
84✔
2880
                        failure = NewDetailedLinkError(
4✔
2881
                                &lnwire.FailUnknownNextPeer{},
4✔
2882
                                OutgoingFailureLinkNotEligible,
4✔
2883
                        )
4✔
2884
                } else {
80✔
2885
                        // We'll ensure that the HTLC satisfies the current
76✔
2886
                        // forwarding conditions of this target link.
76✔
2887
                        currentHeight := atomic.LoadUint32(&s.bestHeight)
76✔
2888
                        failure = link.CheckHtlcForward(
76✔
2889
                                htlc.PaymentHash, packet.incomingAmount,
76✔
2890
                                packet.amount, packet.incomingTimeout,
76✔
2891
                                packet.outgoingTimeout, packet.inboundFee,
76✔
2892
                                currentHeight, packet.originalOutgoingChanID,
76✔
2893
                                htlc.CustomRecords,
76✔
2894
                        )
76✔
2895
                }
76✔
2896

2897
                // If this link can forward the htlc, add it to the set of
2898
                // destinations.
2899
                if failure == nil {
140✔
2900
                        destinations = append(destinations, link)
60✔
2901
                        continue
60✔
2902
                }
2903

2904
                linkErrs[link.ShortChanID()] = failure
20✔
2905
        }
2906

2907
        // If we had a forwarding failure due to the HTLC not satisfying the
2908
        // current policy, then we'll send back an error, but ensure we send
2909
        // back the error sourced at the *target* link.
2910
        if len(destinations) == 0 {
92✔
2911
                // At this point, some or all of the links rejected the HTLC so
16✔
2912
                // we couldn't forward it. So we'll try to look up the error
16✔
2913
                // that came from the source.
16✔
2914
                linkErr, ok := linkErrs[packet.outgoingChanID]
16✔
2915
                if !ok {
16✔
2916
                        // If we can't find the error of the source, then we'll
×
2917
                        // return an unknown next peer, though this should
×
2918
                        // never happen.
×
2919
                        linkErr = NewLinkError(
×
2920
                                &lnwire.FailUnknownNextPeer{},
×
2921
                        )
×
2922
                        log.Warnf("unable to find err source for "+
×
2923
                                "outgoing_link=%v, errors=%v",
×
2924
                                packet.outgoingChanID,
×
2925
                                lnutils.SpewLogClosure(linkErrs))
×
2926
                }
×
2927

2928
                log.Tracef("incoming HTLC(%x) violated "+
16✔
2929
                        "target outgoing link (id=%v) policy: %v",
16✔
2930
                        htlc.PaymentHash[:], packet.outgoingChanID,
16✔
2931
                        linkErr)
16✔
2932

16✔
2933
                return s.failAddPacket(packet, linkErr)
16✔
2934
        }
2935

2936
        // Choose a random link out of the set of links that can forward this
2937
        // htlc. The reason for randomization is to evenly distribute the htlc
2938
        // load without making assumptions about what the best channel is.
2939
        //nolint:gosec
2940
        destination := destinations[rand.Intn(len(destinations))]
60✔
2941

60✔
2942
        // Retrieve the incoming link by its ShortChannelID. Note that the
60✔
2943
        // incomingChanID is never set to hop.Source here.
60✔
2944
        s.indexMtx.RLock()
60✔
2945
        incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
60✔
2946
        s.indexMtx.RUnlock()
60✔
2947
        if err != nil {
60✔
2948
                // If we couldn't find the incoming link, we can't evaluate the
×
2949
                // incoming's exposure to dust, so we just fail the HTLC back.
×
2950
                linkErr := NewLinkError(
×
2951
                        &lnwire.FailTemporaryChannelFailure{},
×
2952
                )
×
2953

×
2954
                return s.failAddPacket(packet, linkErr)
×
2955
        }
×
2956

2957
        // Evaluate whether this HTLC would increase our fee exposure over the
2958
        // threshold on the incoming link. If it does, fail it backwards.
2959
        if s.dustExceedsFeeThreshold(
60✔
2960
                incomingLink, packet.incomingAmount, true,
60✔
2961
        ) {
60✔
2962
                // The incoming dust exceeds the threshold, so we fail the add
×
2963
                // back.
×
2964
                linkErr := NewLinkError(
×
2965
                        &lnwire.FailTemporaryChannelFailure{},
×
2966
                )
×
2967

×
2968
                return s.failAddPacket(packet, linkErr)
×
2969
        }
×
2970

2971
        // Also evaluate whether this HTLC would increase our fee exposure over
2972
        // the threshold on the destination link. If it does, fail it back.
2973
        if s.dustExceedsFeeThreshold(
60✔
2974
                destination, packet.amount, false,
60✔
2975
        ) {
61✔
2976
                // The outgoing dust exceeds the threshold, so we fail the add
1✔
2977
                // back.
1✔
2978
                linkErr := NewLinkError(
1✔
2979
                        &lnwire.FailTemporaryChannelFailure{},
1✔
2980
                )
1✔
2981

1✔
2982
                return s.failAddPacket(packet, linkErr)
1✔
2983
        }
1✔
2984

2985
        // Send the packet to the destination channel link which manages the
2986
        // channel.
2987
        packet.outgoingChanID = destination.ShortChanID()
59✔
2988

59✔
2989
        return destination.handleSwitchPacket(packet)
59✔
2990
}
2991

2992
// handlePacketSettle handles forwarding a settle packet.
2993
func (s *Switch) handlePacketSettle(packet *htlcPacket) error {
399✔
2994
        // If the source of this packet has not been set, use the circuit map
399✔
2995
        // to lookup the origin.
399✔
2996
        circuit, err := s.closeCircuit(packet)
399✔
2997
        if err != nil {
399✔
2998
                return err
×
2999
        }
×
3000

3001
        // closeCircuit returns a nil circuit when a settle packet returns an
3002
        // ErrUnknownCircuit error upon the inner call to CloseCircuit.
3003
        //
3004
        // NOTE: We can only get a nil circuit when it has already been deleted
3005
        // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck`
3006
        // is received, which invokes `processRemoteSettleFails` in its link.
3007
        if circuit == nil {
589✔
3008
                log.Debugf("Found nil circuit: packet=%v", spew.Sdump(packet))
190✔
3009
                return nil
190✔
3010
        }
190✔
3011

3012
        localHTLC := packet.incomingChanID == hop.Source
209✔
3013

209✔
3014
        // If this is a locally initiated HTLC, we need to handle the packet by
209✔
3015
        // storing the network result.
209✔
3016
        //
209✔
3017
        // A blank IncomingChanID in a circuit indicates that it is a pending
209✔
3018
        // user-initiated payment.
209✔
3019
        //
209✔
3020
        // NOTE: `closeCircuit` modifies the state of `packet`.
209✔
3021
        if localHTLC {
389✔
3022
                // TODO(yy): remove the goroutine and send back the error here.
180✔
3023
                s.wg.Add(1)
180✔
3024
                go s.handleLocalResponse(packet)
180✔
3025

180✔
3026
                // If this is a locally initiated HTLC, there's no need to
180✔
3027
                // forward it so we exit.
180✔
3028
                return nil
180✔
3029
        }
180✔
3030

3031
        // If this is an HTLC settle, and it wasn't from a locally initiated
3032
        // HTLC, then we'll log a forwarding event so we can flush it to disk
3033
        // later.
3034
        if circuit.Outgoing != nil {
58✔
3035
                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
29✔
3036
                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
29✔
3037
                        circuit.PaymentHash[:], circuit.OutgoingAmount,
29✔
3038
                        circuit.IncomingAmount-circuit.OutgoingAmount,
29✔
3039
                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
29✔
3040

29✔
3041
                s.fwdEventMtx.Lock()
29✔
3042
                s.pendingFwdingEvents = append(
29✔
3043
                        s.pendingFwdingEvents,
29✔
3044
                        channeldb.ForwardingEvent{
29✔
3045
                                Timestamp:      time.Now(),
29✔
3046
                                IncomingChanID: circuit.Incoming.ChanID,
29✔
3047
                                OutgoingChanID: circuit.Outgoing.ChanID,
29✔
3048
                                AmtIn:          circuit.IncomingAmount,
29✔
3049
                                AmtOut:         circuit.OutgoingAmount,
29✔
3050
                        },
29✔
3051
                )
29✔
3052
                s.fwdEventMtx.Unlock()
29✔
3053
        }
29✔
3054

3055
        // Deliver this packet.
3056
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
29✔
3057
}
3058

3059
// handlePacketFail handles forwarding a fail packet.
3060
func (s *Switch) handlePacketFail(packet *htlcPacket,
3061
        htlc *lnwire.UpdateFailHTLC) error {
138✔
3062

138✔
3063
        // If the source of this packet has not been set, use the circuit map
138✔
3064
        // to lookup the origin.
138✔
3065
        circuit, err := s.closeCircuit(packet)
138✔
3066
        if err != nil {
138✔
3067
                return err
×
3068
        }
×
3069

3070
        // If this is a locally initiated HTLC, we need to handle the packet by
3071
        // storing the network result.
3072
        //
3073
        // A blank IncomingChanID in a circuit indicates that it is a pending
3074
        // user-initiated payment.
3075
        //
3076
        // NOTE: `closeCircuit` modifies the state of `packet`.
3077
        if packet.incomingChanID == hop.Source {
262✔
3078
                // TODO(yy): remove the goroutine and send back the error here.
124✔
3079
                s.wg.Add(1)
124✔
3080
                go s.handleLocalResponse(packet)
124✔
3081

124✔
3082
                // If this is a locally initiated HTLC, there's no need to
124✔
3083
                // forward it so we exit.
124✔
3084
                return nil
124✔
3085
        }
124✔
3086

3087
        // Exit early if this hasSource is true. This flag is only set via
3088
        // mailbox's `FailAdd`. This method has two callsites,
3089
        // - the packet has timed out after `MailboxDeliveryTimeout`, defaults
3090
        //   to 1 min.
3091
        // - the HTLC fails the validation in `channel.AddHTLC`.
3092
        // In either case, the `Reason` field is populated. Thus there's no
3093
        // need to proceed and extract the failure reason below.
3094
        if packet.hasSource {
21✔
3095
                // Deliver this packet.
7✔
3096
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
7✔
3097
        }
7✔
3098

3099
        // HTLC resolutions and messages restored from disk don't have the
3100
        // obfuscator set from the original htlc add packet - set it here for
3101
        // use in blinded errors.
3102
        packet.obfuscator = circuit.ErrorEncrypter
7✔
3103

7✔
3104
        switch {
7✔
3105
        // No message to encrypt, locally sourced payment.
3106
        case circuit.ErrorEncrypter == nil:
×
3107
                // TODO(yy) further check this case as we shouldn't end up here
3108
                // as `isLocal` is already false.
3109

3110
        // If this is a resolution message, then we'll need to encrypt it as
3111
        // it's actually internally sourced.
3112
        case packet.isResolution:
×
3113
                var err error
×
3114
                // TODO(roasbeef): don't need to pass actually?
×
3115
                failure := &lnwire.FailPermanentChannelFailure{}
×
3116
                htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
×
3117
                        failure,
×
3118
                )
×
3119
                if err != nil {
×
3120
                        err = fmt.Errorf("unable to obfuscate error: %w", err)
×
3121
                        log.Error(err)
×
3122
                }
×
3123

3124
        // Alternatively, if the remote party sends us an
3125
        // UpdateFailMalformedHTLC, then we'll need to convert this into a
3126
        // proper well formatted onion error as there's no HMAC currently.
3127
        case packet.convertedError:
2✔
3128
                log.Infof("Converting malformed HTLC error for circuit for "+
2✔
3129
                        "Circuit(%x: (%s, %d) <-> (%s, %d))",
2✔
3130
                        packet.circuit.PaymentHash,
2✔
3131
                        packet.incomingChanID, packet.incomingHTLCID,
2✔
3132
                        packet.outgoingChanID, packet.outgoingHTLCID)
2✔
3133

2✔
3134
                htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
2✔
3135
                        htlc.Reason,
2✔
3136
                )
2✔
3137

3138
        default:
5✔
3139
                // Otherwise, it's a forwarded error, so we'll perform a
5✔
3140
                // wrapper encryption as normal.
5✔
3141
                htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
5✔
3142
                        htlc.Reason,
5✔
3143
                )
5✔
3144
        }
3145

3146
        // Deliver this packet.
3147
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
7✔
3148
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc