• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9935147745

15 Jul 2024 07:07AM UTC coverage: 49.819% (+0.6%) from 49.268%
9935147745

Pull #8900

github

guggero
Makefile: add GOCC variable
Pull Request #8900: Makefile: add GOCC variable

93876 of 188433 relevant lines covered (49.82%)

2.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.0
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math/rand"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/davecgh/go-spew/spew"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/channeldb/models"
19
        "github.com/lightningnetwork/lnd/clock"
20
        "github.com/lightningnetwork/lnd/contractcourt"
21
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
22
        "github.com/lightningnetwork/lnd/kvdb"
23
        "github.com/lightningnetwork/lnd/lntypes"
24
        "github.com/lightningnetwork/lnd/lnwallet"
25
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
26
        "github.com/lightningnetwork/lnd/lnwire"
27
        "github.com/lightningnetwork/lnd/ticker"
28
)
29

30
const (
31
        // DefaultFwdEventInterval is the duration between attempts to flush
32
        // pending forwarding events to disk.
33
        DefaultFwdEventInterval = 15 * time.Second
34

35
        // DefaultLogInterval is the duration between attempts to log statistics
36
        // about forwarding events.
37
        DefaultLogInterval = 10 * time.Second
38

39
        // DefaultAckInterval is the duration between attempts to ack any settle
40
        // fails in a forwarding package.
41
        DefaultAckInterval = 15 * time.Second
42

43
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
44
        // be cancelled if they could not get added to an outgoing commitment.
45
        DefaultMailboxDeliveryTimeout = time.Minute
46
)
47

48
var (
49
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
50
        ErrChannelLinkNotFound = errors.New("channel link not found")
51

52
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
53
        // through the switch and is locked into another commitment txn.
54
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
55

56
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
57
        // error decryptor for this payment. This is likely due to restarting
58
        // the daemon.
59
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
60

61
        // ErrSwitchExiting signaled when the switch has received a shutdown
62
        // request.
63
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
64

65
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
66
        // active links in the switch for a specific destination.
67
        ErrNoLinksFound = errors.New("no channel links found")
68

69
        // ErrUnreadableFailureMessage is returned when the failure message
70
        // cannot be decrypted.
71
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
72

73
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
74
        // failed to be processed.
75
        ErrLocalAddFailed = errors.New("local add HTLC failed")
76

77
        // errDustThresholdExceeded is only surfaced to callers of SendHTLC and
78
        // signals that sending the HTLC would exceed the outgoing link's dust
79
        // threshold.
80
        errDustThresholdExceeded = errors.New("dust threshold exceeded")
81

82
        // DefaultDustThreshold is the default threshold after which we'll fail
83
        // payments if they are dust. This is currently set to 500m msats.
84
        DefaultDustThreshold = lnwire.MilliSatoshi(500_000_000)
85
)
86

87
// plexPacket encapsulates switch packet and adds error channel to receive
88
// error from request handler.
89
type plexPacket struct {
90
        pkt *htlcPacket
91
        err chan error
92
}
93

94
// ChanClose represents a request which close a particular channel specified by
95
// its id.
96
type ChanClose struct {
97
        // CloseType is a variable which signals the type of channel closure the
98
        // peer should execute.
99
        CloseType contractcourt.ChannelCloseType
100

101
        // ChanPoint represent the id of the channel which should be closed.
102
        ChanPoint *wire.OutPoint
103

104
        // TargetFeePerKw is the ideal fee that was specified by the caller.
105
        // This value is only utilized if the closure type is CloseRegular.
106
        // This will be the starting offered fee when the fee negotiation
107
        // process for the cooperative closure transaction kicks off.
108
        TargetFeePerKw chainfee.SatPerKWeight
109

110
        // MaxFee is the highest fee the caller is willing to pay.
111
        //
112
        // NOTE: This field is only respected if the caller is the initiator of
113
        // the channel.
114
        MaxFee chainfee.SatPerKWeight
115

116
        // DeliveryScript is an optional delivery script to pay funds out to.
117
        DeliveryScript lnwire.DeliveryAddress
118

119
        // Updates is used by request creator to receive the notifications about
120
        // execution of the close channel request.
121
        Updates chan interface{}
122

123
        // Err is used by request creator to receive request execution error.
124
        Err chan error
125
}
126

127
// Config defines the configuration for the service. ALL elements within the
128
// configuration MUST be non-nil for the service to carry out its duties.
129
type Config struct {
130
        // FwdingLog is an interface that will be used by the switch to log
131
        // forwarding events. A forwarding event happens each time a payment
132
        // circuit is successfully completed. So when we forward an HTLC, and a
133
        // settle is eventually received.
134
        FwdingLog ForwardingLog
135

136
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
137
        // forced unilateral closure of the channel initiated by a local
138
        // subsystem.
139
        LocalChannelClose func(pubKey []byte, request *ChanClose)
140

141
        // DB is the database backend that will be used to back the switch's
142
        // persistent circuit map.
143
        DB kvdb.Backend
144

145
        // FetchAllOpenChannels is a function that fetches all currently open
146
        // channels from the channel database.
147
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
148

149
        // FetchAllChannels is a function that fetches all pending open, open,
150
        // and waiting close channels from the database.
151
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
152

153
        // FetchClosedChannels is a function that fetches all closed channels
154
        // from the channel database.
155
        FetchClosedChannels func(
156
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
157

158
        // SwitchPackager provides access to the forwarding packages of all
159
        // active channels. This gives the switch the ability to read arbitrary
160
        // forwarding packages, and ack settles and fails contained within them.
161
        SwitchPackager channeldb.FwdOperator
162

163
        // ExtractErrorEncrypter is an interface allowing switch to reextract
164
        // error encrypters stored in the circuit map on restarts, since they
165
        // are not stored directly within the database.
166
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
167

168
        // FetchLastChannelUpdate retrieves the latest routing policy for a
169
        // target channel. This channel will typically be the outgoing channel
170
        // specified when we receive an incoming HTLC.  This will be used to
171
        // provide payment senders our latest policy when sending encrypted
172
        // error messages.
173
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error)
174

175
        // Notifier is an instance of a chain notifier that we'll use to signal
176
        // the switch when a new block has arrived.
177
        Notifier chainntnfs.ChainNotifier
178

179
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
180
        // events through.
181
        HtlcNotifier htlcNotifier
182

183
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
184
        // pending forwarding events.
185
        FwdEventTicker ticker.Ticker
186

187
        // LogEventTicker is a signal instructing the htlcswitch to log
188
        // aggregate stats about it's forwarding during the last interval.
189
        LogEventTicker ticker.Ticker
190

191
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
192
        // fails in forwarding packages.
193
        AckEventTicker ticker.Ticker
194

195
        // AllowCircularRoute is true if the user has configured their node to
196
        // allow forwards that arrive and depart our node over the same channel.
197
        AllowCircularRoute bool
198

199
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
200
        // HTLCs that are not from the source hop.
201
        RejectHTLC bool
202

203
        // Clock is a time source for the switch.
204
        Clock clock.Clock
205

206
        // MailboxDeliveryTimeout is the interval after which Adds will be
207
        // cancelled if they have not been yet been delivered to a link. The
208
        // computed deadline will expiry this long after the Adds are added to
209
        // a mailbox via AddPacket.
210
        MailboxDeliveryTimeout time.Duration
211

212
        // DustThreshold is the threshold in milli-satoshis after which we'll
213
        // fail incoming or outgoing dust payments for a particular channel.
214
        DustThreshold lnwire.MilliSatoshi
215

216
        // SignAliasUpdate is used when sending FailureMessages backwards for
217
        // option_scid_alias channels. This avoids a potential privacy leak by
218
        // replacing the public, confirmed SCID with the alias in the
219
        // ChannelUpdate.
220
        SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
221
                error)
222

223
        // IsAlias returns whether or not a given SCID is an alias.
224
        IsAlias func(scid lnwire.ShortChannelID) bool
225
}
226

227
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
228
// Connected peers with active channels are treated as named interfaces which
229
// refer to active channels as links. A link is the switch's message
230
// communication point with the goroutine that manages an active channel. New
231
// links are registered each time a channel is created, and unregistered once
232
// the channel is closed. The switch manages the hand-off process for multi-hop
233
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
234
// notifies users local-systems concerning their outstanding payment requests.
235
type Switch struct {
236
        started  int32 // To be used atomically.
237
        shutdown int32 // To be used atomically.
238

239
        // bestHeight is the best known height of the main chain. The links will
240
        // be used this information to govern decisions based on HTLC timeouts.
241
        // This will be retrieved by the registered links atomically.
242
        bestHeight uint32
243

244
        wg   sync.WaitGroup
245
        quit chan struct{}
246

247
        // cfg is a copy of the configuration struct that the htlc switch
248
        // service was initialized with.
249
        cfg *Config
250

251
        // networkResults stores the results of payments initiated by the user.
252
        // The store is used to later look up the payments and notify the
253
        // user of the result when they are complete. Each payment attempt
254
        // should be given a unique integer ID when it is created, otherwise
255
        // results might be overwritten.
256
        networkResults *networkResultStore
257

258
        // circuits is storage for payment circuits which are used to
259
        // forward the settle/fail htlc updates back to the add htlc initiator.
260
        circuits CircuitMap
261

262
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
263
        // the switch, and facilitates delayed delivery of packets to links that
264
        // later come online.
265
        mailOrchestrator *mailOrchestrator
266

267
        // indexMtx is a read/write mutex that protects the set of indexes
268
        // below.
269
        indexMtx sync.RWMutex
270

271
        // pendingLinkIndex holds links that have not had their final, live
272
        // short_chan_id assigned.
273
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
274

275
        // links is a map of channel id and channel link which manages
276
        // this channel.
277
        linkIndex map[lnwire.ChannelID]ChannelLink
278

279
        // forwardingIndex is an index which is consulted by the switch when it
280
        // needs to locate the next hop to forward an incoming/outgoing HTLC
281
        // update to/from.
282
        //
283
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
284
        // ChannelLink
285
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
286

287
        // interfaceIndex maps the compressed public key of a peer to all the
288
        // channels that the switch maintains with that peer.
289
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
290

291
        // linkStopIndex stores the currently stopping ChannelLinks,
292
        // represented by their ChannelID. The key is the link's ChannelID and
293
        // the value is a chan that is closed when the link has fully stopped.
294
        // This map is only added to if RemoveLink is called and is not added
295
        // to when the Switch is shutting down and calls Stop() on each link.
296
        //
297
        // MUST be used with the indexMtx.
298
        linkStopIndex map[lnwire.ChannelID]chan struct{}
299

300
        // htlcPlex is the channel which all connected links use to coordinate
301
        // the setup/teardown of Sphinx (onion routing) payment circuits.
302
        // Active links forward any add/settle messages over this channel each
303
        // state transition, sending new adds/settles which are fully locked
304
        // in.
305
        htlcPlex chan *plexPacket
306

307
        // chanCloseRequests is used to transfer the channel close request to
308
        // the channel close handler.
309
        chanCloseRequests chan *ChanClose
310

311
        // resolutionMsgs is the channel that all external contract resolution
312
        // messages will be sent over.
313
        resolutionMsgs chan *resolutionMsg
314

315
        // pendingFwdingEvents is the set of forwarding events which have been
316
        // collected during the current interval, but hasn't yet been written
317
        // to the forwarding log.
318
        fwdEventMtx         sync.Mutex
319
        pendingFwdingEvents []channeldb.ForwardingEvent
320

321
        // blockEpochStream is an active block epoch event stream backed by an
322
        // active ChainNotifier instance. This will be used to retrieve the
323
        // latest height of the chain.
324
        blockEpochStream *chainntnfs.BlockEpochEvent
325

326
        // pendingSettleFails is the set of settle/fail entries that we need to
327
        // ack in the forwarding package of the outgoing link. This was added to
328
        // make pipelining settles more efficient.
329
        pendingSettleFails []channeldb.SettleFailRef
330

331
        // resMsgStore is used to store the set of ResolutionMsg that come from
332
        // contractcourt. This is used so the Switch can properly forward them,
333
        // even on restarts.
334
        resMsgStore *resolutionStore
335

336
        // aliasToReal is a map used for option-scid-alias feature-bit links.
337
        // The alias SCID is the key and the real, confirmed SCID is the value.
338
        // If the channel is unconfirmed, there will not be a mapping for it.
339
        // Since channels can have multiple aliases, this map is essentially a
340
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
341
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
342

343
        // baseIndex is a map used for option-scid-alias feature-bit links.
344
        // The value is the SCID of the link's ShortChannelID. This value may
345
        // be an alias for zero-conf channels or a confirmed SCID for
346
        // non-zero-conf channels with the option-scid-alias feature-bit. The
347
        // key includes the value itself and also any other aliases. This MUST
348
        // be accessed with the indexMtx.
349
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
350
}
351

352
// New creates the new instance of htlc switch.
353
func New(cfg Config, currentHeight uint32) (*Switch, error) {
4✔
354
        resStore := newResolutionStore(cfg.DB)
4✔
355

4✔
356
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
4✔
357
                DB:                    cfg.DB,
4✔
358
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
4✔
359
                FetchClosedChannels:   cfg.FetchClosedChannels,
4✔
360
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
4✔
361
                CheckResolutionMsg:    resStore.checkResolutionMsg,
4✔
362
        })
4✔
363
        if err != nil {
4✔
364
                return nil, err
×
365
        }
×
366

367
        s := &Switch{
4✔
368
                bestHeight:        currentHeight,
4✔
369
                cfg:               &cfg,
4✔
370
                circuits:          circuitMap,
4✔
371
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
4✔
372
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
4✔
373
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
4✔
374
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
4✔
375
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
4✔
376
                networkResults:    newNetworkResultStore(cfg.DB),
4✔
377
                htlcPlex:          make(chan *plexPacket),
4✔
378
                chanCloseRequests: make(chan *ChanClose),
4✔
379
                resolutionMsgs:    make(chan *resolutionMsg),
4✔
380
                resMsgStore:       resStore,
4✔
381
                quit:              make(chan struct{}),
4✔
382
        }
4✔
383

4✔
384
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
4✔
385
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
4✔
386

4✔
387
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
4✔
388
                forwardPackets:    s.ForwardPackets,
4✔
389
                clock:             s.cfg.Clock,
4✔
390
                expiry:            s.cfg.MailboxDeliveryTimeout,
4✔
391
                failMailboxUpdate: s.failMailboxUpdate,
4✔
392
        })
4✔
393

4✔
394
        return s, nil
4✔
395
}
396

397
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
398
// channel. We'll use this channel to synchronize delivery of the message with
399
// the caller.
400
type resolutionMsg struct {
401
        contractcourt.ResolutionMsg
402

403
        errChan chan error
404
}
405

406
// ProcessContractResolution is called by active contract resolvers once a
407
// contract they are watching over has been fully resolved. The message carries
408
// an external signal that *would* have been sent if the outgoing channel
409
// didn't need to go to the chain in order to fulfill a contract. We'll process
410
// this message just as if it came from an active outgoing channel.
411
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
4✔
412
        errChan := make(chan error, 1)
4✔
413

4✔
414
        select {
4✔
415
        case s.resolutionMsgs <- &resolutionMsg{
416
                ResolutionMsg: msg,
417
                errChan:       errChan,
418
        }:
4✔
419
        case <-s.quit:
×
420
                return ErrSwitchExiting
×
421
        }
422

423
        select {
4✔
424
        case err := <-errChan:
4✔
425
                return err
4✔
426
        case <-s.quit:
×
427
                return ErrSwitchExiting
×
428
        }
429
}
430

431
// GetAttemptResult returns the result of the payment attempt with the given
432
// attemptID. The paymentHash should be set to the payment's overall hash, or
433
// in case of AMP payments the payment's unique identifier.
434
//
435
// The method returns a channel where the payment result will be sent when
436
// available, or an error is encountered during forwarding. When a result is
437
// received on the channel, the HTLC is guaranteed to no longer be in flight.
438
// The switch shutting down is signaled by closing the channel. If the
439
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
440
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
441
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
4✔
442

4✔
443
        var (
4✔
444
                nChan <-chan *networkResult
4✔
445
                err   error
4✔
446
                inKey = CircuitKey{
4✔
447
                        ChanID: hop.Source,
4✔
448
                        HtlcID: attemptID,
4✔
449
                }
4✔
450
        )
4✔
451

4✔
452
        // If the payment is not found in the circuit map, check whether a
4✔
453
        // result is already available.
4✔
454
        // Assumption: no one will add this payment ID other than the caller.
4✔
455
        if s.circuits.LookupCircuit(inKey) == nil {
7✔
456
                res, err := s.networkResults.getResult(attemptID)
3✔
457
                if err != nil {
3✔
458
                        return nil, err
×
459
                }
×
460
                c := make(chan *networkResult, 1)
3✔
461
                c <- res
3✔
462
                nChan = c
3✔
463
        } else {
4✔
464
                // The payment was committed to the circuits, subscribe for a
4✔
465
                // result.
4✔
466
                nChan, err = s.networkResults.subscribeResult(attemptID)
4✔
467
                if err != nil {
4✔
468
                        return nil, err
×
469
                }
×
470
        }
471

472
        resultChan := make(chan *PaymentResult, 1)
4✔
473

4✔
474
        // Since the payment was known, we can start a goroutine that can
4✔
475
        // extract the result when it is available, and pass it on to the
4✔
476
        // caller.
4✔
477
        s.wg.Add(1)
4✔
478
        go func() {
8✔
479
                defer s.wg.Done()
4✔
480

4✔
481
                var n *networkResult
4✔
482
                select {
4✔
483
                case n = <-nChan:
4✔
484
                case <-s.quit:
4✔
485
                        // We close the result channel to signal a shutdown. We
4✔
486
                        // don't send any result in this case since the HTLC is
4✔
487
                        // still in flight.
4✔
488
                        close(resultChan)
4✔
489
                        return
4✔
490
                }
491

492
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
4✔
493
                        attemptID)
4✔
494

4✔
495
                // Extract the result and pass it to the result channel.
4✔
496
                result, err := s.extractResult(
4✔
497
                        deobfuscator, n, attemptID, paymentHash,
4✔
498
                )
4✔
499
                if err != nil {
4✔
500
                        e := fmt.Errorf("unable to extract result: %w", err)
×
501
                        log.Error(e)
×
502
                        resultChan <- &PaymentResult{
×
503
                                Error: e,
×
504
                        }
×
505
                        return
×
506
                }
×
507
                resultChan <- result
4✔
508
        }()
509

510
        return resultChan, nil
4✔
511
}
512

513
// CleanStore calls the underlying result store, telling it is safe to delete
514
// all entries except the ones in the keepPids map. This should be called
515
// preiodically to let the switch clean up payment results that we have
516
// handled.
517
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
4✔
518
        return s.networkResults.cleanStore(keepPids)
4✔
519
}
4✔
520

521
// SendHTLC is used by other subsystems which aren't belong to htlc switch
522
// package in order to send the htlc update. The attemptID used MUST be unique
523
// for this HTLC, and MUST be used only once, otherwise the switch might reject
524
// it.
525
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
526
        htlc *lnwire.UpdateAddHTLC) error {
4✔
527

4✔
528
        // Generate and send new update packet, if error will be received on
4✔
529
        // this stage it means that packet haven't left boundaries of our
4✔
530
        // system and something wrong happened.
4✔
531
        packet := &htlcPacket{
4✔
532
                incomingChanID: hop.Source,
4✔
533
                incomingHTLCID: attemptID,
4✔
534
                outgoingChanID: firstHop,
4✔
535
                htlc:           htlc,
4✔
536
                amount:         htlc.Amount,
4✔
537
        }
4✔
538

4✔
539
        // Attempt to fetch the target link before creating a circuit so that
4✔
540
        // we don't leave dangling circuits. The getLocalLink method does not
4✔
541
        // require the circuit variable to be set on the *htlcPacket.
4✔
542
        link, linkErr := s.getLocalLink(packet, htlc)
4✔
543
        if linkErr != nil {
8✔
544
                // Notify the htlc notifier of a link failure on our outgoing
4✔
545
                // link. Incoming timelock/amount values are not set because
4✔
546
                // they are not present for local sends.
4✔
547
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
4✔
548
                        newHtlcKey(packet),
4✔
549
                        HtlcInfo{
4✔
550
                                OutgoingTimeLock: htlc.Expiry,
4✔
551
                                OutgoingAmt:      htlc.Amount,
4✔
552
                        },
4✔
553
                        HtlcEventTypeSend,
4✔
554
                        linkErr,
4✔
555
                        false,
4✔
556
                )
4✔
557

4✔
558
                return linkErr
4✔
559
        }
4✔
560

561
        // Evaluate whether this HTLC would increase our exposure to dust. If
562
        // it does, don't send it out and instead return an error.
563
        if s.evaluateDustThreshold(link, htlc.Amount, false) {
4✔
564
                // Notify the htlc notifier of a link failure on our outgoing
×
565
                // link. We use the FailTemporaryChannelFailure in place of a
×
566
                // more descriptive error message.
×
567
                linkErr := NewLinkError(
×
568
                        &lnwire.FailTemporaryChannelFailure{},
×
569
                )
×
570
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
×
571
                        newHtlcKey(packet),
×
572
                        HtlcInfo{
×
573
                                OutgoingTimeLock: htlc.Expiry,
×
574
                                OutgoingAmt:      htlc.Amount,
×
575
                        },
×
576
                        HtlcEventTypeSend,
×
577
                        linkErr,
×
578
                        false,
×
579
                )
×
580

×
581
                return errDustThresholdExceeded
×
582
        }
×
583

584
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
4✔
585
        actions, err := s.circuits.CommitCircuits(circuit)
4✔
586
        if err != nil {
4✔
587
                log.Errorf("unable to commit circuit in switch: %v", err)
×
588
                return err
×
589
        }
×
590

591
        // Drop duplicate packet if it has already been seen.
592
        switch {
4✔
593
        case len(actions.Drops) == 1:
×
594
                return ErrDuplicateAdd
×
595

596
        case len(actions.Fails) == 1:
×
597
                return ErrLocalAddFailed
×
598
        }
599

600
        // Give the packet to the link's mailbox so that HTLC's are properly
601
        // canceled back if the mailbox timeout elapses.
602
        packet.circuit = circuit
4✔
603

4✔
604
        return link.handleSwitchPacket(packet)
4✔
605
}
606

607
// UpdateForwardingPolicies sends a message to the switch to update the
608
// forwarding policies for the set of target channels, keyed in chanPolicies.
609
//
610
// NOTE: This function is synchronous and will block until either the
611
// forwarding policies for all links have been updated, or the switch shuts
612
// down.
613
func (s *Switch) UpdateForwardingPolicies(
614
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
4✔
615

4✔
616
        log.Tracef("Updating link policies: %v", newLogClosure(func() string {
4✔
617
                return spew.Sdump(chanPolicies)
×
618
        }))
×
619

620
        s.indexMtx.RLock()
4✔
621

4✔
622
        // Update each link in chanPolicies.
4✔
623
        for targetLink, policy := range chanPolicies {
8✔
624
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
4✔
625

4✔
626
                link, ok := s.linkIndex[cid]
4✔
627
                if !ok {
4✔
628
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
629
                                "link policy", targetLink)
×
630
                        continue
×
631
                }
632

633
                link.UpdateForwardingPolicy(policy)
4✔
634
        }
635

636
        s.indexMtx.RUnlock()
4✔
637
}
638

639
// IsForwardedHTLC checks for a given channel and htlc index if it is related
640
// to an opened circuit that represents a forwarded payment.
641
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
642
        htlcIndex uint64) bool {
4✔
643

4✔
644
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
4✔
645
                ChanID: chanID,
4✔
646
                HtlcID: htlcIndex,
4✔
647
        })
4✔
648
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
4✔
649
}
4✔
650

651
// ForwardPackets adds a list of packets to the switch for processing. Fails
652
// and settles are added on a first past, simultaneously constructing circuits
653
// for any adds. After persisting the circuits, another pass of the adds is
654
// given to forward them through the router. The sending link's quit channel is
655
// used to prevent deadlocks when the switch stops a link in the midst of
656
// forwarding.
657
func (s *Switch) ForwardPackets(linkQuit chan struct{},
658
        packets ...*htlcPacket) error {
4✔
659

4✔
660
        var (
4✔
661
                // fwdChan is a buffered channel used to receive err msgs from
4✔
662
                // the htlcPlex when forwarding this batch.
4✔
663
                fwdChan = make(chan error, len(packets))
4✔
664

4✔
665
                // numSent keeps a running count of how many packets are
4✔
666
                // forwarded to the switch, which determines how many responses
4✔
667
                // we will wait for on the fwdChan..
4✔
668
                numSent int
4✔
669
        )
4✔
670

4✔
671
        // No packets, nothing to do.
4✔
672
        if len(packets) == 0 {
8✔
673
                return nil
4✔
674
        }
4✔
675

676
        // Setup a barrier to prevent the background tasks from processing
677
        // responses until this function returns to the user.
678
        var wg sync.WaitGroup
4✔
679
        wg.Add(1)
4✔
680
        defer wg.Done()
4✔
681

4✔
682
        // Before spawning the following goroutine to proxy our error responses,
4✔
683
        // check to see if we have already been issued a shutdown request. If
4✔
684
        // so, we exit early to avoid incrementing the switch's waitgroup while
4✔
685
        // it is already in the process of shutting down.
4✔
686
        select {
4✔
687
        case <-linkQuit:
×
688
                return nil
×
689
        case <-s.quit:
×
690
                return nil
×
691
        default:
4✔
692
                // Spawn a goroutine to log the errors returned from failed packets.
4✔
693
                s.wg.Add(1)
4✔
694
                go s.logFwdErrs(&numSent, &wg, fwdChan)
4✔
695
        }
696

697
        // Make a first pass over the packets, forwarding any settles or fails.
698
        // As adds are found, we create a circuit and append it to our set of
699
        // circuits to be written to disk.
700
        var circuits []*PaymentCircuit
4✔
701
        var addBatch []*htlcPacket
4✔
702
        for _, packet := range packets {
8✔
703
                switch htlc := packet.htlc.(type) {
4✔
704
                case *lnwire.UpdateAddHTLC:
4✔
705
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
4✔
706
                        packet.circuit = circuit
4✔
707
                        circuits = append(circuits, circuit)
4✔
708
                        addBatch = append(addBatch, packet)
4✔
709
                default:
4✔
710
                        err := s.routeAsync(packet, fwdChan, linkQuit)
4✔
711
                        if err != nil {
4✔
712
                                return fmt.Errorf("failed to forward packet %w",
×
713
                                        err)
×
714
                        }
×
715
                        numSent++
4✔
716
                }
717
        }
718

719
        // If this batch did not contain any circuits to commit, we can return
720
        // early.
721
        if len(circuits) == 0 {
8✔
722
                return nil
4✔
723
        }
4✔
724

725
        // Write any circuits that we found to disk.
726
        actions, err := s.circuits.CommitCircuits(circuits...)
4✔
727
        if err != nil {
4✔
728
                log.Errorf("unable to commit circuits in switch: %v", err)
×
729
        }
×
730

731
        // Split the htlc packets by comparing an in-order seek to the head of
732
        // the added, dropped, or failed circuits.
733
        //
734
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
735
        // circuits, and that the union of the sets results in the original set
736
        // of circuits.
737
        var addedPackets, failedPackets []*htlcPacket
4✔
738
        for _, packet := range addBatch {
8✔
739
                switch {
4✔
740
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
4✔
741
                        addedPackets = append(addedPackets, packet)
4✔
742
                        actions.Adds = actions.Adds[1:]
4✔
743

744
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
4✔
745
                        actions.Drops = actions.Drops[1:]
4✔
746

747
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
×
748
                        failedPackets = append(failedPackets, packet)
×
749
                        actions.Fails = actions.Fails[1:]
×
750
                }
751
        }
752

753
        // Now, forward any packets for circuits that were successfully added to
754
        // the switch's circuit map.
755
        for _, packet := range addedPackets {
8✔
756
                err := s.routeAsync(packet, fwdChan, linkQuit)
4✔
757
                if err != nil {
4✔
758
                        return fmt.Errorf("failed to forward packet %w", err)
×
759
                }
×
760
                numSent++
4✔
761
        }
762

763
        // Lastly, for any packets that failed, this implies that they were
764
        // left in a half added state, which can happen when recovering from
765
        // failures.
766
        if len(failedPackets) > 0 {
4✔
767
                var failure lnwire.FailureMessage
×
768
                incomingID := failedPackets[0].incomingChanID
×
769

×
770
                // If the incoming channel is an option_scid_alias channel,
×
771
                // then we'll need to replace the SCID in the ChannelUpdate.
×
772
                update := s.failAliasUpdate(incomingID, true)
×
773
                if update == nil {
×
774
                        // Fallback to the original non-option behavior.
×
775
                        update, err := s.cfg.FetchLastChannelUpdate(
×
776
                                incomingID,
×
777
                        )
×
778
                        if err != nil {
×
779
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
780
                        } else {
×
781
                                failure = lnwire.NewTemporaryChannelFailure(
×
782
                                        update,
×
783
                                )
×
784
                        }
×
785
                } else {
×
786
                        // This is an option_scid_alias channel.
×
787
                        failure = lnwire.NewTemporaryChannelFailure(update)
×
788
                }
×
789

790
                linkError := NewDetailedLinkError(
×
791
                        failure, OutgoingFailureIncompleteForward,
×
792
                )
×
793

×
794
                for _, packet := range failedPackets {
×
795
                        // We don't handle the error here since this method
×
796
                        // always returns an error.
×
797
                        _ = s.failAddPacket(packet, linkError)
×
798
                }
×
799
        }
800

801
        return nil
4✔
802
}
803

804
// logFwdErrs logs any errors received on `fwdChan`.
805
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
4✔
806
        defer s.wg.Done()
4✔
807

4✔
808
        // Wait here until the outer function has finished persisting
4✔
809
        // and routing the packets. This guarantees we don't read from num until
4✔
810
        // the value is accurate.
4✔
811
        wg.Wait()
4✔
812

4✔
813
        numSent := *num
4✔
814
        for i := 0; i < numSent; i++ {
8✔
815
                select {
4✔
816
                case err := <-fwdChan:
4✔
817
                        if err != nil {
8✔
818
                                log.Errorf("Unhandled error while reforwarding htlc "+
4✔
819
                                        "settle/fail over htlcswitch: %v", err)
4✔
820
                        }
4✔
821
                case <-s.quit:
×
822
                        log.Errorf("unable to forward htlc packet " +
×
823
                                "htlc switch was stopped")
×
824
                        return
×
825
                }
826
        }
827
}
828

829
// routeAsync sends a packet through the htlc switch, using the provided err
830
// chan to propagate errors back to the caller. The link's quit channel is
831
// provided so that the send can be canceled if either the link or the switch
832
// receive a shutdown requuest. This method does not wait for a response from
833
// the htlcForwarder before returning.
834
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
835
        linkQuit chan struct{}) error {
4✔
836

4✔
837
        command := &plexPacket{
4✔
838
                pkt: packet,
4✔
839
                err: errChan,
4✔
840
        }
4✔
841

4✔
842
        select {
4✔
843
        case s.htlcPlex <- command:
4✔
844
                return nil
4✔
845
        case <-linkQuit:
×
846
                return ErrLinkShuttingDown
×
847
        case <-s.quit:
×
848
                return errors.New("htlc switch was stopped")
×
849
        }
850
}
851

852
// getLocalLink handles the addition of a htlc for a send that originates from
853
// our node. It returns the link that the htlc should be forwarded outwards on,
854
// and a link error if the htlc cannot be forwarded.
855
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
856
        ChannelLink, *LinkError) {
4✔
857

4✔
858
        // Try to find links by node destination.
4✔
859
        s.indexMtx.RLock()
4✔
860
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
4✔
861
        defer s.indexMtx.RUnlock()
4✔
862
        if err != nil {
8✔
863
                // If the link was not found for the outgoingChanID, an outside
4✔
864
                // subsystem may be using the confirmed SCID of a zero-conf
4✔
865
                // channel. In this case, we'll consult the Switch maps to see
4✔
866
                // if an alias exists and use the alias to lookup the link.
4✔
867
                // This extra step is a consequence of not updating the Switch
4✔
868
                // forwardingIndex when a zero-conf channel is confirmed. We
4✔
869
                // don't need to change the outgoingChanID since the link will
4✔
870
                // do that upon receiving the packet.
4✔
871
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
4✔
872
                if !ok {
8✔
873
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
4✔
874
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
4✔
875
                }
4✔
876

877
                // The base SCID was found, so we'll use that to fetch the
878
                // link.
879
                link, err = s.getLinkByShortID(baseScid)
4✔
880
                if err != nil {
4✔
881
                        log.Errorf("Link %v not found", baseScid)
×
882
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
883
                }
×
884
        }
885

886
        if !link.EligibleToForward() {
4✔
887
                log.Errorf("Link %v is not available to forward",
×
888
                        pkt.outgoingChanID)
×
889

×
890
                // The update does not need to be populated as the error
×
891
                // will be returned back to the router.
×
892
                return nil, NewDetailedLinkError(
×
893
                        lnwire.NewTemporaryChannelFailure(nil),
×
894
                        OutgoingFailureLinkNotEligible,
×
895
                )
×
896
        }
×
897

898
        // Ensure that the htlc satisfies the outgoing channel policy.
899
        currentHeight := atomic.LoadUint32(&s.bestHeight)
4✔
900
        htlcErr := link.CheckHtlcTransit(
4✔
901
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
4✔
902
        )
4✔
903
        if htlcErr != nil {
4✔
904
                log.Errorf("Link %v policy for local forward not "+
×
905
                        "satisfied", pkt.outgoingChanID)
×
906
                return nil, htlcErr
×
907
        }
×
908
        return link, nil
4✔
909
}
910

911
// handleLocalResponse processes a Settle or Fail responding to a
912
// locally-initiated payment. This is handled asynchronously to avoid blocking
913
// the main event loop within the switch, as these operations can require
914
// multiple db transactions. The guarantees of the circuit map are stringent
915
// enough such that we are able to tolerate reordering of these operations
916
// without side effects. The primary operations handled are:
917
//  1. Save the payment result to the pending payment store.
918
//  2. Notify subscribers about the payment result.
919
//  3. Ack settle/fail references, to avoid resending this response internally
920
//  4. Teardown the closing circuit in the circuit map
921
//
922
// NOTE: This method MUST be spawned as a goroutine.
923
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
4✔
924
        defer s.wg.Done()
4✔
925

4✔
926
        attemptID := pkt.incomingHTLCID
4✔
927

4✔
928
        // The error reason will be unencypted in case this a local
4✔
929
        // failure or a converted error.
4✔
930
        unencrypted := pkt.localFailure || pkt.convertedError
4✔
931
        n := &networkResult{
4✔
932
                msg:          pkt.htlc,
4✔
933
                unencrypted:  unencrypted,
4✔
934
                isResolution: pkt.isResolution,
4✔
935
        }
4✔
936

4✔
937
        // Store the result to the db. This will also notify subscribers about
4✔
938
        // the result.
4✔
939
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
4✔
940
                log.Errorf("Unable to complete payment for pid=%v: %v",
×
941
                        attemptID, err)
×
942
                return
×
943
        }
×
944

945
        // First, we'll clean up any fwdpkg references, circuit entries, and
946
        // mark in our db that the payment for this payment hash has either
947
        // succeeded or failed.
948
        //
949
        // If this response is contained in a forwarding package, we'll start by
950
        // acking the settle/fail so that we don't continue to retransmit the
951
        // HTLC internally.
952
        if pkt.destRef != nil {
8✔
953
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
4✔
954
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
955
                                *pkt.destRef, err)
×
956
                        return
×
957
                }
×
958
        }
959

960
        // Next, we'll remove the circuit since we are about to complete an
961
        // fulfill/fail of this HTLC. Since we've already removed the
962
        // settle/fail fwdpkg reference, the response from the peer cannot be
963
        // replayed internally if this step fails. If this happens, this logic
964
        // will be executed when a provided resolution message comes through.
965
        // This can only happen if the circuit is still open, which is why this
966
        // ordering is chosen.
967
        if err := s.teardownCircuit(pkt); err != nil {
4✔
968
                log.Warnf("Unable to teardown circuit %s: %v",
×
969
                        pkt.inKey(), err)
×
970
                return
×
971
        }
×
972

973
        // Finally, notify on the htlc failure or success that has been handled.
974
        key := newHtlcKey(pkt)
4✔
975
        eventType := getEventType(pkt)
4✔
976

4✔
977
        switch htlc := pkt.htlc.(type) {
4✔
978
        case *lnwire.UpdateFulfillHTLC:
4✔
979
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
4✔
980
                        eventType)
4✔
981

982
        case *lnwire.UpdateFailHTLC:
4✔
983
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
4✔
984
        }
985
}
986

987
// extractResult uses the given deobfuscator to extract the payment result from
988
// the given network message.
989
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
990
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
4✔
991

4✔
992
        switch htlc := n.msg.(type) {
4✔
993

994
        // We've received a settle update which means we can finalize the user
995
        // payment and return successful response.
996
        case *lnwire.UpdateFulfillHTLC:
4✔
997
                return &PaymentResult{
4✔
998
                        Preimage: htlc.PaymentPreimage,
4✔
999
                }, nil
4✔
1000

1001
        // We've received a fail update which means we can finalize the
1002
        // user payment and return fail response.
1003
        case *lnwire.UpdateFailHTLC:
4✔
1004
                // TODO(yy): construct deobfuscator here to avoid creating it
4✔
1005
                // in paymentLifecycle even for settled HTLCs.
4✔
1006
                paymentErr := s.parseFailedPayment(
4✔
1007
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
4✔
1008
                        n.isResolution, htlc,
4✔
1009
                )
4✔
1010

4✔
1011
                return &PaymentResult{
4✔
1012
                        Error: paymentErr,
4✔
1013
                }, nil
4✔
1014

1015
        default:
×
1016
                return nil, fmt.Errorf("received unknown response type: %T",
×
1017
                        htlc)
×
1018
        }
1019
}
1020

1021
// parseFailedPayment determines the appropriate failure message to return to
1022
// a user initiated payment. The three cases handled are:
1023
//  1. An unencrypted failure, which should already plaintext.
1024
//  2. A resolution from the chain arbitrator, which possibly has no failure
1025
//     reason attached.
1026
//  3. A failure from the remote party, which will need to be decrypted using
1027
//     the payment deobfuscator.
1028
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1029
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1030
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
4✔
1031

4✔
1032
        switch {
4✔
1033

1034
        // The payment never cleared the link, so we don't need to
1035
        // decrypt the error, simply decode it them report back to the
1036
        // user.
1037
        case unencrypted:
4✔
1038
                r := bytes.NewReader(htlc.Reason)
4✔
1039
                failureMsg, err := lnwire.DecodeFailure(r, 0)
4✔
1040
                if err != nil {
4✔
1041
                        // If we could not decode the failure reason, return a link
×
1042
                        // error indicating that we failed to decode the onion.
×
1043
                        linkError := NewDetailedLinkError(
×
1044
                                // As this didn't even clear the link, we don't
×
1045
                                // need to apply an update here since it goes
×
1046
                                // directly to the router.
×
1047
                                lnwire.NewTemporaryChannelFailure(nil),
×
1048
                                OutgoingFailureDecodeError,
×
1049
                        )
×
1050

×
1051
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1052
                                linkError.FailureDetail.FailureString(),
×
1053
                                paymentHash, attemptID, err)
×
1054

×
1055
                        return linkError
×
1056
                }
×
1057

1058
                // If we successfully decoded the failure reason, return it.
1059
                return NewLinkError(failureMsg)
4✔
1060

1061
        // A payment had to be timed out on chain before it got past
1062
        // the first hop. In this case, we'll report a permanent
1063
        // channel failure as this means us, or the remote party had to
1064
        // go on chain.
1065
        case isResolution && htlc.Reason == nil:
4✔
1066
                linkError := NewDetailedLinkError(
4✔
1067
                        &lnwire.FailPermanentChannelFailure{},
4✔
1068
                        OutgoingFailureOnChainTimeout,
4✔
1069
                )
4✔
1070

4✔
1071
                log.Infof("%v: hash=%v, pid=%d",
4✔
1072
                        linkError.FailureDetail.FailureString(),
4✔
1073
                        paymentHash, attemptID)
4✔
1074

4✔
1075
                return linkError
4✔
1076

1077
        // A regular multi-hop payment error that we'll need to
1078
        // decrypt.
1079
        default:
4✔
1080
                // We'll attempt to fully decrypt the onion encrypted
4✔
1081
                // error. If we're unable to then we'll bail early.
4✔
1082
                failure, err := deobfuscator.DecryptError(htlc.Reason)
4✔
1083
                if err != nil {
4✔
1084
                        log.Errorf("unable to de-obfuscate onion failure "+
×
1085
                                "(hash=%v, pid=%d): %v",
×
1086
                                paymentHash, attemptID, err)
×
1087

×
1088
                        return ErrUnreadableFailureMessage
×
1089
                }
×
1090

1091
                return failure
4✔
1092
        }
1093
}
1094

1095
// handlePacketForward is used in cases when we need forward the htlc update
1096
// from one channel link to another and be able to propagate the settle/fail
1097
// updates back. This behaviour is achieved by creation of payment circuits.
1098
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
4✔
1099
        switch htlc := packet.htlc.(type) {
4✔
1100

1101
        // Channel link forwarded us a new htlc, therefore we initiate the
1102
        // payment circuit within our internal state so we can properly forward
1103
        // the ultimate settle message back latter.
1104
        case *lnwire.UpdateAddHTLC:
4✔
1105
                // Check if the node is set to reject all onward HTLCs and also make
4✔
1106
                // sure that HTLC is not from the source node.
4✔
1107
                if s.cfg.RejectHTLC {
8✔
1108
                        failure := NewDetailedLinkError(
4✔
1109
                                &lnwire.FailChannelDisabled{},
4✔
1110
                                OutgoingFailureForwardsDisabled,
4✔
1111
                        )
4✔
1112

4✔
1113
                        return s.failAddPacket(packet, failure)
4✔
1114
                }
4✔
1115

1116
                // Before we attempt to find a non-strict forwarding path for
1117
                // this htlc, check whether the htlc is being routed over the
1118
                // same incoming and outgoing channel. If our node does not
1119
                // allow forwards of this nature, we fail the htlc early. This
1120
                // check is in place to disallow inefficiently routed htlcs from
1121
                // locking up our balance. With channels where the
1122
                // option-scid-alias feature was negotiated, we also have to be
1123
                // sure that the IDs aren't the same since one or both could be
1124
                // an alias.
1125
                linkErr := s.checkCircularForward(
4✔
1126
                        packet.incomingChanID, packet.outgoingChanID,
4✔
1127
                        s.cfg.AllowCircularRoute, htlc.PaymentHash,
4✔
1128
                )
4✔
1129
                if linkErr != nil {
4✔
1130
                        return s.failAddPacket(packet, linkErr)
×
1131
                }
×
1132

1133
                s.indexMtx.RLock()
4✔
1134
                targetLink, err := s.getLinkByMapping(packet)
4✔
1135
                if err != nil {
8✔
1136
                        s.indexMtx.RUnlock()
4✔
1137

4✔
1138
                        log.Debugf("unable to find link with "+
4✔
1139
                                "destination %v", packet.outgoingChanID)
4✔
1140

4✔
1141
                        // If packet was forwarded from another channel link
4✔
1142
                        // than we should notify this link that some error
4✔
1143
                        // occurred.
4✔
1144
                        linkError := NewLinkError(
4✔
1145
                                &lnwire.FailUnknownNextPeer{},
4✔
1146
                        )
4✔
1147

4✔
1148
                        return s.failAddPacket(packet, linkError)
4✔
1149
                }
4✔
1150
                targetPeerKey := targetLink.PeerPubKey()
4✔
1151
                interfaceLinks, _ := s.getLinks(targetPeerKey)
4✔
1152
                s.indexMtx.RUnlock()
4✔
1153

4✔
1154
                // We'll keep track of any HTLC failures during the link
4✔
1155
                // selection process. This way we can return the error for
4✔
1156
                // precise link that the sender selected, while optimistically
4✔
1157
                // trying all links to utilize our available bandwidth.
4✔
1158
                linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
4✔
1159

4✔
1160
                // Find all destination channel links with appropriate
4✔
1161
                // bandwidth.
4✔
1162
                var destinations []ChannelLink
4✔
1163
                for _, link := range interfaceLinks {
8✔
1164
                        var failure *LinkError
4✔
1165

4✔
1166
                        // We'll skip any links that aren't yet eligible for
4✔
1167
                        // forwarding.
4✔
1168
                        if !link.EligibleToForward() {
4✔
1169
                                failure = NewDetailedLinkError(
×
1170
                                        &lnwire.FailUnknownNextPeer{},
×
1171
                                        OutgoingFailureLinkNotEligible,
×
1172
                                )
×
1173
                        } else {
4✔
1174
                                // We'll ensure that the HTLC satisfies the
4✔
1175
                                // current forwarding conditions of this target
4✔
1176
                                // link.
4✔
1177
                                currentHeight := atomic.LoadUint32(&s.bestHeight)
4✔
1178
                                failure = link.CheckHtlcForward(
4✔
1179
                                        htlc.PaymentHash, packet.incomingAmount,
4✔
1180
                                        packet.amount, packet.incomingTimeout,
4✔
1181
                                        packet.outgoingTimeout,
4✔
1182
                                        packet.inboundFee,
4✔
1183
                                        currentHeight,
4✔
1184
                                        packet.originalOutgoingChanID,
4✔
1185
                                )
4✔
1186
                        }
4✔
1187

1188
                        // If this link can forward the htlc, add it to the set
1189
                        // of destinations.
1190
                        if failure == nil {
8✔
1191
                                destinations = append(destinations, link)
4✔
1192
                                continue
4✔
1193
                        }
1194

1195
                        linkErrs[link.ShortChanID()] = failure
4✔
1196
                }
1197

1198
                // If we had a forwarding failure due to the HTLC not
1199
                // satisfying the current policy, then we'll send back an
1200
                // error, but ensure we send back the error sourced at the
1201
                // *target* link.
1202
                if len(destinations) == 0 {
8✔
1203
                        // At this point, some or all of the links rejected the
4✔
1204
                        // HTLC so we couldn't forward it. So we'll try to look
4✔
1205
                        // up the error that came from the source.
4✔
1206
                        linkErr, ok := linkErrs[packet.outgoingChanID]
4✔
1207
                        if !ok {
4✔
1208
                                // If we can't find the error of the source,
×
1209
                                // then we'll return an unknown next peer,
×
1210
                                // though this should never happen.
×
1211
                                linkErr = NewLinkError(
×
1212
                                        &lnwire.FailUnknownNextPeer{},
×
1213
                                )
×
1214
                                log.Warnf("unable to find err source for "+
×
1215
                                        "outgoing_link=%v, errors=%v",
×
1216
                                        packet.outgoingChanID, newLogClosure(func() string {
×
1217
                                                return spew.Sdump(linkErrs)
×
1218
                                        }))
×
1219
                        }
1220

1221
                        log.Tracef("incoming HTLC(%x) violated "+
4✔
1222
                                "target outgoing link (id=%v) policy: %v",
4✔
1223
                                htlc.PaymentHash[:], packet.outgoingChanID,
4✔
1224
                                linkErr)
4✔
1225

4✔
1226
                        return s.failAddPacket(packet, linkErr)
4✔
1227
                }
1228

1229
                // Choose a random link out of the set of links that can forward
1230
                // this htlc. The reason for randomization is to evenly
1231
                // distribute the htlc load without making assumptions about
1232
                // what the best channel is.
1233
                destination := destinations[rand.Intn(len(destinations))] // nolint:gosec
4✔
1234

4✔
1235
                // Retrieve the incoming link by its ShortChannelID. Note that
4✔
1236
                // the incomingChanID is never set to hop.Source here.
4✔
1237
                s.indexMtx.RLock()
4✔
1238
                incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
4✔
1239
                s.indexMtx.RUnlock()
4✔
1240
                if err != nil {
4✔
1241
                        // If we couldn't find the incoming link, we can't
×
1242
                        // evaluate the incoming's exposure to dust, so we just
×
1243
                        // fail the HTLC back.
×
1244
                        linkErr := NewLinkError(
×
1245
                                &lnwire.FailTemporaryChannelFailure{},
×
1246
                        )
×
1247

×
1248
                        return s.failAddPacket(packet, linkErr)
×
1249
                }
×
1250

1251
                // Evaluate whether this HTLC would increase our exposure to
1252
                // dust on the incoming link. If it does, fail it backwards.
1253
                if s.evaluateDustThreshold(
4✔
1254
                        incomingLink, packet.incomingAmount, true,
4✔
1255
                ) {
4✔
1256
                        // The incoming dust exceeds the threshold, so we fail
×
1257
                        // the add back.
×
1258
                        linkErr := NewLinkError(
×
1259
                                &lnwire.FailTemporaryChannelFailure{},
×
1260
                        )
×
1261

×
1262
                        return s.failAddPacket(packet, linkErr)
×
1263
                }
×
1264

1265
                // Also evaluate whether this HTLC would increase our exposure
1266
                // to dust on the destination link. If it does, fail it back.
1267
                if s.evaluateDustThreshold(
4✔
1268
                        destination, packet.amount, false,
4✔
1269
                ) {
4✔
1270
                        // The outgoing dust exceeds the threshold, so we fail
×
1271
                        // the add back.
×
1272
                        linkErr := NewLinkError(
×
1273
                                &lnwire.FailTemporaryChannelFailure{},
×
1274
                        )
×
1275

×
1276
                        return s.failAddPacket(packet, linkErr)
×
1277
                }
×
1278

1279
                // Send the packet to the destination channel link which
1280
                // manages the channel.
1281
                packet.outgoingChanID = destination.ShortChanID()
4✔
1282
                return destination.handleSwitchPacket(packet)
4✔
1283

1284
        case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
4✔
1285
                // If the source of this packet has not been set, use the
4✔
1286
                // circuit map to lookup the origin.
4✔
1287
                circuit, err := s.closeCircuit(packet)
4✔
1288
                if err != nil {
8✔
1289
                        return err
4✔
1290
                }
4✔
1291

1292
                // closeCircuit returns a nil circuit when a settle packet returns an
1293
                // ErrUnknownCircuit error upon the inner call to CloseCircuit.
1294
                if circuit == nil {
8✔
1295
                        return nil
4✔
1296
                }
4✔
1297

1298
                fail, isFail := htlc.(*lnwire.UpdateFailHTLC)
4✔
1299
                if isFail && !packet.hasSource {
8✔
1300
                        // HTLC resolutions and messages restored from disk
4✔
1301
                        // don't have the obfuscator set from the original htlc
4✔
1302
                        // add packet - set it here for use in blinded errors.
4✔
1303
                        packet.obfuscator = circuit.ErrorEncrypter
4✔
1304

4✔
1305
                        switch {
4✔
1306
                        // No message to encrypt, locally sourced payment.
1307
                        case circuit.ErrorEncrypter == nil:
4✔
1308

1309
                        // If this is a resolution message, then we'll need to
1310
                        // encrypt it as it's actually internally sourced.
1311
                        case packet.isResolution:
4✔
1312
                                var err error
4✔
1313
                                // TODO(roasbeef): don't need to pass actually?
4✔
1314
                                failure := &lnwire.FailPermanentChannelFailure{}
4✔
1315
                                fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
4✔
1316
                                        failure,
4✔
1317
                                )
4✔
1318
                                if err != nil {
4✔
1319
                                        err = fmt.Errorf("unable to obfuscate "+
×
1320
                                                "error: %v", err)
×
1321
                                        log.Error(err)
×
1322
                                }
×
1323

1324
                        // Alternatively, if the remote party send us an
1325
                        // UpdateFailMalformedHTLC, then we'll need to convert
1326
                        // this into a proper well formatted onion error as
1327
                        // there's no HMAC currently.
1328
                        case packet.convertedError:
4✔
1329
                                log.Infof("Converting malformed HTLC error "+
4✔
1330
                                        "for circuit for Circuit(%x: "+
4✔
1331
                                        "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash,
4✔
1332
                                        packet.incomingChanID, packet.incomingHTLCID,
4✔
1333
                                        packet.outgoingChanID, packet.outgoingHTLCID)
4✔
1334

4✔
1335
                                fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
4✔
1336
                                        fail.Reason,
4✔
1337
                                )
4✔
1338

1339
                        default:
4✔
1340
                                // Otherwise, it's a forwarded error, so we'll perform a
4✔
1341
                                // wrapper encryption as normal.
4✔
1342
                                fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
4✔
1343
                                        fail.Reason,
4✔
1344
                                )
4✔
1345
                        }
1346
                } else if !isFail && circuit.Outgoing != nil {
8✔
1347
                        // If this is an HTLC settle, and it wasn't from a
4✔
1348
                        // locally initiated HTLC, then we'll log a forwarding
4✔
1349
                        // event so we can flush it to disk later.
4✔
1350
                        //
4✔
1351
                        // TODO(roasbeef): only do this once link actually
4✔
1352
                        // fully settles?
4✔
1353
                        localHTLC := packet.incomingChanID == hop.Source
4✔
1354
                        if !localHTLC {
8✔
1355
                                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
4✔
1356
                                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
4✔
1357
                                        circuit.PaymentHash[:], circuit.OutgoingAmount,
4✔
1358
                                        circuit.IncomingAmount-circuit.OutgoingAmount,
4✔
1359
                                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
4✔
1360
                                s.fwdEventMtx.Lock()
4✔
1361
                                s.pendingFwdingEvents = append(
4✔
1362
                                        s.pendingFwdingEvents,
4✔
1363
                                        channeldb.ForwardingEvent{
4✔
1364
                                                Timestamp:      time.Now(),
4✔
1365
                                                IncomingChanID: circuit.Incoming.ChanID,
4✔
1366
                                                OutgoingChanID: circuit.Outgoing.ChanID,
4✔
1367
                                                AmtIn:          circuit.IncomingAmount,
4✔
1368
                                                AmtOut:         circuit.OutgoingAmount,
4✔
1369
                                        },
4✔
1370
                                )
4✔
1371
                                s.fwdEventMtx.Unlock()
4✔
1372
                        }
4✔
1373
                }
1374

1375
                // A blank IncomingChanID in a circuit indicates that it is a pending
1376
                // user-initiated payment.
1377
                if packet.incomingChanID == hop.Source {
8✔
1378
                        s.wg.Add(1)
4✔
1379
                        go s.handleLocalResponse(packet)
4✔
1380
                        return nil
4✔
1381
                }
4✔
1382

1383
                // Check to see that the source link is online before removing
1384
                // the circuit.
1385
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
4✔
1386

1387
        default:
×
1388
                return errors.New("wrong update type")
×
1389
        }
1390
}
1391

1392
// checkCircularForward checks whether a forward is circular (arrives and
1393
// departs on the same link) and returns a link error if the switch is
1394
// configured to disallow this behaviour.
1395
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1396
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
4✔
1397

4✔
1398
        // If they are equal, we can skip the alias mapping checks.
4✔
1399
        if incoming == outgoing {
4✔
1400
                // The switch may be configured to allow circular routes, so
×
1401
                // just log and return nil.
×
1402
                if allowCircular {
×
1403
                        log.Debugf("allowing circular route over link: %v "+
×
1404
                                "(payment hash: %x)", incoming, paymentHash)
×
1405
                        return nil
×
1406
                }
×
1407

1408
                // Otherwise, we'll return a temporary channel failure.
1409
                return NewDetailedLinkError(
×
1410
                        lnwire.NewTemporaryChannelFailure(nil),
×
1411
                        OutgoingFailureCircularRoute,
×
1412
                )
×
1413
        }
1414

1415
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1416
        // outgoing SCIDs. If either one does not have a base SCID, then the
1417
        // two channels are not equal since one will be a channel that does not
1418
        // need a mapping and SCID equality was checked above. If the "base"
1419
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1420
        s.indexMtx.RLock()
4✔
1421
        incomingBaseScid, ok := s.baseIndex[incoming]
4✔
1422
        if !ok {
8✔
1423
                // This channel does not use baseIndex, bail out.
4✔
1424
                s.indexMtx.RUnlock()
4✔
1425
                return nil
4✔
1426
        }
4✔
1427

1428
        outgoingBaseScid, ok := s.baseIndex[outgoing]
4✔
1429
        if !ok {
8✔
1430
                // This channel does not use baseIndex, bail out.
4✔
1431
                s.indexMtx.RUnlock()
4✔
1432
                return nil
4✔
1433
        }
4✔
1434
        s.indexMtx.RUnlock()
4✔
1435

4✔
1436
        // Check base SCID equality.
4✔
1437
        if incomingBaseScid != outgoingBaseScid {
8✔
1438
                // The base SCIDs are not equal so these are not the same
4✔
1439
                // channel.
4✔
1440
                return nil
4✔
1441
        }
4✔
1442

1443
        // If the incoming and outgoing link are equal, the htlc is part of a
1444
        // circular route which may be used to lock up our liquidity. If the
1445
        // switch is configured to allow circular routes, log that we are
1446
        // allowing the route then return nil.
1447
        if allowCircular {
×
1448
                log.Debugf("allowing circular route over link: %v "+
×
1449
                        "(payment hash: %x)", incoming, paymentHash)
×
1450
                return nil
×
1451
        }
×
1452

1453
        // If our node disallows circular routes, return a temporary channel
1454
        // failure. There is nothing wrong with the policy used by the remote
1455
        // node, so we do not include a channel update.
1456
        return NewDetailedLinkError(
×
1457
                lnwire.NewTemporaryChannelFailure(nil),
×
1458
                OutgoingFailureCircularRoute,
×
1459
        )
×
1460
}
1461

1462
// failAddPacket encrypts a fail packet back to an add packet's source.
1463
// The ciphertext will be derived from the failure message proivded by context.
1464
// This method returns the failErr if all other steps complete successfully.
1465
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
4✔
1466
        // Encrypt the failure so that the sender will be able to read the error
4✔
1467
        // message. Since we failed this packet, we use EncryptFirstHop to
4✔
1468
        // obfuscate the failure for their eyes only.
4✔
1469
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
4✔
1470
        if err != nil {
4✔
1471
                err := fmt.Errorf("unable to obfuscate "+
×
1472
                        "error: %v", err)
×
1473
                log.Error(err)
×
1474
                return err
×
1475
        }
×
1476

1477
        log.Error(failure.Error())
4✔
1478

4✔
1479
        // Create a failure packet for this htlc. The full set of
4✔
1480
        // information about the htlc failure is included so that they can
4✔
1481
        // be included in link failure notifications.
4✔
1482
        failPkt := &htlcPacket{
4✔
1483
                sourceRef:       packet.sourceRef,
4✔
1484
                incomingChanID:  packet.incomingChanID,
4✔
1485
                incomingHTLCID:  packet.incomingHTLCID,
4✔
1486
                outgoingChanID:  packet.outgoingChanID,
4✔
1487
                outgoingHTLCID:  packet.outgoingHTLCID,
4✔
1488
                incomingAmount:  packet.incomingAmount,
4✔
1489
                amount:          packet.amount,
4✔
1490
                incomingTimeout: packet.incomingTimeout,
4✔
1491
                outgoingTimeout: packet.outgoingTimeout,
4✔
1492
                circuit:         packet.circuit,
4✔
1493
                obfuscator:      packet.obfuscator,
4✔
1494
                linkFailure:     failure,
4✔
1495
                htlc: &lnwire.UpdateFailHTLC{
4✔
1496
                        Reason: reason,
4✔
1497
                },
4✔
1498
        }
4✔
1499

4✔
1500
        // Route a fail packet back to the source link.
4✔
1501
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
4✔
1502
        if err != nil {
4✔
1503
                err = fmt.Errorf("source chanid=%v unable to "+
×
1504
                        "handle switch packet: %v",
×
1505
                        packet.incomingChanID, err)
×
1506
                log.Error(err)
×
1507
                return err
×
1508
        }
×
1509

1510
        return failure
4✔
1511
}
1512

1513
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1514
// attempts to determine the source that forwarded this htlc. This method will
1515
// set the incoming chan and htlc ID of the given packet if the source was
1516
// found, and will properly [re]encrypt any failure messages.
1517
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
4✔
1518
        // If the packet has its source, that means it was failed locally by
4✔
1519
        // the outgoing link. We fail it here to make sure only one response
4✔
1520
        // makes it through the switch.
4✔
1521
        if pkt.hasSource {
8✔
1522
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
4✔
1523
                switch err {
4✔
1524

1525
                // Circuit successfully closed.
1526
                case nil:
4✔
1527
                        return circuit, nil
4✔
1528

1529
                // Circuit was previously closed, but has not been deleted.
1530
                // We'll just drop this response until the circuit has been
1531
                // fully removed.
1532
                case ErrCircuitClosing:
×
1533
                        return nil, err
×
1534

1535
                // Failed to close circuit because it does not exist. This is
1536
                // likely because the circuit was already successfully closed.
1537
                // Since this packet failed locally, there is no forwarding
1538
                // package entry to acknowledge.
1539
                case ErrUnknownCircuit:
×
1540
                        return nil, err
×
1541

1542
                // Unexpected error.
1543
                default:
×
1544
                        return nil, err
×
1545
                }
1546
        }
1547

1548
        // Otherwise, this is packet was received from the remote party.  Use
1549
        // circuit map to find the incoming link to receive the settle/fail.
1550
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
4✔
1551
        switch err {
4✔
1552

1553
        // Open circuit successfully closed.
1554
        case nil:
4✔
1555
                pkt.incomingChanID = circuit.Incoming.ChanID
4✔
1556
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
4✔
1557
                pkt.circuit = circuit
4✔
1558
                pkt.sourceRef = &circuit.AddRef
4✔
1559

4✔
1560
                pktType := "SETTLE"
4✔
1561
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
8✔
1562
                        pktType = "FAIL"
4✔
1563
                }
4✔
1564

1565
                log.Debugf("Closed completed %s circuit for %x: "+
4✔
1566
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
4✔
1567
                        pkt.incomingChanID, pkt.incomingHTLCID,
4✔
1568
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
4✔
1569

4✔
1570
                return circuit, nil
4✔
1571

1572
        // Circuit was previously closed, but has not been deleted. We'll just
1573
        // drop this response until the circuit has been removed.
1574
        case ErrCircuitClosing:
4✔
1575
                return nil, err
4✔
1576

1577
        // Failed to close circuit because it does not exist. This is likely
1578
        // because the circuit was already successfully closed.
1579
        case ErrUnknownCircuit:
4✔
1580
                if pkt.destRef != nil {
8✔
1581
                        // Add this SettleFailRef to the set of pending settle/fail entries
4✔
1582
                        // awaiting acknowledgement.
4✔
1583
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
4✔
1584
                }
4✔
1585

1586
                // If this is a settle, we will not log an error message as settles
1587
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1588
                // can hit this case if the link restarts after having just sent a fail
1589
                // to the switch.
1590
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
4✔
1591
                if !isSettle {
8✔
1592
                        err := fmt.Errorf("unable to find target channel "+
4✔
1593
                                "for HTLC fail: channel ID = %s, "+
4✔
1594
                                "HTLC ID = %d", pkt.outgoingChanID,
4✔
1595
                                pkt.outgoingHTLCID)
4✔
1596
                        log.Error(err)
4✔
1597

4✔
1598
                        return nil, err
4✔
1599
                }
4✔
1600

1601
                return nil, nil
4✔
1602

1603
        // Unexpected error.
1604
        default:
×
1605
                return nil, err
×
1606
        }
1607
}
1608

1609
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1610
// forwarding package of the outgoing link for a payment circuit. We do this if
1611
// we're the originator of the payment, so the link stops attempting to
1612
// re-broadcast.
1613
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
4✔
1614
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
8✔
1615
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
4✔
1616
        })
4✔
1617
}
1618

1619
// teardownCircuit removes a pending or open circuit from the switch's circuit
1620
// map and prints useful logging statements regarding the outcome.
1621
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
4✔
1622
        var pktType string
4✔
1623
        switch htlc := pkt.htlc.(type) {
4✔
1624
        case *lnwire.UpdateFulfillHTLC:
4✔
1625
                pktType = "SETTLE"
4✔
1626
        case *lnwire.UpdateFailHTLC:
4✔
1627
                pktType = "FAIL"
4✔
1628
        default:
×
1629
                err := fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1630
                log.Errorf(err.Error())
×
1631
                return err
×
1632
        }
1633

1634
        switch {
4✔
1635
        case pkt.circuit.HasKeystone():
4✔
1636
                log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+
4✔
1637
                        "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
4✔
1638

4✔
1639
                err := s.circuits.DeleteCircuits(pkt.inKey())
4✔
1640
                if err != nil {
4✔
1641
                        log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+
×
1642
                                "with payment_hash-%v using %s pkt",
×
1643
                                pkt.incomingChanID, pkt.incomingHTLCID,
×
1644
                                pkt.outgoingChanID, pkt.outgoingHTLCID,
×
1645
                                pkt.circuit.PaymentHash, pktType)
×
1646
                        return err
×
1647
                }
×
1648

1649
                log.Debugf("Closed completed %s circuit for %x: "+
4✔
1650
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
4✔
1651
                        pkt.incomingChanID, pkt.incomingHTLCID,
4✔
1652
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
4✔
1653

1654
        default:
4✔
1655
                log.Debugf("Tearing down incomplete circuit with %s for inkey=%v",
4✔
1656
                        pktType, pkt.inKey())
4✔
1657

4✔
1658
                err := s.circuits.DeleteCircuits(pkt.inKey())
4✔
1659
                if err != nil {
4✔
1660
                        log.Warnf("Failed to tear down pending %s circuit for %x: "+
×
1661
                                "(%s, %d)", pktType, pkt.circuit.PaymentHash,
×
1662
                                pkt.incomingChanID, pkt.incomingHTLCID)
×
1663
                        return err
×
1664
                }
×
1665

1666
                log.Debugf("Removed pending onion circuit for %x: "+
4✔
1667
                        "(%s, %d)", pkt.circuit.PaymentHash,
4✔
1668
                        pkt.incomingChanID, pkt.incomingHTLCID)
4✔
1669
        }
1670

1671
        return nil
4✔
1672
}
1673

1674
// CloseLink creates and sends the close channel command to the target link
1675
// directing the specified closure type. If the closure type is CloseRegular,
1676
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1677
// a starting point for close negotiation. The deliveryScript parameter is an
1678
// optional parameter which sets a user specified script to close out to.
1679
func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
1680
        closeType contractcourt.ChannelCloseType,
1681
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
1682
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
4✔
1683

4✔
1684
        // TODO(roasbeef) abstract out the close updates.
4✔
1685
        updateChan := make(chan interface{}, 2)
4✔
1686
        errChan := make(chan error, 1)
4✔
1687

4✔
1688
        command := &ChanClose{
4✔
1689
                CloseType:      closeType,
4✔
1690
                ChanPoint:      chanPoint,
4✔
1691
                Updates:        updateChan,
4✔
1692
                TargetFeePerKw: targetFeePerKw,
4✔
1693
                MaxFee:         maxFee,
4✔
1694
                DeliveryScript: deliveryScript,
4✔
1695
                Err:            errChan,
4✔
1696
        }
4✔
1697

4✔
1698
        select {
4✔
1699
        case s.chanCloseRequests <- command:
4✔
1700
                return updateChan, errChan
4✔
1701

1702
        case <-s.quit:
×
1703
                errChan <- ErrSwitchExiting
×
1704
                close(updateChan)
×
1705
                return updateChan, errChan
×
1706
        }
1707
}
1708

1709
// htlcForwarder is responsible for optimally forwarding (and possibly
1710
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1711
// links. The duties of the forwarder are similar to that of a network switch,
1712
// in that it facilitates multi-hop payments by acting as a central messaging
1713
// bus. The switch communicates will active links to create, manage, and tear
1714
// down active onion routed payments. Each active channel is modeled as
1715
// networked device with metadata such as the available payment bandwidth, and
1716
// total link capacity.
1717
//
1718
// NOTE: This MUST be run as a goroutine.
1719
func (s *Switch) htlcForwarder() {
4✔
1720
        defer s.wg.Done()
4✔
1721

4✔
1722
        defer func() {
8✔
1723
                s.blockEpochStream.Cancel()
4✔
1724

4✔
1725
                // Remove all links once we've been signalled for shutdown.
4✔
1726
                var linksToStop []ChannelLink
4✔
1727
                s.indexMtx.Lock()
4✔
1728
                for _, link := range s.linkIndex {
8✔
1729
                        activeLink := s.removeLink(link.ChanID())
4✔
1730
                        if activeLink == nil {
4✔
1731
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1732
                                        "on stop", link.ChanID())
×
1733
                                continue
×
1734
                        }
1735
                        linksToStop = append(linksToStop, activeLink)
4✔
1736
                }
1737
                for _, link := range s.pendingLinkIndex {
8✔
1738
                        pendingLink := s.removeLink(link.ChanID())
4✔
1739
                        if pendingLink == nil {
4✔
1740
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1741
                                        "on stop", link.ChanID())
×
1742
                                continue
×
1743
                        }
1744
                        linksToStop = append(linksToStop, pendingLink)
4✔
1745
                }
1746
                s.indexMtx.Unlock()
4✔
1747

4✔
1748
                // Now that all pending and live links have been removed from
4✔
1749
                // the forwarding indexes, stop each one before shutting down.
4✔
1750
                // We'll shut them down in parallel to make exiting as fast as
4✔
1751
                // possible.
4✔
1752
                var wg sync.WaitGroup
4✔
1753
                for _, link := range linksToStop {
8✔
1754
                        wg.Add(1)
4✔
1755
                        go func(l ChannelLink) {
8✔
1756
                                defer wg.Done()
4✔
1757

4✔
1758
                                l.Stop()
4✔
1759
                        }(link)
4✔
1760
                }
1761
                wg.Wait()
4✔
1762

4✔
1763
                // Before we exit fully, we'll attempt to flush out any
4✔
1764
                // forwarding events that may still be lingering since the last
4✔
1765
                // batch flush.
4✔
1766
                if err := s.FlushForwardingEvents(); err != nil {
4✔
1767
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1768
                }
×
1769
        }()
1770

1771
        // TODO(roasbeef): cleared vs settled distinction
1772
        var (
4✔
1773
                totalNumUpdates uint64
4✔
1774
                totalSatSent    btcutil.Amount
4✔
1775
                totalSatRecv    btcutil.Amount
4✔
1776
        )
4✔
1777
        s.cfg.LogEventTicker.Resume()
4✔
1778
        defer s.cfg.LogEventTicker.Stop()
4✔
1779

4✔
1780
        // Every 15 seconds, we'll flush out the forwarding events that
4✔
1781
        // occurred during that period.
4✔
1782
        s.cfg.FwdEventTicker.Resume()
4✔
1783
        defer s.cfg.FwdEventTicker.Stop()
4✔
1784

4✔
1785
        defer s.cfg.AckEventTicker.Stop()
4✔
1786

4✔
1787
out:
4✔
1788
        for {
8✔
1789

4✔
1790
                // If the set of pending settle/fail entries is non-zero,
4✔
1791
                // reinstate the ack ticker so we can batch ack them.
4✔
1792
                if len(s.pendingSettleFails) > 0 {
8✔
1793
                        s.cfg.AckEventTicker.Resume()
4✔
1794
                }
4✔
1795

1796
                select {
4✔
1797
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
4✔
1798
                        if !ok {
4✔
1799
                                break out
×
1800
                        }
1801

1802
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
4✔
1803

1804
                // A local close request has arrived, we'll forward this to the
1805
                // relevant link (if it exists) so the channel can be
1806
                // cooperatively closed (if possible).
1807
                case req := <-s.chanCloseRequests:
4✔
1808
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
4✔
1809

4✔
1810
                        s.indexMtx.RLock()
4✔
1811
                        link, ok := s.linkIndex[chanID]
4✔
1812
                        if !ok {
8✔
1813
                                s.indexMtx.RUnlock()
4✔
1814

4✔
1815
                                req.Err <- fmt.Errorf("no peer for channel with "+
4✔
1816
                                        "chan_id=%x", chanID[:])
4✔
1817
                                continue
4✔
1818
                        }
1819
                        s.indexMtx.RUnlock()
4✔
1820

4✔
1821
                        peerPub := link.PeerPubKey()
4✔
1822
                        log.Debugf("Requesting local channel close: peer=%v, "+
4✔
1823
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
4✔
1824

4✔
1825
                        go s.cfg.LocalChannelClose(peerPub[:], req)
4✔
1826

1827
                case resolutionMsg := <-s.resolutionMsgs:
4✔
1828
                        // We'll persist the resolution message to the Switch's
4✔
1829
                        // resolution store.
4✔
1830
                        resMsg := resolutionMsg.ResolutionMsg
4✔
1831
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
4✔
1832
                        if err != nil {
4✔
1833
                                // This will only fail if there is a database
×
1834
                                // error or a serialization error. Sending the
×
1835
                                // error prevents the contractcourt from being
×
1836
                                // in a state where it believes the send was
×
1837
                                // successful, when it wasn't.
×
1838
                                log.Errorf("unable to add resolution msg: %v",
×
1839
                                        err)
×
1840
                                resolutionMsg.errChan <- err
×
1841
                                continue
×
1842
                        }
1843

1844
                        // At this point, the resolution message has been
1845
                        // persisted. It is safe to signal success by sending
1846
                        // a nil error since the Switch will re-deliver the
1847
                        // resolution message on restart.
1848
                        resolutionMsg.errChan <- nil
4✔
1849

4✔
1850
                        // Create a htlc packet for this resolution. We do
4✔
1851
                        // not have some of the information that we'll need
4✔
1852
                        // for blinded error handling here , so we'll rely on
4✔
1853
                        // our forwarding logic to fill it in later.
4✔
1854
                        pkt := &htlcPacket{
4✔
1855
                                outgoingChanID: resolutionMsg.SourceChan,
4✔
1856
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
4✔
1857
                                isResolution:   true,
4✔
1858
                        }
4✔
1859

4✔
1860
                        // Resolution messages will either be cancelling
4✔
1861
                        // backwards an existing HTLC, or settling a previously
4✔
1862
                        // outgoing HTLC. Based on this, we'll map the message
4✔
1863
                        // to the proper htlcPacket.
4✔
1864
                        if resolutionMsg.Failure != nil {
8✔
1865
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
4✔
1866
                        } else {
8✔
1867
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
4✔
1868
                                        PaymentPreimage: *resolutionMsg.PreImage,
4✔
1869
                                }
4✔
1870
                        }
4✔
1871

1872
                        log.Infof("Received outside contract resolution, "+
4✔
1873
                                "mapping to: %v", spew.Sdump(pkt))
4✔
1874

4✔
1875
                        // We don't check the error, as the only failure we can
4✔
1876
                        // encounter is due to the circuit already being
4✔
1877
                        // closed. This is fine, as processing this message is
4✔
1878
                        // meant to be idempotent.
4✔
1879
                        err = s.handlePacketForward(pkt)
4✔
1880
                        if err != nil {
8✔
1881
                                log.Errorf("Unable to forward resolution msg: %v", err)
4✔
1882
                        }
4✔
1883

1884
                // A new packet has arrived for forwarding, we'll interpret the
1885
                // packet concretely, then either forward it along, or
1886
                // interpret a return packet to a locally initialized one.
1887
                case cmd := <-s.htlcPlex:
4✔
1888
                        cmd.err <- s.handlePacketForward(cmd.pkt)
4✔
1889

1890
                // When this time ticks, then it indicates that we should
1891
                // collect all the forwarding events since the last internal,
1892
                // and write them out to our log.
1893
                case <-s.cfg.FwdEventTicker.Ticks():
4✔
1894
                        s.wg.Add(1)
4✔
1895
                        go func() {
8✔
1896
                                defer s.wg.Done()
4✔
1897

4✔
1898
                                if err := s.FlushForwardingEvents(); err != nil {
4✔
1899
                                        log.Errorf("unable to flush "+
×
1900
                                                "forwarding events: %v", err)
×
1901
                                }
×
1902
                        }()
1903

1904
                // The log ticker has fired, so we'll calculate some forwarding
1905
                // stats for the last 10 seconds to display within the logs to
1906
                // users.
1907
                case <-s.cfg.LogEventTicker.Ticks():
4✔
1908
                        // First, we'll collate the current running tally of
4✔
1909
                        // our forwarding stats.
4✔
1910
                        prevSatSent := totalSatSent
4✔
1911
                        prevSatRecv := totalSatRecv
4✔
1912
                        prevNumUpdates := totalNumUpdates
4✔
1913

4✔
1914
                        var (
4✔
1915
                                newNumUpdates uint64
4✔
1916
                                newSatSent    btcutil.Amount
4✔
1917
                                newSatRecv    btcutil.Amount
4✔
1918
                        )
4✔
1919

4✔
1920
                        // Next, we'll run through all the registered links and
4✔
1921
                        // compute their up-to-date forwarding stats.
4✔
1922
                        s.indexMtx.RLock()
4✔
1923
                        for _, link := range s.linkIndex {
8✔
1924
                                // TODO(roasbeef): when links first registered
4✔
1925
                                // stats printed.
4✔
1926
                                updates, sent, recv := link.Stats()
4✔
1927
                                newNumUpdates += updates
4✔
1928
                                newSatSent += sent.ToSatoshis()
4✔
1929
                                newSatRecv += recv.ToSatoshis()
4✔
1930
                        }
4✔
1931
                        s.indexMtx.RUnlock()
4✔
1932

4✔
1933
                        var (
4✔
1934
                                diffNumUpdates uint64
4✔
1935
                                diffSatSent    btcutil.Amount
4✔
1936
                                diffSatRecv    btcutil.Amount
4✔
1937
                        )
4✔
1938

4✔
1939
                        // If this is the first time we're computing these
4✔
1940
                        // stats, then the diff is just the new value. We do
4✔
1941
                        // this in order to avoid integer underflow issues.
4✔
1942
                        if prevNumUpdates == 0 {
8✔
1943
                                diffNumUpdates = newNumUpdates
4✔
1944
                                diffSatSent = newSatSent
4✔
1945
                                diffSatRecv = newSatRecv
4✔
1946
                        } else {
8✔
1947
                                diffNumUpdates = newNumUpdates - prevNumUpdates
4✔
1948
                                diffSatSent = newSatSent - prevSatSent
4✔
1949
                                diffSatRecv = newSatRecv - prevSatRecv
4✔
1950
                        }
4✔
1951

1952
                        // If the diff of num updates is zero, then we haven't
1953
                        // forwarded anything in the last 10 seconds, so we can
1954
                        // skip this update.
1955
                        if diffNumUpdates == 0 {
8✔
1956
                                continue
4✔
1957
                        }
1958

1959
                        // If the diff of num updates is negative, then some
1960
                        // links may have been unregistered from the switch, so
1961
                        // we'll update our stats to only include our registered
1962
                        // links.
1963
                        if int64(diffNumUpdates) < 0 {
8✔
1964
                                totalNumUpdates = newNumUpdates
4✔
1965
                                totalSatSent = newSatSent
4✔
1966
                                totalSatRecv = newSatRecv
4✔
1967
                                continue
4✔
1968
                        }
1969

1970
                        // Otherwise, we'll log this diff, then accumulate the
1971
                        // new stats into the running total.
1972
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
4✔
1973
                                "in the last 10 seconds (%f tx/sec)",
4✔
1974
                                diffSatSent, diffSatRecv,
4✔
1975
                                float64(diffNumUpdates)/10)
4✔
1976

4✔
1977
                        totalNumUpdates += diffNumUpdates
4✔
1978
                        totalSatSent += diffSatSent
4✔
1979
                        totalSatRecv += diffSatRecv
4✔
1980

1981
                // The ack ticker has fired so if we have any settle/fail entries
1982
                // for a forwarding package to ack, we will do so here in a batch
1983
                // db call.
1984
                case <-s.cfg.AckEventTicker.Ticks():
4✔
1985
                        // If the current set is empty, pause the ticker.
4✔
1986
                        if len(s.pendingSettleFails) == 0 {
8✔
1987
                                s.cfg.AckEventTicker.Pause()
4✔
1988
                                continue
4✔
1989
                        }
1990

1991
                        // Batch ack the settle/fail entries.
1992
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
4✔
1993
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1994
                                continue
×
1995
                        }
1996

1997
                        log.Tracef("Acked %d settle fails: %v", len(s.pendingSettleFails),
4✔
1998
                                newLogClosure(func() string {
4✔
1999
                                        return spew.Sdump(s.pendingSettleFails)
×
2000
                                }))
×
2001

2002
                        // Reset the pendingSettleFails buffer while keeping acquired
2003
                        // memory.
2004
                        s.pendingSettleFails = s.pendingSettleFails[:0]
4✔
2005

2006
                case <-s.quit:
4✔
2007
                        return
4✔
2008
                }
2009
        }
2010
}
2011

2012
// Start starts all helper goroutines required for the operation of the switch.
2013
func (s *Switch) Start() error {
4✔
2014
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
4✔
2015
                log.Warn("Htlc Switch already started")
×
2016
                return errors.New("htlc switch already started")
×
2017
        }
×
2018

2019
        log.Infof("HTLC Switch starting")
4✔
2020

4✔
2021
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
4✔
2022
        if err != nil {
4✔
2023
                return err
×
2024
        }
×
2025
        s.blockEpochStream = blockEpochStream
4✔
2026

4✔
2027
        s.wg.Add(1)
4✔
2028
        go s.htlcForwarder()
4✔
2029

4✔
2030
        if err := s.reforwardResponses(); err != nil {
4✔
2031
                s.Stop()
×
2032
                log.Errorf("unable to reforward responses: %v", err)
×
2033
                return err
×
2034
        }
×
2035

2036
        if err := s.reforwardResolutions(); err != nil {
4✔
2037
                // We are already stopping so we can ignore the error.
×
2038
                _ = s.Stop()
×
2039
                log.Errorf("unable to reforward resolutions: %v", err)
×
2040
                return err
×
2041
        }
×
2042

2043
        return nil
4✔
2044
}
2045

2046
// reforwardResolutions fetches the set of resolution messages stored on-disk
2047
// and reforwards them if their circuits are still open. If the circuits have
2048
// been deleted, then we will delete the resolution message from the database.
2049
func (s *Switch) reforwardResolutions() error {
4✔
2050
        // Fetch all stored resolution messages, deleting the ones that are
4✔
2051
        // resolved.
4✔
2052
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
4✔
2053
        if err != nil {
4✔
2054
                return err
×
2055
        }
×
2056

2057
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
4✔
2058
        for _, resMsg := range resMsgs {
8✔
2059
                // If the open circuit no longer exists, then we can remove the
4✔
2060
                // message from the store.
4✔
2061
                outKey := CircuitKey{
4✔
2062
                        ChanID: resMsg.SourceChan,
4✔
2063
                        HtlcID: resMsg.HtlcIndex,
4✔
2064
                }
4✔
2065

4✔
2066
                if s.circuits.LookupOpenCircuit(outKey) == nil {
8✔
2067
                        // The open circuit doesn't exist.
4✔
2068
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
4✔
2069
                        if err != nil {
4✔
2070
                                return err
×
2071
                        }
×
2072

2073
                        continue
4✔
2074
                }
2075

2076
                // The circuit is still open, so we can assume that the link or
2077
                // switch (if we are the source) hasn't cleaned it up yet.
2078
                // We rely on our forwarding logic to fill in details that
2079
                // are not currently available to us.
2080
                resPkt := &htlcPacket{
4✔
2081
                        outgoingChanID: resMsg.SourceChan,
4✔
2082
                        outgoingHTLCID: resMsg.HtlcIndex,
4✔
2083
                        isResolution:   true,
4✔
2084
                }
4✔
2085

4✔
2086
                if resMsg.Failure != nil {
8✔
2087
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
4✔
2088
                } else {
4✔
2089
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
2090
                                PaymentPreimage: *resMsg.PreImage,
×
2091
                        }
×
2092
                }
×
2093

2094
                switchPackets = append(switchPackets, resPkt)
4✔
2095
        }
2096

2097
        // We'll now dispatch the set of resolution messages to the proper
2098
        // destination. An error is only encountered here if the switch is
2099
        // shutting down.
2100
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
4✔
2101
                return err
×
2102
        }
×
2103

2104
        return nil
4✔
2105
}
2106

2107
// reforwardResponses for every known, non-pending channel, loads all associated
2108
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
2109
// used to resurrect the switch's mailboxes after a restart. This also runs for
2110
// waiting close channels since there may be settles or fails that need to be
2111
// reforwarded before they completely close.
2112
func (s *Switch) reforwardResponses() error {
4✔
2113
        openChannels, err := s.cfg.FetchAllChannels()
4✔
2114
        if err != nil {
4✔
2115
                return err
×
2116
        }
×
2117

2118
        for _, openChannel := range openChannels {
8✔
2119
                shortChanID := openChannel.ShortChanID()
4✔
2120

4✔
2121
                // Locally-initiated payments never need reforwarding.
4✔
2122
                if shortChanID == hop.Source {
8✔
2123
                        continue
4✔
2124
                }
2125

2126
                // If the channel is pending, it should have no forwarding
2127
                // packages, and nothing to reforward.
2128
                if openChannel.IsPending {
4✔
2129
                        continue
×
2130
                }
2131

2132
                // Channels in open or waiting-close may still have responses in
2133
                // their forwarding packages. We will continue to reattempt
2134
                // forwarding on startup until the channel is fully-closed.
2135
                //
2136
                // Load this channel's forwarding packages, and deliver them to
2137
                // the switch.
2138
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
4✔
2139
                if err != nil {
4✔
2140
                        log.Errorf("unable to load forwarding "+
×
2141
                                "packages for %v: %v", shortChanID, err)
×
2142
                        return err
×
2143
                }
×
2144

2145
                s.reforwardSettleFails(fwdPkgs)
4✔
2146
        }
2147

2148
        return nil
4✔
2149
}
2150

2151
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
2152
// channel identifier.
2153
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
4✔
2154

4✔
2155
        var fwdPkgs []*channeldb.FwdPkg
4✔
2156
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
8✔
2157
                var err error
4✔
2158
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
4✔
2159
                        tx, source,
4✔
2160
                )
4✔
2161
                return err
4✔
2162
        }, func() {
8✔
2163
                fwdPkgs = nil
4✔
2164
        }); err != nil {
4✔
2165
                return nil, err
×
2166
        }
×
2167

2168
        return fwdPkgs, nil
4✔
2169
}
2170

2171
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
2172
// forwarding packages, and reforwards those that have not been acknowledged.
2173
// This is intended to occur on startup, in order to recover the switch's
2174
// mailboxes, and to ensure that responses can be propagated in case the
2175
// outgoing link never comes back online.
2176
//
2177
// NOTE: This should mimic the behavior processRemoteSettleFails.
2178
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
4✔
2179
        for _, fwdPkg := range fwdPkgs {
8✔
2180
                settleFails, err := lnwallet.PayDescsFromRemoteLogUpdates(
4✔
2181
                        fwdPkg.Source, fwdPkg.Height, fwdPkg.SettleFails,
4✔
2182
                )
4✔
2183
                if err != nil {
4✔
2184
                        log.Errorf("Unable to process remote log updates: %v",
×
2185
                                err)
×
2186
                        continue
×
2187
                }
2188

2189
                switchPackets := make([]*htlcPacket, 0, len(settleFails))
4✔
2190
                for i, pd := range settleFails {
8✔
2191

4✔
2192
                        // Skip any settles or fails that have already been
4✔
2193
                        // acknowledged by the incoming link that originated the
4✔
2194
                        // forwarded Add.
4✔
2195
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
8✔
2196
                                continue
4✔
2197
                        }
2198

2199
                        switch pd.EntryType {
4✔
2200

2201
                        // A settle for an HTLC we previously forwarded HTLC has
2202
                        // been received. So we'll forward the HTLC to the
2203
                        // switch which will handle propagating the settle to
2204
                        // the prior hop.
2205
                        case lnwallet.Settle:
4✔
2206
                                settlePacket := &htlcPacket{
4✔
2207
                                        outgoingChanID: fwdPkg.Source,
4✔
2208
                                        outgoingHTLCID: pd.ParentIndex,
4✔
2209
                                        destRef:        pd.DestRef,
4✔
2210
                                        htlc: &lnwire.UpdateFulfillHTLC{
4✔
2211
                                                PaymentPreimage: pd.RPreimage,
4✔
2212
                                        },
4✔
2213
                                }
4✔
2214

4✔
2215
                                // Add the packet to the batch to be forwarded, and
4✔
2216
                                // notify the overflow queue that a spare spot has been
4✔
2217
                                // freed up within the commitment state.
4✔
2218
                                switchPackets = append(switchPackets, settlePacket)
4✔
2219

2220
                        // A failureCode message for a previously forwarded HTLC has been
2221
                        // received. As a result a new slot will be freed up in our
2222
                        // commitment state, so we'll forward this to the switch so the
2223
                        // backwards undo can continue.
2224
                        case lnwallet.Fail:
×
2225
                                // Fetch the reason the HTLC was canceled so
×
2226
                                // we can continue to propagate it. This
×
2227
                                // failure originated from another node, so
×
2228
                                // the linkFailure field is not set on this
×
2229
                                // packet. We rely on the link to fill in
×
2230
                                // additional circuit information for us.
×
2231
                                failPacket := &htlcPacket{
×
2232
                                        outgoingChanID: fwdPkg.Source,
×
2233
                                        outgoingHTLCID: pd.ParentIndex,
×
2234
                                        destRef:        pd.DestRef,
×
2235
                                        htlc: &lnwire.UpdateFailHTLC{
×
2236
                                                Reason: lnwire.OpaqueReason(pd.FailReason),
×
2237
                                        },
×
2238
                                }
×
2239

×
2240
                                // Add the packet to the batch to be forwarded, and
×
2241
                                // notify the overflow queue that a spare spot has been
×
2242
                                // freed up within the commitment state.
×
2243
                                switchPackets = append(switchPackets, failPacket)
×
2244
                        }
2245
                }
2246

2247
                // Since this send isn't tied to a specific link, we pass a nil
2248
                // link quit channel, meaning the send will fail only if the
2249
                // switch receives a shutdown request.
2250
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
4✔
2251
                        log.Errorf("Unhandled error while reforwarding packets "+
×
2252
                                "settle/fail over htlcswitch: %v", err)
×
2253
                }
×
2254
        }
2255
}
2256

2257
// Stop gracefully stops all active helper goroutines, then waits until they've
2258
// exited.
2259
func (s *Switch) Stop() error {
4✔
2260
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
4✔
2261
                log.Warn("Htlc Switch already stopped")
×
2262
                return errors.New("htlc switch already shutdown")
×
2263
        }
×
2264

2265
        log.Info("HTLC Switch shutting down...")
4✔
2266
        defer log.Debug("HTLC Switch shutdown complete")
4✔
2267

4✔
2268
        close(s.quit)
4✔
2269

4✔
2270
        s.wg.Wait()
4✔
2271

4✔
2272
        // Wait until all active goroutines have finished exiting before
4✔
2273
        // stopping the mailboxes, otherwise the mailbox map could still be
4✔
2274
        // accessed and modified.
4✔
2275
        s.mailOrchestrator.Stop()
4✔
2276

4✔
2277
        return nil
4✔
2278
}
2279

2280
// CreateAndAddLink will create a link and then add it to the internal maps
2281
// when given a ChannelLinkConfig and LightningChannel.
2282
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
2283
        lnChan *lnwallet.LightningChannel) error {
4✔
2284

4✔
2285
        link := NewChannelLink(linkCfg, lnChan)
4✔
2286
        return s.AddLink(link)
4✔
2287
}
4✔
2288

2289
// AddLink is used to initiate the handling of the add link command. The
2290
// request will be propagated and handled in the main goroutine.
2291
func (s *Switch) AddLink(link ChannelLink) error {
4✔
2292
        s.indexMtx.Lock()
4✔
2293
        defer s.indexMtx.Unlock()
4✔
2294

4✔
2295
        chanID := link.ChanID()
4✔
2296

4✔
2297
        // First, ensure that this link is not already active in the switch.
4✔
2298
        _, err := s.getLink(chanID)
4✔
2299
        if err == nil {
4✔
2300
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
×
2301
                        "active", chanID)
×
2302
        }
×
2303

2304
        // Get and attach the mailbox for this link, which buffers packets in
2305
        // case there packets that we tried to deliver while this link was
2306
        // offline.
2307
        shortChanID := link.ShortChanID()
4✔
2308
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
4✔
2309
        link.AttachMailBox(mailbox)
4✔
2310

4✔
2311
        // Attach the Switch's failAliasUpdate function to the link.
4✔
2312
        link.attachFailAliasUpdate(s.failAliasUpdate)
4✔
2313

4✔
2314
        if err := link.Start(); err != nil {
4✔
2315
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2316
                        chanID, err)
×
2317
                s.removeLink(chanID)
×
2318
                return err
×
2319
        }
×
2320

2321
        if shortChanID == hop.Source {
8✔
2322
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
4✔
2323
                        chanID, shortChanID)
4✔
2324

4✔
2325
                s.pendingLinkIndex[chanID] = link
4✔
2326
        } else {
8✔
2327
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
4✔
2328
                        chanID, shortChanID)
4✔
2329

4✔
2330
                s.addLiveLink(link)
4✔
2331
                s.mailOrchestrator.BindLiveShortChanID(
4✔
2332
                        mailbox, chanID, shortChanID,
4✔
2333
                )
4✔
2334
        }
4✔
2335

2336
        return nil
4✔
2337
}
2338

2339
// addLiveLink adds a link to all associated forwarding index, this makes it a
2340
// candidate for forwarding HTLCs.
2341
func (s *Switch) addLiveLink(link ChannelLink) {
4✔
2342
        linkScid := link.ShortChanID()
4✔
2343

4✔
2344
        // We'll add the link to the linkIndex which lets us quickly
4✔
2345
        // look up a channel when we need to close or register it, and
4✔
2346
        // the forwarding index which'll be used when forwarding HTLC's
4✔
2347
        // in the multi-hop setting.
4✔
2348
        s.linkIndex[link.ChanID()] = link
4✔
2349
        s.forwardingIndex[linkScid] = link
4✔
2350

4✔
2351
        // Next we'll add the link to the interface index so we can
4✔
2352
        // quickly look up all the channels for a particular node.
4✔
2353
        peerPub := link.PeerPubKey()
4✔
2354
        if _, ok := s.interfaceIndex[peerPub]; !ok {
8✔
2355
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
4✔
2356
        }
4✔
2357
        s.interfaceIndex[peerPub][link.ChanID()] = link
4✔
2358

4✔
2359
        aliases := link.getAliases()
4✔
2360
        if link.isZeroConf() {
8✔
2361
                if link.zeroConfConfirmed() {
8✔
2362
                        // Since the zero-conf channel has confirmed, we can
4✔
2363
                        // populate the aliasToReal mapping.
4✔
2364
                        confirmedScid := link.confirmedScid()
4✔
2365

4✔
2366
                        for _, alias := range aliases {
8✔
2367
                                s.aliasToReal[alias] = confirmedScid
4✔
2368
                        }
4✔
2369

2370
                        // Add the confirmed SCID as a key in the baseIndex.
2371
                        s.baseIndex[confirmedScid] = linkScid
4✔
2372
                }
2373

2374
                // Now we populate the baseIndex which will be used to fetch
2375
                // the link given any of the channel's alias SCIDs or the real
2376
                // SCID. The link's SCID is an alias, so we don't need to
2377
                // special-case it like the option-scid-alias feature-bit case
2378
                // further down.
2379
                for _, alias := range aliases {
8✔
2380
                        s.baseIndex[alias] = linkScid
4✔
2381
                }
4✔
2382
        } else if link.negotiatedAliasFeature() {
8✔
2383
                // The link's SCID is the confirmed SCID for non-zero-conf
4✔
2384
                // option-scid-alias feature bit channels.
4✔
2385
                for _, alias := range aliases {
8✔
2386
                        s.aliasToReal[alias] = linkScid
4✔
2387
                        s.baseIndex[alias] = linkScid
4✔
2388
                }
4✔
2389

2390
                // Since the link's SCID is confirmed, it was not included in
2391
                // the baseIndex above as a key. Add it now.
2392
                s.baseIndex[linkScid] = linkScid
4✔
2393
        }
2394
}
2395

2396
// GetLink is used to initiate the handling of the get link command. The
2397
// request will be propagated/handled to/in the main goroutine.
2398
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2399
        error) {
4✔
2400

4✔
2401
        s.indexMtx.RLock()
4✔
2402
        defer s.indexMtx.RUnlock()
4✔
2403

4✔
2404
        return s.getLink(chanID)
4✔
2405
}
4✔
2406

2407
// getLink returns the link stored in either the pending index or the live
2408
// lindex.
2409
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
4✔
2410
        link, ok := s.linkIndex[chanID]
4✔
2411
        if !ok {
8✔
2412
                link, ok = s.pendingLinkIndex[chanID]
4✔
2413
                if !ok {
8✔
2414
                        return nil, ErrChannelLinkNotFound
4✔
2415
                }
4✔
2416
        }
2417

2418
        return link, nil
4✔
2419
}
2420

2421
// GetLinkByShortID attempts to return the link which possesses the target short
2422
// channel ID.
2423
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
2424
        error) {
4✔
2425

4✔
2426
        s.indexMtx.RLock()
4✔
2427
        defer s.indexMtx.RUnlock()
4✔
2428

4✔
2429
        link, err := s.getLinkByShortID(chanID)
4✔
2430
        if err != nil {
8✔
2431
                // If we failed to find the link under the passed-in SCID, we
4✔
2432
                // consult the Switch's baseIndex map to see if the confirmed
4✔
2433
                // SCID was used for a zero-conf channel.
4✔
2434
                aliasID, ok := s.baseIndex[chanID]
4✔
2435
                if !ok {
4✔
2436
                        return nil, err
×
2437
                }
×
2438

2439
                // An alias was found, use it to lookup if a link exists.
2440
                return s.getLinkByShortID(aliasID)
4✔
2441
        }
2442

2443
        return link, nil
4✔
2444
}
2445

2446
// getLinkByShortID attempts to return the link which possesses the target
2447
// short channel ID.
2448
//
2449
// NOTE: This MUST be called with the indexMtx held.
2450
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
4✔
2451
        link, ok := s.forwardingIndex[chanID]
4✔
2452
        if !ok {
8✔
2453
                return nil, ErrChannelLinkNotFound
4✔
2454
        }
4✔
2455

2456
        return link, nil
4✔
2457
}
2458

2459
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2460
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2461
// the outgoingChanID will be changed so that an error can be properly
2462
// attributed when looping over linkErrs in handlePacketForward.
2463
//
2464
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2465
// public or not.
2466
//
2467
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2468
//   - If there is no entry found in baseIndex, fetch the link. This channel
2469
//     did not have the option-scid-alias feature negotiated (which includes
2470
//     zero-conf and option-scid-alias channel-types).
2471
//   - If there is an entry found, fetch the link from forwardingIndex and
2472
//     fail if this is a private link.
2473
//
2474
// NOTE: This MUST be called with the indexMtx read lock held.
2475
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
4✔
2476
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
4✔
2477
        chanID := pkt.outgoingChanID
4✔
2478
        aliasID := s.cfg.IsAlias(chanID)
4✔
2479

4✔
2480
        // Set the originalOutgoingChanID so the proper channel_update can be
4✔
2481
        // sent back if the option-scid-alias feature bit was negotiated.
4✔
2482
        pkt.originalOutgoingChanID = chanID
4✔
2483

4✔
2484
        if aliasID {
8✔
2485
                // Since outgoingChanID is an alias, we'll fetch the link via
4✔
2486
                // baseIndex.
4✔
2487
                baseScid, ok := s.baseIndex[chanID]
4✔
2488
                if !ok {
4✔
2489
                        // No mapping exists, bail.
×
2490
                        return nil, ErrChannelLinkNotFound
×
2491
                }
×
2492

2493
                // A mapping exists, so use baseScid to find the link in the
2494
                // forwardingIndex.
2495
                link, ok := s.forwardingIndex[baseScid]
4✔
2496
                if !ok {
4✔
2497
                        // Link not found, bail.
×
2498
                        return nil, ErrChannelLinkNotFound
×
2499
                }
×
2500

2501
                // Change the packet's outgoingChanID field so that errors are
2502
                // properly attributed.
2503
                pkt.outgoingChanID = baseScid
4✔
2504

4✔
2505
                // Return the link without checking if it's private or not.
4✔
2506
                return link, nil
4✔
2507
        }
2508

2509
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2510
        // SCID from baseIndex.
2511
        baseScid, ok := s.baseIndex[chanID]
4✔
2512
        if !ok {
8✔
2513
                // outgoingChanID is not a key in base index meaning this
4✔
2514
                // channel did not have the option-scid-alias feature bit
4✔
2515
                // negotiated. We'll fetch the link and return it.
4✔
2516
                link, ok := s.forwardingIndex[chanID]
4✔
2517
                if !ok {
8✔
2518
                        // The link wasn't found, bail out.
4✔
2519
                        return nil, ErrChannelLinkNotFound
4✔
2520
                }
4✔
2521

2522
                return link, nil
4✔
2523
        }
2524

2525
        // Fetch the link whose internal SCID is baseScid.
2526
        link, ok := s.forwardingIndex[baseScid]
4✔
2527
        if !ok {
4✔
2528
                // Link wasn't found, bail out.
×
2529
                return nil, ErrChannelLinkNotFound
×
2530
        }
×
2531

2532
        // If the link is unadvertised, we fail since the real SCID was used to
2533
        // forward over it and this is a channel where the option-scid-alias
2534
        // feature bit was negotiated.
2535
        if link.IsUnadvertised() {
4✔
2536
                return nil, ErrChannelLinkNotFound
×
2537
        }
×
2538

2539
        // The link is public so the confirmed SCID can be used to forward over
2540
        // it. We'll also replace pkt's outgoingChanID field so errors can
2541
        // properly be attributed in the calling function.
2542
        pkt.outgoingChanID = baseScid
4✔
2543
        return link, nil
4✔
2544
}
2545

2546
// HasActiveLink returns true if the given channel ID has a link in the link
2547
// index AND the link is eligible to forward.
2548
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
4✔
2549
        s.indexMtx.RLock()
4✔
2550
        defer s.indexMtx.RUnlock()
4✔
2551

4✔
2552
        if link, ok := s.linkIndex[chanID]; ok {
8✔
2553
                return link.EligibleToForward()
4✔
2554
        }
4✔
2555

2556
        return false
4✔
2557
}
2558

2559
// RemoveLink purges the switch of any link associated with chanID. If a pending
2560
// or active link is not found, this method does nothing. Otherwise, the method
2561
// returns after the link has been completely shutdown.
2562
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
4✔
2563
        s.indexMtx.Lock()
4✔
2564
        link, err := s.getLink(chanID)
4✔
2565
        if err != nil {
8✔
2566
                // If err is non-nil, this means that link is also nil. The
4✔
2567
                // link variable cannot be nil without err being non-nil.
4✔
2568
                s.indexMtx.Unlock()
4✔
2569
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
4✔
2570
                        chanID, err)
4✔
2571
                return
4✔
2572
        }
4✔
2573

2574
        // Check if the link is already stopping and grab the stop chan if it
2575
        // is.
2576
        stopChan, ok := s.linkStopIndex[chanID]
4✔
2577
        if !ok {
8✔
2578
                // If the link is non-nil, it is not currently stopping, so
4✔
2579
                // we'll add a stop chan to the linkStopIndex.
4✔
2580
                stopChan = make(chan struct{})
4✔
2581
                s.linkStopIndex[chanID] = stopChan
4✔
2582
        }
4✔
2583
        s.indexMtx.Unlock()
4✔
2584

4✔
2585
        if ok {
4✔
2586
                // If the stop chan exists, we will wait for it to be closed.
×
2587
                // Once it is closed, we will exit.
×
2588
                select {
×
2589
                case <-stopChan:
×
2590
                        return
×
2591
                case <-s.quit:
×
2592
                        return
×
2593
                }
2594
        }
2595

2596
        // Stop the link before removing it from the maps.
2597
        link.Stop()
4✔
2598

4✔
2599
        s.indexMtx.Lock()
4✔
2600
        _ = s.removeLink(chanID)
4✔
2601

4✔
2602
        // Close stopChan and remove this link from the linkStopIndex.
4✔
2603
        // Deleting from the index and removing from the link must be done
4✔
2604
        // in the same block while the mutex is held.
4✔
2605
        close(stopChan)
4✔
2606
        delete(s.linkStopIndex, chanID)
4✔
2607
        s.indexMtx.Unlock()
4✔
2608
}
2609

2610
// removeLink is used to remove and stop the channel link.
2611
//
2612
// NOTE: This MUST be called with the indexMtx held.
2613
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
4✔
2614
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
4✔
2615

4✔
2616
        link, err := s.getLink(chanID)
4✔
2617
        if err != nil {
4✔
2618
                return nil
×
2619
        }
×
2620

2621
        // Remove the channel from live link indexes.
2622
        delete(s.pendingLinkIndex, link.ChanID())
4✔
2623
        delete(s.linkIndex, link.ChanID())
4✔
2624
        delete(s.forwardingIndex, link.ShortChanID())
4✔
2625

4✔
2626
        // If the link has been added to the peer index, then we'll move to
4✔
2627
        // delete the entry within the index.
4✔
2628
        peerPub := link.PeerPubKey()
4✔
2629
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
8✔
2630
                delete(peerIndex, link.ChanID())
4✔
2631

4✔
2632
                // If after deletion, there are no longer any links, then we'll
4✔
2633
                // remove the interface map all together.
4✔
2634
                if len(peerIndex) == 0 {
8✔
2635
                        delete(s.interfaceIndex, peerPub)
4✔
2636
                }
4✔
2637
        }
2638

2639
        return link
4✔
2640
}
2641

2642
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2643
// underlying channel state. This is only used in zero-conf channels to allow
2644
// the confirmed SCID to be updated.
2645
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
4✔
2646
        s.indexMtx.Lock()
4✔
2647
        defer s.indexMtx.Unlock()
4✔
2648

4✔
2649
        // Locate the target link in the link index. If no such link exists,
4✔
2650
        // then we will ignore the request.
4✔
2651
        link, ok := s.linkIndex[chanID]
4✔
2652
        if !ok {
4✔
2653
                return fmt.Errorf("link %v not found", chanID)
×
2654
        }
×
2655

2656
        // Try to update the link's underlying channel state, returning early
2657
        // if this update failed.
2658
        _, err := link.UpdateShortChanID()
4✔
2659
        if err != nil {
4✔
2660
                return err
×
2661
        }
×
2662

2663
        // Since the zero-conf channel is confirmed, we should populate the
2664
        // aliasToReal map and update the baseIndex.
2665
        aliases := link.getAliases()
4✔
2666

4✔
2667
        confirmedScid := link.confirmedScid()
4✔
2668

4✔
2669
        for _, alias := range aliases {
8✔
2670
                s.aliasToReal[alias] = confirmedScid
4✔
2671
        }
4✔
2672

2673
        s.baseIndex[confirmedScid] = link.ShortChanID()
4✔
2674

4✔
2675
        return nil
4✔
2676
}
2677

2678
// GetLinksByInterface fetches all the links connected to a particular node
2679
// identified by the serialized compressed form of its public key.
2680
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
2681
        error) {
4✔
2682

4✔
2683
        s.indexMtx.RLock()
4✔
2684
        defer s.indexMtx.RUnlock()
4✔
2685

4✔
2686
        var handlers []ChannelUpdateHandler
4✔
2687

4✔
2688
        links, err := s.getLinks(hop)
4✔
2689
        if err != nil {
8✔
2690
                return nil, err
4✔
2691
        }
4✔
2692

2693
        // Range over the returned []ChannelLink to convert them into
2694
        // []ChannelUpdateHandler.
2695
        for _, link := range links {
8✔
2696
                handlers = append(handlers, link)
4✔
2697
        }
4✔
2698

2699
        return handlers, nil
4✔
2700
}
2701

2702
// getLinks is function which returns the channel links of the peer by hop
2703
// destination id.
2704
//
2705
// NOTE: This MUST be called with the indexMtx held.
2706
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
4✔
2707
        links, ok := s.interfaceIndex[destination]
4✔
2708
        if !ok {
8✔
2709
                return nil, ErrNoLinksFound
4✔
2710
        }
4✔
2711

2712
        channelLinks := make([]ChannelLink, 0, len(links))
4✔
2713
        for _, link := range links {
8✔
2714
                channelLinks = append(channelLinks, link)
4✔
2715
        }
4✔
2716

2717
        return channelLinks, nil
4✔
2718
}
2719

2720
// CircuitModifier returns a reference to subset of the interfaces provided by
2721
// the circuit map, to allow links to open and close circuits.
2722
func (s *Switch) CircuitModifier() CircuitModifier {
4✔
2723
        return s.circuits
4✔
2724
}
4✔
2725

2726
// CircuitLookup returns a reference to subset of the interfaces provided by the
2727
// circuit map, to allow looking up circuits.
2728
func (s *Switch) CircuitLookup() CircuitLookup {
4✔
2729
        return s.circuits
4✔
2730
}
4✔
2731

2732
// commitCircuits persistently adds a circuit to the switch's circuit map.
2733
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2734
        *CircuitFwdActions, error) {
×
2735

×
2736
        return s.circuits.CommitCircuits(circuits...)
×
2737
}
×
2738

2739
// FlushForwardingEvents flushes out the set of pending forwarding events to
2740
// the persistent log. This will be used by the switch to periodically flush
2741
// out the set of forwarding events to disk. External callers can also use this
2742
// method to ensure all data is flushed to dis before querying the log.
2743
func (s *Switch) FlushForwardingEvents() error {
4✔
2744
        // First, we'll obtain a copy of the current set of pending forwarding
4✔
2745
        // events.
4✔
2746
        s.fwdEventMtx.Lock()
4✔
2747

4✔
2748
        // If we won't have any forwarding events, then we can exit early.
4✔
2749
        if len(s.pendingFwdingEvents) == 0 {
8✔
2750
                s.fwdEventMtx.Unlock()
4✔
2751
                return nil
4✔
2752
        }
4✔
2753

2754
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
4✔
2755
        copy(events[:], s.pendingFwdingEvents[:])
4✔
2756

4✔
2757
        // With the copy obtained, we can now clear out the header pointer of
4✔
2758
        // the current slice. This way, we can re-use the underlying storage
4✔
2759
        // allocated for the slice.
4✔
2760
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
4✔
2761
        s.fwdEventMtx.Unlock()
4✔
2762

4✔
2763
        // Finally, we'll write out the copied events to the persistent
4✔
2764
        // forwarding log.
4✔
2765
        return s.cfg.FwdingLog.AddForwardingEvents(events)
4✔
2766
}
2767

2768
// BestHeight returns the best height known to the switch.
2769
func (s *Switch) BestHeight() uint32 {
4✔
2770
        return atomic.LoadUint32(&s.bestHeight)
4✔
2771
}
4✔
2772

2773
// evaluateDustThreshold takes in a ChannelLink, HTLC amount, and a boolean to
2774
// determine whether the default dust threshold has been exceeded. This
2775
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2776
// commitment's dust with the mailbox's dust with the amount is checked against
2777
// the default threshold. If incoming is true, then the amount is not included
2778
// in the sum as it was already included in the commitment's dust. A boolean is
2779
// returned telling the caller whether the HTLC should be failed back.
2780
func (s *Switch) evaluateDustThreshold(link ChannelLink,
2781
        amount lnwire.MilliSatoshi, incoming bool) bool {
4✔
2782

4✔
2783
        // Retrieve the link's current commitment feerate and dustClosure.
4✔
2784
        feeRate := link.getFeeRate()
4✔
2785
        isDust := link.getDustClosure()
4✔
2786

4✔
2787
        // Evaluate if the HTLC is dust on either sides' commitment.
4✔
2788
        isLocalDust := isDust(feeRate, incoming, true, amount.ToSatoshis())
4✔
2789
        isRemoteDust := isDust(feeRate, incoming, false, amount.ToSatoshis())
4✔
2790

4✔
2791
        if !(isLocalDust || isRemoteDust) {
8✔
2792
                // If the HTLC is not dust on either commitment, it's fine to
4✔
2793
                // forward.
4✔
2794
                return false
4✔
2795
        }
4✔
2796

2797
        // Fetch the dust sums currently in the mailbox for this link.
2798
        cid := link.ChanID()
4✔
2799
        sid := link.ShortChanID()
4✔
2800
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
4✔
2801
        localMailDust, remoteMailDust := mailbox.DustPackets()
4✔
2802

4✔
2803
        // If the htlc is dust on the local commitment, we'll obtain the dust
4✔
2804
        // sum for it.
4✔
2805
        if isLocalDust {
8✔
2806
                localSum := link.getDustSum(false)
4✔
2807
                localSum += localMailDust
4✔
2808

4✔
2809
                // Optionally include the HTLC amount only for outgoing
4✔
2810
                // HTLCs.
4✔
2811
                if !incoming {
8✔
2812
                        localSum += amount
4✔
2813
                }
4✔
2814

2815
                // Finally check against the defined dust threshold.
2816
                if localSum > s.cfg.DustThreshold {
4✔
2817
                        return true
×
2818
                }
×
2819
        }
2820

2821
        // Also check if the htlc is dust on the remote commitment, if we've
2822
        // reached this point.
2823
        if isRemoteDust {
8✔
2824
                remoteSum := link.getDustSum(true)
4✔
2825
                remoteSum += remoteMailDust
4✔
2826

4✔
2827
                // Optionally include the HTLC amount only for outgoing
4✔
2828
                // HTLCs.
4✔
2829
                if !incoming {
8✔
2830
                        remoteSum += amount
4✔
2831
                }
4✔
2832

2833
                // Finally check against the defined dust threshold.
2834
                if remoteSum > s.cfg.DustThreshold {
4✔
2835
                        return true
×
2836
                }
×
2837
        }
2838

2839
        // If we reached this point, this HTLC is fine to forward.
2840
        return false
4✔
2841
}
2842

2843
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2844
// it to individual mailboxes. It allows the mailboxes to construct a
2845
// FailureMessage when failing back HTLC's due to expiry and may include an
2846
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2847
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2848
// use. The mailboxScid is only used in the non-alias case, so it is always
2849
// the confirmed SCID.
2850
func (s *Switch) failMailboxUpdate(outgoingScid,
2851
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
4✔
2852

4✔
2853
        // Try to use the failAliasUpdate function in case this is a channel
4✔
2854
        // that uses aliases. If it returns nil, we'll fallback to the original
4✔
2855
        // pre-alias behavior.
4✔
2856
        update := s.failAliasUpdate(outgoingScid, false)
4✔
2857
        if update == nil {
8✔
2858
                // Execute the fallback behavior.
4✔
2859
                var err error
4✔
2860
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
4✔
2861
                if err != nil {
4✔
2862
                        return &lnwire.FailTemporaryNodeFailure{}
×
2863
                }
×
2864
        }
2865

2866
        return lnwire.NewTemporaryChannelFailure(update)
4✔
2867
}
2868

2869
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2870
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2871
// the associated channel is not one of these, this function will return nil
2872
// and the caller is expected to handle this properly. In this case, a return
2873
// to the original non-alias behavior is expected.
2874
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2875
        incoming bool) *lnwire.ChannelUpdate {
4✔
2876

4✔
2877
        // This function does not defer the unlocking because of the database
4✔
2878
        // lookups for ChannelUpdate.
4✔
2879
        s.indexMtx.RLock()
4✔
2880

4✔
2881
        if s.cfg.IsAlias(scid) {
8✔
2882
                // The alias SCID was used. In the incoming case this means
4✔
2883
                // the channel is zero-conf as the link sets the scid. In the
4✔
2884
                // outgoing case, the sender set the scid to use and may be
4✔
2885
                // either the alias or the confirmed one, if it exists.
4✔
2886
                realScid, ok := s.aliasToReal[scid]
4✔
2887
                if !ok {
4✔
2888
                        // The real, confirmed SCID does not exist yet. Find
×
2889
                        // the "base" SCID that the link uses via the
×
2890
                        // baseIndex. If we can't find it, return nil. This
×
2891
                        // means the channel is zero-conf.
×
2892
                        baseScid, ok := s.baseIndex[scid]
×
2893
                        s.indexMtx.RUnlock()
×
2894
                        if !ok {
×
2895
                                return nil
×
2896
                        }
×
2897

2898
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2899
                        if err != nil {
×
2900
                                return nil
×
2901
                        }
×
2902

2903
                        // Replace the baseScid with the passed-in alias.
2904
                        update.ShortChannelID = scid
×
2905
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2906
                        if err != nil {
×
2907
                                return nil
×
2908
                        }
×
2909

2910
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2911
                        if err != nil {
×
2912
                                return nil
×
2913
                        }
×
2914

2915
                        return update
×
2916
                }
2917

2918
                s.indexMtx.RUnlock()
4✔
2919

4✔
2920
                // Fetch the SCID via the confirmed SCID and replace it with
4✔
2921
                // the alias.
4✔
2922
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
4✔
2923
                if err != nil {
8✔
2924
                        return nil
4✔
2925
                }
4✔
2926

2927
                // In the incoming case, we want to ensure that we don't leak
2928
                // the UTXO in case the channel is private. In the outgoing
2929
                // case, since the alias was used, we do the same thing.
2930
                update.ShortChannelID = scid
4✔
2931
                sig, err := s.cfg.SignAliasUpdate(update)
4✔
2932
                if err != nil {
4✔
2933
                        return nil
×
2934
                }
×
2935

2936
                update.Signature, err = lnwire.NewSigFromSignature(sig)
4✔
2937
                if err != nil {
4✔
2938
                        return nil
×
2939
                }
×
2940

2941
                return update
4✔
2942
        }
2943

2944
        // If the confirmed SCID is not in baseIndex, this is not an
2945
        // option-scid-alias or zero-conf channel.
2946
        baseScid, ok := s.baseIndex[scid]
4✔
2947
        if !ok {
8✔
2948
                s.indexMtx.RUnlock()
4✔
2949
                return nil
4✔
2950
        }
4✔
2951

2952
        // Fetch the link so we can get an alias to use in the ShortChannelID
2953
        // of the ChannelUpdate.
2954
        link, ok := s.forwardingIndex[baseScid]
×
2955
        s.indexMtx.RUnlock()
×
2956
        if !ok {
×
2957
                // This should never happen, but if it does for some reason,
×
2958
                // fallback to the old behavior.
×
2959
                return nil
×
2960
        }
×
2961

2962
        aliases := link.getAliases()
×
2963
        if len(aliases) == 0 {
×
2964
                // This should never happen, but if it does, fallback.
×
2965
                return nil
×
2966
        }
×
2967

2968
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2969
        update, err := s.cfg.FetchLastChannelUpdate(scid)
×
2970
        if err != nil {
×
2971
                return nil
×
2972
        }
×
2973

2974
        // The incoming case will replace the ShortChannelID in the retrieved
2975
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2976
        // would happen if a private non-zero-conf option-scid-alias
2977
        // feature-bit channel leaked its UTXO here rather than supplying an
2978
        // alias. In the outgoing case, the confirmed SCID was actually used
2979
        // for forwarding in the onion, so no replacement is necessary as the
2980
        // sender knows the scid.
2981
        if incoming {
×
2982
                // We will replace and sign the update with the first alias.
×
2983
                // Since this happens on the incoming side, it's not actually
×
2984
                // possible to know what the sender used in the onion.
×
2985
                update.ShortChannelID = aliases[0]
×
2986
                sig, err := s.cfg.SignAliasUpdate(update)
×
2987
                if err != nil {
×
2988
                        return nil
×
2989
                }
×
2990

2991
                update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2992
                if err != nil {
×
2993
                        return nil
×
2994
                }
×
2995
        }
2996

2997
        return update
×
2998
}
2999

3000
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
3001
// that a link has a new alias.
3002
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
3003
        alias lnwire.ShortChannelID) error {
×
3004

×
3005
        // Fetch the link so that we can update the underlying channel's set of
×
3006
        // aliases.
×
3007
        s.indexMtx.RLock()
×
3008
        link, err := s.getLink(chanID)
×
3009
        s.indexMtx.RUnlock()
×
3010
        if err != nil {
×
3011
                return err
×
3012
        }
×
3013

3014
        // If the link is a channel where the option-scid-alias feature bit was
3015
        // not negotiated, we'll return an error.
3016
        if !link.negotiatedAliasFeature() {
×
3017
                return fmt.Errorf("attempted to update non-alias channel")
×
3018
        }
×
3019

3020
        linkScid := link.ShortChanID()
×
3021

×
3022
        // We'll update the maps so the Switch includes this alias in its
×
3023
        // forwarding decisions.
×
3024
        if link.isZeroConf() {
×
3025
                if link.zeroConfConfirmed() {
×
3026
                        // If the channel has confirmed on-chain, we'll
×
3027
                        // add this alias to the aliasToReal map.
×
3028
                        confirmedScid := link.confirmedScid()
×
3029

×
3030
                        s.aliasToReal[alias] = confirmedScid
×
3031
                }
×
3032

3033
                // Add this alias to the baseIndex mapping.
3034
                s.baseIndex[alias] = linkScid
×
3035
        } else if link.negotiatedAliasFeature() {
×
3036
                // The channel is confirmed, so we'll populate the aliasToReal
×
3037
                // and baseIndex maps.
×
3038
                s.aliasToReal[alias] = linkScid
×
3039
                s.baseIndex[alias] = linkScid
×
3040
        }
×
3041

3042
        return nil
×
3043
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc