• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10189747109

01 Aug 2024 12:31AM UTC coverage: 58.641% (+0.2%) from 58.459%
10189747109

push

github

web-flow
Merge pull request #8949 from ProofOfKeags/fn/req

[MICRO]: fn: Add new Req type to abstract the pattern of remote processing.

125217 of 213532 relevant lines covered (58.64%)

29382.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.6
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math/rand"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/davecgh/go-spew/spew"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/channeldb/models"
19
        "github.com/lightningnetwork/lnd/clock"
20
        "github.com/lightningnetwork/lnd/contractcourt"
21
        "github.com/lightningnetwork/lnd/fn"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
23
        "github.com/lightningnetwork/lnd/kvdb"
24
        "github.com/lightningnetwork/lnd/lntypes"
25
        "github.com/lightningnetwork/lnd/lnutils"
26
        "github.com/lightningnetwork/lnd/lnwallet"
27
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
28
        "github.com/lightningnetwork/lnd/lnwire"
29
        "github.com/lightningnetwork/lnd/ticker"
30
)
31

32
const (
33
        // DefaultFwdEventInterval is the duration between attempts to flush
34
        // pending forwarding events to disk.
35
        DefaultFwdEventInterval = 15 * time.Second
36

37
        // DefaultLogInterval is the duration between attempts to log statistics
38
        // about forwarding events.
39
        DefaultLogInterval = 10 * time.Second
40

41
        // DefaultAckInterval is the duration between attempts to ack any settle
42
        // fails in a forwarding package.
43
        DefaultAckInterval = 15 * time.Second
44

45
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
46
        // be cancelled if they could not get added to an outgoing commitment.
47
        DefaultMailboxDeliveryTimeout = time.Minute
48
)
49

50
var (
51
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
52
        ErrChannelLinkNotFound = errors.New("channel link not found")
53

54
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
55
        // through the switch and is locked into another commitment txn.
56
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
57

58
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
59
        // error decryptor for this payment. This is likely due to restarting
60
        // the daemon.
61
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
62

63
        // ErrSwitchExiting signaled when the switch has received a shutdown
64
        // request.
65
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
66

67
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
68
        // active links in the switch for a specific destination.
69
        ErrNoLinksFound = errors.New("no channel links found")
70

71
        // ErrUnreadableFailureMessage is returned when the failure message
72
        // cannot be decrypted.
73
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
74

75
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
76
        // failed to be processed.
77
        ErrLocalAddFailed = errors.New("local add HTLC failed")
78

79
        // errFeeExposureExceeded is only surfaced to callers of SendHTLC and
80
        // signals that sending the HTLC would exceed the outgoing link's fee
81
        // exposure threshold.
82
        errFeeExposureExceeded = errors.New("fee exposure exceeded")
83

84
        // DefaultMaxFeeExposure is the default threshold after which we'll
85
        // fail payments if they increase our fee exposure. This is currently
86
        // set to 500m msats.
87
        DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000)
88
)
89

90
// plexPacket encapsulates switch packet and adds error channel to receive
91
// error from request handler.
92
type plexPacket struct {
93
        pkt *htlcPacket
94
        err chan error
95
}
96

97
// ChanClose represents a request which close a particular channel specified by
98
// its id.
99
type ChanClose struct {
100
        // CloseType is a variable which signals the type of channel closure the
101
        // peer should execute.
102
        CloseType contractcourt.ChannelCloseType
103

104
        // ChanPoint represent the id of the channel which should be closed.
105
        ChanPoint *wire.OutPoint
106

107
        // TargetFeePerKw is the ideal fee that was specified by the caller.
108
        // This value is only utilized if the closure type is CloseRegular.
109
        // This will be the starting offered fee when the fee negotiation
110
        // process for the cooperative closure transaction kicks off.
111
        TargetFeePerKw chainfee.SatPerKWeight
112

113
        // MaxFee is the highest fee the caller is willing to pay.
114
        //
115
        // NOTE: This field is only respected if the caller is the initiator of
116
        // the channel.
117
        MaxFee chainfee.SatPerKWeight
118

119
        // DeliveryScript is an optional delivery script to pay funds out to.
120
        DeliveryScript lnwire.DeliveryAddress
121

122
        // Updates is used by request creator to receive the notifications about
123
        // execution of the close channel request.
124
        Updates chan interface{}
125

126
        // Err is used by request creator to receive request execution error.
127
        Err chan error
128
}
129

130
// Config defines the configuration for the service. ALL elements within the
131
// configuration MUST be non-nil for the service to carry out its duties.
132
type Config struct {
133
        // FwdingLog is an interface that will be used by the switch to log
134
        // forwarding events. A forwarding event happens each time a payment
135
        // circuit is successfully completed. So when we forward an HTLC, and a
136
        // settle is eventually received.
137
        FwdingLog ForwardingLog
138

139
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
140
        // forced unilateral closure of the channel initiated by a local
141
        // subsystem.
142
        LocalChannelClose func(pubKey []byte, request *ChanClose)
143

144
        // DB is the database backend that will be used to back the switch's
145
        // persistent circuit map.
146
        DB kvdb.Backend
147

148
        // FetchAllOpenChannels is a function that fetches all currently open
149
        // channels from the channel database.
150
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
151

152
        // FetchAllChannels is a function that fetches all pending open, open,
153
        // and waiting close channels from the database.
154
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
155

156
        // FetchClosedChannels is a function that fetches all closed channels
157
        // from the channel database.
158
        FetchClosedChannels func(
159
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
160

161
        // SwitchPackager provides access to the forwarding packages of all
162
        // active channels. This gives the switch the ability to read arbitrary
163
        // forwarding packages, and ack settles and fails contained within them.
164
        SwitchPackager channeldb.FwdOperator
165

166
        // ExtractErrorEncrypter is an interface allowing switch to reextract
167
        // error encrypters stored in the circuit map on restarts, since they
168
        // are not stored directly within the database.
169
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
170

171
        // FetchLastChannelUpdate retrieves the latest routing policy for a
172
        // target channel. This channel will typically be the outgoing channel
173
        // specified when we receive an incoming HTLC.  This will be used to
174
        // provide payment senders our latest policy when sending encrypted
175
        // error messages.
176
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
177

178
        // Notifier is an instance of a chain notifier that we'll use to signal
179
        // the switch when a new block has arrived.
180
        Notifier chainntnfs.ChainNotifier
181

182
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
183
        // events through.
184
        HtlcNotifier htlcNotifier
185

186
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
187
        // pending forwarding events.
188
        FwdEventTicker ticker.Ticker
189

190
        // LogEventTicker is a signal instructing the htlcswitch to log
191
        // aggregate stats about it's forwarding during the last interval.
192
        LogEventTicker ticker.Ticker
193

194
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
195
        // fails in forwarding packages.
196
        AckEventTicker ticker.Ticker
197

198
        // AllowCircularRoute is true if the user has configured their node to
199
        // allow forwards that arrive and depart our node over the same channel.
200
        AllowCircularRoute bool
201

202
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
203
        // HTLCs that are not from the source hop.
204
        RejectHTLC bool
205

206
        // Clock is a time source for the switch.
207
        Clock clock.Clock
208

209
        // MailboxDeliveryTimeout is the interval after which Adds will be
210
        // cancelled if they have not been yet been delivered to a link. The
211
        // computed deadline will expiry this long after the Adds are added to
212
        // a mailbox via AddPacket.
213
        MailboxDeliveryTimeout time.Duration
214

215
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
216
        // fail incoming or outgoing payments for a particular channel.
217
        MaxFeeExposure lnwire.MilliSatoshi
218

219
        // SignAliasUpdate is used when sending FailureMessages backwards for
220
        // option_scid_alias channels. This avoids a potential privacy leak by
221
        // replacing the public, confirmed SCID with the alias in the
222
        // ChannelUpdate.
223
        SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
224
                error)
225

226
        // IsAlias returns whether or not a given SCID is an alias.
227
        IsAlias func(scid lnwire.ShortChannelID) bool
228
}
229

230
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
231
// Connected peers with active channels are treated as named interfaces which
232
// refer to active channels as links. A link is the switch's message
233
// communication point with the goroutine that manages an active channel. New
234
// links are registered each time a channel is created, and unregistered once
235
// the channel is closed. The switch manages the hand-off process for multi-hop
236
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
237
// notifies users local-systems concerning their outstanding payment requests.
238
type Switch struct {
239
        started  int32 // To be used atomically.
240
        shutdown int32 // To be used atomically.
241

242
        // bestHeight is the best known height of the main chain. The links will
243
        // be used this information to govern decisions based on HTLC timeouts.
244
        // This will be retrieved by the registered links atomically.
245
        bestHeight uint32
246

247
        wg   sync.WaitGroup
248
        quit chan struct{}
249

250
        // cfg is a copy of the configuration struct that the htlc switch
251
        // service was initialized with.
252
        cfg *Config
253

254
        // networkResults stores the results of payments initiated by the user.
255
        // The store is used to later look up the payments and notify the
256
        // user of the result when they are complete. Each payment attempt
257
        // should be given a unique integer ID when it is created, otherwise
258
        // results might be overwritten.
259
        networkResults *networkResultStore
260

261
        // circuits is storage for payment circuits which are used to
262
        // forward the settle/fail htlc updates back to the add htlc initiator.
263
        circuits CircuitMap
264

265
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
266
        // the switch, and facilitates delayed delivery of packets to links that
267
        // later come online.
268
        mailOrchestrator *mailOrchestrator
269

270
        // indexMtx is a read/write mutex that protects the set of indexes
271
        // below.
272
        indexMtx sync.RWMutex
273

274
        // pendingLinkIndex holds links that have not had their final, live
275
        // short_chan_id assigned.
276
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
277

278
        // links is a map of channel id and channel link which manages
279
        // this channel.
280
        linkIndex map[lnwire.ChannelID]ChannelLink
281

282
        // forwardingIndex is an index which is consulted by the switch when it
283
        // needs to locate the next hop to forward an incoming/outgoing HTLC
284
        // update to/from.
285
        //
286
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
287
        // ChannelLink
288
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
289

290
        // interfaceIndex maps the compressed public key of a peer to all the
291
        // channels that the switch maintains with that peer.
292
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
293

294
        // linkStopIndex stores the currently stopping ChannelLinks,
295
        // represented by their ChannelID. The key is the link's ChannelID and
296
        // the value is a chan that is closed when the link has fully stopped.
297
        // This map is only added to if RemoveLink is called and is not added
298
        // to when the Switch is shutting down and calls Stop() on each link.
299
        //
300
        // MUST be used with the indexMtx.
301
        linkStopIndex map[lnwire.ChannelID]chan struct{}
302

303
        // htlcPlex is the channel which all connected links use to coordinate
304
        // the setup/teardown of Sphinx (onion routing) payment circuits.
305
        // Active links forward any add/settle messages over this channel each
306
        // state transition, sending new adds/settles which are fully locked
307
        // in.
308
        htlcPlex chan *plexPacket
309

310
        // chanCloseRequests is used to transfer the channel close request to
311
        // the channel close handler.
312
        chanCloseRequests chan *ChanClose
313

314
        // resolutionMsgs is the channel that all external contract resolution
315
        // messages will be sent over.
316
        resolutionMsgs chan *resolutionMsg
317

318
        // pendingFwdingEvents is the set of forwarding events which have been
319
        // collected during the current interval, but hasn't yet been written
320
        // to the forwarding log.
321
        fwdEventMtx         sync.Mutex
322
        pendingFwdingEvents []channeldb.ForwardingEvent
323

324
        // blockEpochStream is an active block epoch event stream backed by an
325
        // active ChainNotifier instance. This will be used to retrieve the
326
        // latest height of the chain.
327
        blockEpochStream *chainntnfs.BlockEpochEvent
328

329
        // pendingSettleFails is the set of settle/fail entries that we need to
330
        // ack in the forwarding package of the outgoing link. This was added to
331
        // make pipelining settles more efficient.
332
        pendingSettleFails []channeldb.SettleFailRef
333

334
        // resMsgStore is used to store the set of ResolutionMsg that come from
335
        // contractcourt. This is used so the Switch can properly forward them,
336
        // even on restarts.
337
        resMsgStore *resolutionStore
338

339
        // aliasToReal is a map used for option-scid-alias feature-bit links.
340
        // The alias SCID is the key and the real, confirmed SCID is the value.
341
        // If the channel is unconfirmed, there will not be a mapping for it.
342
        // Since channels can have multiple aliases, this map is essentially a
343
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
344
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
345

346
        // baseIndex is a map used for option-scid-alias feature-bit links.
347
        // The value is the SCID of the link's ShortChannelID. This value may
348
        // be an alias for zero-conf channels or a confirmed SCID for
349
        // non-zero-conf channels with the option-scid-alias feature-bit. The
350
        // key includes the value itself and also any other aliases. This MUST
351
        // be accessed with the indexMtx.
352
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
353
}
354

355
// New creates the new instance of htlc switch.
356
func New(cfg Config, currentHeight uint32) (*Switch, error) {
343✔
357
        resStore := newResolutionStore(cfg.DB)
343✔
358

343✔
359
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
343✔
360
                DB:                    cfg.DB,
343✔
361
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
343✔
362
                FetchClosedChannels:   cfg.FetchClosedChannels,
343✔
363
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
343✔
364
                CheckResolutionMsg:    resStore.checkResolutionMsg,
343✔
365
        })
343✔
366
        if err != nil {
343✔
367
                return nil, err
×
368
        }
×
369

370
        s := &Switch{
343✔
371
                bestHeight:        currentHeight,
343✔
372
                cfg:               &cfg,
343✔
373
                circuits:          circuitMap,
343✔
374
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
343✔
375
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
343✔
376
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
343✔
377
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
343✔
378
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
343✔
379
                networkResults:    newNetworkResultStore(cfg.DB),
343✔
380
                htlcPlex:          make(chan *plexPacket),
343✔
381
                chanCloseRequests: make(chan *ChanClose),
343✔
382
                resolutionMsgs:    make(chan *resolutionMsg),
343✔
383
                resMsgStore:       resStore,
343✔
384
                quit:              make(chan struct{}),
343✔
385
        }
343✔
386

343✔
387
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
343✔
388
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
343✔
389

343✔
390
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
343✔
391
                forwardPackets:    s.ForwardPackets,
343✔
392
                clock:             s.cfg.Clock,
343✔
393
                expiry:            s.cfg.MailboxDeliveryTimeout,
343✔
394
                failMailboxUpdate: s.failMailboxUpdate,
343✔
395
        })
343✔
396

343✔
397
        return s, nil
343✔
398
}
399

400
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
401
// channel. We'll use this channel to synchronize delivery of the message with
402
// the caller.
403
type resolutionMsg struct {
404
        contractcourt.ResolutionMsg
405

406
        errChan chan error
407
}
408

409
// ProcessContractResolution is called by active contract resolvers once a
410
// contract they are watching over has been fully resolved. The message carries
411
// an external signal that *would* have been sent if the outgoing channel
412
// didn't need to go to the chain in order to fulfill a contract. We'll process
413
// this message just as if it came from an active outgoing channel.
414
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
5✔
415
        errChan := make(chan error, 1)
5✔
416

5✔
417
        select {
5✔
418
        case s.resolutionMsgs <- &resolutionMsg{
419
                ResolutionMsg: msg,
420
                errChan:       errChan,
421
        }:
5✔
422
        case <-s.quit:
×
423
                return ErrSwitchExiting
×
424
        }
425

426
        select {
5✔
427
        case err := <-errChan:
5✔
428
                return err
5✔
429
        case <-s.quit:
×
430
                return ErrSwitchExiting
×
431
        }
432
}
433

434
// GetAttemptResult returns the result of the payment attempt with the given
435
// attemptID. The paymentHash should be set to the payment's overall hash, or
436
// in case of AMP payments the payment's unique identifier.
437
//
438
// The method returns a channel where the payment result will be sent when
439
// available, or an error is encountered during forwarding. When a result is
440
// received on the channel, the HTLC is guaranteed to no longer be in flight.
441
// The switch shutting down is signaled by closing the channel. If the
442
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
443
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
444
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
744✔
445

744✔
446
        var (
744✔
447
                nChan <-chan *networkResult
744✔
448
                err   error
744✔
449
                inKey = CircuitKey{
744✔
450
                        ChanID: hop.Source,
744✔
451
                        HtlcID: attemptID,
744✔
452
                }
744✔
453
        )
744✔
454

744✔
455
        // If the payment is not found in the circuit map, check whether a
744✔
456
        // result is already available.
744✔
457
        // Assumption: no one will add this payment ID other than the caller.
744✔
458
        if s.circuits.LookupCircuit(inKey) == nil {
753✔
459
                res, err := s.networkResults.getResult(attemptID)
9✔
460
                if err != nil {
11✔
461
                        return nil, err
2✔
462
                }
2✔
463
                c := make(chan *networkResult, 1)
7✔
464
                c <- res
7✔
465
                nChan = c
7✔
466
        } else {
739✔
467
                // The payment was committed to the circuits, subscribe for a
739✔
468
                // result.
739✔
469
                nChan, err = s.networkResults.subscribeResult(attemptID)
739✔
470
                if err != nil {
739✔
471
                        return nil, err
×
472
                }
×
473
        }
474

475
        resultChan := make(chan *PaymentResult, 1)
742✔
476

742✔
477
        // Since the payment was known, we can start a goroutine that can
742✔
478
        // extract the result when it is available, and pass it on to the
742✔
479
        // caller.
742✔
480
        s.wg.Add(1)
742✔
481
        go func() {
1,484✔
482
                defer s.wg.Done()
742✔
483

742✔
484
                var n *networkResult
742✔
485
                select {
742✔
486
                case n = <-nChan:
738✔
487
                case <-s.quit:
8✔
488
                        // We close the result channel to signal a shutdown. We
8✔
489
                        // don't send any result in this case since the HTLC is
8✔
490
                        // still in flight.
8✔
491
                        close(resultChan)
8✔
492
                        return
8✔
493
                }
494

495
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
738✔
496
                        attemptID)
738✔
497

738✔
498
                // Extract the result and pass it to the result channel.
738✔
499
                result, err := s.extractResult(
738✔
500
                        deobfuscator, n, attemptID, paymentHash,
738✔
501
                )
738✔
502
                if err != nil {
738✔
503
                        e := fmt.Errorf("unable to extract result: %w", err)
×
504
                        log.Error(e)
×
505
                        resultChan <- &PaymentResult{
×
506
                                Error: e,
×
507
                        }
×
508
                        return
×
509
                }
×
510
                resultChan <- result
738✔
511
        }()
512

513
        return resultChan, nil
742✔
514
}
515

516
// CleanStore calls the underlying result store, telling it is safe to delete
517
// all entries except the ones in the keepPids map. This should be called
518
// preiodically to let the switch clean up payment results that we have
519
// handled.
520
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
4✔
521
        return s.networkResults.cleanStore(keepPids)
4✔
522
}
4✔
523

524
// SendHTLC is used by other subsystems which aren't belong to htlc switch
525
// package in order to send the htlc update. The attemptID used MUST be unique
526
// for this HTLC, and MUST be used only once, otherwise the switch might reject
527
// it.
528
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
529
        htlc *lnwire.UpdateAddHTLC) error {
1,461✔
530

1,461✔
531
        // Generate and send new update packet, if error will be received on
1,461✔
532
        // this stage it means that packet haven't left boundaries of our
1,461✔
533
        // system and something wrong happened.
1,461✔
534
        packet := &htlcPacket{
1,461✔
535
                incomingChanID: hop.Source,
1,461✔
536
                incomingHTLCID: attemptID,
1,461✔
537
                outgoingChanID: firstHop,
1,461✔
538
                htlc:           htlc,
1,461✔
539
                amount:         htlc.Amount,
1,461✔
540
        }
1,461✔
541

1,461✔
542
        // Attempt to fetch the target link before creating a circuit so that
1,461✔
543
        // we don't leave dangling circuits. The getLocalLink method does not
1,461✔
544
        // require the circuit variable to be set on the *htlcPacket.
1,461✔
545
        link, linkErr := s.getLocalLink(packet, htlc)
1,461✔
546
        if linkErr != nil {
1,469✔
547
                // Notify the htlc notifier of a link failure on our outgoing
8✔
548
                // link. Incoming timelock/amount values are not set because
8✔
549
                // they are not present for local sends.
8✔
550
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
8✔
551
                        newHtlcKey(packet),
8✔
552
                        HtlcInfo{
8✔
553
                                OutgoingTimeLock: htlc.Expiry,
8✔
554
                                OutgoingAmt:      htlc.Amount,
8✔
555
                        },
8✔
556
                        HtlcEventTypeSend,
8✔
557
                        linkErr,
8✔
558
                        false,
8✔
559
                )
8✔
560

8✔
561
                return linkErr
8✔
562
        }
8✔
563

564
        // Evaluate whether this HTLC would bypass our fee exposure. If it
565
        // does, don't send it out and instead return an error.
566
        if s.dustExceedsFeeThreshold(link, htlc.Amount, false) {
1,458✔
567
                // Notify the htlc notifier of a link failure on our outgoing
1✔
568
                // link. We use the FailTemporaryChannelFailure in place of a
1✔
569
                // more descriptive error message.
1✔
570
                linkErr := NewLinkError(
1✔
571
                        &lnwire.FailTemporaryChannelFailure{},
1✔
572
                )
1✔
573
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
1✔
574
                        newHtlcKey(packet),
1✔
575
                        HtlcInfo{
1✔
576
                                OutgoingTimeLock: htlc.Expiry,
1✔
577
                                OutgoingAmt:      htlc.Amount,
1✔
578
                        },
1✔
579
                        HtlcEventTypeSend,
1✔
580
                        linkErr,
1✔
581
                        false,
1✔
582
                )
1✔
583

1✔
584
                return errFeeExposureExceeded
1✔
585
        }
1✔
586

587
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
1,456✔
588
        actions, err := s.circuits.CommitCircuits(circuit)
1,456✔
589
        if err != nil {
1,456✔
590
                log.Errorf("unable to commit circuit in switch: %v", err)
×
591
                return err
×
592
        }
×
593

594
        // Drop duplicate packet if it has already been seen.
595
        switch {
1,456✔
596
        case len(actions.Drops) == 1:
1✔
597
                return ErrDuplicateAdd
1✔
598

599
        case len(actions.Fails) == 1:
×
600
                return ErrLocalAddFailed
×
601
        }
602

603
        // Give the packet to the link's mailbox so that HTLC's are properly
604
        // canceled back if the mailbox timeout elapses.
605
        packet.circuit = circuit
1,455✔
606

1,455✔
607
        return link.handleSwitchPacket(packet)
1,455✔
608
}
609

610
// UpdateForwardingPolicies sends a message to the switch to update the
611
// forwarding policies for the set of target channels, keyed in chanPolicies.
612
//
613
// NOTE: This function is synchronous and will block until either the
614
// forwarding policies for all links have been updated, or the switch shuts
615
// down.
616
func (s *Switch) UpdateForwardingPolicies(
617
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
4✔
618

4✔
619
        log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure(
4✔
620
                chanPolicies))
4✔
621

4✔
622
        s.indexMtx.RLock()
4✔
623

4✔
624
        // Update each link in chanPolicies.
4✔
625
        for targetLink, policy := range chanPolicies {
8✔
626
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
4✔
627

4✔
628
                link, ok := s.linkIndex[cid]
4✔
629
                if !ok {
4✔
630
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
631
                                "link policy", targetLink)
×
632
                        continue
×
633
                }
634

635
                link.UpdateForwardingPolicy(policy)
4✔
636
        }
637

638
        s.indexMtx.RUnlock()
4✔
639
}
640

641
// IsForwardedHTLC checks for a given channel and htlc index if it is related
642
// to an opened circuit that represents a forwarded payment.
643
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
644
        htlcIndex uint64) bool {
6✔
645

6✔
646
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
6✔
647
                ChanID: chanID,
6✔
648
                HtlcID: htlcIndex,
6✔
649
        })
6✔
650
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
6✔
651
}
6✔
652

653
// ForwardPackets adds a list of packets to the switch for processing. Fails
654
// and settles are added on a first past, simultaneously constructing circuits
655
// for any adds. After persisting the circuits, another pass of the adds is
656
// given to forward them through the router. The sending link's quit channel is
657
// used to prevent deadlocks when the switch stops a link in the midst of
658
// forwarding.
659
func (s *Switch) ForwardPackets(linkQuit chan struct{},
660
        packets ...*htlcPacket) error {
1,737✔
661

1,737✔
662
        var (
1,737✔
663
                // fwdChan is a buffered channel used to receive err msgs from
1,737✔
664
                // the htlcPlex when forwarding this batch.
1,737✔
665
                fwdChan = make(chan error, len(packets))
1,737✔
666

1,737✔
667
                // numSent keeps a running count of how many packets are
1,737✔
668
                // forwarded to the switch, which determines how many responses
1,737✔
669
                // we will wait for on the fwdChan..
1,737✔
670
                numSent int
1,737✔
671
        )
1,737✔
672

1,737✔
673
        // No packets, nothing to do.
1,737✔
674
        if len(packets) == 0 {
1,959✔
675
                return nil
222✔
676
        }
222✔
677

678
        // Setup a barrier to prevent the background tasks from processing
679
        // responses until this function returns to the user.
680
        var wg sync.WaitGroup
1,519✔
681
        wg.Add(1)
1,519✔
682
        defer wg.Done()
1,519✔
683

1,519✔
684
        // Before spawning the following goroutine to proxy our error responses,
1,519✔
685
        // check to see if we have already been issued a shutdown request. If
1,519✔
686
        // so, we exit early to avoid incrementing the switch's waitgroup while
1,519✔
687
        // it is already in the process of shutting down.
1,519✔
688
        select {
1,519✔
689
        case <-linkQuit:
×
690
                return nil
×
691
        case <-s.quit:
1✔
692
                return nil
1✔
693
        default:
1,518✔
694
                // Spawn a goroutine to log the errors returned from failed packets.
1,518✔
695
                s.wg.Add(1)
1,518✔
696
                go s.logFwdErrs(&numSent, &wg, fwdChan)
1,518✔
697
        }
698

699
        // Make a first pass over the packets, forwarding any settles or fails.
700
        // As adds are found, we create a circuit and append it to our set of
701
        // circuits to be written to disk.
702
        var circuits []*PaymentCircuit
1,518✔
703
        var addBatch []*htlcPacket
1,518✔
704
        for _, packet := range packets {
3,036✔
705
                switch htlc := packet.htlc.(type) {
1,518✔
706
                case *lnwire.UpdateAddHTLC:
91✔
707
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
91✔
708
                        packet.circuit = circuit
91✔
709
                        circuits = append(circuits, circuit)
91✔
710
                        addBatch = append(addBatch, packet)
91✔
711
                default:
1,431✔
712
                        err := s.routeAsync(packet, fwdChan, linkQuit)
1,431✔
713
                        if err != nil {
1,441✔
714
                                return fmt.Errorf("failed to forward packet %w",
10✔
715
                                        err)
10✔
716
                        }
10✔
717
                        numSent++
1,406✔
718
                }
719
        }
720

721
        // If this batch did not contain any circuits to commit, we can return
722
        // early.
723
        if len(circuits) == 0 {
2,899✔
724
                return nil
1,406✔
725
        }
1,406✔
726

727
        // Write any circuits that we found to disk.
728
        actions, err := s.circuits.CommitCircuits(circuits...)
91✔
729
        if err != nil {
91✔
730
                log.Errorf("unable to commit circuits in switch: %v", err)
×
731
        }
×
732

733
        // Split the htlc packets by comparing an in-order seek to the head of
734
        // the added, dropped, or failed circuits.
735
        //
736
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
737
        // circuits, and that the union of the sets results in the original set
738
        // of circuits.
739
        var addedPackets, failedPackets []*htlcPacket
91✔
740
        for _, packet := range addBatch {
182✔
741
                switch {
91✔
742
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
87✔
743
                        addedPackets = append(addedPackets, packet)
87✔
744
                        actions.Adds = actions.Adds[1:]
87✔
745

746
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
5✔
747
                        actions.Drops = actions.Drops[1:]
5✔
748

749
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
3✔
750
                        failedPackets = append(failedPackets, packet)
3✔
751
                        actions.Fails = actions.Fails[1:]
3✔
752
                }
753
        }
754

755
        // Now, forward any packets for circuits that were successfully added to
756
        // the switch's circuit map.
757
        for _, packet := range addedPackets {
178✔
758
                err := s.routeAsync(packet, fwdChan, linkQuit)
87✔
759
                if err != nil {
88✔
760
                        return fmt.Errorf("failed to forward packet %w", err)
1✔
761
                }
1✔
762
                numSent++
86✔
763
        }
764

765
        // Lastly, for any packets that failed, this implies that they were
766
        // left in a half added state, which can happen when recovering from
767
        // failures.
768
        if len(failedPackets) > 0 {
93✔
769
                var failure lnwire.FailureMessage
3✔
770
                incomingID := failedPackets[0].incomingChanID
3✔
771

3✔
772
                // If the incoming channel is an option_scid_alias channel,
3✔
773
                // then we'll need to replace the SCID in the ChannelUpdate.
3✔
774
                update := s.failAliasUpdate(incomingID, true)
3✔
775
                if update == nil {
4✔
776
                        // Fallback to the original non-option behavior.
1✔
777
                        update, err := s.cfg.FetchLastChannelUpdate(
1✔
778
                                incomingID,
1✔
779
                        )
1✔
780
                        if err != nil {
1✔
781
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
782
                        } else {
1✔
783
                                failure = lnwire.NewTemporaryChannelFailure(
1✔
784
                                        update,
1✔
785
                                )
1✔
786
                        }
1✔
787
                } else {
2✔
788
                        // This is an option_scid_alias channel.
2✔
789
                        failure = lnwire.NewTemporaryChannelFailure(update)
2✔
790
                }
2✔
791

792
                linkError := NewDetailedLinkError(
3✔
793
                        failure, OutgoingFailureIncompleteForward,
3✔
794
                )
3✔
795

3✔
796
                for _, packet := range failedPackets {
6✔
797
                        // We don't handle the error here since this method
3✔
798
                        // always returns an error.
3✔
799
                        _ = s.failAddPacket(packet, linkError)
3✔
800
                }
3✔
801
        }
802

803
        return nil
90✔
804
}
805

806
// logFwdErrs logs any errors received on `fwdChan`.
807
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
1,518✔
808
        defer s.wg.Done()
1,518✔
809

1,518✔
810
        // Wait here until the outer function has finished persisting
1,518✔
811
        // and routing the packets. This guarantees we don't read from num until
1,518✔
812
        // the value is accurate.
1,518✔
813
        wg.Wait()
1,518✔
814

1,518✔
815
        numSent := *num
1,518✔
816
        for i := 0; i < numSent; i++ {
3,006✔
817
                select {
1,488✔
818
                case err := <-fwdChan:
1,486✔
819
                        if err != nil {
1,513✔
820
                                log.Errorf("Unhandled error while reforwarding htlc "+
27✔
821
                                        "settle/fail over htlcswitch: %v", err)
27✔
822
                        }
27✔
823
                case <-s.quit:
2✔
824
                        log.Errorf("unable to forward htlc packet " +
2✔
825
                                "htlc switch was stopped")
2✔
826
                        return
2✔
827
                }
828
        }
829
}
830

831
// routeAsync sends a packet through the htlc switch, using the provided err
832
// chan to propagate errors back to the caller. The link's quit channel is
833
// provided so that the send can be canceled if either the link or the switch
834
// receive a shutdown requuest. This method does not wait for a response from
835
// the htlcForwarder before returning.
836
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
837
        linkQuit chan struct{}) error {
1,514✔
838

1,514✔
839
        command := &plexPacket{
1,514✔
840
                pkt: packet,
1,514✔
841
                err: errChan,
1,514✔
842
        }
1,514✔
843

1,514✔
844
        select {
1,514✔
845
        case s.htlcPlex <- command:
1,488✔
846
                return nil
1,488✔
847
        case <-linkQuit:
11✔
848
                return ErrLinkShuttingDown
11✔
849
        case <-s.quit:
×
850
                return errors.New("htlc switch was stopped")
×
851
        }
852
}
853

854
// getLocalLink handles the addition of a htlc for a send that originates from
855
// our node. It returns the link that the htlc should be forwarded outwards on,
856
// and a link error if the htlc cannot be forwarded.
857
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
858
        ChannelLink, *LinkError) {
1,461✔
859

1,461✔
860
        // Try to find links by node destination.
1,461✔
861
        s.indexMtx.RLock()
1,461✔
862
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
1,461✔
863
        defer s.indexMtx.RUnlock()
1,461✔
864
        if err != nil {
1,466✔
865
                // If the link was not found for the outgoingChanID, an outside
5✔
866
                // subsystem may be using the confirmed SCID of a zero-conf
5✔
867
                // channel. In this case, we'll consult the Switch maps to see
5✔
868
                // if an alias exists and use the alias to lookup the link.
5✔
869
                // This extra step is a consequence of not updating the Switch
5✔
870
                // forwardingIndex when a zero-conf channel is confirmed. We
5✔
871
                // don't need to change the outgoingChanID since the link will
5✔
872
                // do that upon receiving the packet.
5✔
873
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
5✔
874
                if !ok {
9✔
875
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
4✔
876
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
4✔
877
                }
4✔
878

879
                // The base SCID was found, so we'll use that to fetch the
880
                // link.
881
                link, err = s.getLinkByShortID(baseScid)
5✔
882
                if err != nil {
5✔
883
                        log.Errorf("Link %v not found", baseScid)
×
884
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
885
                }
×
886
        }
887

888
        if !link.EligibleToForward() {
1,462✔
889
                log.Errorf("Link %v is not available to forward",
1✔
890
                        pkt.outgoingChanID)
1✔
891

1✔
892
                // The update does not need to be populated as the error
1✔
893
                // will be returned back to the router.
1✔
894
                return nil, NewDetailedLinkError(
1✔
895
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
896
                        OutgoingFailureLinkNotEligible,
1✔
897
                )
1✔
898
        }
1✔
899

900
        // Ensure that the htlc satisfies the outgoing channel policy.
901
        currentHeight := atomic.LoadUint32(&s.bestHeight)
1,460✔
902
        htlcErr := link.CheckHtlcTransit(
1,460✔
903
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
1,460✔
904
        )
1,460✔
905
        if htlcErr != nil {
1,465✔
906
                log.Errorf("Link %v policy for local forward not "+
5✔
907
                        "satisfied", pkt.outgoingChanID)
5✔
908
                return nil, htlcErr
5✔
909
        }
5✔
910
        return link, nil
1,457✔
911
}
912

913
// handleLocalResponse processes a Settle or Fail responding to a
914
// locally-initiated payment. This is handled asynchronously to avoid blocking
915
// the main event loop within the switch, as these operations can require
916
// multiple db transactions. The guarantees of the circuit map are stringent
917
// enough such that we are able to tolerate reordering of these operations
918
// without side effects. The primary operations handled are:
919
//  1. Save the payment result to the pending payment store.
920
//  2. Notify subscribers about the payment result.
921
//  3. Ack settle/fail references, to avoid resending this response internally
922
//  4. Teardown the closing circuit in the circuit map
923
//
924
// NOTE: This method MUST be spawned as a goroutine.
925
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
741✔
926
        defer s.wg.Done()
741✔
927

741✔
928
        attemptID := pkt.incomingHTLCID
741✔
929

741✔
930
        // The error reason will be unencypted in case this a local
741✔
931
        // failure or a converted error.
741✔
932
        unencrypted := pkt.localFailure || pkt.convertedError
741✔
933
        n := &networkResult{
741✔
934
                msg:          pkt.htlc,
741✔
935
                unencrypted:  unencrypted,
741✔
936
                isResolution: pkt.isResolution,
741✔
937
        }
741✔
938

741✔
939
        // Store the result to the db. This will also notify subscribers about
741✔
940
        // the result.
741✔
941
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
741✔
942
                log.Errorf("Unable to complete payment for pid=%v: %v",
×
943
                        attemptID, err)
×
944
                return
×
945
        }
×
946

947
        // First, we'll clean up any fwdpkg references, circuit entries, and
948
        // mark in our db that the payment for this payment hash has either
949
        // succeeded or failed.
950
        //
951
        // If this response is contained in a forwarding package, we'll start by
952
        // acking the settle/fail so that we don't continue to retransmit the
953
        // HTLC internally.
954
        if pkt.destRef != nil {
863✔
955
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
122✔
956
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
957
                                *pkt.destRef, err)
×
958
                        return
×
959
                }
×
960
        }
961

962
        // Next, we'll remove the circuit since we are about to complete an
963
        // fulfill/fail of this HTLC. Since we've already removed the
964
        // settle/fail fwdpkg reference, the response from the peer cannot be
965
        // replayed internally if this step fails. If this happens, this logic
966
        // will be executed when a provided resolution message comes through.
967
        // This can only happen if the circuit is still open, which is why this
968
        // ordering is chosen.
969
        if err := s.teardownCircuit(pkt); err != nil {
741✔
970
                log.Warnf("Unable to teardown circuit %s: %v",
×
971
                        pkt.inKey(), err)
×
972
                return
×
973
        }
×
974

975
        // Finally, notify on the htlc failure or success that has been handled.
976
        key := newHtlcKey(pkt)
741✔
977
        eventType := getEventType(pkt)
741✔
978

741✔
979
        switch htlc := pkt.htlc.(type) {
741✔
980
        case *lnwire.UpdateFulfillHTLC:
617✔
981
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
617✔
982
                        eventType)
617✔
983

984
        case *lnwire.UpdateFailHTLC:
128✔
985
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
128✔
986
        }
987
}
988

989
// extractResult uses the given deobfuscator to extract the payment result from
990
// the given network message.
991
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
992
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
738✔
993

738✔
994
        switch htlc := n.msg.(type) {
738✔
995

996
        // We've received a settle update which means we can finalize the user
997
        // payment and return successful response.
998
        case *lnwire.UpdateFulfillHTLC:
614✔
999
                return &PaymentResult{
614✔
1000
                        Preimage: htlc.PaymentPreimage,
614✔
1001
                }, nil
614✔
1002

1003
        // We've received a fail update which means we can finalize the
1004
        // user payment and return fail response.
1005
        case *lnwire.UpdateFailHTLC:
128✔
1006
                // TODO(yy): construct deobfuscator here to avoid creating it
128✔
1007
                // in paymentLifecycle even for settled HTLCs.
128✔
1008
                paymentErr := s.parseFailedPayment(
128✔
1009
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
128✔
1010
                        n.isResolution, htlc,
128✔
1011
                )
128✔
1012

128✔
1013
                return &PaymentResult{
128✔
1014
                        Error: paymentErr,
128✔
1015
                }, nil
128✔
1016

1017
        default:
×
1018
                return nil, fmt.Errorf("received unknown response type: %T",
×
1019
                        htlc)
×
1020
        }
1021
}
1022

1023
// parseFailedPayment determines the appropriate failure message to return to
1024
// a user initiated payment. The three cases handled are:
1025
//  1. An unencrypted failure, which should already plaintext.
1026
//  2. A resolution from the chain arbitrator, which possibly has no failure
1027
//     reason attached.
1028
//  3. A failure from the remote party, which will need to be decrypted using
1029
//     the payment deobfuscator.
1030
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1031
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1032
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
128✔
1033

128✔
1034
        switch {
128✔
1035

1036
        // The payment never cleared the link, so we don't need to
1037
        // decrypt the error, simply decode it them report back to the
1038
        // user.
1039
        case unencrypted:
9✔
1040
                r := bytes.NewReader(htlc.Reason)
9✔
1041
                failureMsg, err := lnwire.DecodeFailure(r, 0)
9✔
1042
                if err != nil {
9✔
1043
                        // If we could not decode the failure reason, return a link
×
1044
                        // error indicating that we failed to decode the onion.
×
1045
                        linkError := NewDetailedLinkError(
×
1046
                                // As this didn't even clear the link, we don't
×
1047
                                // need to apply an update here since it goes
×
1048
                                // directly to the router.
×
1049
                                lnwire.NewTemporaryChannelFailure(nil),
×
1050
                                OutgoingFailureDecodeError,
×
1051
                        )
×
1052

×
1053
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1054
                                linkError.FailureDetail.FailureString(),
×
1055
                                paymentHash, attemptID, err)
×
1056

×
1057
                        return linkError
×
1058
                }
×
1059

1060
                // If we successfully decoded the failure reason, return it.
1061
                return NewLinkError(failureMsg)
9✔
1062

1063
        // A payment had to be timed out on chain before it got past
1064
        // the first hop. In this case, we'll report a permanent
1065
        // channel failure as this means us, or the remote party had to
1066
        // go on chain.
1067
        case isResolution && htlc.Reason == nil:
4✔
1068
                linkError := NewDetailedLinkError(
4✔
1069
                        &lnwire.FailPermanentChannelFailure{},
4✔
1070
                        OutgoingFailureOnChainTimeout,
4✔
1071
                )
4✔
1072

4✔
1073
                log.Infof("%v: hash=%v, pid=%d",
4✔
1074
                        linkError.FailureDetail.FailureString(),
4✔
1075
                        paymentHash, attemptID)
4✔
1076

4✔
1077
                return linkError
4✔
1078

1079
        // A regular multi-hop payment error that we'll need to
1080
        // decrypt.
1081
        default:
123✔
1082
                // We'll attempt to fully decrypt the onion encrypted
123✔
1083
                // error. If we're unable to then we'll bail early.
123✔
1084
                failure, err := deobfuscator.DecryptError(htlc.Reason)
123✔
1085
                if err != nil {
124✔
1086
                        log.Errorf("unable to de-obfuscate onion failure "+
1✔
1087
                                "(hash=%v, pid=%d): %v",
1✔
1088
                                paymentHash, attemptID, err)
1✔
1089

1✔
1090
                        return ErrUnreadableFailureMessage
1✔
1091
                }
1✔
1092

1093
                return failure
122✔
1094
        }
1095
}
1096

1097
// handlePacketForward is used in cases when we need forward the htlc update
1098
// from one channel link to another and be able to propagate the settle/fail
1099
// updates back. This behaviour is achieved by creation of payment circuits.
1100
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
1,489✔
1101
        switch htlc := packet.htlc.(type) {
1,489✔
1102

1103
        // Channel link forwarded us a new htlc, therefore we initiate the
1104
        // payment circuit within our internal state so we can properly forward
1105
        // the ultimate settle message back latter.
1106
        case *lnwire.UpdateAddHTLC:
86✔
1107
                // Check if the node is set to reject all onward HTLCs and also make
86✔
1108
                // sure that HTLC is not from the source node.
86✔
1109
                if s.cfg.RejectHTLC {
91✔
1110
                        failure := NewDetailedLinkError(
5✔
1111
                                &lnwire.FailChannelDisabled{},
5✔
1112
                                OutgoingFailureForwardsDisabled,
5✔
1113
                        )
5✔
1114

5✔
1115
                        return s.failAddPacket(packet, failure)
5✔
1116
                }
5✔
1117

1118
                // Before we attempt to find a non-strict forwarding path for
1119
                // this htlc, check whether the htlc is being routed over the
1120
                // same incoming and outgoing channel. If our node does not
1121
                // allow forwards of this nature, we fail the htlc early. This
1122
                // check is in place to disallow inefficiently routed htlcs from
1123
                // locking up our balance. With channels where the
1124
                // option-scid-alias feature was negotiated, we also have to be
1125
                // sure that the IDs aren't the same since one or both could be
1126
                // an alias.
1127
                linkErr := s.checkCircularForward(
85✔
1128
                        packet.incomingChanID, packet.outgoingChanID,
85✔
1129
                        s.cfg.AllowCircularRoute, htlc.PaymentHash,
85✔
1130
                )
85✔
1131
                if linkErr != nil {
86✔
1132
                        return s.failAddPacket(packet, linkErr)
1✔
1133
                }
1✔
1134

1135
                s.indexMtx.RLock()
84✔
1136
                targetLink, err := s.getLinkByMapping(packet)
84✔
1137
                if err != nil {
92✔
1138
                        s.indexMtx.RUnlock()
8✔
1139

8✔
1140
                        log.Debugf("unable to find link with "+
8✔
1141
                                "destination %v", packet.outgoingChanID)
8✔
1142

8✔
1143
                        // If packet was forwarded from another channel link
8✔
1144
                        // than we should notify this link that some error
8✔
1145
                        // occurred.
8✔
1146
                        linkError := NewLinkError(
8✔
1147
                                &lnwire.FailUnknownNextPeer{},
8✔
1148
                        )
8✔
1149

8✔
1150
                        return s.failAddPacket(packet, linkError)
8✔
1151
                }
8✔
1152
                targetPeerKey := targetLink.PeerPubKey()
80✔
1153
                interfaceLinks, _ := s.getLinks(targetPeerKey)
80✔
1154
                s.indexMtx.RUnlock()
80✔
1155

80✔
1156
                // We'll keep track of any HTLC failures during the link
80✔
1157
                // selection process. This way we can return the error for
80✔
1158
                // precise link that the sender selected, while optimistically
80✔
1159
                // trying all links to utilize our available bandwidth.
80✔
1160
                linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
80✔
1161

80✔
1162
                // Find all destination channel links with appropriate
80✔
1163
                // bandwidth.
80✔
1164
                var destinations []ChannelLink
80✔
1165
                for _, link := range interfaceLinks {
164✔
1166
                        var failure *LinkError
84✔
1167

84✔
1168
                        // We'll skip any links that aren't yet eligible for
84✔
1169
                        // forwarding.
84✔
1170
                        if !link.EligibleToForward() {
88✔
1171
                                failure = NewDetailedLinkError(
4✔
1172
                                        &lnwire.FailUnknownNextPeer{},
4✔
1173
                                        OutgoingFailureLinkNotEligible,
4✔
1174
                                )
4✔
1175
                        } else {
84✔
1176
                                // We'll ensure that the HTLC satisfies the
80✔
1177
                                // current forwarding conditions of this target
80✔
1178
                                // link.
80✔
1179
                                currentHeight := atomic.LoadUint32(&s.bestHeight)
80✔
1180
                                failure = link.CheckHtlcForward(
80✔
1181
                                        htlc.PaymentHash, packet.incomingAmount,
80✔
1182
                                        packet.amount, packet.incomingTimeout,
80✔
1183
                                        packet.outgoingTimeout,
80✔
1184
                                        packet.inboundFee,
80✔
1185
                                        currentHeight,
80✔
1186
                                        packet.originalOutgoingChanID,
80✔
1187
                                )
80✔
1188
                        }
80✔
1189

1190
                        // If this link can forward the htlc, add it to the set
1191
                        // of destinations.
1192
                        if failure == nil {
148✔
1193
                                destinations = append(destinations, link)
64✔
1194
                                continue
64✔
1195
                        }
1196

1197
                        linkErrs[link.ShortChanID()] = failure
24✔
1198
                }
1199

1200
                // If we had a forwarding failure due to the HTLC not
1201
                // satisfying the current policy, then we'll send back an
1202
                // error, but ensure we send back the error sourced at the
1203
                // *target* link.
1204
                if len(destinations) == 0 {
100✔
1205
                        // At this point, some or all of the links rejected the
20✔
1206
                        // HTLC so we couldn't forward it. So we'll try to look
20✔
1207
                        // up the error that came from the source.
20✔
1208
                        linkErr, ok := linkErrs[packet.outgoingChanID]
20✔
1209
                        if !ok {
20✔
1210
                                // If we can't find the error of the source,
×
1211
                                // then we'll return an unknown next peer,
×
1212
                                // though this should never happen.
×
1213
                                linkErr = NewLinkError(
×
1214
                                        &lnwire.FailUnknownNextPeer{},
×
1215
                                )
×
1216
                                log.Warnf("unable to find err source for "+
×
1217
                                        "outgoing_link=%v, errors=%v",
×
1218
                                        packet.outgoingChanID,
×
1219
                                        lnutils.SpewLogClosure(linkErrs))
×
1220
                        }
×
1221

1222
                        log.Tracef("incoming HTLC(%x) violated "+
20✔
1223
                                "target outgoing link (id=%v) policy: %v",
20✔
1224
                                htlc.PaymentHash[:], packet.outgoingChanID,
20✔
1225
                                linkErr)
20✔
1226

20✔
1227
                        return s.failAddPacket(packet, linkErr)
20✔
1228
                }
1229

1230
                // Choose a random link out of the set of links that can forward
1231
                // this htlc. The reason for randomization is to evenly
1232
                // distribute the htlc load without making assumptions about
1233
                // what the best channel is.
1234
                destination := destinations[rand.Intn(len(destinations))] // nolint:gosec
64✔
1235

64✔
1236
                // Retrieve the incoming link by its ShortChannelID. Note that
64✔
1237
                // the incomingChanID is never set to hop.Source here.
64✔
1238
                s.indexMtx.RLock()
64✔
1239
                incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
64✔
1240
                s.indexMtx.RUnlock()
64✔
1241
                if err != nil {
64✔
1242
                        // If we couldn't find the incoming link, we can't
×
1243
                        // evaluate the incoming's exposure to dust, so we just
×
1244
                        // fail the HTLC back.
×
1245
                        linkErr := NewLinkError(
×
1246
                                &lnwire.FailTemporaryChannelFailure{},
×
1247
                        )
×
1248

×
1249
                        return s.failAddPacket(packet, linkErr)
×
1250
                }
×
1251

1252
                // Evaluate whether this HTLC would increase our fee exposure
1253
                // over the threshold on the incoming link. If it does, fail it
1254
                // backwards.
1255
                if s.dustExceedsFeeThreshold(
64✔
1256
                        incomingLink, packet.incomingAmount, true,
64✔
1257
                ) {
64✔
1258
                        // The incoming dust exceeds the threshold, so we fail
×
1259
                        // the add back.
×
1260
                        linkErr := NewLinkError(
×
1261
                                &lnwire.FailTemporaryChannelFailure{},
×
1262
                        )
×
1263

×
1264
                        return s.failAddPacket(packet, linkErr)
×
1265
                }
×
1266

1267
                // Also evaluate whether this HTLC would increase our fee
1268
                // exposure over the threshold on the destination link. If it
1269
                // does, fail it back.
1270
                if s.dustExceedsFeeThreshold(
64✔
1271
                        destination, packet.amount, false,
64✔
1272
                ) {
65✔
1273
                        // The outgoing dust exceeds the threshold, so we fail
1✔
1274
                        // the add back.
1✔
1275
                        linkErr := NewLinkError(
1✔
1276
                                &lnwire.FailTemporaryChannelFailure{},
1✔
1277
                        )
1✔
1278

1✔
1279
                        return s.failAddPacket(packet, linkErr)
1✔
1280
                }
1✔
1281

1282
                // Send the packet to the destination channel link which
1283
                // manages the channel.
1284
                packet.outgoingChanID = destination.ShortChanID()
63✔
1285
                return destination.handleSwitchPacket(packet)
63✔
1286

1287
        case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
1,407✔
1288
                // If the source of this packet has not been set, use the
1,407✔
1289
                // circuit map to lookup the origin.
1,407✔
1290
                circuit, err := s.closeCircuit(packet)
1,407✔
1291
                if err != nil {
1,411✔
1292
                        return err
4✔
1293
                }
4✔
1294

1295
                // closeCircuit returns a nil circuit when a settle packet returns an
1296
                // ErrUnknownCircuit error upon the inner call to CloseCircuit.
1297
                if circuit == nil {
2,034✔
1298
                        return nil
627✔
1299
                }
627✔
1300

1301
                fail, isFail := htlc.(*lnwire.UpdateFailHTLC)
784✔
1302
                if isFail && !packet.hasSource {
915✔
1303
                        // HTLC resolutions and messages restored from disk
131✔
1304
                        // don't have the obfuscator set from the original htlc
131✔
1305
                        // add packet - set it here for use in blinded errors.
131✔
1306
                        packet.obfuscator = circuit.ErrorEncrypter
131✔
1307

131✔
1308
                        switch {
131✔
1309
                        // No message to encrypt, locally sourced payment.
1310
                        case circuit.ErrorEncrypter == nil:
124✔
1311

1312
                        // If this is a resolution message, then we'll need to
1313
                        // encrypt it as it's actually internally sourced.
1314
                        case packet.isResolution:
4✔
1315
                                var err error
4✔
1316
                                // TODO(roasbeef): don't need to pass actually?
4✔
1317
                                failure := &lnwire.FailPermanentChannelFailure{}
4✔
1318
                                fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
4✔
1319
                                        failure,
4✔
1320
                                )
4✔
1321
                                if err != nil {
4✔
1322
                                        err = fmt.Errorf("unable to obfuscate "+
×
1323
                                                "error: %v", err)
×
1324
                                        log.Error(err)
×
1325
                                }
×
1326

1327
                        // Alternatively, if the remote party send us an
1328
                        // UpdateFailMalformedHTLC, then we'll need to convert
1329
                        // this into a proper well formatted onion error as
1330
                        // there's no HMAC currently.
1331
                        case packet.convertedError:
6✔
1332
                                log.Infof("Converting malformed HTLC error "+
6✔
1333
                                        "for circuit for Circuit(%x: "+
6✔
1334
                                        "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash,
6✔
1335
                                        packet.incomingChanID, packet.incomingHTLCID,
6✔
1336
                                        packet.outgoingChanID, packet.outgoingHTLCID)
6✔
1337

6✔
1338
                                fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
6✔
1339
                                        fail.Reason,
6✔
1340
                                )
6✔
1341

1342
                        default:
9✔
1343
                                // Otherwise, it's a forwarded error, so we'll perform a
9✔
1344
                                // wrapper encryption as normal.
9✔
1345
                                fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
9✔
1346
                                        fail.Reason,
9✔
1347
                                )
9✔
1348
                        }
1349
                } else if !isFail && circuit.Outgoing != nil {
1,303✔
1350
                        // If this is an HTLC settle, and it wasn't from a
646✔
1351
                        // locally initiated HTLC, then we'll log a forwarding
646✔
1352
                        // event so we can flush it to disk later.
646✔
1353
                        //
646✔
1354
                        // TODO(roasbeef): only do this once link actually
646✔
1355
                        // fully settles?
646✔
1356
                        localHTLC := packet.incomingChanID == hop.Source
646✔
1357
                        if !localHTLC {
679✔
1358
                                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
33✔
1359
                                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
33✔
1360
                                        circuit.PaymentHash[:], circuit.OutgoingAmount,
33✔
1361
                                        circuit.IncomingAmount-circuit.OutgoingAmount,
33✔
1362
                                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
33✔
1363
                                s.fwdEventMtx.Lock()
33✔
1364
                                s.pendingFwdingEvents = append(
33✔
1365
                                        s.pendingFwdingEvents,
33✔
1366
                                        channeldb.ForwardingEvent{
33✔
1367
                                                Timestamp:      time.Now(),
33✔
1368
                                                IncomingChanID: circuit.Incoming.ChanID,
33✔
1369
                                                OutgoingChanID: circuit.Outgoing.ChanID,
33✔
1370
                                                AmtIn:          circuit.IncomingAmount,
33✔
1371
                                                AmtOut:         circuit.OutgoingAmount,
33✔
1372
                                        },
33✔
1373
                                )
33✔
1374
                                s.fwdEventMtx.Unlock()
33✔
1375
                        }
33✔
1376
                }
1377

1378
                // A blank IncomingChanID in a circuit indicates that it is a pending
1379
                // user-initiated payment.
1380
                if packet.incomingChanID == hop.Source {
1,525✔
1381
                        s.wg.Add(1)
741✔
1382
                        go s.handleLocalResponse(packet)
741✔
1383
                        return nil
741✔
1384
                }
741✔
1385

1386
                // Check to see that the source link is online before removing
1387
                // the circuit.
1388
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
47✔
1389

1390
        default:
×
1391
                return errors.New("wrong update type")
×
1392
        }
1393
}
1394

1395
// checkCircularForward checks whether a forward is circular (arrives and
1396
// departs on the same link) and returns a link error if the switch is
1397
// configured to disallow this behaviour.
1398
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1399
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
94✔
1400

94✔
1401
        // If they are equal, we can skip the alias mapping checks.
94✔
1402
        if incoming == outgoing {
98✔
1403
                // The switch may be configured to allow circular routes, so
4✔
1404
                // just log and return nil.
4✔
1405
                if allowCircular {
6✔
1406
                        log.Debugf("allowing circular route over link: %v "+
2✔
1407
                                "(payment hash: %x)", incoming, paymentHash)
2✔
1408
                        return nil
2✔
1409
                }
2✔
1410

1411
                // Otherwise, we'll return a temporary channel failure.
1412
                return NewDetailedLinkError(
2✔
1413
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1414
                        OutgoingFailureCircularRoute,
2✔
1415
                )
2✔
1416
        }
1417

1418
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1419
        // outgoing SCIDs. If either one does not have a base SCID, then the
1420
        // two channels are not equal since one will be a channel that does not
1421
        // need a mapping and SCID equality was checked above. If the "base"
1422
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1423
        s.indexMtx.RLock()
90✔
1424
        incomingBaseScid, ok := s.baseIndex[incoming]
90✔
1425
        if !ok {
174✔
1426
                // This channel does not use baseIndex, bail out.
84✔
1427
                s.indexMtx.RUnlock()
84✔
1428
                return nil
84✔
1429
        }
84✔
1430

1431
        outgoingBaseScid, ok := s.baseIndex[outgoing]
10✔
1432
        if !ok {
16✔
1433
                // This channel does not use baseIndex, bail out.
6✔
1434
                s.indexMtx.RUnlock()
6✔
1435
                return nil
6✔
1436
        }
6✔
1437
        s.indexMtx.RUnlock()
8✔
1438

8✔
1439
        // Check base SCID equality.
8✔
1440
        if incomingBaseScid != outgoingBaseScid {
12✔
1441
                // The base SCIDs are not equal so these are not the same
4✔
1442
                // channel.
4✔
1443
                return nil
4✔
1444
        }
4✔
1445

1446
        // If the incoming and outgoing link are equal, the htlc is part of a
1447
        // circular route which may be used to lock up our liquidity. If the
1448
        // switch is configured to allow circular routes, log that we are
1449
        // allowing the route then return nil.
1450
        if allowCircular {
6✔
1451
                log.Debugf("allowing circular route over link: %v "+
2✔
1452
                        "(payment hash: %x)", incoming, paymentHash)
2✔
1453
                return nil
2✔
1454
        }
2✔
1455

1456
        // If our node disallows circular routes, return a temporary channel
1457
        // failure. There is nothing wrong with the policy used by the remote
1458
        // node, so we do not include a channel update.
1459
        return NewDetailedLinkError(
2✔
1460
                lnwire.NewTemporaryChannelFailure(nil),
2✔
1461
                OutgoingFailureCircularRoute,
2✔
1462
        )
2✔
1463
}
1464

1465
// failAddPacket encrypts a fail packet back to an add packet's source.
1466
// The ciphertext will be derived from the failure message proivded by context.
1467
// This method returns the failErr if all other steps complete successfully.
1468
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
30✔
1469
        // Encrypt the failure so that the sender will be able to read the error
30✔
1470
        // message. Since we failed this packet, we use EncryptFirstHop to
30✔
1471
        // obfuscate the failure for their eyes only.
30✔
1472
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
30✔
1473
        if err != nil {
30✔
1474
                err := fmt.Errorf("unable to obfuscate "+
×
1475
                        "error: %v", err)
×
1476
                log.Error(err)
×
1477
                return err
×
1478
        }
×
1479

1480
        log.Error(failure.Error())
30✔
1481

30✔
1482
        // Create a failure packet for this htlc. The full set of
30✔
1483
        // information about the htlc failure is included so that they can
30✔
1484
        // be included in link failure notifications.
30✔
1485
        failPkt := &htlcPacket{
30✔
1486
                sourceRef:       packet.sourceRef,
30✔
1487
                incomingChanID:  packet.incomingChanID,
30✔
1488
                incomingHTLCID:  packet.incomingHTLCID,
30✔
1489
                outgoingChanID:  packet.outgoingChanID,
30✔
1490
                outgoingHTLCID:  packet.outgoingHTLCID,
30✔
1491
                incomingAmount:  packet.incomingAmount,
30✔
1492
                amount:          packet.amount,
30✔
1493
                incomingTimeout: packet.incomingTimeout,
30✔
1494
                outgoingTimeout: packet.outgoingTimeout,
30✔
1495
                circuit:         packet.circuit,
30✔
1496
                obfuscator:      packet.obfuscator,
30✔
1497
                linkFailure:     failure,
30✔
1498
                htlc: &lnwire.UpdateFailHTLC{
30✔
1499
                        Reason: reason,
30✔
1500
                },
30✔
1501
        }
30✔
1502

30✔
1503
        // Route a fail packet back to the source link.
30✔
1504
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
30✔
1505
        if err != nil {
30✔
1506
                err = fmt.Errorf("source chanid=%v unable to "+
×
1507
                        "handle switch packet: %v",
×
1508
                        packet.incomingChanID, err)
×
1509
                log.Error(err)
×
1510
                return err
×
1511
        }
×
1512

1513
        return failure
30✔
1514
}
1515

1516
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1517
// attempts to determine the source that forwarded this htlc. This method will
1518
// set the incoming chan and htlc ID of the given packet if the source was
1519
// found, and will properly [re]encrypt any failure messages.
1520
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
1,407✔
1521
        // If the packet has its source, that means it was failed locally by
1,407✔
1522
        // the outgoing link. We fail it here to make sure only one response
1,407✔
1523
        // makes it through the switch.
1,407✔
1524
        if pkt.hasSource {
1,422✔
1525
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
15✔
1526
                switch err {
15✔
1527

1528
                // Circuit successfully closed.
1529
                case nil:
15✔
1530
                        return circuit, nil
15✔
1531

1532
                // Circuit was previously closed, but has not been deleted.
1533
                // We'll just drop this response until the circuit has been
1534
                // fully removed.
1535
                case ErrCircuitClosing:
×
1536
                        return nil, err
×
1537

1538
                // Failed to close circuit because it does not exist. This is
1539
                // likely because the circuit was already successfully closed.
1540
                // Since this packet failed locally, there is no forwarding
1541
                // package entry to acknowledge.
1542
                case ErrUnknownCircuit:
×
1543
                        return nil, err
×
1544

1545
                // Unexpected error.
1546
                default:
×
1547
                        return nil, err
×
1548
                }
1549
        }
1550

1551
        // Otherwise, this is packet was received from the remote party.  Use
1552
        // circuit map to find the incoming link to receive the settle/fail.
1553
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
1,396✔
1554
        switch err {
1,396✔
1555

1556
        // Open circuit successfully closed.
1557
        case nil:
773✔
1558
                pkt.incomingChanID = circuit.Incoming.ChanID
773✔
1559
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
773✔
1560
                pkt.circuit = circuit
773✔
1561
                pkt.sourceRef = &circuit.AddRef
773✔
1562

773✔
1563
                pktType := "SETTLE"
773✔
1564
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
904✔
1565
                        pktType = "FAIL"
131✔
1566
                }
131✔
1567

1568
                log.Debugf("Closed completed %s circuit for %x: "+
773✔
1569
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
773✔
1570
                        pkt.incomingChanID, pkt.incomingHTLCID,
773✔
1571
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
773✔
1572

773✔
1573
                return circuit, nil
773✔
1574

1575
        // Circuit was previously closed, but has not been deleted. We'll just
1576
        // drop this response until the circuit has been removed.
1577
        case ErrCircuitClosing:
4✔
1578
                return nil, err
4✔
1579

1580
        // Failed to close circuit because it does not exist. This is likely
1581
        // because the circuit was already successfully closed.
1582
        case ErrUnknownCircuit:
627✔
1583
                if pkt.destRef != nil {
1,253✔
1584
                        // Add this SettleFailRef to the set of pending settle/fail entries
626✔
1585
                        // awaiting acknowledgement.
626✔
1586
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
626✔
1587
                }
626✔
1588

1589
                // If this is a settle, we will not log an error message as settles
1590
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1591
                // can hit this case if the link restarts after having just sent a fail
1592
                // to the switch.
1593
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
627✔
1594
                if !isSettle {
631✔
1595
                        err := fmt.Errorf("unable to find target channel "+
4✔
1596
                                "for HTLC fail: channel ID = %s, "+
4✔
1597
                                "HTLC ID = %d", pkt.outgoingChanID,
4✔
1598
                                pkt.outgoingHTLCID)
4✔
1599
                        log.Error(err)
4✔
1600

4✔
1601
                        return nil, err
4✔
1602
                }
4✔
1603

1604
                return nil, nil
627✔
1605

1606
        // Unexpected error.
1607
        default:
×
1608
                return nil, err
×
1609
        }
1610
}
1611

1612
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1613
// forwarding package of the outgoing link for a payment circuit. We do this if
1614
// we're the originator of the payment, so the link stops attempting to
1615
// re-broadcast.
1616
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
128✔
1617
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
256✔
1618
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
128✔
1619
        })
128✔
1620
}
1621

1622
// teardownCircuit removes a pending or open circuit from the switch's circuit
1623
// map and prints useful logging statements regarding the outcome.
1624
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
751✔
1625
        var pktType string
751✔
1626
        switch htlc := pkt.htlc.(type) {
751✔
1627
        case *lnwire.UpdateFulfillHTLC:
623✔
1628
                pktType = "SETTLE"
623✔
1629
        case *lnwire.UpdateFailHTLC:
132✔
1630
                pktType = "FAIL"
132✔
1631
        default:
×
1632
                err := fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1633
                log.Errorf(err.Error())
×
1634
                return err
×
1635
        }
1636

1637
        switch {
751✔
1638
        case pkt.circuit.HasKeystone():
747✔
1639
                log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+
747✔
1640
                        "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
747✔
1641

747✔
1642
                err := s.circuits.DeleteCircuits(pkt.inKey())
747✔
1643
                if err != nil {
747✔
1644
                        log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+
×
1645
                                "with payment_hash-%v using %s pkt",
×
1646
                                pkt.incomingChanID, pkt.incomingHTLCID,
×
1647
                                pkt.outgoingChanID, pkt.outgoingHTLCID,
×
1648
                                pkt.circuit.PaymentHash, pktType)
×
1649
                        return err
×
1650
                }
×
1651

1652
                log.Debugf("Closed completed %s circuit for %x: "+
747✔
1653
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
747✔
1654
                        pkt.incomingChanID, pkt.incomingHTLCID,
747✔
1655
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
747✔
1656

1657
        default:
8✔
1658
                log.Debugf("Tearing down incomplete circuit with %s for inkey=%v",
8✔
1659
                        pktType, pkt.inKey())
8✔
1660

8✔
1661
                err := s.circuits.DeleteCircuits(pkt.inKey())
8✔
1662
                if err != nil {
8✔
1663
                        log.Warnf("Failed to tear down pending %s circuit for %x: "+
×
1664
                                "(%s, %d)", pktType, pkt.circuit.PaymentHash,
×
1665
                                pkt.incomingChanID, pkt.incomingHTLCID)
×
1666
                        return err
×
1667
                }
×
1668

1669
                log.Debugf("Removed pending onion circuit for %x: "+
8✔
1670
                        "(%s, %d)", pkt.circuit.PaymentHash,
8✔
1671
                        pkt.incomingChanID, pkt.incomingHTLCID)
8✔
1672
        }
1673

1674
        return nil
751✔
1675
}
1676

1677
// CloseLink creates and sends the close channel command to the target link
1678
// directing the specified closure type. If the closure type is CloseRegular,
1679
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1680
// a starting point for close negotiation. The deliveryScript parameter is an
1681
// optional parameter which sets a user specified script to close out to.
1682
func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
1683
        closeType contractcourt.ChannelCloseType,
1684
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
1685
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
4✔
1686

4✔
1687
        // TODO(roasbeef) abstract out the close updates.
4✔
1688
        updateChan := make(chan interface{}, 2)
4✔
1689
        errChan := make(chan error, 1)
4✔
1690

4✔
1691
        command := &ChanClose{
4✔
1692
                CloseType:      closeType,
4✔
1693
                ChanPoint:      chanPoint,
4✔
1694
                Updates:        updateChan,
4✔
1695
                TargetFeePerKw: targetFeePerKw,
4✔
1696
                MaxFee:         maxFee,
4✔
1697
                DeliveryScript: deliveryScript,
4✔
1698
                Err:            errChan,
4✔
1699
        }
4✔
1700

4✔
1701
        select {
4✔
1702
        case s.chanCloseRequests <- command:
4✔
1703
                return updateChan, errChan
4✔
1704

1705
        case <-s.quit:
×
1706
                errChan <- ErrSwitchExiting
×
1707
                close(updateChan)
×
1708
                return updateChan, errChan
×
1709
        }
1710
}
1711

1712
// htlcForwarder is responsible for optimally forwarding (and possibly
1713
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1714
// links. The duties of the forwarder are similar to that of a network switch,
1715
// in that it facilitates multi-hop payments by acting as a central messaging
1716
// bus. The switch communicates will active links to create, manage, and tear
1717
// down active onion routed payments. Each active channel is modeled as
1718
// networked device with metadata such as the available payment bandwidth, and
1719
// total link capacity.
1720
//
1721
// NOTE: This MUST be run as a goroutine.
1722
func (s *Switch) htlcForwarder() {
209✔
1723
        defer s.wg.Done()
209✔
1724

209✔
1725
        defer func() {
418✔
1726
                s.blockEpochStream.Cancel()
209✔
1727

209✔
1728
                // Remove all links once we've been signalled for shutdown.
209✔
1729
                var linksToStop []ChannelLink
209✔
1730
                s.indexMtx.Lock()
209✔
1731
                for _, link := range s.linkIndex {
503✔
1732
                        activeLink := s.removeLink(link.ChanID())
294✔
1733
                        if activeLink == nil {
294✔
1734
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1735
                                        "on stop", link.ChanID())
×
1736
                                continue
×
1737
                        }
1738
                        linksToStop = append(linksToStop, activeLink)
294✔
1739
                }
1740
                for _, link := range s.pendingLinkIndex {
214✔
1741
                        pendingLink := s.removeLink(link.ChanID())
5✔
1742
                        if pendingLink == nil {
5✔
1743
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1744
                                        "on stop", link.ChanID())
×
1745
                                continue
×
1746
                        }
1747
                        linksToStop = append(linksToStop, pendingLink)
5✔
1748
                }
1749
                s.indexMtx.Unlock()
209✔
1750

209✔
1751
                // Now that all pending and live links have been removed from
209✔
1752
                // the forwarding indexes, stop each one before shutting down.
209✔
1753
                // We'll shut them down in parallel to make exiting as fast as
209✔
1754
                // possible.
209✔
1755
                var wg sync.WaitGroup
209✔
1756
                for _, link := range linksToStop {
504✔
1757
                        wg.Add(1)
295✔
1758
                        go func(l ChannelLink) {
590✔
1759
                                defer wg.Done()
295✔
1760

295✔
1761
                                l.Stop()
295✔
1762
                        }(link)
295✔
1763
                }
1764
                wg.Wait()
209✔
1765

209✔
1766
                // Before we exit fully, we'll attempt to flush out any
209✔
1767
                // forwarding events that may still be lingering since the last
209✔
1768
                // batch flush.
209✔
1769
                if err := s.FlushForwardingEvents(); err != nil {
209✔
1770
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1771
                }
×
1772
        }()
1773

1774
        // TODO(roasbeef): cleared vs settled distinction
1775
        var (
209✔
1776
                totalNumUpdates uint64
209✔
1777
                totalSatSent    btcutil.Amount
209✔
1778
                totalSatRecv    btcutil.Amount
209✔
1779
        )
209✔
1780
        s.cfg.LogEventTicker.Resume()
209✔
1781
        defer s.cfg.LogEventTicker.Stop()
209✔
1782

209✔
1783
        // Every 15 seconds, we'll flush out the forwarding events that
209✔
1784
        // occurred during that period.
209✔
1785
        s.cfg.FwdEventTicker.Resume()
209✔
1786
        defer s.cfg.FwdEventTicker.Stop()
209✔
1787

209✔
1788
        defer s.cfg.AckEventTicker.Stop()
209✔
1789

209✔
1790
out:
209✔
1791
        for {
1,943✔
1792

1,734✔
1793
                // If the set of pending settle/fail entries is non-zero,
1,734✔
1794
                // reinstate the ack ticker so we can batch ack them.
1,734✔
1795
                if len(s.pendingSettleFails) > 0 {
2,981✔
1796
                        s.cfg.AckEventTicker.Resume()
1,247✔
1797
                }
1,247✔
1798

1799
                select {
1,734✔
1800
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
4✔
1801
                        if !ok {
4✔
1802
                                break out
×
1803
                        }
1804

1805
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
4✔
1806

1807
                // A local close request has arrived, we'll forward this to the
1808
                // relevant link (if it exists) so the channel can be
1809
                // cooperatively closed (if possible).
1810
                case req := <-s.chanCloseRequests:
4✔
1811
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
4✔
1812

4✔
1813
                        s.indexMtx.RLock()
4✔
1814
                        link, ok := s.linkIndex[chanID]
4✔
1815
                        if !ok {
8✔
1816
                                s.indexMtx.RUnlock()
4✔
1817

4✔
1818
                                req.Err <- fmt.Errorf("no peer for channel with "+
4✔
1819
                                        "chan_id=%x", chanID[:])
4✔
1820
                                continue
4✔
1821
                        }
1822
                        s.indexMtx.RUnlock()
4✔
1823

4✔
1824
                        peerPub := link.PeerPubKey()
4✔
1825
                        log.Debugf("Requesting local channel close: peer=%v, "+
4✔
1826
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
4✔
1827

4✔
1828
                        go s.cfg.LocalChannelClose(peerPub[:], req)
4✔
1829

1830
                case resolutionMsg := <-s.resolutionMsgs:
5✔
1831
                        // We'll persist the resolution message to the Switch's
5✔
1832
                        // resolution store.
5✔
1833
                        resMsg := resolutionMsg.ResolutionMsg
5✔
1834
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
5✔
1835
                        if err != nil {
5✔
1836
                                // This will only fail if there is a database
×
1837
                                // error or a serialization error. Sending the
×
1838
                                // error prevents the contractcourt from being
×
1839
                                // in a state where it believes the send was
×
1840
                                // successful, when it wasn't.
×
1841
                                log.Errorf("unable to add resolution msg: %v",
×
1842
                                        err)
×
1843
                                resolutionMsg.errChan <- err
×
1844
                                continue
×
1845
                        }
1846

1847
                        // At this point, the resolution message has been
1848
                        // persisted. It is safe to signal success by sending
1849
                        // a nil error since the Switch will re-deliver the
1850
                        // resolution message on restart.
1851
                        resolutionMsg.errChan <- nil
5✔
1852

5✔
1853
                        // Create a htlc packet for this resolution. We do
5✔
1854
                        // not have some of the information that we'll need
5✔
1855
                        // for blinded error handling here , so we'll rely on
5✔
1856
                        // our forwarding logic to fill it in later.
5✔
1857
                        pkt := &htlcPacket{
5✔
1858
                                outgoingChanID: resolutionMsg.SourceChan,
5✔
1859
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
5✔
1860
                                isResolution:   true,
5✔
1861
                        }
5✔
1862

5✔
1863
                        // Resolution messages will either be cancelling
5✔
1864
                        // backwards an existing HTLC, or settling a previously
5✔
1865
                        // outgoing HTLC. Based on this, we'll map the message
5✔
1866
                        // to the proper htlcPacket.
5✔
1867
                        if resolutionMsg.Failure != nil {
9✔
1868
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
4✔
1869
                        } else {
9✔
1870
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
5✔
1871
                                        PaymentPreimage: *resolutionMsg.PreImage,
5✔
1872
                                }
5✔
1873
                        }
5✔
1874

1875
                        log.Infof("Received outside contract resolution, "+
5✔
1876
                                "mapping to: %v", spew.Sdump(pkt))
5✔
1877

5✔
1878
                        // We don't check the error, as the only failure we can
5✔
1879
                        // encounter is due to the circuit already being
5✔
1880
                        // closed. This is fine, as processing this message is
5✔
1881
                        // meant to be idempotent.
5✔
1882
                        err = s.handlePacketForward(pkt)
5✔
1883
                        if err != nil {
9✔
1884
                                log.Errorf("Unable to forward resolution msg: %v", err)
4✔
1885
                        }
4✔
1886

1887
                // A new packet has arrived for forwarding, we'll interpret the
1888
                // packet concretely, then either forward it along, or
1889
                // interpret a return packet to a locally initialized one.
1890
                case cmd := <-s.htlcPlex:
1,488✔
1891
                        cmd.err <- s.handlePacketForward(cmd.pkt)
1,488✔
1892

1893
                // When this time ticks, then it indicates that we should
1894
                // collect all the forwarding events since the last internal,
1895
                // and write them out to our log.
1896
                case <-s.cfg.FwdEventTicker.Ticks():
18✔
1897
                        s.wg.Add(1)
18✔
1898
                        go func() {
36✔
1899
                                defer s.wg.Done()
18✔
1900

18✔
1901
                                if err := s.FlushForwardingEvents(); err != nil {
18✔
1902
                                        log.Errorf("unable to flush "+
×
1903
                                                "forwarding events: %v", err)
×
1904
                                }
×
1905
                        }()
1906

1907
                // The log ticker has fired, so we'll calculate some forwarding
1908
                // stats for the last 10 seconds to display within the logs to
1909
                // users.
1910
                case <-s.cfg.LogEventTicker.Ticks():
24✔
1911
                        // First, we'll collate the current running tally of
24✔
1912
                        // our forwarding stats.
24✔
1913
                        prevSatSent := totalSatSent
24✔
1914
                        prevSatRecv := totalSatRecv
24✔
1915
                        prevNumUpdates := totalNumUpdates
24✔
1916

24✔
1917
                        var (
24✔
1918
                                newNumUpdates uint64
24✔
1919
                                newSatSent    btcutil.Amount
24✔
1920
                                newSatRecv    btcutil.Amount
24✔
1921
                        )
24✔
1922

24✔
1923
                        // Next, we'll run through all the registered links and
24✔
1924
                        // compute their up-to-date forwarding stats.
24✔
1925
                        s.indexMtx.RLock()
24✔
1926
                        for _, link := range s.linkIndex {
50✔
1927
                                // TODO(roasbeef): when links first registered
26✔
1928
                                // stats printed.
26✔
1929
                                updates, sent, recv := link.Stats()
26✔
1930
                                newNumUpdates += updates
26✔
1931
                                newSatSent += sent.ToSatoshis()
26✔
1932
                                newSatRecv += recv.ToSatoshis()
26✔
1933
                        }
26✔
1934
                        s.indexMtx.RUnlock()
24✔
1935

24✔
1936
                        var (
24✔
1937
                                diffNumUpdates uint64
24✔
1938
                                diffSatSent    btcutil.Amount
24✔
1939
                                diffSatRecv    btcutil.Amount
24✔
1940
                        )
24✔
1941

24✔
1942
                        // If this is the first time we're computing these
24✔
1943
                        // stats, then the diff is just the new value. We do
24✔
1944
                        // this in order to avoid integer underflow issues.
24✔
1945
                        if prevNumUpdates == 0 {
32✔
1946
                                diffNumUpdates = newNumUpdates
8✔
1947
                                diffSatSent = newSatSent
8✔
1948
                                diffSatRecv = newSatRecv
8✔
1949
                        } else {
28✔
1950
                                diffNumUpdates = newNumUpdates - prevNumUpdates
20✔
1951
                                diffSatSent = newSatSent - prevSatSent
20✔
1952
                                diffSatRecv = newSatRecv - prevSatRecv
20✔
1953
                        }
20✔
1954

1955
                        // If the diff of num updates is zero, then we haven't
1956
                        // forwarded anything in the last 10 seconds, so we can
1957
                        // skip this update.
1958
                        if diffNumUpdates == 0 {
30✔
1959
                                continue
6✔
1960
                        }
1961

1962
                        // If the diff of num updates is negative, then some
1963
                        // links may have been unregistered from the switch, so
1964
                        // we'll update our stats to only include our registered
1965
                        // links.
1966
                        if int64(diffNumUpdates) < 0 {
26✔
1967
                                totalNumUpdates = newNumUpdates
4✔
1968
                                totalSatSent = newSatSent
4✔
1969
                                totalSatRecv = newSatRecv
4✔
1970
                                continue
4✔
1971
                        }
1972

1973
                        // Otherwise, we'll log this diff, then accumulate the
1974
                        // new stats into the running total.
1975
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
22✔
1976
                                "in the last 10 seconds (%f tx/sec)",
22✔
1977
                                diffSatSent, diffSatRecv,
22✔
1978
                                float64(diffNumUpdates)/10)
22✔
1979

22✔
1980
                        totalNumUpdates += diffNumUpdates
22✔
1981
                        totalSatSent += diffSatSent
22✔
1982
                        totalSatRecv += diffSatRecv
22✔
1983

1984
                // The ack ticker has fired so if we have any settle/fail entries
1985
                // for a forwarding package to ack, we will do so here in a batch
1986
                // db call.
1987
                case <-s.cfg.AckEventTicker.Ticks():
10✔
1988
                        // If the current set is empty, pause the ticker.
10✔
1989
                        if len(s.pendingSettleFails) == 0 {
14✔
1990
                                s.cfg.AckEventTicker.Pause()
4✔
1991
                                continue
4✔
1992
                        }
1993

1994
                        // Batch ack the settle/fail entries.
1995
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
10✔
1996
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1997
                                continue
×
1998
                        }
1999

2000
                        log.Tracef("Acked %d settle fails: %v",
10✔
2001
                                len(s.pendingSettleFails),
10✔
2002
                                lnutils.SpewLogClosure(s.pendingSettleFails))
10✔
2003

10✔
2004
                        // Reset the pendingSettleFails buffer while keeping acquired
10✔
2005
                        // memory.
10✔
2006
                        s.pendingSettleFails = s.pendingSettleFails[:0]
10✔
2007

2008
                case <-s.quit:
209✔
2009
                        return
209✔
2010
                }
2011
        }
2012
}
2013

2014
// Start starts all helper goroutines required for the operation of the switch.
2015
func (s *Switch) Start() error {
209✔
2016
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
209✔
2017
                log.Warn("Htlc Switch already started")
×
2018
                return errors.New("htlc switch already started")
×
2019
        }
×
2020

2021
        log.Infof("HTLC Switch starting")
209✔
2022

209✔
2023
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
209✔
2024
        if err != nil {
209✔
2025
                return err
×
2026
        }
×
2027
        s.blockEpochStream = blockEpochStream
209✔
2028

209✔
2029
        s.wg.Add(1)
209✔
2030
        go s.htlcForwarder()
209✔
2031

209✔
2032
        if err := s.reforwardResponses(); err != nil {
209✔
2033
                s.Stop()
×
2034
                log.Errorf("unable to reforward responses: %v", err)
×
2035
                return err
×
2036
        }
×
2037

2038
        if err := s.reforwardResolutions(); err != nil {
209✔
2039
                // We are already stopping so we can ignore the error.
×
2040
                _ = s.Stop()
×
2041
                log.Errorf("unable to reforward resolutions: %v", err)
×
2042
                return err
×
2043
        }
×
2044

2045
        return nil
209✔
2046
}
2047

2048
// reforwardResolutions fetches the set of resolution messages stored on-disk
2049
// and reforwards them if their circuits are still open. If the circuits have
2050
// been deleted, then we will delete the resolution message from the database.
2051
func (s *Switch) reforwardResolutions() error {
209✔
2052
        // Fetch all stored resolution messages, deleting the ones that are
209✔
2053
        // resolved.
209✔
2054
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
209✔
2055
        if err != nil {
209✔
2056
                return err
×
2057
        }
×
2058

2059
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
209✔
2060
        for _, resMsg := range resMsgs {
214✔
2061
                // If the open circuit no longer exists, then we can remove the
5✔
2062
                // message from the store.
5✔
2063
                outKey := CircuitKey{
5✔
2064
                        ChanID: resMsg.SourceChan,
5✔
2065
                        HtlcID: resMsg.HtlcIndex,
5✔
2066
                }
5✔
2067

5✔
2068
                if s.circuits.LookupOpenCircuit(outKey) == nil {
10✔
2069
                        // The open circuit doesn't exist.
5✔
2070
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
5✔
2071
                        if err != nil {
5✔
2072
                                return err
×
2073
                        }
×
2074

2075
                        continue
5✔
2076
                }
2077

2078
                // The circuit is still open, so we can assume that the link or
2079
                // switch (if we are the source) hasn't cleaned it up yet.
2080
                // We rely on our forwarding logic to fill in details that
2081
                // are not currently available to us.
2082
                resPkt := &htlcPacket{
4✔
2083
                        outgoingChanID: resMsg.SourceChan,
4✔
2084
                        outgoingHTLCID: resMsg.HtlcIndex,
4✔
2085
                        isResolution:   true,
4✔
2086
                }
4✔
2087

4✔
2088
                if resMsg.Failure != nil {
8✔
2089
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
4✔
2090
                } else {
4✔
2091
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
2092
                                PaymentPreimage: *resMsg.PreImage,
×
2093
                        }
×
2094
                }
×
2095

2096
                switchPackets = append(switchPackets, resPkt)
4✔
2097
        }
2098

2099
        // We'll now dispatch the set of resolution messages to the proper
2100
        // destination. An error is only encountered here if the switch is
2101
        // shutting down.
2102
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
209✔
2103
                return err
×
2104
        }
×
2105

2106
        return nil
209✔
2107
}
2108

2109
// reforwardResponses for every known, non-pending channel, loads all associated
2110
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
2111
// used to resurrect the switch's mailboxes after a restart. This also runs for
2112
// waiting close channels since there may be settles or fails that need to be
2113
// reforwarded before they completely close.
2114
func (s *Switch) reforwardResponses() error {
209✔
2115
        openChannels, err := s.cfg.FetchAllChannels()
209✔
2116
        if err != nil {
209✔
2117
                return err
×
2118
        }
×
2119

2120
        for _, openChannel := range openChannels {
340✔
2121
                shortChanID := openChannel.ShortChanID()
131✔
2122

131✔
2123
                // Locally-initiated payments never need reforwarding.
131✔
2124
                if shortChanID == hop.Source {
135✔
2125
                        continue
4✔
2126
                }
2127

2128
                // If the channel is pending, it should have no forwarding
2129
                // packages, and nothing to reforward.
2130
                if openChannel.IsPending {
131✔
2131
                        continue
×
2132
                }
2133

2134
                // Channels in open or waiting-close may still have responses in
2135
                // their forwarding packages. We will continue to reattempt
2136
                // forwarding on startup until the channel is fully-closed.
2137
                //
2138
                // Load this channel's forwarding packages, and deliver them to
2139
                // the switch.
2140
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
131✔
2141
                if err != nil {
131✔
2142
                        log.Errorf("unable to load forwarding "+
×
2143
                                "packages for %v: %v", shortChanID, err)
×
2144
                        return err
×
2145
                }
×
2146

2147
                s.reforwardSettleFails(fwdPkgs)
131✔
2148
        }
2149

2150
        return nil
209✔
2151
}
2152

2153
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
2154
// channel identifier.
2155
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
131✔
2156

131✔
2157
        var fwdPkgs []*channeldb.FwdPkg
131✔
2158
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
262✔
2159
                var err error
131✔
2160
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
131✔
2161
                        tx, source,
131✔
2162
                )
131✔
2163
                return err
131✔
2164
        }, func() {
262✔
2165
                fwdPkgs = nil
131✔
2166
        }); err != nil {
131✔
2167
                return nil, err
×
2168
        }
×
2169

2170
        return fwdPkgs, nil
131✔
2171
}
2172

2173
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
2174
// forwarding packages, and reforwards those that have not been acknowledged.
2175
// This is intended to occur on startup, in order to recover the switch's
2176
// mailboxes, and to ensure that responses can be propagated in case the
2177
// outgoing link never comes back online.
2178
//
2179
// NOTE: This should mimic the behavior processRemoteSettleFails.
2180
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
131✔
2181
        for _, fwdPkg := range fwdPkgs {
136✔
2182
                settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates(
5✔
2183
                        fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails,
5✔
2184
                )
5✔
2185
                if err != nil {
5✔
2186
                        log.Errorf("Unable to process remote log updates: %v",
×
2187
                                err)
×
2188
                        continue
×
2189
                }
2190

2191
                switchPackets := make([]*htlcPacket, 0, len(settleFails))
5✔
2192
                for i, pd := range settleFails {
9✔
2193

4✔
2194
                        // Skip any settles or fails that have already been
4✔
2195
                        // acknowledged by the incoming link that originated the
4✔
2196
                        // forwarded Add.
4✔
2197
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
8✔
2198
                                continue
4✔
2199
                        }
2200

2201
                        switch pd.EntryType {
4✔
2202

2203
                        // A settle for an HTLC we previously forwarded HTLC has
2204
                        // been received. So we'll forward the HTLC to the
2205
                        // switch which will handle propagating the settle to
2206
                        // the prior hop.
2207
                        case lnwallet.Settle:
4✔
2208
                                settlePacket := &htlcPacket{
4✔
2209
                                        outgoingChanID: fwdPkg.Source,
4✔
2210
                                        outgoingHTLCID: pd.ParentIndex,
4✔
2211
                                        destRef:        pd.DestRef,
4✔
2212
                                        htlc: &lnwire.UpdateFulfillHTLC{
4✔
2213
                                                PaymentPreimage: pd.RPreimage,
4✔
2214
                                        },
4✔
2215
                                }
4✔
2216

4✔
2217
                                // Add the packet to the batch to be forwarded, and
4✔
2218
                                // notify the overflow queue that a spare spot has been
4✔
2219
                                // freed up within the commitment state.
4✔
2220
                                switchPackets = append(switchPackets, settlePacket)
4✔
2221

2222
                        // A failureCode message for a previously forwarded HTLC has been
2223
                        // received. As a result a new slot will be freed up in our
2224
                        // commitment state, so we'll forward this to the switch so the
2225
                        // backwards undo can continue.
2226
                        case lnwallet.Fail:
×
2227
                                // Fetch the reason the HTLC was canceled so
×
2228
                                // we can continue to propagate it. This
×
2229
                                // failure originated from another node, so
×
2230
                                // the linkFailure field is not set on this
×
2231
                                // packet. We rely on the link to fill in
×
2232
                                // additional circuit information for us.
×
2233
                                failPacket := &htlcPacket{
×
2234
                                        outgoingChanID: fwdPkg.Source,
×
2235
                                        outgoingHTLCID: pd.ParentIndex,
×
2236
                                        destRef:        pd.DestRef,
×
2237
                                        htlc: &lnwire.UpdateFailHTLC{
×
2238
                                                Reason: lnwire.OpaqueReason(pd.FailReason),
×
2239
                                        },
×
2240
                                }
×
2241

×
2242
                                // Add the packet to the batch to be forwarded, and
×
2243
                                // notify the overflow queue that a spare spot has been
×
2244
                                // freed up within the commitment state.
×
2245
                                switchPackets = append(switchPackets, failPacket)
×
2246
                        }
2247
                }
2248

2249
                // Since this send isn't tied to a specific link, we pass a nil
2250
                // link quit channel, meaning the send will fail only if the
2251
                // switch receives a shutdown request.
2252
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
5✔
2253
                        log.Errorf("Unhandled error while reforwarding packets "+
×
2254
                                "settle/fail over htlcswitch: %v", err)
×
2255
                }
×
2256
        }
2257
}
2258

2259
// Stop gracefully stops all active helper goroutines, then waits until they've
2260
// exited.
2261
func (s *Switch) Stop() error {
439✔
2262
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
567✔
2263
                log.Warn("Htlc Switch already stopped")
128✔
2264
                return errors.New("htlc switch already shutdown")
128✔
2265
        }
128✔
2266

2267
        log.Info("HTLC Switch shutting down...")
311✔
2268
        defer log.Debug("HTLC Switch shutdown complete")
311✔
2269

311✔
2270
        close(s.quit)
311✔
2271

311✔
2272
        s.wg.Wait()
311✔
2273

311✔
2274
        // Wait until all active goroutines have finished exiting before
311✔
2275
        // stopping the mailboxes, otherwise the mailbox map could still be
311✔
2276
        // accessed and modified.
311✔
2277
        s.mailOrchestrator.Stop()
311✔
2278

311✔
2279
        return nil
311✔
2280
}
2281

2282
// CreateAndAddLink will create a link and then add it to the internal maps
2283
// when given a ChannelLinkConfig and LightningChannel.
2284
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
2285
        lnChan *lnwallet.LightningChannel) error {
4✔
2286

4✔
2287
        link := NewChannelLink(linkCfg, lnChan)
4✔
2288
        return s.AddLink(link)
4✔
2289
}
4✔
2290

2291
// AddLink is used to initiate the handling of the add link command. The
2292
// request will be propagated and handled in the main goroutine.
2293
func (s *Switch) AddLink(link ChannelLink) error {
338✔
2294
        s.indexMtx.Lock()
338✔
2295
        defer s.indexMtx.Unlock()
338✔
2296

338✔
2297
        chanID := link.ChanID()
338✔
2298

338✔
2299
        // First, ensure that this link is not already active in the switch.
338✔
2300
        _, err := s.getLink(chanID)
338✔
2301
        if err == nil {
339✔
2302
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
1✔
2303
                        "active", chanID)
1✔
2304
        }
1✔
2305

2306
        // Get and attach the mailbox for this link, which buffers packets in
2307
        // case there packets that we tried to deliver while this link was
2308
        // offline.
2309
        shortChanID := link.ShortChanID()
337✔
2310
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
337✔
2311
        link.AttachMailBox(mailbox)
337✔
2312

337✔
2313
        // Attach the Switch's failAliasUpdate function to the link.
337✔
2314
        link.attachFailAliasUpdate(s.failAliasUpdate)
337✔
2315

337✔
2316
        if err := link.Start(); err != nil {
337✔
2317
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2318
                        chanID, err)
×
2319
                s.removeLink(chanID)
×
2320
                return err
×
2321
        }
×
2322

2323
        if shortChanID == hop.Source {
342✔
2324
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
5✔
2325
                        chanID, shortChanID)
5✔
2326

5✔
2327
                s.pendingLinkIndex[chanID] = link
5✔
2328
        } else {
341✔
2329
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
336✔
2330
                        chanID, shortChanID)
336✔
2331

336✔
2332
                s.addLiveLink(link)
336✔
2333
                s.mailOrchestrator.BindLiveShortChanID(
336✔
2334
                        mailbox, chanID, shortChanID,
336✔
2335
                )
336✔
2336
        }
336✔
2337

2338
        return nil
337✔
2339
}
2340

2341
// addLiveLink adds a link to all associated forwarding index, this makes it a
2342
// candidate for forwarding HTLCs.
2343
func (s *Switch) addLiveLink(link ChannelLink) {
336✔
2344
        linkScid := link.ShortChanID()
336✔
2345

336✔
2346
        // We'll add the link to the linkIndex which lets us quickly
336✔
2347
        // look up a channel when we need to close or register it, and
336✔
2348
        // the forwarding index which'll be used when forwarding HTLC's
336✔
2349
        // in the multi-hop setting.
336✔
2350
        s.linkIndex[link.ChanID()] = link
336✔
2351
        s.forwardingIndex[linkScid] = link
336✔
2352

336✔
2353
        // Next we'll add the link to the interface index so we can
336✔
2354
        // quickly look up all the channels for a particular node.
336✔
2355
        peerPub := link.PeerPubKey()
336✔
2356
        if _, ok := s.interfaceIndex[peerPub]; !ok {
667✔
2357
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
331✔
2358
        }
331✔
2359
        s.interfaceIndex[peerPub][link.ChanID()] = link
336✔
2360

336✔
2361
        aliases := link.getAliases()
336✔
2362
        if link.isZeroConf() {
359✔
2363
                if link.zeroConfConfirmed() {
42✔
2364
                        // Since the zero-conf channel has confirmed, we can
19✔
2365
                        // populate the aliasToReal mapping.
19✔
2366
                        confirmedScid := link.confirmedScid()
19✔
2367

19✔
2368
                        for _, alias := range aliases {
45✔
2369
                                s.aliasToReal[alias] = confirmedScid
26✔
2370
                        }
26✔
2371

2372
                        // Add the confirmed SCID as a key in the baseIndex.
2373
                        s.baseIndex[confirmedScid] = linkScid
19✔
2374
                }
2375

2376
                // Now we populate the baseIndex which will be used to fetch
2377
                // the link given any of the channel's alias SCIDs or the real
2378
                // SCID. The link's SCID is an alias, so we don't need to
2379
                // special-case it like the option-scid-alias feature-bit case
2380
                // further down.
2381
                for _, alias := range aliases {
54✔
2382
                        s.baseIndex[alias] = linkScid
31✔
2383
                }
31✔
2384
        } else if link.negotiatedAliasFeature() {
337✔
2385
                // The link's SCID is the confirmed SCID for non-zero-conf
20✔
2386
                // option-scid-alias feature bit channels.
20✔
2387
                for _, alias := range aliases {
47✔
2388
                        s.aliasToReal[alias] = linkScid
27✔
2389
                        s.baseIndex[alias] = linkScid
27✔
2390
                }
27✔
2391

2392
                // Since the link's SCID is confirmed, it was not included in
2393
                // the baseIndex above as a key. Add it now.
2394
                s.baseIndex[linkScid] = linkScid
20✔
2395
        }
2396
}
2397

2398
// GetLink is used to initiate the handling of the get link command. The
2399
// request will be propagated/handled to/in the main goroutine.
2400
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2401
        error) {
7,302✔
2402

7,302✔
2403
        s.indexMtx.RLock()
7,302✔
2404
        defer s.indexMtx.RUnlock()
7,302✔
2405

7,302✔
2406
        return s.getLink(chanID)
7,302✔
2407
}
7,302✔
2408

2409
// getLink returns the link stored in either the pending index or the live
2410
// lindex.
2411
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
7,963✔
2412
        link, ok := s.linkIndex[chanID]
7,963✔
2413
        if !ok {
8,301✔
2414
                link, ok = s.pendingLinkIndex[chanID]
338✔
2415
                if !ok {
675✔
2416
                        return nil, ErrChannelLinkNotFound
337✔
2417
                }
337✔
2418
        }
2419

2420
        return link, nil
7,630✔
2421
}
2422

2423
// GetLinkByShortID attempts to return the link which possesses the target short
2424
// channel ID.
2425
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
2426
        error) {
4✔
2427

4✔
2428
        s.indexMtx.RLock()
4✔
2429
        defer s.indexMtx.RUnlock()
4✔
2430

4✔
2431
        link, err := s.getLinkByShortID(chanID)
4✔
2432
        if err != nil {
8✔
2433
                // If we failed to find the link under the passed-in SCID, we
4✔
2434
                // consult the Switch's baseIndex map to see if the confirmed
4✔
2435
                // SCID was used for a zero-conf channel.
4✔
2436
                aliasID, ok := s.baseIndex[chanID]
4✔
2437
                if !ok {
4✔
2438
                        return nil, err
×
2439
                }
×
2440

2441
                // An alias was found, use it to lookup if a link exists.
2442
                return s.getLinkByShortID(aliasID)
4✔
2443
        }
2444

2445
        return link, nil
4✔
2446
}
2447

2448
// getLinkByShortID attempts to return the link which possesses the target
2449
// short channel ID.
2450
//
2451
// NOTE: This MUST be called with the indexMtx held.
2452
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
1,522✔
2453
        link, ok := s.forwardingIndex[chanID]
1,522✔
2454
        if !ok {
1,527✔
2455
                return nil, ErrChannelLinkNotFound
5✔
2456
        }
5✔
2457

2458
        return link, nil
1,521✔
2459
}
2460

2461
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2462
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2463
// the outgoingChanID will be changed so that an error can be properly
2464
// attributed when looping over linkErrs in handlePacketForward.
2465
//
2466
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2467
// public or not.
2468
//
2469
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2470
//   - If there is no entry found in baseIndex, fetch the link. This channel
2471
//     did not have the option-scid-alias feature negotiated (which includes
2472
//     zero-conf and option-scid-alias channel-types).
2473
//   - If there is an entry found, fetch the link from forwardingIndex and
2474
//     fail if this is a private link.
2475
//
2476
// NOTE: This MUST be called with the indexMtx read lock held.
2477
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
84✔
2478
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
84✔
2479
        chanID := pkt.outgoingChanID
84✔
2480
        aliasID := s.cfg.IsAlias(chanID)
84✔
2481

84✔
2482
        // Set the originalOutgoingChanID so the proper channel_update can be
84✔
2483
        // sent back if the option-scid-alias feature bit was negotiated.
84✔
2484
        pkt.originalOutgoingChanID = chanID
84✔
2485

84✔
2486
        if aliasID {
103✔
2487
                // Since outgoingChanID is an alias, we'll fetch the link via
19✔
2488
                // baseIndex.
19✔
2489
                baseScid, ok := s.baseIndex[chanID]
19✔
2490
                if !ok {
19✔
2491
                        // No mapping exists, bail.
×
2492
                        return nil, ErrChannelLinkNotFound
×
2493
                }
×
2494

2495
                // A mapping exists, so use baseScid to find the link in the
2496
                // forwardingIndex.
2497
                link, ok := s.forwardingIndex[baseScid]
19✔
2498
                if !ok {
19✔
2499
                        // Link not found, bail.
×
2500
                        return nil, ErrChannelLinkNotFound
×
2501
                }
×
2502

2503
                // Change the packet's outgoingChanID field so that errors are
2504
                // properly attributed.
2505
                pkt.outgoingChanID = baseScid
19✔
2506

19✔
2507
                // Return the link without checking if it's private or not.
19✔
2508
                return link, nil
19✔
2509
        }
2510

2511
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2512
        // SCID from baseIndex.
2513
        baseScid, ok := s.baseIndex[chanID]
69✔
2514
        if !ok {
131✔
2515
                // outgoingChanID is not a key in base index meaning this
62✔
2516
                // channel did not have the option-scid-alias feature bit
62✔
2517
                // negotiated. We'll fetch the link and return it.
62✔
2518
                link, ok := s.forwardingIndex[chanID]
62✔
2519
                if !ok {
68✔
2520
                        // The link wasn't found, bail out.
6✔
2521
                        return nil, ErrChannelLinkNotFound
6✔
2522
                }
6✔
2523

2524
                return link, nil
60✔
2525
        }
2526

2527
        // Fetch the link whose internal SCID is baseScid.
2528
        link, ok := s.forwardingIndex[baseScid]
11✔
2529
        if !ok {
11✔
2530
                // Link wasn't found, bail out.
×
2531
                return nil, ErrChannelLinkNotFound
×
2532
        }
×
2533

2534
        // If the link is unadvertised, we fail since the real SCID was used to
2535
        // forward over it and this is a channel where the option-scid-alias
2536
        // feature bit was negotiated.
2537
        if link.IsUnadvertised() {
13✔
2538
                return nil, ErrChannelLinkNotFound
2✔
2539
        }
2✔
2540

2541
        // The link is public so the confirmed SCID can be used to forward over
2542
        // it. We'll also replace pkt's outgoingChanID field so errors can
2543
        // properly be attributed in the calling function.
2544
        pkt.outgoingChanID = baseScid
9✔
2545
        return link, nil
9✔
2546
}
2547

2548
// HasActiveLink returns true if the given channel ID has a link in the link
2549
// index AND the link is eligible to forward.
2550
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
6✔
2551
        s.indexMtx.RLock()
6✔
2552
        defer s.indexMtx.RUnlock()
6✔
2553

6✔
2554
        if link, ok := s.linkIndex[chanID]; ok {
12✔
2555
                return link.EligibleToForward()
6✔
2556
        }
6✔
2557

2558
        return false
4✔
2559
}
2560

2561
// RemoveLink purges the switch of any link associated with chanID. If a pending
2562
// or active link is not found, this method does nothing. Otherwise, the method
2563
// returns after the link has been completely shutdown.
2564
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
22✔
2565
        s.indexMtx.Lock()
22✔
2566
        link, err := s.getLink(chanID)
22✔
2567
        if err != nil {
26✔
2568
                // If err is non-nil, this means that link is also nil. The
4✔
2569
                // link variable cannot be nil without err being non-nil.
4✔
2570
                s.indexMtx.Unlock()
4✔
2571
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
4✔
2572
                        chanID, err)
4✔
2573
                return
4✔
2574
        }
4✔
2575

2576
        // Check if the link is already stopping and grab the stop chan if it
2577
        // is.
2578
        stopChan, ok := s.linkStopIndex[chanID]
22✔
2579
        if !ok {
44✔
2580
                // If the link is non-nil, it is not currently stopping, so
22✔
2581
                // we'll add a stop chan to the linkStopIndex.
22✔
2582
                stopChan = make(chan struct{})
22✔
2583
                s.linkStopIndex[chanID] = stopChan
22✔
2584
        }
22✔
2585
        s.indexMtx.Unlock()
22✔
2586

22✔
2587
        if ok {
22✔
2588
                // If the stop chan exists, we will wait for it to be closed.
×
2589
                // Once it is closed, we will exit.
×
2590
                select {
×
2591
                case <-stopChan:
×
2592
                        return
×
2593
                case <-s.quit:
×
2594
                        return
×
2595
                }
2596
        }
2597

2598
        // Stop the link before removing it from the maps.
2599
        link.Stop()
22✔
2600

22✔
2601
        s.indexMtx.Lock()
22✔
2602
        _ = s.removeLink(chanID)
22✔
2603

22✔
2604
        // Close stopChan and remove this link from the linkStopIndex.
22✔
2605
        // Deleting from the index and removing from the link must be done
22✔
2606
        // in the same block while the mutex is held.
22✔
2607
        close(stopChan)
22✔
2608
        delete(s.linkStopIndex, chanID)
22✔
2609
        s.indexMtx.Unlock()
22✔
2610
}
2611

2612
// removeLink is used to remove and stop the channel link.
2613
//
2614
// NOTE: This MUST be called with the indexMtx held.
2615
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
313✔
2616
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
313✔
2617

313✔
2618
        link, err := s.getLink(chanID)
313✔
2619
        if err != nil {
313✔
2620
                return nil
×
2621
        }
×
2622

2623
        // Remove the channel from live link indexes.
2624
        delete(s.pendingLinkIndex, link.ChanID())
313✔
2625
        delete(s.linkIndex, link.ChanID())
313✔
2626
        delete(s.forwardingIndex, link.ShortChanID())
313✔
2627

313✔
2628
        // If the link has been added to the peer index, then we'll move to
313✔
2629
        // delete the entry within the index.
313✔
2630
        peerPub := link.PeerPubKey()
313✔
2631
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
625✔
2632
                delete(peerIndex, link.ChanID())
312✔
2633

312✔
2634
                // If after deletion, there are no longer any links, then we'll
312✔
2635
                // remove the interface map all together.
312✔
2636
                if len(peerIndex) == 0 {
619✔
2637
                        delete(s.interfaceIndex, peerPub)
307✔
2638
                }
307✔
2639
        }
2640

2641
        return link
313✔
2642
}
2643

2644
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2645
// underlying channel state. This is only used in zero-conf channels to allow
2646
// the confirmed SCID to be updated.
2647
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
5✔
2648
        s.indexMtx.Lock()
5✔
2649
        defer s.indexMtx.Unlock()
5✔
2650

5✔
2651
        // Locate the target link in the link index. If no such link exists,
5✔
2652
        // then we will ignore the request.
5✔
2653
        link, ok := s.linkIndex[chanID]
5✔
2654
        if !ok {
5✔
2655
                return fmt.Errorf("link %v not found", chanID)
×
2656
        }
×
2657

2658
        // Try to update the link's underlying channel state, returning early
2659
        // if this update failed.
2660
        _, err := link.UpdateShortChanID()
5✔
2661
        if err != nil {
5✔
2662
                return err
×
2663
        }
×
2664

2665
        // Since the zero-conf channel is confirmed, we should populate the
2666
        // aliasToReal map and update the baseIndex.
2667
        aliases := link.getAliases()
5✔
2668

5✔
2669
        confirmedScid := link.confirmedScid()
5✔
2670

5✔
2671
        for _, alias := range aliases {
11✔
2672
                s.aliasToReal[alias] = confirmedScid
6✔
2673
        }
6✔
2674

2675
        s.baseIndex[confirmedScid] = link.ShortChanID()
5✔
2676

5✔
2677
        return nil
5✔
2678
}
2679

2680
// GetLinksByInterface fetches all the links connected to a particular node
2681
// identified by the serialized compressed form of its public key.
2682
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
2683
        error) {
4✔
2684

4✔
2685
        s.indexMtx.RLock()
4✔
2686
        defer s.indexMtx.RUnlock()
4✔
2687

4✔
2688
        var handlers []ChannelUpdateHandler
4✔
2689

4✔
2690
        links, err := s.getLinks(hop)
4✔
2691
        if err != nil {
8✔
2692
                return nil, err
4✔
2693
        }
4✔
2694

2695
        // Range over the returned []ChannelLink to convert them into
2696
        // []ChannelUpdateHandler.
2697
        for _, link := range links {
8✔
2698
                handlers = append(handlers, link)
4✔
2699
        }
4✔
2700

2701
        return handlers, nil
4✔
2702
}
2703

2704
// getLinks is function which returns the channel links of the peer by hop
2705
// destination id.
2706
//
2707
// NOTE: This MUST be called with the indexMtx held.
2708
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
80✔
2709
        links, ok := s.interfaceIndex[destination]
80✔
2710
        if !ok {
84✔
2711
                return nil, ErrNoLinksFound
4✔
2712
        }
4✔
2713

2714
        channelLinks := make([]ChannelLink, 0, len(links))
80✔
2715
        for _, link := range links {
164✔
2716
                channelLinks = append(channelLinks, link)
84✔
2717
        }
84✔
2718

2719
        return channelLinks, nil
80✔
2720
}
2721

2722
// CircuitModifier returns a reference to subset of the interfaces provided by
2723
// the circuit map, to allow links to open and close circuits.
2724
func (s *Switch) CircuitModifier() CircuitModifier {
217✔
2725
        return s.circuits
217✔
2726
}
217✔
2727

2728
// CircuitLookup returns a reference to subset of the interfaces provided by the
2729
// circuit map, to allow looking up circuits.
2730
func (s *Switch) CircuitLookup() CircuitLookup {
4✔
2731
        return s.circuits
4✔
2732
}
4✔
2733

2734
// commitCircuits persistently adds a circuit to the switch's circuit map.
2735
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2736
        *CircuitFwdActions, error) {
17✔
2737

17✔
2738
        return s.circuits.CommitCircuits(circuits...)
17✔
2739
}
17✔
2740

2741
// FlushForwardingEvents flushes out the set of pending forwarding events to
2742
// the persistent log. This will be used by the switch to periodically flush
2743
// out the set of forwarding events to disk. External callers can also use this
2744
// method to ensure all data is flushed to dis before querying the log.
2745
func (s *Switch) FlushForwardingEvents() error {
223✔
2746
        // First, we'll obtain a copy of the current set of pending forwarding
223✔
2747
        // events.
223✔
2748
        s.fwdEventMtx.Lock()
223✔
2749

223✔
2750
        // If we won't have any forwarding events, then we can exit early.
223✔
2751
        if len(s.pendingFwdingEvents) == 0 {
429✔
2752
                s.fwdEventMtx.Unlock()
206✔
2753
                return nil
206✔
2754
        }
206✔
2755

2756
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
21✔
2757
        copy(events[:], s.pendingFwdingEvents[:])
21✔
2758

21✔
2759
        // With the copy obtained, we can now clear out the header pointer of
21✔
2760
        // the current slice. This way, we can re-use the underlying storage
21✔
2761
        // allocated for the slice.
21✔
2762
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
21✔
2763
        s.fwdEventMtx.Unlock()
21✔
2764

21✔
2765
        // Finally, we'll write out the copied events to the persistent
21✔
2766
        // forwarding log.
21✔
2767
        return s.cfg.FwdingLog.AddForwardingEvents(events)
21✔
2768
}
2769

2770
// BestHeight returns the best height known to the switch.
2771
func (s *Switch) BestHeight() uint32 {
1,492✔
2772
        return atomic.LoadUint32(&s.bestHeight)
1,492✔
2773
}
1,492✔
2774

2775
// dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean
2776
// to determine whether the default fee threshold has been exceeded. This
2777
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2778
// commitment's dust with the mailbox's dust with the amount is checked against
2779
// the fee exposure threshold. If incoming is true, then the amount is not
2780
// included in the sum as it was already included in the commitment's dust. A
2781
// boolean is returned telling the caller whether the HTLC should be failed
2782
// back.
2783
func (s *Switch) dustExceedsFeeThreshold(link ChannelLink,
2784
        amount lnwire.MilliSatoshi, incoming bool) bool {
1,577✔
2785

1,577✔
2786
        // Retrieve the link's current commitment feerate and dustClosure.
1,577✔
2787
        feeRate := link.getFeeRate()
1,577✔
2788
        isDust := link.getDustClosure()
1,577✔
2789

1,577✔
2790
        // Evaluate if the HTLC is dust on either sides' commitment.
1,577✔
2791
        isLocalDust := isDust(
1,577✔
2792
                feeRate, incoming, lntypes.Local, amount.ToSatoshis(),
1,577✔
2793
        )
1,577✔
2794
        isRemoteDust := isDust(
1,577✔
2795
                feeRate, incoming, lntypes.Remote, amount.ToSatoshis(),
1,577✔
2796
        )
1,577✔
2797

1,577✔
2798
        if !(isLocalDust || isRemoteDust) {
2,146✔
2799
                // If the HTLC is not dust on either commitment, it's fine to
569✔
2800
                // forward.
569✔
2801
                return false
569✔
2802
        }
569✔
2803

2804
        // Fetch the dust sums currently in the mailbox for this link.
2805
        cid := link.ChanID()
1,012✔
2806
        sid := link.ShortChanID()
1,012✔
2807
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
1,012✔
2808
        localMailDust, remoteMailDust := mailbox.DustPackets()
1,012✔
2809

1,012✔
2810
        // If the htlc is dust on the local commitment, we'll obtain the dust
1,012✔
2811
        // sum for it.
1,012✔
2812
        if isLocalDust {
2,024✔
2813
                localSum := link.getDustSum(
1,012✔
2814
                        lntypes.Local, fn.None[chainfee.SatPerKWeight](),
1,012✔
2815
                )
1,012✔
2816
                localSum += localMailDust
1,012✔
2817

1,012✔
2818
                // Optionally include the HTLC amount only for outgoing
1,012✔
2819
                // HTLCs.
1,012✔
2820
                if !incoming {
1,984✔
2821
                        localSum += amount
972✔
2822
                }
972✔
2823

2824
                // Finally check against the defined fee threshold.
2825
                if localSum > s.cfg.MaxFeeExposure {
1,014✔
2826
                        return true
2✔
2827
                }
2✔
2828
        }
2829

2830
        // Also check if the htlc is dust on the remote commitment, if we've
2831
        // reached this point.
2832
        if isRemoteDust {
2,020✔
2833
                remoteSum := link.getDustSum(
1,010✔
2834
                        lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
1,010✔
2835
                )
1,010✔
2836
                remoteSum += remoteMailDust
1,010✔
2837

1,010✔
2838
                // Optionally include the HTLC amount only for outgoing
1,010✔
2839
                // HTLCs.
1,010✔
2840
                if !incoming {
1,980✔
2841
                        remoteSum += amount
970✔
2842
                }
970✔
2843

2844
                // Finally check against the defined fee threshold.
2845
                if remoteSum > s.cfg.MaxFeeExposure {
1,010✔
2846
                        return true
×
2847
                }
×
2848
        }
2849

2850
        // If we reached this point, this HTLC is fine to forward.
2851
        return false
1,010✔
2852
}
2853

2854
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2855
// it to individual mailboxes. It allows the mailboxes to construct a
2856
// FailureMessage when failing back HTLC's due to expiry and may include an
2857
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2858
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2859
// use. The mailboxScid is only used in the non-alias case, so it is always
2860
// the confirmed SCID.
2861
func (s *Switch) failMailboxUpdate(outgoingScid,
2862
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
15✔
2863

15✔
2864
        // Try to use the failAliasUpdate function in case this is a channel
15✔
2865
        // that uses aliases. If it returns nil, we'll fallback to the original
15✔
2866
        // pre-alias behavior.
15✔
2867
        update := s.failAliasUpdate(outgoingScid, false)
15✔
2868
        if update == nil {
24✔
2869
                // Execute the fallback behavior.
9✔
2870
                var err error
9✔
2871
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
9✔
2872
                if err != nil {
9✔
2873
                        return &lnwire.FailTemporaryNodeFailure{}
×
2874
                }
×
2875
        }
2876

2877
        return lnwire.NewTemporaryChannelFailure(update)
15✔
2878
}
2879

2880
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2881
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2882
// the associated channel is not one of these, this function will return nil
2883
// and the caller is expected to handle this properly. In this case, a return
2884
// to the original non-alias behavior is expected.
2885
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2886
        incoming bool) *lnwire.ChannelUpdate {
38✔
2887

38✔
2888
        // This function does not defer the unlocking because of the database
38✔
2889
        // lookups for ChannelUpdate.
38✔
2890
        s.indexMtx.RLock()
38✔
2891

38✔
2892
        if s.cfg.IsAlias(scid) {
53✔
2893
                // The alias SCID was used. In the incoming case this means
15✔
2894
                // the channel is zero-conf as the link sets the scid. In the
15✔
2895
                // outgoing case, the sender set the scid to use and may be
15✔
2896
                // either the alias or the confirmed one, if it exists.
15✔
2897
                realScid, ok := s.aliasToReal[scid]
15✔
2898
                if !ok {
15✔
2899
                        // The real, confirmed SCID does not exist yet. Find
×
2900
                        // the "base" SCID that the link uses via the
×
2901
                        // baseIndex. If we can't find it, return nil. This
×
2902
                        // means the channel is zero-conf.
×
2903
                        baseScid, ok := s.baseIndex[scid]
×
2904
                        s.indexMtx.RUnlock()
×
2905
                        if !ok {
×
2906
                                return nil
×
2907
                        }
×
2908

2909
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2910
                        if err != nil {
×
2911
                                return nil
×
2912
                        }
×
2913

2914
                        // Replace the baseScid with the passed-in alias.
2915
                        update.ShortChannelID = scid
×
2916
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2917
                        if err != nil {
×
2918
                                return nil
×
2919
                        }
×
2920

2921
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2922
                        if err != nil {
×
2923
                                return nil
×
2924
                        }
×
2925

2926
                        return update
×
2927
                }
2928

2929
                s.indexMtx.RUnlock()
15✔
2930

15✔
2931
                // Fetch the SCID via the confirmed SCID and replace it with
15✔
2932
                // the alias.
15✔
2933
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
15✔
2934
                if err != nil {
19✔
2935
                        return nil
4✔
2936
                }
4✔
2937

2938
                // In the incoming case, we want to ensure that we don't leak
2939
                // the UTXO in case the channel is private. In the outgoing
2940
                // case, since the alias was used, we do the same thing.
2941
                update.ShortChannelID = scid
15✔
2942
                sig, err := s.cfg.SignAliasUpdate(update)
15✔
2943
                if err != nil {
15✔
2944
                        return nil
×
2945
                }
×
2946

2947
                update.Signature, err = lnwire.NewSigFromSignature(sig)
15✔
2948
                if err != nil {
15✔
2949
                        return nil
×
2950
                }
×
2951

2952
                return update
15✔
2953
        }
2954

2955
        // If the confirmed SCID is not in baseIndex, this is not an
2956
        // option-scid-alias or zero-conf channel.
2957
        baseScid, ok := s.baseIndex[scid]
27✔
2958
        if !ok {
49✔
2959
                s.indexMtx.RUnlock()
22✔
2960
                return nil
22✔
2961
        }
22✔
2962

2963
        // Fetch the link so we can get an alias to use in the ShortChannelID
2964
        // of the ChannelUpdate.
2965
        link, ok := s.forwardingIndex[baseScid]
5✔
2966
        s.indexMtx.RUnlock()
5✔
2967
        if !ok {
5✔
2968
                // This should never happen, but if it does for some reason,
×
2969
                // fallback to the old behavior.
×
2970
                return nil
×
2971
        }
×
2972

2973
        aliases := link.getAliases()
5✔
2974
        if len(aliases) == 0 {
5✔
2975
                // This should never happen, but if it does, fallback.
×
2976
                return nil
×
2977
        }
×
2978

2979
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2980
        update, err := s.cfg.FetchLastChannelUpdate(scid)
5✔
2981
        if err != nil {
5✔
2982
                return nil
×
2983
        }
×
2984

2985
        // The incoming case will replace the ShortChannelID in the retrieved
2986
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2987
        // would happen if a private non-zero-conf option-scid-alias
2988
        // feature-bit channel leaked its UTXO here rather than supplying an
2989
        // alias. In the outgoing case, the confirmed SCID was actually used
2990
        // for forwarding in the onion, so no replacement is necessary as the
2991
        // sender knows the scid.
2992
        if incoming {
7✔
2993
                // We will replace and sign the update with the first alias.
2✔
2994
                // Since this happens on the incoming side, it's not actually
2✔
2995
                // possible to know what the sender used in the onion.
2✔
2996
                update.ShortChannelID = aliases[0]
2✔
2997
                sig, err := s.cfg.SignAliasUpdate(update)
2✔
2998
                if err != nil {
2✔
2999
                        return nil
×
3000
                }
×
3001

3002
                update.Signature, err = lnwire.NewSigFromSignature(sig)
2✔
3003
                if err != nil {
2✔
3004
                        return nil
×
3005
                }
×
3006
        }
3007

3008
        return update
5✔
3009
}
3010

3011
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
3012
// that a link has a new alias.
3013
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
3014
        alias lnwire.ShortChannelID) error {
×
3015

×
3016
        // Fetch the link so that we can update the underlying channel's set of
×
3017
        // aliases.
×
3018
        s.indexMtx.RLock()
×
3019
        link, err := s.getLink(chanID)
×
3020
        s.indexMtx.RUnlock()
×
3021
        if err != nil {
×
3022
                return err
×
3023
        }
×
3024

3025
        // If the link is a channel where the option-scid-alias feature bit was
3026
        // not negotiated, we'll return an error.
3027
        if !link.negotiatedAliasFeature() {
×
3028
                return fmt.Errorf("attempted to update non-alias channel")
×
3029
        }
×
3030

3031
        linkScid := link.ShortChanID()
×
3032

×
3033
        // We'll update the maps so the Switch includes this alias in its
×
3034
        // forwarding decisions.
×
3035
        if link.isZeroConf() {
×
3036
                if link.zeroConfConfirmed() {
×
3037
                        // If the channel has confirmed on-chain, we'll
×
3038
                        // add this alias to the aliasToReal map.
×
3039
                        confirmedScid := link.confirmedScid()
×
3040

×
3041
                        s.aliasToReal[alias] = confirmedScid
×
3042
                }
×
3043

3044
                // Add this alias to the baseIndex mapping.
3045
                s.baseIndex[alias] = linkScid
×
3046
        } else if link.negotiatedAliasFeature() {
×
3047
                // The channel is confirmed, so we'll populate the aliasToReal
×
3048
                // and baseIndex maps.
×
3049
                s.aliasToReal[alias] = linkScid
×
3050
                s.baseIndex[alias] = linkScid
×
3051
        }
×
3052

3053
        return nil
×
3054
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc