• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18197857992

02 Oct 2025 03:32PM UTC coverage: 66.622% (-0.02%) from 66.646%
18197857992

Pull #10267

github

web-flow
Merge 0d9bfccfe into 1c2ff4a7e
Pull Request #10267: [g175] multi: small G175 preparations

24 of 141 new or added lines in 12 files covered. (17.02%)

64 existing lines in 20 files now uncovered.

137216 of 205963 relevant lines covered (66.62%)

21302.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.7
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math/rand"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/clock"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn/v2"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
23
        "github.com/lightningnetwork/lnd/kvdb"
24
        "github.com/lightningnetwork/lnd/lntypes"
25
        "github.com/lightningnetwork/lnd/lnutils"
26
        "github.com/lightningnetwork/lnd/lnwallet"
27
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
28
        "github.com/lightningnetwork/lnd/lnwire"
29
        "github.com/lightningnetwork/lnd/ticker"
30
)
31

32
const (
33
        // DefaultFwdEventInterval is the duration between attempts to flush
34
        // pending forwarding events to disk.
35
        DefaultFwdEventInterval = 15 * time.Second
36

37
        // DefaultLogInterval is the duration between attempts to log statistics
38
        // about forwarding events.
39
        DefaultLogInterval = 10 * time.Second
40

41
        // DefaultAckInterval is the duration between attempts to ack any settle
42
        // fails in a forwarding package.
43
        DefaultAckInterval = 15 * time.Second
44

45
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
46
        // be cancelled if they could not get added to an outgoing commitment.
47
        DefaultMailboxDeliveryTimeout = time.Minute
48
)
49

50
var (
51
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
52
        ErrChannelLinkNotFound = errors.New("channel link not found")
53

54
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
55
        // through the switch and is locked into another commitment txn.
56
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
57

58
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
59
        // error decryptor for this payment. This is likely due to restarting
60
        // the daemon.
61
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
62

63
        // ErrSwitchExiting signaled when the switch has received a shutdown
64
        // request.
65
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
66

67
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
68
        // active links in the switch for a specific destination.
69
        ErrNoLinksFound = errors.New("no channel links found")
70

71
        // ErrUnreadableFailureMessage is returned when the failure message
72
        // cannot be decrypted.
73
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
74

75
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
76
        // failed to be processed.
77
        ErrLocalAddFailed = errors.New("local add HTLC failed")
78

79
        // errFeeExposureExceeded is only surfaced to callers of SendHTLC and
80
        // signals that sending the HTLC would exceed the outgoing link's fee
81
        // exposure threshold.
82
        errFeeExposureExceeded = errors.New("fee exposure exceeded")
83

84
        // DefaultMaxFeeExposure is the default threshold after which we'll
85
        // fail payments if they increase our fee exposure. This is currently
86
        // set to 500m msats.
87
        DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000)
88
)
89

90
// plexPacket encapsulates switch packet and adds error channel to receive
91
// error from request handler.
92
type plexPacket struct {
93
        pkt *htlcPacket
94
        err chan error
95
}
96

97
// ChanClose represents a request which close a particular channel specified by
98
// its id.
99
type ChanClose struct {
100
        // CloseType is a variable which signals the type of channel closure the
101
        // peer should execute.
102
        CloseType contractcourt.ChannelCloseType
103

104
        // ChanPoint represent the id of the channel which should be closed.
105
        ChanPoint *wire.OutPoint
106

107
        // TargetFeePerKw is the ideal fee that was specified by the caller.
108
        // This value is only utilized if the closure type is CloseRegular.
109
        // This will be the starting offered fee when the fee negotiation
110
        // process for the cooperative closure transaction kicks off.
111
        TargetFeePerKw chainfee.SatPerKWeight
112

113
        // MaxFee is the highest fee the caller is willing to pay.
114
        //
115
        // NOTE: This field is only respected if the caller is the initiator of
116
        // the channel.
117
        MaxFee chainfee.SatPerKWeight
118

119
        // DeliveryScript is an optional delivery script to pay funds out to.
120
        DeliveryScript lnwire.DeliveryAddress
121

122
        // Updates is used by request creator to receive the notifications about
123
        // execution of the close channel request.
124
        Updates chan interface{}
125

126
        // Err is used by request creator to receive request execution error.
127
        Err chan error
128

129
        // Ctx is a context linked to the lifetime of the caller.
130
        Ctx context.Context //nolint:containedctx
131
}
132

133
// Config defines the configuration for the service. ALL elements within the
134
// configuration MUST be non-nil for the service to carry out its duties.
135
type Config struct {
136
        // FwdingLog is an interface that will be used by the switch to log
137
        // forwarding events. A forwarding event happens each time a payment
138
        // circuit is successfully completed. So when we forward an HTLC, and a
139
        // settle is eventually received.
140
        FwdingLog ForwardingLog
141

142
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
143
        // forced unilateral closure of the channel initiated by a local
144
        // subsystem.
145
        LocalChannelClose func(pubKey []byte, request *ChanClose)
146

147
        // DB is the database backend that will be used to back the switch's
148
        // persistent circuit map.
149
        DB kvdb.Backend
150

151
        // FetchAllOpenChannels is a function that fetches all currently open
152
        // channels from the channel database.
153
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
154

155
        // FetchAllChannels is a function that fetches all pending open, open,
156
        // and waiting close channels from the database.
157
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
158

159
        // FetchClosedChannels is a function that fetches all closed channels
160
        // from the channel database.
161
        FetchClosedChannels func(
162
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
163

164
        // SwitchPackager provides access to the forwarding packages of all
165
        // active channels. This gives the switch the ability to read arbitrary
166
        // forwarding packages, and ack settles and fails contained within them.
167
        SwitchPackager channeldb.FwdOperator
168

169
        // ExtractErrorEncrypter is an interface allowing switch to reextract
170
        // error encrypters stored in the circuit map on restarts, since they
171
        // are not stored directly within the database.
172
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
173

174
        // FetchLastChannelUpdate retrieves the latest routing policy for a
175
        // target channel. This channel will typically be the outgoing channel
176
        // specified when we receive an incoming HTLC.  This will be used to
177
        // provide payment senders our latest policy when sending encrypted
178
        // error messages.
179
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
180
                *lnwire.ChannelUpdate1, error)
181

182
        // Notifier is an instance of a chain notifier that we'll use to signal
183
        // the switch when a new block has arrived.
184
        Notifier chainntnfs.ChainNotifier
185

186
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
187
        // events through.
188
        HtlcNotifier htlcNotifier
189

190
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
191
        // pending forwarding events.
192
        FwdEventTicker ticker.Ticker
193

194
        // LogEventTicker is a signal instructing the htlcswitch to log
195
        // aggregate stats about it's forwarding during the last interval.
196
        LogEventTicker ticker.Ticker
197

198
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
199
        // fails in forwarding packages.
200
        AckEventTicker ticker.Ticker
201

202
        // AllowCircularRoute is true if the user has configured their node to
203
        // allow forwards that arrive and depart our node over the same channel.
204
        AllowCircularRoute bool
205

206
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
207
        // HTLCs that are not from the source hop.
208
        RejectHTLC bool
209

210
        // Clock is a time source for the switch.
211
        Clock clock.Clock
212

213
        // MailboxDeliveryTimeout is the interval after which Adds will be
214
        // cancelled if they have not been yet been delivered to a link. The
215
        // computed deadline will expiry this long after the Adds are added to
216
        // a mailbox via AddPacket.
217
        MailboxDeliveryTimeout time.Duration
218

219
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
220
        // fail incoming or outgoing payments for a particular channel.
221
        MaxFeeExposure lnwire.MilliSatoshi
222

223
        // SignAliasUpdate is used when sending FailureMessages backwards for
224
        // option_scid_alias channels. This avoids a potential privacy leak by
225
        // replacing the public, confirmed SCID with the alias in the
226
        // ChannelUpdate.
227
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
228
                error)
229

230
        // IsAlias returns whether or not a given SCID is an alias.
231
        IsAlias func(scid lnwire.ShortChannelID) bool
232
}
233

234
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
235
// Connected peers with active channels are treated as named interfaces which
236
// refer to active channels as links. A link is the switch's message
237
// communication point with the goroutine that manages an active channel. New
238
// links are registered each time a channel is created, and unregistered once
239
// the channel is closed. The switch manages the hand-off process for multi-hop
240
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
241
// notifies users local-systems concerning their outstanding payment requests.
242
type Switch struct {
243
        started  int32 // To be used atomically.
244
        shutdown int32 // To be used atomically.
245

246
        // bestHeight is the best known height of the main chain. The links will
247
        // be used this information to govern decisions based on HTLC timeouts.
248
        // This will be retrieved by the registered links atomically.
249
        bestHeight uint32
250

251
        wg   sync.WaitGroup
252
        quit chan struct{}
253

254
        // cfg is a copy of the configuration struct that the htlc switch
255
        // service was initialized with.
256
        cfg *Config
257

258
        // networkResults stores the results of payments initiated by the user.
259
        // The store is used to later look up the payments and notify the
260
        // user of the result when they are complete. Each payment attempt
261
        // should be given a unique integer ID when it is created, otherwise
262
        // results might be overwritten.
263
        networkResults *networkResultStore
264

265
        // circuits is storage for payment circuits which are used to
266
        // forward the settle/fail htlc updates back to the add htlc initiator.
267
        circuits CircuitMap
268

269
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
270
        // the switch, and facilitates delayed delivery of packets to links that
271
        // later come online.
272
        mailOrchestrator *mailOrchestrator
273

274
        // indexMtx is a read/write mutex that protects the set of indexes
275
        // below.
276
        indexMtx sync.RWMutex
277

278
        // pendingLinkIndex holds links that have not had their final, live
279
        // short_chan_id assigned.
280
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
281

282
        // links is a map of channel id and channel link which manages
283
        // this channel.
284
        linkIndex map[lnwire.ChannelID]ChannelLink
285

286
        // forwardingIndex is an index which is consulted by the switch when it
287
        // needs to locate the next hop to forward an incoming/outgoing HTLC
288
        // update to/from.
289
        //
290
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
291
        // ChannelLink
292
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
293

294
        // interfaceIndex maps the compressed public key of a peer to all the
295
        // channels that the switch maintains with that peer.
296
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
297

298
        // linkStopIndex stores the currently stopping ChannelLinks,
299
        // represented by their ChannelID. The key is the link's ChannelID and
300
        // the value is a chan that is closed when the link has fully stopped.
301
        // This map is only added to if RemoveLink is called and is not added
302
        // to when the Switch is shutting down and calls Stop() on each link.
303
        //
304
        // MUST be used with the indexMtx.
305
        linkStopIndex map[lnwire.ChannelID]chan struct{}
306

307
        // htlcPlex is the channel which all connected links use to coordinate
308
        // the setup/teardown of Sphinx (onion routing) payment circuits.
309
        // Active links forward any add/settle messages over this channel each
310
        // state transition, sending new adds/settles which are fully locked
311
        // in.
312
        htlcPlex chan *plexPacket
313

314
        // chanCloseRequests is used to transfer the channel close request to
315
        // the channel close handler.
316
        chanCloseRequests chan *ChanClose
317

318
        // resolutionMsgs is the channel that all external contract resolution
319
        // messages will be sent over.
320
        resolutionMsgs chan *resolutionMsg
321

322
        // pendingFwdingEvents is the set of forwarding events which have been
323
        // collected during the current interval, but hasn't yet been written
324
        // to the forwarding log.
325
        fwdEventMtx         sync.Mutex
326
        pendingFwdingEvents []channeldb.ForwardingEvent
327

328
        // blockEpochStream is an active block epoch event stream backed by an
329
        // active ChainNotifier instance. This will be used to retrieve the
330
        // latest height of the chain.
331
        blockEpochStream *chainntnfs.BlockEpochEvent
332

333
        // pendingSettleFails is the set of settle/fail entries that we need to
334
        // ack in the forwarding package of the outgoing link. This was added to
335
        // make pipelining settles more efficient.
336
        pendingSettleFails []channeldb.SettleFailRef
337

338
        // resMsgStore is used to store the set of ResolutionMsg that come from
339
        // contractcourt. This is used so the Switch can properly forward them,
340
        // even on restarts.
341
        resMsgStore *resolutionStore
342

343
        // aliasToReal is a map used for option-scid-alias feature-bit links.
344
        // The alias SCID is the key and the real, confirmed SCID is the value.
345
        // If the channel is unconfirmed, there will not be a mapping for it.
346
        // Since channels can have multiple aliases, this map is essentially a
347
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
348
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
349

350
        // baseIndex is a map used for option-scid-alias feature-bit links.
351
        // The value is the SCID of the link's ShortChannelID. This value may
352
        // be an alias for zero-conf channels or a confirmed SCID for
353
        // non-zero-conf channels with the option-scid-alias feature-bit. The
354
        // key includes the value itself and also any other aliases. This MUST
355
        // be accessed with the indexMtx.
356
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
357
}
358

359
// New creates the new instance of htlc switch.
360
func New(cfg Config, currentHeight uint32) (*Switch, error) {
344✔
361
        resStore := newResolutionStore(cfg.DB)
344✔
362

344✔
363
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
344✔
364
                DB:                    cfg.DB,
344✔
365
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
344✔
366
                FetchClosedChannels:   cfg.FetchClosedChannels,
344✔
367
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
344✔
368
                CheckResolutionMsg:    resStore.checkResolutionMsg,
344✔
369
        })
344✔
370
        if err != nil {
344✔
371
                return nil, err
×
372
        }
×
373

374
        s := &Switch{
344✔
375
                bestHeight:        currentHeight,
344✔
376
                cfg:               &cfg,
344✔
377
                circuits:          circuitMap,
344✔
378
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
344✔
379
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
344✔
380
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
344✔
381
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
344✔
382
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
344✔
383
                networkResults:    newNetworkResultStore(cfg.DB),
344✔
384
                htlcPlex:          make(chan *plexPacket),
344✔
385
                chanCloseRequests: make(chan *ChanClose),
344✔
386
                resolutionMsgs:    make(chan *resolutionMsg),
344✔
387
                resMsgStore:       resStore,
344✔
388
                quit:              make(chan struct{}),
344✔
389
        }
344✔
390

344✔
391
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
344✔
392
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
344✔
393

344✔
394
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
344✔
395
                forwardPackets:    s.ForwardPackets,
344✔
396
                clock:             s.cfg.Clock,
344✔
397
                expiry:            s.cfg.MailboxDeliveryTimeout,
344✔
398
                failMailboxUpdate: s.failMailboxUpdate,
344✔
399
        })
344✔
400

344✔
401
        return s, nil
344✔
402
}
403

404
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
405
// channel. We'll use this channel to synchronize delivery of the message with
406
// the caller.
407
type resolutionMsg struct {
408
        contractcourt.ResolutionMsg
409

410
        errChan chan error
411
}
412

413
// ProcessContractResolution is called by active contract resolvers once a
414
// contract they are watching over has been fully resolved. The message carries
415
// an external signal that *would* have been sent if the outgoing channel
416
// didn't need to go to the chain in order to fulfill a contract. We'll process
417
// this message just as if it came from an active outgoing channel.
418
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
4✔
419
        errChan := make(chan error, 1)
4✔
420

4✔
421
        select {
4✔
422
        case s.resolutionMsgs <- &resolutionMsg{
423
                ResolutionMsg: msg,
424
                errChan:       errChan,
425
        }:
4✔
426
        case <-s.quit:
×
427
                return ErrSwitchExiting
×
428
        }
429

430
        select {
4✔
431
        case err := <-errChan:
4✔
432
                return err
4✔
433
        case <-s.quit:
×
434
                return ErrSwitchExiting
×
435
        }
436
}
437

438
// HasAttemptResult reads the network result store to fetch the specified
439
// attempt. Returns true if the attempt result exists.
440
func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) {
3✔
441
        _, err := s.networkResults.getResult(attemptID)
3✔
442
        if err == nil {
3✔
443
                return true, nil
×
444
        }
×
445

446
        if !errors.Is(err, ErrPaymentIDNotFound) {
3✔
447
                return false, err
×
448
        }
×
449

450
        return false, nil
3✔
451
}
452

453
// GetAttemptResult returns the result of the HTLC attempt with the given
454
// attemptID. The paymentHash should be set to the payment's overall hash, or
455
// in case of AMP payments the payment's unique identifier.
456
//
457
// The method returns a channel where the HTLC attempt result will be sent when
458
// available, or an error is encountered during forwarding. When a result is
459
// received on the channel, the HTLC is guaranteed to no longer be in flight.
460
// The switch shutting down is signaled by closing the channel. If the
461
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
462
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
463
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
310✔
464

310✔
465
        var (
310✔
466
                nChan <-chan *networkResult
310✔
467
                err   error
310✔
468
                inKey = CircuitKey{
310✔
469
                        ChanID: hop.Source,
310✔
470
                        HtlcID: attemptID,
310✔
471
                }
310✔
472
        )
310✔
473

310✔
474
        // If the HTLC is not found in the circuit map, check whether a result
310✔
475
        // is already available.
310✔
476
        // Assumption: no one will add this attempt ID other than the caller.
310✔
477
        if s.circuits.LookupCircuit(inKey) == nil {
315✔
478
                res, err := s.networkResults.getResult(attemptID)
5✔
479
                if err != nil {
7✔
480
                        return nil, err
2✔
481
                }
2✔
482
                c := make(chan *networkResult, 1)
3✔
483
                c <- res
3✔
484
                nChan = c
3✔
485
        } else {
305✔
486
                // The HTLC was committed to the circuits, subscribe for a
305✔
487
                // result.
305✔
488
                nChan, err = s.networkResults.subscribeResult(attemptID)
305✔
489
                if err != nil {
305✔
490
                        return nil, err
×
491
                }
×
492
        }
493

494
        resultChan := make(chan *PaymentResult, 1)
308✔
495

308✔
496
        // Since the attempt was known, we can start a goroutine that can
308✔
497
        // extract the result when it is available, and pass it on to the
308✔
498
        // caller.
308✔
499
        s.wg.Add(1)
308✔
500
        go func() {
616✔
501
                defer s.wg.Done()
308✔
502

308✔
503
                var n *networkResult
308✔
504
                select {
308✔
505
                case n = <-nChan:
304✔
506
                case <-s.quit:
7✔
507
                        // We close the result channel to signal a shutdown. We
7✔
508
                        // don't send any result in this case since the HTLC is
7✔
509
                        // still in flight.
7✔
510
                        close(resultChan)
7✔
511
                        return
7✔
512
                }
513

514
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
304✔
515
                        attemptID)
304✔
516

304✔
517
                // Extract the result and pass it to the result channel.
304✔
518
                result, err := s.extractResult(
304✔
519
                        deobfuscator, n, attemptID, paymentHash,
304✔
520
                )
304✔
521
                if err != nil {
304✔
522
                        e := fmt.Errorf("unable to extract result: %w", err)
×
523
                        log.Error(e)
×
524
                        resultChan <- &PaymentResult{
×
525
                                Error: e,
×
526
                        }
×
527
                        return
×
528
                }
×
529
                resultChan <- result
304✔
530
        }()
531

532
        return resultChan, nil
308✔
533
}
534

535
// CleanStore calls the underlying result store, telling it is safe to delete
536
// all entries except the ones in the keepPids map. This should be called
537
// preiodically to let the switch clean up payment results that we have
538
// handled.
539
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
3✔
540
        return s.networkResults.cleanStore(keepPids)
3✔
541
}
3✔
542

543
// SendHTLC is used by other subsystems which aren't belong to htlc switch
544
// package in order to send the htlc update. The attemptID used MUST be unique
545
// for this HTLC, and MUST be used only once, otherwise the switch might reject
546
// it.
547
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
548
        htlc *lnwire.UpdateAddHTLC) error {
419✔
549

419✔
550
        // Generate and send new update packet, if error will be received on
419✔
551
        // this stage it means that packet haven't left boundaries of our
419✔
552
        // system and something wrong happened.
419✔
553
        packet := &htlcPacket{
419✔
554
                incomingChanID: hop.Source,
419✔
555
                incomingHTLCID: attemptID,
419✔
556
                outgoingChanID: firstHop,
419✔
557
                htlc:           htlc,
419✔
558
                amount:         htlc.Amount,
419✔
559
        }
419✔
560

419✔
561
        // Attempt to fetch the target link before creating a circuit so that
419✔
562
        // we don't leave dangling circuits. The getLocalLink method does not
419✔
563
        // require the circuit variable to be set on the *htlcPacket.
419✔
564
        link, linkErr := s.getLocalLink(packet, htlc)
419✔
565
        if linkErr != nil {
426✔
566
                // Notify the htlc notifier of a link failure on our outgoing
7✔
567
                // link. Incoming timelock/amount values are not set because
7✔
568
                // they are not present for local sends.
7✔
569
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
7✔
570
                        newHtlcKey(packet),
7✔
571
                        HtlcInfo{
7✔
572
                                OutgoingTimeLock: htlc.Expiry,
7✔
573
                                OutgoingAmt:      htlc.Amount,
7✔
574
                        },
7✔
575
                        HtlcEventTypeSend,
7✔
576
                        linkErr,
7✔
577
                        false,
7✔
578
                )
7✔
579

7✔
580
                return linkErr
7✔
581
        }
7✔
582

583
        // Evaluate whether this HTLC would bypass our fee exposure. If it
584
        // does, don't send it out and instead return an error.
585
        if s.dustExceedsFeeThreshold(link, htlc.Amount, false) {
416✔
586
                // Notify the htlc notifier of a link failure on our outgoing
1✔
587
                // link. We use the FailTemporaryChannelFailure in place of a
1✔
588
                // more descriptive error message.
1✔
589
                linkErr := NewLinkError(
1✔
590
                        &lnwire.FailTemporaryChannelFailure{},
1✔
591
                )
1✔
592
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
1✔
593
                        newHtlcKey(packet),
1✔
594
                        HtlcInfo{
1✔
595
                                OutgoingTimeLock: htlc.Expiry,
1✔
596
                                OutgoingAmt:      htlc.Amount,
1✔
597
                        },
1✔
598
                        HtlcEventTypeSend,
1✔
599
                        linkErr,
1✔
600
                        false,
1✔
601
                )
1✔
602

1✔
603
                return errFeeExposureExceeded
1✔
604
        }
1✔
605

606
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
414✔
607
        actions, err := s.circuits.CommitCircuits(circuit)
414✔
608
        if err != nil {
414✔
609
                log.Errorf("unable to commit circuit in switch: %v", err)
×
610
                return err
×
611
        }
×
612

613
        // Drop duplicate packet if it has already been seen.
614
        switch {
414✔
615
        case len(actions.Drops) == 1:
1✔
616
                return ErrDuplicateAdd
1✔
617

618
        case len(actions.Fails) == 1:
×
619
                return ErrLocalAddFailed
×
620
        }
621

622
        // Give the packet to the link's mailbox so that HTLC's are properly
623
        // canceled back if the mailbox timeout elapses.
624
        packet.circuit = circuit
413✔
625

413✔
626
        return link.handleSwitchPacket(packet)
413✔
627
}
628

629
// UpdateForwardingPolicies sends a message to the switch to update the
630
// forwarding policies for the set of target channels, keyed in chanPolicies.
631
//
632
// NOTE: This function is synchronous and will block until either the
633
// forwarding policies for all links have been updated, or the switch shuts
634
// down.
635
func (s *Switch) UpdateForwardingPolicies(
636
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
3✔
637

3✔
638
        log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure(
3✔
639
                chanPolicies))
3✔
640

3✔
641
        s.indexMtx.RLock()
3✔
642

3✔
643
        // Update each link in chanPolicies.
3✔
644
        for targetLink, policy := range chanPolicies {
6✔
645
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
3✔
646

3✔
647
                link, ok := s.linkIndex[cid]
3✔
648
                if !ok {
3✔
649
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
650
                                "link policy", targetLink)
×
651
                        continue
×
652
                }
653

654
                link.UpdateForwardingPolicy(policy)
3✔
655
        }
656

657
        s.indexMtx.RUnlock()
3✔
658
}
659

660
// IsForwardedHTLC checks for a given channel and htlc index if it is related
661
// to an opened circuit that represents a forwarded payment.
662
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
663
        htlcIndex uint64) bool {
5✔
664

5✔
665
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
5✔
666
                ChanID: chanID,
5✔
667
                HtlcID: htlcIndex,
5✔
668
        })
5✔
669
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
5✔
670
}
5✔
671

672
// ForwardPackets adds a list of packets to the switch for processing. Fails
673
// and settles are added on a first past, simultaneously constructing circuits
674
// for any adds. After persisting the circuits, another pass of the adds is
675
// given to forward them through the router. The sending link's quit channel is
676
// used to prevent deadlocks when the switch stops a link in the midst of
677
// forwarding.
678
func (s *Switch) ForwardPackets(linkQuit <-chan struct{},
679
        packets ...*htlcPacket) error {
872✔
680

872✔
681
        var (
872✔
682
                // fwdChan is a buffered channel used to receive err msgs from
872✔
683
                // the htlcPlex when forwarding this batch.
872✔
684
                fwdChan = make(chan error, len(packets))
872✔
685

872✔
686
                // numSent keeps a running count of how many packets are
872✔
687
                // forwarded to the switch, which determines how many responses
872✔
688
                // we will wait for on the fwdChan..
872✔
689
                numSent int
872✔
690
        )
872✔
691

872✔
692
        // No packets, nothing to do.
872✔
693
        if len(packets) == 0 {
1,095✔
694
                return nil
223✔
695
        }
223✔
696

697
        // Setup a barrier to prevent the background tasks from processing
698
        // responses until this function returns to the user.
699
        var wg sync.WaitGroup
652✔
700
        wg.Add(1)
652✔
701
        defer wg.Done()
652✔
702

652✔
703
        // Before spawning the following goroutine to proxy our error responses,
652✔
704
        // check to see if we have already been issued a shutdown request. If
652✔
705
        // so, we exit early to avoid incrementing the switch's waitgroup while
652✔
706
        // it is already in the process of shutting down.
652✔
707
        select {
652✔
708
        case <-linkQuit:
1✔
709
                return nil
1✔
710
        case <-s.quit:
1✔
711
                return nil
1✔
712
        default:
651✔
713
                // Spawn a goroutine to log the errors returned from failed packets.
651✔
714
                s.wg.Add(1)
651✔
715
                go s.logFwdErrs(&numSent, &wg, fwdChan)
651✔
716
        }
717

718
        // Make a first pass over the packets, forwarding any settles or fails.
719
        // As adds are found, we create a circuit and append it to our set of
720
        // circuits to be written to disk.
721
        var circuits []*PaymentCircuit
651✔
722
        var addBatch []*htlcPacket
651✔
723
        for _, packet := range packets {
1,302✔
724
                switch htlc := packet.htlc.(type) {
651✔
725
                case *lnwire.UpdateAddHTLC:
90✔
726
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
90✔
727
                        packet.circuit = circuit
90✔
728
                        circuits = append(circuits, circuit)
90✔
729
                        addBatch = append(addBatch, packet)
90✔
730
                default:
564✔
731
                        err := s.routeAsync(packet, fwdChan, linkQuit)
564✔
732
                        if err != nil {
574✔
733
                                return fmt.Errorf("failed to forward packet %w",
10✔
734
                                        err)
10✔
735
                        }
10✔
736
                        numSent++
539✔
737
                }
738
        }
739

740
        // If this batch did not contain any circuits to commit, we can return
741
        // early.
742
        if len(circuits) == 0 {
1,165✔
743
                return nil
539✔
744
        }
539✔
745

746
        // Write any circuits that we found to disk.
747
        actions, err := s.circuits.CommitCircuits(circuits...)
90✔
748
        if err != nil {
90✔
749
                log.Errorf("unable to commit circuits in switch: %v", err)
×
750
        }
×
751

752
        // Split the htlc packets by comparing an in-order seek to the head of
753
        // the added, dropped, or failed circuits.
754
        //
755
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
756
        // circuits, and that the union of the sets results in the original set
757
        // of circuits.
758
        var addedPackets, failedPackets []*htlcPacket
90✔
759
        for _, packet := range addBatch {
180✔
760
                switch {
90✔
761
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
86✔
762
                        addedPackets = append(addedPackets, packet)
86✔
763
                        actions.Adds = actions.Adds[1:]
86✔
764

765
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
4✔
766
                        actions.Drops = actions.Drops[1:]
4✔
767

768
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
3✔
769
                        failedPackets = append(failedPackets, packet)
3✔
770
                        actions.Fails = actions.Fails[1:]
3✔
771
                }
772
        }
773

774
        // Now, forward any packets for circuits that were successfully added to
775
        // the switch's circuit map.
776
        for _, packet := range addedPackets {
176✔
777
                err := s.routeAsync(packet, fwdChan, linkQuit)
86✔
778
                if err != nil {
87✔
779
                        return fmt.Errorf("failed to forward packet %w", err)
1✔
780
                }
1✔
781
                numSent++
85✔
782
        }
783

784
        // Lastly, for any packets that failed, this implies that they were
785
        // left in a half added state, which can happen when recovering from
786
        // failures.
787
        if len(failedPackets) > 0 {
92✔
788
                var failure lnwire.FailureMessage
3✔
789
                incomingID := failedPackets[0].incomingChanID
3✔
790

3✔
791
                // If the incoming channel is an option_scid_alias channel,
3✔
792
                // then we'll need to replace the SCID in the ChannelUpdate.
3✔
793
                update := s.failAliasUpdate(incomingID, true)
3✔
794
                if update == nil {
4✔
795
                        // Fallback to the original non-option behavior.
1✔
796
                        update, err := s.cfg.FetchLastChannelUpdate(
1✔
797
                                incomingID,
1✔
798
                        )
1✔
799
                        if err != nil {
1✔
800
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
801
                        } else {
1✔
802
                                failure = lnwire.NewTemporaryChannelFailure(
1✔
803
                                        update,
1✔
804
                                )
1✔
805
                        }
1✔
806
                } else {
2✔
807
                        // This is an option_scid_alias channel.
2✔
808
                        failure = lnwire.NewTemporaryChannelFailure(update)
2✔
809
                }
2✔
810

811
                linkError := NewDetailedLinkError(
3✔
812
                        failure, OutgoingFailureIncompleteForward,
3✔
813
                )
3✔
814

3✔
815
                for _, packet := range failedPackets {
6✔
816
                        // We don't handle the error here since this method
3✔
817
                        // always returns an error.
3✔
818
                        _ = s.failAddPacket(packet, linkError)
3✔
819
                }
3✔
820
        }
821

822
        return nil
89✔
823
}
824

825
// logFwdErrs logs any errors received on `fwdChan`.
826
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
651✔
827
        defer s.wg.Done()
651✔
828

651✔
829
        // Wait here until the outer function has finished persisting
651✔
830
        // and routing the packets. This guarantees we don't read from num until
651✔
831
        // the value is accurate.
651✔
832
        wg.Wait()
651✔
833

651✔
834
        numSent := *num
651✔
835
        for i := 0; i < numSent; i++ {
1,272✔
836
                select {
621✔
837
                case err := <-fwdChan:
618✔
838
                        if err != nil {
644✔
839
                                log.Errorf("Unhandled error while reforwarding htlc "+
26✔
840
                                        "settle/fail over htlcswitch: %v", err)
26✔
841
                        }
26✔
842
                case <-s.quit:
3✔
843
                        log.Errorf("unable to forward htlc packet " +
3✔
844
                                "htlc switch was stopped")
3✔
845
                        return
3✔
846
                }
847
        }
848
}
849

850
// routeAsync sends a packet through the htlc switch, using the provided err
851
// chan to propagate errors back to the caller. The link's quit channel is
852
// provided so that the send can be canceled if either the link or the switch
853
// receive a shutdown requuest. This method does not wait for a response from
854
// the htlcForwarder before returning.
855
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
856
        linkQuit <-chan struct{}) error {
647✔
857

647✔
858
        command := &plexPacket{
647✔
859
                pkt: packet,
647✔
860
                err: errChan,
647✔
861
        }
647✔
862

647✔
863
        select {
647✔
864
        case s.htlcPlex <- command:
621✔
865
                return nil
621✔
866
        case <-linkQuit:
11✔
867
                return ErrLinkShuttingDown
11✔
868
        case <-s.quit:
×
869
                return errors.New("htlc switch was stopped")
×
870
        }
871
}
872

873
// getLocalLink handles the addition of a htlc for a send that originates from
874
// our node. It returns the link that the htlc should be forwarded outwards on,
875
// and a link error if the htlc cannot be forwarded.
876
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
877
        ChannelLink, *LinkError) {
419✔
878

419✔
879
        // Try to find links by node destination.
419✔
880
        s.indexMtx.RLock()
419✔
881
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
419✔
882
        if err != nil {
423✔
883
                // If the link was not found for the outgoingChanID, an outside
4✔
884
                // subsystem may be using the confirmed SCID of a zero-conf
4✔
885
                // channel. In this case, we'll consult the Switch maps to see
4✔
886
                // if an alias exists and use the alias to lookup the link.
4✔
887
                // This extra step is a consequence of not updating the Switch
4✔
888
                // forwardingIndex when a zero-conf channel is confirmed. We
4✔
889
                // don't need to change the outgoingChanID since the link will
4✔
890
                // do that upon receiving the packet.
4✔
891
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
4✔
892
                if !ok {
7✔
893
                        s.indexMtx.RUnlock()
3✔
894
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
3✔
895
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
3✔
896
                }
3✔
897

898
                // The base SCID was found, so we'll use that to fetch the
899
                // link.
900
                link, err = s.getLinkByShortID(baseScid)
4✔
901
                if err != nil {
4✔
902
                        s.indexMtx.RUnlock()
×
903
                        log.Errorf("Link %v not found", baseScid)
×
904
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
905
                }
×
906
        }
907
        // We finished looking up the indexes, so we can unlock the mutex before
908
        // performing the link operations which might also acquire the lock
909
        // in case e.g. failAliasUpdate is called.
910
        s.indexMtx.RUnlock()
419✔
911

419✔
912
        if !link.EligibleToForward() {
420✔
913
                log.Errorf("Link %v is not available to forward",
1✔
914
                        pkt.outgoingChanID)
1✔
915

1✔
916
                // The update does not need to be populated as the error
1✔
917
                // will be returned back to the router.
1✔
918
                return nil, NewDetailedLinkError(
1✔
919
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
920
                        OutgoingFailureLinkNotEligible,
1✔
921
                )
1✔
922
        }
1✔
923

924
        // Ensure that the htlc satisfies the outgoing channel policy.
925
        currentHeight := atomic.LoadUint32(&s.bestHeight)
418✔
926
        htlcErr := link.CheckHtlcTransit(
418✔
927
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
418✔
928
                htlc.CustomRecords,
418✔
929
        )
418✔
930
        if htlcErr != nil {
423✔
931
                log.Errorf("Link %v policy for local forward not "+
5✔
932
                        "satisfied", pkt.outgoingChanID)
5✔
933
                return nil, htlcErr
5✔
934
        }
5✔
935

936
        return link, nil
415✔
937
}
938

939
// handleLocalResponse processes a Settle or Fail responding to a
940
// locally-initiated payment. This is handled asynchronously to avoid blocking
941
// the main event loop within the switch, as these operations can require
942
// multiple db transactions. The guarantees of the circuit map are stringent
943
// enough such that we are able to tolerate reordering of these operations
944
// without side effects. The primary operations handled are:
945
//  1. Save the payment result to the pending payment store.
946
//  2. Notify subscribers about the payment result.
947
//  3. Ack settle/fail references, to avoid resending this response internally
948
//  4. Teardown the closing circuit in the circuit map
949
//
950
// NOTE: This method MUST be spawned as a goroutine.
951
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
307✔
952
        defer s.wg.Done()
307✔
953

307✔
954
        attemptID := pkt.incomingHTLCID
307✔
955

307✔
956
        // The error reason will be unencypted in case this a local
307✔
957
        // failure or a converted error.
307✔
958
        unencrypted := pkt.localFailure || pkt.convertedError
307✔
959
        n := &networkResult{
307✔
960
                msg:          pkt.htlc,
307✔
961
                unencrypted:  unencrypted,
307✔
962
                isResolution: pkt.isResolution,
307✔
963
        }
307✔
964

307✔
965
        // Store the result to the db. This will also notify subscribers about
307✔
966
        // the result.
307✔
967
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
307✔
968
                log.Errorf("Unable to store attempt result for pid=%v: %v",
×
969
                        attemptID, err)
×
970
                return
×
971
        }
×
972

973
        // First, we'll clean up any fwdpkg references, circuit entries, and
974
        // mark in our db that the payment for this payment hash has either
975
        // succeeded or failed.
976
        //
977
        // If this response is contained in a forwarding package, we'll start by
978
        // acking the settle/fail so that we don't continue to retransmit the
979
        // HTLC internally.
980
        if pkt.destRef != nil {
428✔
981
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
121✔
982
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
983
                                *pkt.destRef, err)
×
984
                        return
×
985
                }
×
986
        }
987

988
        // Next, we'll remove the circuit since we are about to complete an
989
        // fulfill/fail of this HTLC. Since we've already removed the
990
        // settle/fail fwdpkg reference, the response from the peer cannot be
991
        // replayed internally if this step fails. If this happens, this logic
992
        // will be executed when a provided resolution message comes through.
993
        // This can only happen if the circuit is still open, which is why this
994
        // ordering is chosen.
995
        if err := s.teardownCircuit(pkt); err != nil {
307✔
996
                log.Errorf("Unable to teardown circuit %s: %v",
×
997
                        pkt.inKey(), err)
×
998
                return
×
999
        }
×
1000

1001
        // Finally, notify on the htlc failure or success that has been handled.
1002
        key := newHtlcKey(pkt)
307✔
1003
        eventType := getEventType(pkt)
307✔
1004

307✔
1005
        switch htlc := pkt.htlc.(type) {
307✔
1006
        case *lnwire.UpdateFulfillHTLC:
183✔
1007
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
183✔
1008
                        eventType)
183✔
1009

1010
        case *lnwire.UpdateFailHTLC:
127✔
1011
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
127✔
1012
        }
1013
}
1014

1015
// extractResult uses the given deobfuscator to extract the payment result from
1016
// the given network message.
1017
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
1018
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
304✔
1019

304✔
1020
        switch htlc := n.msg.(type) {
304✔
1021

1022
        // We've received a settle update which means we can finalize the user
1023
        // payment and return successful response.
1024
        case *lnwire.UpdateFulfillHTLC:
180✔
1025
                return &PaymentResult{
180✔
1026
                        Preimage: htlc.PaymentPreimage,
180✔
1027
                }, nil
180✔
1028

1029
        // We've received a fail update which means we can finalize the
1030
        // user payment and return fail response.
1031
        case *lnwire.UpdateFailHTLC:
127✔
1032
                // TODO(yy): construct deobfuscator here to avoid creating it
127✔
1033
                // in paymentLifecycle even for settled HTLCs.
127✔
1034
                paymentErr := s.parseFailedPayment(
127✔
1035
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
127✔
1036
                        n.isResolution, htlc,
127✔
1037
                )
127✔
1038

127✔
1039
                return &PaymentResult{
127✔
1040
                        Error: paymentErr,
127✔
1041
                }, nil
127✔
1042

1043
        default:
×
1044
                return nil, fmt.Errorf("received unknown response type: %T",
×
1045
                        htlc)
×
1046
        }
1047
}
1048

1049
// parseFailedPayment determines the appropriate failure message to return to
1050
// a user initiated payment. The three cases handled are:
1051
//  1. An unencrypted failure, which should already plaintext.
1052
//  2. A resolution from the chain arbitrator, which possibly has no failure
1053
//     reason attached.
1054
//  3. A failure from the remote party, which will need to be decrypted using
1055
//     the payment deobfuscator.
1056
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1057
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1058
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
127✔
1059

127✔
1060
        switch {
127✔
1061

1062
        // The payment never cleared the link, so we don't need to
1063
        // decrypt the error, simply decode it them report back to the
1064
        // user.
1065
        case unencrypted:
7✔
1066
                r := bytes.NewReader(htlc.Reason)
7✔
1067
                failureMsg, err := lnwire.DecodeFailure(r, 0)
7✔
1068
                if err != nil {
7✔
1069
                        // If we could not decode the failure reason, return a link
×
1070
                        // error indicating that we failed to decode the onion.
×
1071
                        linkError := NewDetailedLinkError(
×
1072
                                // As this didn't even clear the link, we don't
×
1073
                                // need to apply an update here since it goes
×
1074
                                // directly to the router.
×
1075
                                lnwire.NewTemporaryChannelFailure(nil),
×
1076
                                OutgoingFailureDecodeError,
×
1077
                        )
×
1078

×
1079
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1080
                                linkError.FailureDetail.FailureString(),
×
1081
                                paymentHash, attemptID, err)
×
1082

×
1083
                        return linkError
×
1084
                }
×
1085

1086
                // If we successfully decoded the failure reason, return it.
1087
                return NewLinkError(failureMsg)
7✔
1088

1089
        // A payment had to be timed out on chain before it got past
1090
        // the first hop. In this case, we'll report a permanent
1091
        // channel failure as this means us, or the remote party had to
1092
        // go on chain.
1093
        case isResolution && htlc.Reason == nil:
3✔
1094
                linkError := NewDetailedLinkError(
3✔
1095
                        &lnwire.FailPermanentChannelFailure{},
3✔
1096
                        OutgoingFailureOnChainTimeout,
3✔
1097
                )
3✔
1098

3✔
1099
                log.Infof("%v: hash=%v, pid=%d",
3✔
1100
                        linkError.FailureDetail.FailureString(),
3✔
1101
                        paymentHash, attemptID)
3✔
1102

3✔
1103
                return linkError
3✔
1104

1105
        // A regular multi-hop payment error that we'll need to
1106
        // decrypt.
1107
        default:
122✔
1108
                // We'll attempt to fully decrypt the onion encrypted
122✔
1109
                // error. If we're unable to then we'll bail early.
122✔
1110
                failure, err := deobfuscator.DecryptError(htlc.Reason)
122✔
1111
                if err != nil {
123✔
1112
                        log.Errorf("unable to de-obfuscate onion failure "+
1✔
1113
                                "(hash=%v, pid=%d): %v",
1✔
1114
                                paymentHash, attemptID, err)
1✔
1115

1✔
1116
                        return ErrUnreadableFailureMessage
1✔
1117
                }
1✔
1118

1119
                return failure
121✔
1120
        }
1121
}
1122

1123
// handlePacketForward is used in cases when we need forward the htlc update
1124
// from one channel link to another and be able to propagate the settle/fail
1125
// updates back. This behaviour is achieved by creation of payment circuits.
1126
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
622✔
1127
        switch htlc := packet.htlc.(type) {
622✔
1128
        // Channel link forwarded us a new htlc, therefore we initiate the
1129
        // payment circuit within our internal state so we can properly forward
1130
        // the ultimate settle message back latter.
1131
        case *lnwire.UpdateAddHTLC:
85✔
1132
                return s.handlePacketAdd(packet, htlc)
85✔
1133

1134
        case *lnwire.UpdateFulfillHTLC:
402✔
1135
                return s.handlePacketSettle(packet)
402✔
1136

1137
        // Channel link forwarded us an update_fail_htlc message.
1138
        //
1139
        // NOTE: when the channel link receives an update_fail_malformed_htlc
1140
        // from upstream, it will convert the message into update_fail_htlc and
1141
        // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC`
1142
        // here.
1143
        case *lnwire.UpdateFailHTLC:
141✔
1144
                return s.handlePacketFail(packet, htlc)
141✔
1145

1146
        default:
×
1147
                return fmt.Errorf("wrong update type: %T", htlc)
×
1148
        }
1149
}
1150

1151
// checkCircularForward checks whether a forward is circular (arrives and
1152
// departs on the same link) and returns a link error if the switch is
1153
// configured to disallow this behaviour.
1154
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1155
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
93✔
1156

93✔
1157
        log.Tracef("Checking for circular route: incoming=%v, outgoing=%v "+
93✔
1158
                "(payment hash: %x)", incoming, outgoing, paymentHash[:])
93✔
1159

93✔
1160
        // If they are equal, we can skip the alias mapping checks.
93✔
1161
        if incoming == outgoing {
97✔
1162
                // The switch may be configured to allow circular routes, so
4✔
1163
                // just log and return nil.
4✔
1164
                if allowCircular {
6✔
1165
                        log.Debugf("allowing circular route over link: %v "+
2✔
1166
                                "(payment hash: %x)", incoming, paymentHash)
2✔
1167
                        return nil
2✔
1168
                }
2✔
1169

1170
                // Otherwise, we'll return a temporary channel failure.
1171
                return NewDetailedLinkError(
2✔
1172
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1173
                        OutgoingFailureCircularRoute,
2✔
1174
                )
2✔
1175
        }
1176

1177
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1178
        // outgoing SCIDs. If either one does not have a base SCID, then the
1179
        // two channels are not equal since one will be a channel that does not
1180
        // need a mapping and SCID equality was checked above. If the "base"
1181
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1182
        s.indexMtx.RLock()
89✔
1183
        incomingBaseScid, ok := s.baseIndex[incoming]
89✔
1184
        if !ok {
172✔
1185
                // This channel does not use baseIndex, bail out.
83✔
1186
                s.indexMtx.RUnlock()
83✔
1187
                return nil
83✔
1188
        }
83✔
1189

1190
        outgoingBaseScid, ok := s.baseIndex[outgoing]
9✔
1191
        if !ok {
14✔
1192
                // This channel does not use baseIndex, bail out.
5✔
1193
                s.indexMtx.RUnlock()
5✔
1194
                return nil
5✔
1195
        }
5✔
1196
        s.indexMtx.RUnlock()
7✔
1197

7✔
1198
        // Check base SCID equality.
7✔
1199
        if incomingBaseScid != outgoingBaseScid {
10✔
1200
                log.Tracef("Incoming base SCID %v does not match outgoing "+
3✔
1201
                        "base SCID %v (payment hash: %x)", incomingBaseScid,
3✔
1202
                        outgoingBaseScid, paymentHash[:])
3✔
1203

3✔
1204
                // The base SCIDs are not equal so these are not the same
3✔
1205
                // channel.
3✔
1206
                return nil
3✔
1207
        }
3✔
1208

1209
        // If the incoming and outgoing link are equal, the htlc is part of a
1210
        // circular route which may be used to lock up our liquidity. If the
1211
        // switch is configured to allow circular routes, log that we are
1212
        // allowing the route then return nil.
1213
        if allowCircular {
6✔
1214
                log.Debugf("allowing circular route over link: %v "+
2✔
1215
                        "(payment hash: %x)", incoming, paymentHash)
2✔
1216
                return nil
2✔
1217
        }
2✔
1218

1219
        // If our node disallows circular routes, return a temporary channel
1220
        // failure. There is nothing wrong with the policy used by the remote
1221
        // node, so we do not include a channel update.
1222
        return NewDetailedLinkError(
2✔
1223
                lnwire.NewTemporaryChannelFailure(nil),
2✔
1224
                OutgoingFailureCircularRoute,
2✔
1225
        )
2✔
1226
}
1227

1228
// failAddPacket encrypts a fail packet back to an add packet's source.
1229
// The ciphertext will be derived from the failure message proivded by context.
1230
// This method returns the failErr if all other steps complete successfully.
1231
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
29✔
1232
        // Encrypt the failure so that the sender will be able to read the error
29✔
1233
        // message. Since we failed this packet, we use EncryptFirstHop to
29✔
1234
        // obfuscate the failure for their eyes only.
29✔
1235
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
29✔
1236
        if err != nil {
29✔
1237
                err := fmt.Errorf("unable to obfuscate "+
×
1238
                        "error: %v", err)
×
1239
                log.Error(err)
×
1240
                return err
×
1241
        }
×
1242

1243
        log.Error(failure.Error())
29✔
1244

29✔
1245
        // Create a failure packet for this htlc. The full set of
29✔
1246
        // information about the htlc failure is included so that they can
29✔
1247
        // be included in link failure notifications.
29✔
1248
        failPkt := &htlcPacket{
29✔
1249
                sourceRef:       packet.sourceRef,
29✔
1250
                incomingChanID:  packet.incomingChanID,
29✔
1251
                incomingHTLCID:  packet.incomingHTLCID,
29✔
1252
                outgoingChanID:  packet.outgoingChanID,
29✔
1253
                outgoingHTLCID:  packet.outgoingHTLCID,
29✔
1254
                incomingAmount:  packet.incomingAmount,
29✔
1255
                amount:          packet.amount,
29✔
1256
                incomingTimeout: packet.incomingTimeout,
29✔
1257
                outgoingTimeout: packet.outgoingTimeout,
29✔
1258
                circuit:         packet.circuit,
29✔
1259
                obfuscator:      packet.obfuscator,
29✔
1260
                linkFailure:     failure,
29✔
1261
                htlc: &lnwire.UpdateFailHTLC{
29✔
1262
                        Reason: reason,
29✔
1263
                },
29✔
1264
        }
29✔
1265

29✔
1266
        // Route a fail packet back to the source link.
29✔
1267
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
29✔
1268
        if err != nil {
29✔
1269
                err = fmt.Errorf("source chanid=%v unable to "+
×
1270
                        "handle switch packet: %v",
×
1271
                        packet.incomingChanID, err)
×
1272
                log.Error(err)
×
1273
                return err
×
1274
        }
×
1275

1276
        return failure
29✔
1277
}
1278

1279
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1280
// attempts to determine the source that forwarded this htlc. This method will
1281
// set the incoming chan and htlc ID of the given packet if the source was
1282
// found, and will properly [re]encrypt any failure messages.
1283
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
540✔
1284
        // If the packet has its source, that means it was failed locally by
540✔
1285
        // the outgoing link. We fail it here to make sure only one response
540✔
1286
        // makes it through the switch.
540✔
1287
        if pkt.hasSource {
553✔
1288
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
13✔
1289
                switch err {
13✔
1290

1291
                // Circuit successfully closed.
1292
                case nil:
13✔
1293
                        return circuit, nil
13✔
1294

1295
                // Circuit was previously closed, but has not been deleted.
1296
                // We'll just drop this response until the circuit has been
1297
                // fully removed.
1298
                case ErrCircuitClosing:
×
1299
                        return nil, err
×
1300

1301
                // Failed to close circuit because it does not exist. This is
1302
                // likely because the circuit was already successfully closed.
1303
                // Since this packet failed locally, there is no forwarding
1304
                // package entry to acknowledge.
1305
                case ErrUnknownCircuit:
×
1306
                        return nil, err
×
1307

1308
                // Unexpected error.
1309
                default:
×
1310
                        return nil, err
×
1311
                }
1312
        }
1313

1314
        // Otherwise, this is packet was received from the remote party.  Use
1315
        // circuit map to find the incoming link to receive the settle/fail.
1316
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
529✔
1317
        switch err {
529✔
1318

1319
        // Open circuit successfully closed.
1320
        case nil:
339✔
1321
                pkt.incomingChanID = circuit.Incoming.ChanID
339✔
1322
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
339✔
1323
                pkt.circuit = circuit
339✔
1324
                pkt.sourceRef = &circuit.AddRef
339✔
1325

339✔
1326
                pktType := "SETTLE"
339✔
1327
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
469✔
1328
                        pktType = "FAIL"
130✔
1329
                }
130✔
1330

1331
                log.Debugf("Closed completed %s circuit for %x: "+
339✔
1332
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
339✔
1333
                        pkt.incomingChanID, pkt.incomingHTLCID,
339✔
1334
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
339✔
1335

339✔
1336
                return circuit, nil
339✔
1337

1338
        // Circuit was previously closed, but has not been deleted. We'll just
1339
        // drop this response until the circuit has been removed.
1340
        case ErrCircuitClosing:
3✔
1341
                return nil, err
3✔
1342

1343
        // Failed to close circuit because it does not exist. This is likely
1344
        // because the circuit was already successfully closed.
1345
        case ErrUnknownCircuit:
193✔
1346
                if pkt.destRef != nil {
385✔
1347
                        // Add this SettleFailRef to the set of pending settle/fail entries
192✔
1348
                        // awaiting acknowledgement.
192✔
1349
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
192✔
1350
                }
192✔
1351

1352
                // If this is a settle, we will not log an error message as settles
1353
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1354
                // can hit this case if the link restarts after having just sent a fail
1355
                // to the switch.
1356
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
193✔
1357
                if !isSettle {
193✔
1358
                        err := fmt.Errorf("unable to find target channel "+
×
1359
                                "for HTLC fail: channel ID = %s, "+
×
1360
                                "HTLC ID = %d", pkt.outgoingChanID,
×
1361
                                pkt.outgoingHTLCID)
×
1362
                        log.Error(err)
×
1363

×
1364
                        return nil, err
×
1365
                }
×
1366

1367
                return nil, nil
193✔
1368

1369
        // Unexpected error.
1370
        default:
×
1371
                return nil, err
×
1372
        }
1373
}
1374

1375
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1376
// forwarding package of the outgoing link for a payment circuit. We do this if
1377
// we're the originator of the payment, so the link stops attempting to
1378
// re-broadcast.
1379
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
121✔
1380
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
242✔
1381
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
121✔
1382
        })
121✔
1383
}
1384

1385
// teardownCircuit removes a pending or open circuit from the switch's circuit
1386
// map and prints useful logging statements regarding the outcome.
1387
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
317✔
1388
        var pktType string
317✔
1389
        switch htlc := pkt.htlc.(type) {
317✔
1390
        case *lnwire.UpdateFulfillHTLC:
189✔
1391
                pktType = "SETTLE"
189✔
1392
        case *lnwire.UpdateFailHTLC:
131✔
1393
                pktType = "FAIL"
131✔
1394
        default:
×
1395
                return fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1396
        }
1397

1398
        var paymentHash lntypes.Hash
317✔
1399

317✔
1400
        // Perform a defensive check to make sure we don't try to access a nil
317✔
1401
        // circuit.
317✔
1402
        circuit := pkt.circuit
317✔
1403
        if circuit != nil {
634✔
1404
                copy(paymentHash[:], circuit.PaymentHash[:])
317✔
1405
        }
317✔
1406

1407
        log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+
317✔
1408
                "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
317✔
1409

317✔
1410
        err := s.circuits.DeleteCircuits(pkt.inKey())
317✔
1411
        if err != nil {
317✔
1412
                log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+
×
1413
                        "with payment_hash=%v using %s pkt", pkt.incomingChanID,
×
1414
                        pkt.incomingHTLCID, pkt.outgoingChanID,
×
1415
                        pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType)
×
1416

×
1417
                return err
×
1418
        }
×
1419

1420
        log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType,
317✔
1421
                paymentHash, pkt.incomingChanID, pkt.incomingHTLCID,
317✔
1422
                pkt.outgoingChanID, pkt.outgoingHTLCID)
317✔
1423

317✔
1424
        return nil
317✔
1425
}
1426

1427
// CloseLink creates and sends the close channel command to the target link
1428
// directing the specified closure type. If the closure type is CloseRegular,
1429
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1430
// a starting point for close negotiation. The deliveryScript parameter is an
1431
// optional parameter which sets a user specified script to close out to.
1432
func (s *Switch) CloseLink(ctx context.Context, chanPoint *wire.OutPoint,
1433
        closeType contractcourt.ChannelCloseType,
1434
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
1435
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
3✔
1436

3✔
1437
        // TODO(roasbeef) abstract out the close updates.
3✔
1438
        updateChan := make(chan interface{}, 2)
3✔
1439
        errChan := make(chan error, 1)
3✔
1440

3✔
1441
        command := &ChanClose{
3✔
1442
                CloseType:      closeType,
3✔
1443
                ChanPoint:      chanPoint,
3✔
1444
                Updates:        updateChan,
3✔
1445
                TargetFeePerKw: targetFeePerKw,
3✔
1446
                DeliveryScript: deliveryScript,
3✔
1447
                Err:            errChan,
3✔
1448
                MaxFee:         maxFee,
3✔
1449
                Ctx:            ctx,
3✔
1450
        }
3✔
1451

3✔
1452
        select {
3✔
1453
        case s.chanCloseRequests <- command:
3✔
1454
                return updateChan, errChan
3✔
1455

1456
        case <-s.quit:
×
1457
                errChan <- ErrSwitchExiting
×
1458
                close(updateChan)
×
1459
                return updateChan, errChan
×
1460
        }
1461
}
1462

1463
// htlcForwarder is responsible for optimally forwarding (and possibly
1464
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1465
// links. The duties of the forwarder are similar to that of a network switch,
1466
// in that it facilitates multi-hop payments by acting as a central messaging
1467
// bus. The switch communicates will active links to create, manage, and tear
1468
// down active onion routed payments. Each active channel is modeled as
1469
// networked device with metadata such as the available payment bandwidth, and
1470
// total link capacity.
1471
//
1472
// NOTE: This MUST be run as a goroutine.
1473
func (s *Switch) htlcForwarder() {
210✔
1474
        defer s.wg.Done()
210✔
1475

210✔
1476
        defer func() {
420✔
1477
                s.blockEpochStream.Cancel()
210✔
1478

210✔
1479
                // Remove all links once we've been signalled for shutdown.
210✔
1480
                var linksToStop []ChannelLink
210✔
1481
                s.indexMtx.Lock()
210✔
1482
                for _, link := range s.linkIndex {
505✔
1483
                        activeLink := s.removeLink(link.ChanID())
295✔
1484
                        if activeLink == nil {
295✔
1485
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1486
                                        "on stop", link.ChanID())
×
1487
                                continue
×
1488
                        }
1489
                        linksToStop = append(linksToStop, activeLink)
295✔
1490
                }
1491
                for _, link := range s.pendingLinkIndex {
214✔
1492
                        pendingLink := s.removeLink(link.ChanID())
4✔
1493
                        if pendingLink == nil {
4✔
1494
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1495
                                        "on stop", link.ChanID())
×
1496
                                continue
×
1497
                        }
1498
                        linksToStop = append(linksToStop, pendingLink)
4✔
1499
                }
1500
                s.indexMtx.Unlock()
210✔
1501

210✔
1502
                // Now that all pending and live links have been removed from
210✔
1503
                // the forwarding indexes, stop each one before shutting down.
210✔
1504
                // We'll shut them down in parallel to make exiting as fast as
210✔
1505
                // possible.
210✔
1506
                var wg sync.WaitGroup
210✔
1507
                for _, link := range linksToStop {
506✔
1508
                        wg.Add(1)
296✔
1509
                        go func(l ChannelLink) {
592✔
1510
                                defer wg.Done()
296✔
1511

296✔
1512
                                l.Stop()
296✔
1513
                        }(link)
296✔
1514
                }
1515
                wg.Wait()
210✔
1516

210✔
1517
                // Before we exit fully, we'll attempt to flush out any
210✔
1518
                // forwarding events that may still be lingering since the last
210✔
1519
                // batch flush.
210✔
1520
                if err := s.FlushForwardingEvents(); err != nil {
210✔
1521
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1522
                }
×
1523
        }()
1524

1525
        // TODO(roasbeef): cleared vs settled distinction
1526
        var (
210✔
1527
                totalNumUpdates uint64
210✔
1528
                totalSatSent    btcutil.Amount
210✔
1529
                totalSatRecv    btcutil.Amount
210✔
1530
        )
210✔
1531
        s.cfg.LogEventTicker.Resume()
210✔
1532
        defer s.cfg.LogEventTicker.Stop()
210✔
1533

210✔
1534
        // Every 15 seconds, we'll flush out the forwarding events that
210✔
1535
        // occurred during that period.
210✔
1536
        s.cfg.FwdEventTicker.Resume()
210✔
1537
        defer s.cfg.FwdEventTicker.Stop()
210✔
1538

210✔
1539
        defer s.cfg.AckEventTicker.Stop()
210✔
1540

210✔
1541
out:
210✔
1542
        for {
1,045✔
1543

835✔
1544
                // If the set of pending settle/fail entries is non-zero,
835✔
1545
                // reinstate the ack ticker so we can batch ack them.
835✔
1546
                if len(s.pendingSettleFails) > 0 {
1,215✔
1547
                        s.cfg.AckEventTicker.Resume()
380✔
1548
                }
380✔
1549

1550
                select {
835✔
1551
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
3✔
1552
                        if !ok {
3✔
1553
                                break out
×
1554
                        }
1555

1556
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
3✔
1557

1558
                // A local close request has arrived, we'll forward this to the
1559
                // relevant link (if it exists) so the channel can be
1560
                // cooperatively closed (if possible).
1561
                case req := <-s.chanCloseRequests:
3✔
1562
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
3✔
1563

3✔
1564
                        s.indexMtx.RLock()
3✔
1565
                        link, ok := s.linkIndex[chanID]
3✔
1566
                        if !ok {
6✔
1567
                                s.indexMtx.RUnlock()
3✔
1568

3✔
1569
                                req.Err <- fmt.Errorf("no peer for channel with "+
3✔
1570
                                        "chan_id=%x", chanID[:])
3✔
1571
                                continue
3✔
1572
                        }
1573
                        s.indexMtx.RUnlock()
3✔
1574

3✔
1575
                        peerPub := link.PeerPubKey()
3✔
1576
                        log.Debugf("Requesting local channel close: peer=%x, "+
3✔
1577
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
3✔
1578

3✔
1579
                        go s.cfg.LocalChannelClose(peerPub[:], req)
3✔
1580

1581
                case resolutionMsg := <-s.resolutionMsgs:
4✔
1582
                        // We'll persist the resolution message to the Switch's
4✔
1583
                        // resolution store.
4✔
1584
                        resMsg := resolutionMsg.ResolutionMsg
4✔
1585
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
4✔
1586
                        if err != nil {
4✔
1587
                                // This will only fail if there is a database
×
1588
                                // error or a serialization error. Sending the
×
1589
                                // error prevents the contractcourt from being
×
1590
                                // in a state where it believes the send was
×
1591
                                // successful, when it wasn't.
×
1592
                                log.Errorf("unable to add resolution msg: %v",
×
1593
                                        err)
×
1594
                                resolutionMsg.errChan <- err
×
1595
                                continue
×
1596
                        }
1597

1598
                        // At this point, the resolution message has been
1599
                        // persisted. It is safe to signal success by sending
1600
                        // a nil error since the Switch will re-deliver the
1601
                        // resolution message on restart.
1602
                        resolutionMsg.errChan <- nil
4✔
1603

4✔
1604
                        // Create a htlc packet for this resolution. We do
4✔
1605
                        // not have some of the information that we'll need
4✔
1606
                        // for blinded error handling here , so we'll rely on
4✔
1607
                        // our forwarding logic to fill it in later.
4✔
1608
                        pkt := &htlcPacket{
4✔
1609
                                outgoingChanID: resolutionMsg.SourceChan,
4✔
1610
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
4✔
1611
                                isResolution:   true,
4✔
1612
                        }
4✔
1613

4✔
1614
                        // Resolution messages will either be cancelling
4✔
1615
                        // backwards an existing HTLC, or settling a previously
4✔
1616
                        // outgoing HTLC. Based on this, we'll map the message
4✔
1617
                        // to the proper htlcPacket.
4✔
1618
                        if resolutionMsg.Failure != nil {
7✔
1619
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
3✔
1620
                        } else {
7✔
1621
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
4✔
1622
                                        PaymentPreimage: *resolutionMsg.PreImage,
4✔
1623
                                }
4✔
1624
                        }
4✔
1625

1626
                        log.Debugf("Received outside contract resolution, "+
4✔
1627
                                "mapping to: %v", lnutils.SpewLogClosure(pkt))
4✔
1628

4✔
1629
                        // We don't check the error, as the only failure we can
4✔
1630
                        // encounter is due to the circuit already being
4✔
1631
                        // closed. This is fine, as processing this message is
4✔
1632
                        // meant to be idempotent.
4✔
1633
                        err = s.handlePacketForward(pkt)
4✔
1634
                        if err != nil {
4✔
1635
                                log.Errorf("Unable to forward resolution msg: %v", err)
×
1636
                        }
×
1637

1638
                // A new packet has arrived for forwarding, we'll interpret the
1639
                // packet concretely, then either forward it along, or
1640
                // interpret a return packet to a locally initialized one.
1641
                case cmd := <-s.htlcPlex:
621✔
1642
                        cmd.err <- s.handlePacketForward(cmd.pkt)
621✔
1643

1644
                // When this time ticks, then it indicates that we should
1645
                // collect all the forwarding events since the last internal,
1646
                // and write them out to our log.
1647
                case <-s.cfg.FwdEventTicker.Ticks():
5✔
1648
                        s.wg.Add(1)
5✔
1649
                        go func() {
10✔
1650
                                defer s.wg.Done()
5✔
1651

5✔
1652
                                if err := s.FlushForwardingEvents(); err != nil {
5✔
1653
                                        log.Errorf("Unable to flush "+
×
1654
                                                "forwarding events: %v", err)
×
1655
                                }
×
1656
                        }()
1657

1658
                // The log ticker has fired, so we'll calculate some forwarding
1659
                // stats for the last 10 seconds to display within the logs to
1660
                // users.
1661
                case <-s.cfg.LogEventTicker.Ticks():
7✔
1662
                        // First, we'll collate the current running tally of
7✔
1663
                        // our forwarding stats.
7✔
1664
                        prevSatSent := totalSatSent
7✔
1665
                        prevSatRecv := totalSatRecv
7✔
1666
                        prevNumUpdates := totalNumUpdates
7✔
1667

7✔
1668
                        var (
7✔
1669
                                newNumUpdates uint64
7✔
1670
                                newSatSent    btcutil.Amount
7✔
1671
                                newSatRecv    btcutil.Amount
7✔
1672
                        )
7✔
1673

7✔
1674
                        // Next, we'll run through all the registered links and
7✔
1675
                        // compute their up-to-date forwarding stats.
7✔
1676
                        s.indexMtx.RLock()
7✔
1677
                        for _, link := range s.linkIndex {
16✔
1678
                                // TODO(roasbeef): when links first registered
9✔
1679
                                // stats printed.
9✔
1680
                                updates, sent, recv := link.Stats()
9✔
1681
                                newNumUpdates += updates
9✔
1682
                                newSatSent += sent.ToSatoshis()
9✔
1683
                                newSatRecv += recv.ToSatoshis()
9✔
1684
                        }
9✔
1685
                        s.indexMtx.RUnlock()
7✔
1686

7✔
1687
                        var (
7✔
1688
                                diffNumUpdates uint64
7✔
1689
                                diffSatSent    btcutil.Amount
7✔
1690
                                diffSatRecv    btcutil.Amount
7✔
1691
                        )
7✔
1692

7✔
1693
                        // If this is the first time we're computing these
7✔
1694
                        // stats, then the diff is just the new value. We do
7✔
1695
                        // this in order to avoid integer underflow issues.
7✔
1696
                        if prevNumUpdates == 0 {
14✔
1697
                                diffNumUpdates = newNumUpdates
7✔
1698
                                diffSatSent = newSatSent
7✔
1699
                                diffSatRecv = newSatRecv
7✔
1700
                        } else {
10✔
1701
                                diffNumUpdates = newNumUpdates - prevNumUpdates
3✔
1702
                                diffSatSent = newSatSent - prevSatSent
3✔
1703
                                diffSatRecv = newSatRecv - prevSatRecv
3✔
1704
                        }
3✔
1705

1706
                        // If the diff of num updates is zero, then we haven't
1707
                        // forwarded anything in the last 10 seconds, so we can
1708
                        // skip this update.
1709
                        if diffNumUpdates == 0 {
12✔
1710
                                continue
5✔
1711
                        }
1712

1713
                        // If the diff of num updates is negative, then some
1714
                        // links may have been unregistered from the switch, so
1715
                        // we'll update our stats to only include our registered
1716
                        // links.
1717
                        if int64(diffNumUpdates) < 0 {
8✔
1718
                                totalNumUpdates = newNumUpdates
3✔
1719
                                totalSatSent = newSatSent
3✔
1720
                                totalSatRecv = newSatRecv
3✔
1721
                                continue
3✔
1722
                        }
1723

1724
                        // Otherwise, we'll log this diff, then accumulate the
1725
                        // new stats into the running total.
1726
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
5✔
1727
                                "in the last 10 seconds (%f tx/sec)",
5✔
1728
                                diffSatSent, diffSatRecv,
5✔
1729
                                float64(diffNumUpdates)/10)
5✔
1730

5✔
1731
                        totalNumUpdates += diffNumUpdates
5✔
1732
                        totalSatSent += diffSatSent
5✔
1733
                        totalSatRecv += diffSatRecv
5✔
1734

1735
                // The ack ticker has fired so if we have any settle/fail entries
1736
                // for a forwarding package to ack, we will do so here in a batch
1737
                // db call.
1738
                case <-s.cfg.AckEventTicker.Ticks():
3✔
1739
                        // If the current set is empty, pause the ticker.
3✔
1740
                        if len(s.pendingSettleFails) == 0 {
6✔
1741
                                s.cfg.AckEventTicker.Pause()
3✔
1742
                                continue
3✔
1743
                        }
1744

1745
                        // Batch ack the settle/fail entries.
1746
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
3✔
1747
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1748
                                continue
×
1749
                        }
1750

1751
                        log.Tracef("Acked %d settle fails: %v",
3✔
1752
                                len(s.pendingSettleFails),
3✔
1753
                                lnutils.SpewLogClosure(s.pendingSettleFails))
3✔
1754

3✔
1755
                        // Reset the pendingSettleFails buffer while keeping acquired
3✔
1756
                        // memory.
3✔
1757
                        s.pendingSettleFails = s.pendingSettleFails[:0]
3✔
1758

1759
                case <-s.quit:
210✔
1760
                        return
210✔
1761
                }
1762
        }
1763
}
1764

1765
// Start starts all helper goroutines required for the operation of the switch.
1766
func (s *Switch) Start() error {
210✔
1767
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
210✔
1768
                log.Warn("Htlc Switch already started")
×
1769
                return errors.New("htlc switch already started")
×
1770
        }
×
1771

1772
        log.Infof("HTLC Switch starting")
210✔
1773

210✔
1774
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
210✔
1775
        if err != nil {
210✔
1776
                return err
×
1777
        }
×
1778
        s.blockEpochStream = blockEpochStream
210✔
1779

210✔
1780
        s.wg.Add(1)
210✔
1781
        go s.htlcForwarder()
210✔
1782

210✔
1783
        if err := s.reforwardResponses(); err != nil {
210✔
1784
                s.Stop()
×
1785
                log.Errorf("unable to reforward responses: %v", err)
×
1786
                return err
×
1787
        }
×
1788

1789
        if err := s.reforwardResolutions(); err != nil {
210✔
1790
                // We are already stopping so we can ignore the error.
×
1791
                _ = s.Stop()
×
1792
                log.Errorf("unable to reforward resolutions: %v", err)
×
1793
                return err
×
1794
        }
×
1795

1796
        return nil
210✔
1797
}
1798

1799
// reforwardResolutions fetches the set of resolution messages stored on-disk
1800
// and reforwards them if their circuits are still open. If the circuits have
1801
// been deleted, then we will delete the resolution message from the database.
1802
func (s *Switch) reforwardResolutions() error {
210✔
1803
        // Fetch all stored resolution messages, deleting the ones that are
210✔
1804
        // resolved.
210✔
1805
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
210✔
1806
        if err != nil {
210✔
1807
                return err
×
1808
        }
×
1809

1810
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
210✔
1811
        for _, resMsg := range resMsgs {
214✔
1812
                // If the open circuit no longer exists, then we can remove the
4✔
1813
                // message from the store.
4✔
1814
                outKey := CircuitKey{
4✔
1815
                        ChanID: resMsg.SourceChan,
4✔
1816
                        HtlcID: resMsg.HtlcIndex,
4✔
1817
                }
4✔
1818

4✔
1819
                if s.circuits.LookupOpenCircuit(outKey) == nil {
8✔
1820
                        // The open circuit doesn't exist.
4✔
1821
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
4✔
1822
                        if err != nil {
4✔
1823
                                return err
×
1824
                        }
×
1825

1826
                        continue
4✔
1827
                }
1828

1829
                // The circuit is still open, so we can assume that the link or
1830
                // switch (if we are the source) hasn't cleaned it up yet.
1831
                // We rely on our forwarding logic to fill in details that
1832
                // are not currently available to us.
1833
                resPkt := &htlcPacket{
3✔
1834
                        outgoingChanID: resMsg.SourceChan,
3✔
1835
                        outgoingHTLCID: resMsg.HtlcIndex,
3✔
1836
                        isResolution:   true,
3✔
1837
                }
3✔
1838

3✔
1839
                if resMsg.Failure != nil {
6✔
1840
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
3✔
1841
                } else {
3✔
1842
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
1843
                                PaymentPreimage: *resMsg.PreImage,
×
1844
                        }
×
1845
                }
×
1846

1847
                switchPackets = append(switchPackets, resPkt)
3✔
1848
        }
1849

1850
        // We'll now dispatch the set of resolution messages to the proper
1851
        // destination. An error is only encountered here if the switch is
1852
        // shutting down.
1853
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
210✔
1854
                return err
×
1855
        }
×
1856

1857
        return nil
210✔
1858
}
1859

1860
// reforwardResponses for every known, non-pending channel, loads all associated
1861
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
1862
// used to resurrect the switch's mailboxes after a restart. This also runs for
1863
// waiting close channels since there may be settles or fails that need to be
1864
// reforwarded before they completely close.
1865
func (s *Switch) reforwardResponses() error {
210✔
1866
        openChannels, err := s.cfg.FetchAllChannels()
210✔
1867
        if err != nil {
210✔
1868
                return err
×
1869
        }
×
1870

1871
        for _, openChannel := range openChannels {
342✔
1872
                shortChanID := openChannel.ShortChanID()
132✔
1873

132✔
1874
                // Locally-initiated payments never need reforwarding.
132✔
1875
                if shortChanID == hop.Source {
135✔
1876
                        continue
3✔
1877
                }
1878

1879
                // If the channel is pending, it should have no forwarding
1880
                // packages, and nothing to reforward.
1881
                if openChannel.IsPending {
132✔
1882
                        continue
×
1883
                }
1884

1885
                // Channels in open or waiting-close may still have responses in
1886
                // their forwarding packages. We will continue to reattempt
1887
                // forwarding on startup until the channel is fully-closed.
1888
                //
1889
                // Load this channel's forwarding packages, and deliver them to
1890
                // the switch.
1891
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
132✔
1892
                if err != nil {
132✔
1893
                        log.Errorf("unable to load forwarding "+
×
1894
                                "packages for %v: %v", shortChanID, err)
×
1895
                        return err
×
1896
                }
×
1897

1898
                s.reforwardSettleFails(fwdPkgs)
132✔
1899
        }
1900

1901
        return nil
210✔
1902
}
1903

1904
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
1905
// channel identifier.
1906
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
132✔
1907

132✔
1908
        var fwdPkgs []*channeldb.FwdPkg
132✔
1909
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
264✔
1910
                var err error
132✔
1911
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
132✔
1912
                        tx, source,
132✔
1913
                )
132✔
1914
                return err
132✔
1915
        }, func() {
264✔
1916
                fwdPkgs = nil
132✔
1917
        }); err != nil {
132✔
1918
                return nil, err
×
1919
        }
×
1920

1921
        return fwdPkgs, nil
132✔
1922
}
1923

1924
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
1925
// forwarding packages, and reforwards those that have not been acknowledged.
1926
// This is intended to occur on startup, in order to recover the switch's
1927
// mailboxes, and to ensure that responses can be propagated in case the
1928
// outgoing link never comes back online.
1929
//
1930
// NOTE: This should mimic the behavior processRemoteSettleFails.
1931
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
132✔
1932
        for _, fwdPkg := range fwdPkgs {
136✔
1933
                switchPackets := make([]*htlcPacket, 0, len(fwdPkg.SettleFails))
4✔
1934
                for i, update := range fwdPkg.SettleFails {
7✔
1935
                        // Skip any settles or fails that have already been
3✔
1936
                        // acknowledged by the incoming link that originated the
3✔
1937
                        // forwarded Add.
3✔
1938
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
6✔
1939
                                continue
3✔
1940
                        }
1941

1942
                        switch msg := update.UpdateMsg.(type) {
3✔
1943
                        // A settle for an HTLC we previously forwarded HTLC has
1944
                        // been received. So we'll forward the HTLC to the
1945
                        // switch which will handle propagating the settle to
1946
                        // the prior hop.
1947
                        case *lnwire.UpdateFulfillHTLC:
3✔
1948
                                destRef := fwdPkg.DestRef(uint16(i))
3✔
1949
                                settlePacket := &htlcPacket{
3✔
1950
                                        outgoingChanID: fwdPkg.Source,
3✔
1951
                                        outgoingHTLCID: msg.ID,
3✔
1952
                                        destRef:        &destRef,
3✔
1953
                                        htlc:           msg,
3✔
1954
                                }
3✔
1955

3✔
1956
                                // Add the packet to the batch to be forwarded, and
3✔
1957
                                // notify the overflow queue that a spare spot has been
3✔
1958
                                // freed up within the commitment state.
3✔
1959
                                switchPackets = append(switchPackets, settlePacket)
3✔
1960

1961
                        // A failureCode message for a previously forwarded HTLC has been
1962
                        // received. As a result a new slot will be freed up in our
1963
                        // commitment state, so we'll forward this to the switch so the
1964
                        // backwards undo can continue.
1965
                        case *lnwire.UpdateFailHTLC:
×
1966
                                // Fetch the reason the HTLC was canceled so
×
1967
                                // we can continue to propagate it. This
×
1968
                                // failure originated from another node, so
×
1969
                                // the linkFailure field is not set on this
×
1970
                                // packet. We rely on the link to fill in
×
1971
                                // additional circuit information for us.
×
1972
                                failPacket := &htlcPacket{
×
1973
                                        outgoingChanID: fwdPkg.Source,
×
1974
                                        outgoingHTLCID: msg.ID,
×
1975
                                        destRef: &channeldb.SettleFailRef{
×
1976
                                                Source: fwdPkg.Source,
×
1977
                                                Height: fwdPkg.Height,
×
1978
                                                Index:  uint16(i),
×
1979
                                        },
×
1980
                                        htlc: msg,
×
1981
                                }
×
1982

×
1983
                                // Add the packet to the batch to be forwarded, and
×
1984
                                // notify the overflow queue that a spare spot has been
×
1985
                                // freed up within the commitment state.
×
1986
                                switchPackets = append(switchPackets, failPacket)
×
1987
                        }
1988
                }
1989

1990
                // Since this send isn't tied to a specific link, we pass a nil
1991
                // link quit channel, meaning the send will fail only if the
1992
                // switch receives a shutdown request.
1993
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
4✔
1994
                        log.Errorf("Unhandled error while reforwarding packets "+
×
1995
                                "settle/fail over htlcswitch: %v", err)
×
1996
                }
×
1997
        }
1998
}
1999

2000
// Stop gracefully stops all active helper goroutines, then waits until they've
2001
// exited.
2002
func (s *Switch) Stop() error {
442✔
2003
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
572✔
2004
                log.Warn("Htlc Switch already stopped")
130✔
2005
                return errors.New("htlc switch already shutdown")
130✔
2006
        }
130✔
2007

2008
        log.Info("HTLC Switch shutting down...")
312✔
2009
        defer log.Debug("HTLC Switch shutdown complete")
312✔
2010

312✔
2011
        close(s.quit)
312✔
2012

312✔
2013
        s.wg.Wait()
312✔
2014

312✔
2015
        // Wait until all active goroutines have finished exiting before
312✔
2016
        // stopping the mailboxes, otherwise the mailbox map could still be
312✔
2017
        // accessed and modified.
312✔
2018
        s.mailOrchestrator.Stop()
312✔
2019

312✔
2020
        return nil
312✔
2021
}
2022

2023
// CreateAndAddLink will create a link and then add it to the internal maps
2024
// when given a ChannelLinkConfig and LightningChannel.
2025
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
2026
        lnChan *lnwallet.LightningChannel) error {
3✔
2027

3✔
2028
        link := NewChannelLink(linkCfg, lnChan)
3✔
2029
        return s.AddLink(link)
3✔
2030
}
3✔
2031

2032
// AddLink is used to initiate the handling of the add link command. The
2033
// request will be propagated and handled in the main goroutine.
2034
func (s *Switch) AddLink(link ChannelLink) error {
339✔
2035
        s.indexMtx.Lock()
339✔
2036
        defer s.indexMtx.Unlock()
339✔
2037

339✔
2038
        chanID := link.ChanID()
339✔
2039

339✔
2040
        // First, ensure that this link is not already active in the switch.
339✔
2041
        _, err := s.getLink(chanID)
339✔
2042
        if err == nil {
340✔
2043
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
1✔
2044
                        "active", chanID)
1✔
2045
        }
1✔
2046

2047
        // Get and attach the mailbox for this link, which buffers packets in
2048
        // case there packets that we tried to deliver while this link was
2049
        // offline.
2050
        shortChanID := link.ShortChanID()
338✔
2051
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
338✔
2052
        link.AttachMailBox(mailbox)
338✔
2053

338✔
2054
        // Attach the Switch's failAliasUpdate function to the link.
338✔
2055
        link.attachFailAliasUpdate(s.failAliasUpdate)
338✔
2056

338✔
2057
        if err := link.Start(); err != nil {
338✔
2058
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2059
                        chanID, err)
×
2060
                s.removeLink(chanID)
×
2061
                return err
×
2062
        }
×
2063

2064
        if shortChanID == hop.Source {
342✔
2065
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
4✔
2066
                        chanID, shortChanID)
4✔
2067

4✔
2068
                s.pendingLinkIndex[chanID] = link
4✔
2069
        } else {
341✔
2070
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
337✔
2071
                        chanID, shortChanID)
337✔
2072

337✔
2073
                s.addLiveLink(link)
337✔
2074
                s.mailOrchestrator.BindLiveShortChanID(
337✔
2075
                        mailbox, chanID, shortChanID,
337✔
2076
                )
337✔
2077
        }
337✔
2078

2079
        return nil
338✔
2080
}
2081

2082
// addLiveLink adds a link to all associated forwarding index, this makes it a
2083
// candidate for forwarding HTLCs.
2084
func (s *Switch) addLiveLink(link ChannelLink) {
337✔
2085
        linkScid := link.ShortChanID()
337✔
2086

337✔
2087
        // We'll add the link to the linkIndex which lets us quickly
337✔
2088
        // look up a channel when we need to close or register it, and
337✔
2089
        // the forwarding index which'll be used when forwarding HTLC's
337✔
2090
        // in the multi-hop setting.
337✔
2091
        s.linkIndex[link.ChanID()] = link
337✔
2092
        s.forwardingIndex[linkScid] = link
337✔
2093

337✔
2094
        // Next we'll add the link to the interface index so we can
337✔
2095
        // quickly look up all the channels for a particular node.
337✔
2096
        peerPub := link.PeerPubKey()
337✔
2097
        if _, ok := s.interfaceIndex[peerPub]; !ok {
669✔
2098
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
332✔
2099
        }
332✔
2100
        s.interfaceIndex[peerPub][link.ChanID()] = link
337✔
2101

337✔
2102
        s.updateLinkAliases(link)
337✔
2103
}
2104

2105
// UpdateLinkAliases is the externally exposed wrapper for updating link
2106
// aliases. It acquires the indexMtx and calls the internal method.
2107
func (s *Switch) UpdateLinkAliases(link ChannelLink) {
3✔
2108
        s.indexMtx.Lock()
3✔
2109
        defer s.indexMtx.Unlock()
3✔
2110

3✔
2111
        s.updateLinkAliases(link)
3✔
2112
}
3✔
2113

2114
// updateLinkAliases updates the aliases for a given link. This will cause the
2115
// htlcswitch to consult the alias manager on the up to date values of its
2116
// alias maps.
2117
//
2118
// NOTE: this MUST be called with the indexMtx held.
2119
func (s *Switch) updateLinkAliases(link ChannelLink) {
337✔
2120
        linkScid := link.ShortChanID()
337✔
2121

337✔
2122
        aliases := link.getAliases()
337✔
2123
        if link.isZeroConf() {
359✔
2124
                if link.zeroConfConfirmed() {
40✔
2125
                        // Since the zero-conf channel has confirmed, we can
18✔
2126
                        // populate the aliasToReal mapping.
18✔
2127
                        confirmedScid := link.confirmedScid()
18✔
2128

18✔
2129
                        for _, alias := range aliases {
43✔
2130
                                s.aliasToReal[alias] = confirmedScid
25✔
2131
                        }
25✔
2132

2133
                        // Add the confirmed SCID as a key in the baseIndex.
2134
                        s.baseIndex[confirmedScid] = linkScid
18✔
2135
                }
2136

2137
                // Now we populate the baseIndex which will be used to fetch
2138
                // the link given any of the channel's alias SCIDs or the real
2139
                // SCID. The link's SCID is an alias, so we don't need to
2140
                // special-case it like the option-scid-alias feature-bit case
2141
                // further down.
2142
                for _, alias := range aliases {
52✔
2143
                        s.baseIndex[alias] = linkScid
30✔
2144
                }
30✔
2145
        } else if link.negotiatedAliasFeature() {
337✔
2146
                // First, we flush any alias mappings for this link's scid
19✔
2147
                // before we populate the map again, in order to get rid of old
19✔
2148
                // values that no longer exist.
19✔
2149
                for alias, real := range s.aliasToReal {
24✔
2150
                        if real == linkScid {
8✔
2151
                                delete(s.aliasToReal, alias)
3✔
2152
                        }
3✔
2153
                }
2154

2155
                for alias, real := range s.baseIndex {
25✔
2156
                        if real == linkScid {
9✔
2157
                                delete(s.baseIndex, alias)
3✔
2158
                        }
3✔
2159
                }
2160

2161
                // The link's SCID is the confirmed SCID for non-zero-conf
2162
                // option-scid-alias feature bit channels.
2163
                for _, alias := range aliases {
45✔
2164
                        s.aliasToReal[alias] = linkScid
26✔
2165
                        s.baseIndex[alias] = linkScid
26✔
2166
                }
26✔
2167

2168
                // Since the link's SCID is confirmed, it was not included in
2169
                // the baseIndex above as a key. Add it now.
2170
                s.baseIndex[linkScid] = linkScid
19✔
2171
        }
2172
}
2173

2174
// GetLink is used to initiate the handling of the get link command. The
2175
// request will be propagated/handled to/in the main goroutine.
2176
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2177
        error) {
3,259✔
2178

3,259✔
2179
        s.indexMtx.RLock()
3,259✔
2180
        defer s.indexMtx.RUnlock()
3,259✔
2181

3,259✔
2182
        return s.getLink(chanID)
3,259✔
2183
}
3,259✔
2184

2185
// getLink returns the link stored in either the pending index or the live
2186
// lindex.
2187
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
3,924✔
2188
        link, ok := s.linkIndex[chanID]
3,924✔
2189
        if !ok {
4,263✔
2190
                link, ok = s.pendingLinkIndex[chanID]
339✔
2191
                if !ok {
677✔
2192
                        return nil, ErrChannelLinkNotFound
338✔
2193
                }
338✔
2194
        }
2195

2196
        return link, nil
3,589✔
2197
}
2198

2199
// GetLinkByShortID attempts to return the link which possesses the target short
2200
// channel ID.
2201
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
2202
        error) {
3✔
2203

3✔
2204
        s.indexMtx.RLock()
3✔
2205
        defer s.indexMtx.RUnlock()
3✔
2206

3✔
2207
        link, err := s.getLinkByShortID(chanID)
3✔
2208
        if err != nil {
6✔
2209
                // If we failed to find the link under the passed-in SCID, we
3✔
2210
                // consult the Switch's baseIndex map to see if the confirmed
3✔
2211
                // SCID was used for a zero-conf channel.
3✔
2212
                aliasID, ok := s.baseIndex[chanID]
3✔
2213
                if !ok {
6✔
2214
                        return nil, err
3✔
2215
                }
3✔
2216

2217
                // An alias was found, use it to lookup if a link exists.
2218
                return s.getLinkByShortID(aliasID)
3✔
2219
        }
2220

2221
        return link, nil
3✔
2222
}
2223

2224
// getLinkByShortID attempts to return the link which possesses the target
2225
// short channel ID.
2226
//
2227
// NOTE: This MUST be called with the indexMtx held.
2228
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
480✔
2229
        link, ok := s.forwardingIndex[chanID]
480✔
2230
        if !ok {
484✔
2231
                log.Debugf("Link not found in forwarding index using "+
4✔
2232
                        "chanID=%v", chanID)
4✔
2233

4✔
2234
                return nil, ErrChannelLinkNotFound
4✔
2235
        }
4✔
2236

2237
        return link, nil
479✔
2238
}
2239

2240
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2241
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2242
// the outgoingChanID will be changed so that an error can be properly
2243
// attributed when looping over linkErrs in handlePacketForward.
2244
//
2245
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2246
// public or not.
2247
//
2248
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2249
//   - If there is no entry found in baseIndex, fetch the link. This channel
2250
//     did not have the option-scid-alias feature negotiated (which includes
2251
//     zero-conf and option-scid-alias channel-types).
2252
//   - If there is an entry found, fetch the link from forwardingIndex and
2253
//     fail if this is a private link.
2254
//
2255
// NOTE: This MUST be called with the indexMtx read lock held.
2256
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
83✔
2257
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
83✔
2258
        chanID := pkt.outgoingChanID
83✔
2259
        aliasID := s.cfg.IsAlias(chanID)
83✔
2260

83✔
2261
        log.Debugf("Querying outgoing link using chanID=%v, aliasID=%v", chanID,
83✔
2262
                aliasID)
83✔
2263

83✔
2264
        // Set the originalOutgoingChanID so the proper channel_update can be
83✔
2265
        // sent back if the option-scid-alias feature bit was negotiated.
83✔
2266
        pkt.originalOutgoingChanID = chanID
83✔
2267

83✔
2268
        if aliasID {
101✔
2269
                // Since outgoingChanID is an alias, we'll fetch the link via
18✔
2270
                // baseIndex.
18✔
2271
                baseScid, ok := s.baseIndex[chanID]
18✔
2272
                if !ok {
18✔
2273
                        // No mapping exists, bail.
×
2274
                        return nil, ErrChannelLinkNotFound
×
2275
                }
×
2276

2277
                // A mapping exists, so use baseScid to find the link in the
2278
                // forwardingIndex.
2279
                link, ok := s.forwardingIndex[baseScid]
18✔
2280
                if !ok {
18✔
2281
                        log.Debugf("Forwarding index not found using "+
×
2282
                                "baseScid=%v", baseScid)
×
2283

×
2284
                        // Link not found, bail.
×
2285
                        return nil, ErrChannelLinkNotFound
×
2286
                }
×
2287

2288
                // Change the packet's outgoingChanID field so that errors are
2289
                // properly attributed.
2290
                pkt.outgoingChanID = baseScid
18✔
2291

18✔
2292
                // Return the link without checking if it's private or not.
18✔
2293
                return link, nil
18✔
2294
        }
2295

2296
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2297
        // SCID from baseIndex.
2298
        baseScid, ok := s.baseIndex[chanID]
68✔
2299
        if !ok {
129✔
2300
                // outgoingChanID is not a key in base index meaning this
61✔
2301
                // channel did not have the option-scid-alias feature bit
61✔
2302
                // negotiated. We'll fetch the link and return it.
61✔
2303
                link, ok := s.forwardingIndex[chanID]
61✔
2304
                if !ok {
66✔
2305
                        log.Debugf("Forwarding index not found using "+
5✔
2306
                                "chanID=%v", chanID)
5✔
2307

5✔
2308
                        // The link wasn't found, bail out.
5✔
2309
                        return nil, ErrChannelLinkNotFound
5✔
2310
                }
5✔
2311

2312
                return link, nil
59✔
2313
        }
2314

2315
        // Fetch the link whose internal SCID is baseScid.
2316
        link, ok := s.forwardingIndex[baseScid]
10✔
2317
        if !ok {
10✔
2318
                log.Debugf("Forwarding index not found using baseScid=%v",
×
2319
                        baseScid)
×
2320

×
2321
                // Link wasn't found, bail out.
×
2322
                return nil, ErrChannelLinkNotFound
×
2323
        }
×
2324

2325
        // If the link is unadvertised, we fail since the real SCID was used to
2326
        // forward over it and this is a channel where the option-scid-alias
2327
        // feature bit was negotiated.
2328
        if link.IsUnadvertised() {
12✔
2329
                log.Debugf("Link is unadvertised, chanID=%v, baseScid=%v",
2✔
2330
                        chanID, baseScid)
2✔
2331

2✔
2332
                return nil, ErrChannelLinkNotFound
2✔
2333
        }
2✔
2334

2335
        // The link is public so the confirmed SCID can be used to forward over
2336
        // it. We'll also replace pkt's outgoingChanID field so errors can
2337
        // properly be attributed in the calling function.
2338
        pkt.outgoingChanID = baseScid
8✔
2339
        return link, nil
8✔
2340
}
2341

2342
// HasActiveLink returns true if the given channel ID has a link in the link
2343
// index AND the link is eligible to forward.
2344
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
5✔
2345
        s.indexMtx.RLock()
5✔
2346
        defer s.indexMtx.RUnlock()
5✔
2347

5✔
2348
        if link, ok := s.linkIndex[chanID]; ok {
10✔
2349
                return link.EligibleToForward()
5✔
2350
        }
5✔
2351

2352
        return false
3✔
2353
}
2354

2355
// RemoveLink purges the switch of any link associated with chanID. If a pending
2356
// or active link is not found, this method does nothing. Otherwise, the method
2357
// returns after the link has been completely shutdown.
2358
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
21✔
2359
        s.indexMtx.Lock()
21✔
2360
        link, err := s.getLink(chanID)
21✔
2361
        if err != nil {
24✔
2362
                // If err is non-nil, this means that link is also nil. The
3✔
2363
                // link variable cannot be nil without err being non-nil.
3✔
2364
                s.indexMtx.Unlock()
3✔
2365
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
3✔
2366
                        chanID, err)
3✔
2367
                return
3✔
2368
        }
3✔
2369

2370
        // Check if the link is already stopping and grab the stop chan if it
2371
        // is.
2372
        stopChan, ok := s.linkStopIndex[chanID]
21✔
2373
        if !ok {
42✔
2374
                // If the link is non-nil, it is not currently stopping, so
21✔
2375
                // we'll add a stop chan to the linkStopIndex.
21✔
2376
                stopChan = make(chan struct{})
21✔
2377
                s.linkStopIndex[chanID] = stopChan
21✔
2378
        }
21✔
2379
        s.indexMtx.Unlock()
21✔
2380

21✔
2381
        if ok {
21✔
2382
                // If the stop chan exists, we will wait for it to be closed.
×
2383
                // Once it is closed, we will exit.
×
2384
                select {
×
2385
                case <-stopChan:
×
2386
                        return
×
2387
                case <-s.quit:
×
2388
                        return
×
2389
                }
2390
        }
2391

2392
        // Stop the link before removing it from the maps.
2393
        link.Stop()
21✔
2394

21✔
2395
        s.indexMtx.Lock()
21✔
2396
        _ = s.removeLink(chanID)
21✔
2397

21✔
2398
        // Close stopChan and remove this link from the linkStopIndex.
21✔
2399
        // Deleting from the index and removing from the link must be done
21✔
2400
        // in the same block while the mutex is held.
21✔
2401
        close(stopChan)
21✔
2402
        delete(s.linkStopIndex, chanID)
21✔
2403
        s.indexMtx.Unlock()
21✔
2404
}
2405

2406
// removeLink is used to remove and stop the channel link.
2407
//
2408
// NOTE: This MUST be called with the indexMtx held.
2409
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
314✔
2410
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
314✔
2411

314✔
2412
        link, err := s.getLink(chanID)
314✔
2413
        if err != nil {
314✔
UNCOV
2414
                return nil
×
UNCOV
2415
        }
×
2416

2417
        // Remove the channel from live link indexes.
2418
        delete(s.pendingLinkIndex, link.ChanID())
314✔
2419
        delete(s.linkIndex, link.ChanID())
314✔
2420
        delete(s.forwardingIndex, link.ShortChanID())
314✔
2421

314✔
2422
        // If the link has been added to the peer index, then we'll move to
314✔
2423
        // delete the entry within the index.
314✔
2424
        peerPub := link.PeerPubKey()
314✔
2425
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
627✔
2426
                delete(peerIndex, link.ChanID())
313✔
2427

313✔
2428
                // If after deletion, there are no longer any links, then we'll
313✔
2429
                // remove the interface map all together.
313✔
2430
                if len(peerIndex) == 0 {
621✔
2431
                        delete(s.interfaceIndex, peerPub)
308✔
2432
                }
308✔
2433
        }
2434

2435
        return link
314✔
2436
}
2437

2438
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2439
// underlying channel state. This is only used in zero-conf channels to allow
2440
// the confirmed SCID to be updated.
2441
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
4✔
2442
        s.indexMtx.Lock()
4✔
2443
        defer s.indexMtx.Unlock()
4✔
2444

4✔
2445
        // Locate the target link in the link index. If no such link exists,
4✔
2446
        // then we will ignore the request.
4✔
2447
        link, ok := s.linkIndex[chanID]
4✔
2448
        if !ok {
4✔
2449
                return fmt.Errorf("link %v not found", chanID)
×
2450
        }
×
2451

2452
        // Try to update the link's underlying channel state, returning early
2453
        // if this update failed.
2454
        _, err := link.UpdateShortChanID()
4✔
2455
        if err != nil {
4✔
2456
                return err
×
2457
        }
×
2458

2459
        // Since the zero-conf channel is confirmed, we should populate the
2460
        // aliasToReal map and update the baseIndex.
2461
        aliases := link.getAliases()
4✔
2462

4✔
2463
        confirmedScid := link.confirmedScid()
4✔
2464

4✔
2465
        for _, alias := range aliases {
9✔
2466
                s.aliasToReal[alias] = confirmedScid
5✔
2467
        }
5✔
2468

2469
        s.baseIndex[confirmedScid] = link.ShortChanID()
4✔
2470

4✔
2471
        return nil
4✔
2472
}
2473

2474
// GetLinksByInterface fetches all the links connected to a particular node
2475
// identified by the serialized compressed form of its public key.
2476
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
2477
        error) {
3✔
2478

3✔
2479
        s.indexMtx.RLock()
3✔
2480
        defer s.indexMtx.RUnlock()
3✔
2481

3✔
2482
        var handlers []ChannelUpdateHandler
3✔
2483

3✔
2484
        links, err := s.getLinks(hop)
3✔
2485
        if err != nil {
6✔
2486
                return nil, err
3✔
2487
        }
3✔
2488

2489
        // Range over the returned []ChannelLink to convert them into
2490
        // []ChannelUpdateHandler.
2491
        for _, link := range links {
6✔
2492
                handlers = append(handlers, link)
3✔
2493
        }
3✔
2494

2495
        return handlers, nil
3✔
2496
}
2497

2498
// getLinks is function which returns the channel links of the peer by hop
2499
// destination id.
2500
//
2501
// NOTE: This MUST be called with the indexMtx held.
2502
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
79✔
2503
        links, ok := s.interfaceIndex[destination]
79✔
2504
        if !ok {
82✔
2505
                return nil, ErrNoLinksFound
3✔
2506
        }
3✔
2507

2508
        channelLinks := make([]ChannelLink, 0, len(links))
79✔
2509
        for _, link := range links {
162✔
2510
                channelLinks = append(channelLinks, link)
83✔
2511
        }
83✔
2512

2513
        return channelLinks, nil
79✔
2514
}
2515

2516
// CircuitModifier returns a reference to subset of the interfaces provided by
2517
// the circuit map, to allow links to open and close circuits.
2518
func (s *Switch) CircuitModifier() CircuitModifier {
218✔
2519
        return s.circuits
218✔
2520
}
218✔
2521

2522
// CircuitLookup returns a reference to subset of the interfaces provided by the
2523
// circuit map, to allow looking up circuits.
2524
func (s *Switch) CircuitLookup() CircuitLookup {
3✔
2525
        return s.circuits
3✔
2526
}
3✔
2527

2528
// commitCircuits persistently adds a circuit to the switch's circuit map.
2529
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2530
        *CircuitFwdActions, error) {
17✔
2531

17✔
2532
        return s.circuits.CommitCircuits(circuits...)
17✔
2533
}
17✔
2534

2535
// FlushForwardingEvents flushes out the set of pending forwarding events to
2536
// the persistent log. This will be used by the switch to periodically flush
2537
// out the set of forwarding events to disk. External callers can also use this
2538
// method to ensure all data is flushed to dis before querying the log.
2539
func (s *Switch) FlushForwardingEvents() error {
212✔
2540
        // First, we'll obtain a copy of the current set of pending forwarding
212✔
2541
        // events.
212✔
2542
        s.fwdEventMtx.Lock()
212✔
2543

212✔
2544
        // If we won't have any forwarding events, then we can exit early.
212✔
2545
        if len(s.pendingFwdingEvents) == 0 {
407✔
2546
                s.fwdEventMtx.Unlock()
195✔
2547
                return nil
195✔
2548
        }
195✔
2549

2550
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
20✔
2551
        copy(events[:], s.pendingFwdingEvents[:])
20✔
2552

20✔
2553
        // With the copy obtained, we can now clear out the header pointer of
20✔
2554
        // the current slice. This way, we can re-use the underlying storage
20✔
2555
        // allocated for the slice.
20✔
2556
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
20✔
2557
        s.fwdEventMtx.Unlock()
20✔
2558

20✔
2559
        // Finally, we'll write out the copied events to the persistent
20✔
2560
        // forwarding log.
20✔
2561
        return s.cfg.FwdingLog.AddForwardingEvents(events)
20✔
2562
}
2563

2564
// BestHeight returns the best height known to the switch.
2565
func (s *Switch) BestHeight() uint32 {
450✔
2566
        return atomic.LoadUint32(&s.bestHeight)
450✔
2567
}
450✔
2568

2569
// dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean
2570
// to determine whether the default fee threshold has been exceeded. This
2571
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2572
// commitment's dust with the mailbox's dust with the amount is checked against
2573
// the fee exposure threshold. If incoming is true, then the amount is not
2574
// included in the sum as it was already included in the commitment's dust. A
2575
// boolean is returned telling the caller whether the HTLC should be failed
2576
// back.
2577
func (s *Switch) dustExceedsFeeThreshold(link ChannelLink,
2578
        amount lnwire.MilliSatoshi, incoming bool) bool {
535✔
2579

535✔
2580
        // Retrieve the link's current commitment feerate and dustClosure.
535✔
2581
        feeRate := link.getFeeRate()
535✔
2582
        isDust := link.getDustClosure()
535✔
2583

535✔
2584
        // Evaluate if the HTLC is dust on either sides' commitment.
535✔
2585
        isLocalDust := isDust(
535✔
2586
                feeRate, incoming, lntypes.Local, amount.ToSatoshis(),
535✔
2587
        )
535✔
2588
        isRemoteDust := isDust(
535✔
2589
                feeRate, incoming, lntypes.Remote, amount.ToSatoshis(),
535✔
2590
        )
535✔
2591

535✔
2592
        if !(isLocalDust || isRemoteDust) {
670✔
2593
                // If the HTLC is not dust on either commitment, it's fine to
135✔
2594
                // forward.
135✔
2595
                return false
135✔
2596
        }
135✔
2597

2598
        // Fetch the dust sums currently in the mailbox for this link.
2599
        cid := link.ChanID()
403✔
2600
        sid := link.ShortChanID()
403✔
2601
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
403✔
2602
        localMailDust, remoteMailDust := mailbox.DustPackets()
403✔
2603

403✔
2604
        // If the htlc is dust on the local commitment, we'll obtain the dust
403✔
2605
        // sum for it.
403✔
2606
        if isLocalDust {
806✔
2607
                localSum := link.getDustSum(
403✔
2608
                        lntypes.Local, fn.None[chainfee.SatPerKWeight](),
403✔
2609
                )
403✔
2610
                localSum += localMailDust
403✔
2611

403✔
2612
                // Optionally include the HTLC amount only for outgoing
403✔
2613
                // HTLCs.
403✔
2614
                if !incoming {
766✔
2615
                        localSum += amount
363✔
2616
                }
363✔
2617

2618
                // Finally check against the defined fee threshold.
2619
                if localSum > s.cfg.MaxFeeExposure {
405✔
2620
                        return true
2✔
2621
                }
2✔
2622
        }
2623

2624
        // Also check if the htlc is dust on the remote commitment, if we've
2625
        // reached this point.
2626
        if isRemoteDust {
802✔
2627
                remoteSum := link.getDustSum(
401✔
2628
                        lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
401✔
2629
                )
401✔
2630
                remoteSum += remoteMailDust
401✔
2631

401✔
2632
                // Optionally include the HTLC amount only for outgoing
401✔
2633
                // HTLCs.
401✔
2634
                if !incoming {
762✔
2635
                        remoteSum += amount
361✔
2636
                }
361✔
2637

2638
                // Finally check against the defined fee threshold.
2639
                if remoteSum > s.cfg.MaxFeeExposure {
401✔
2640
                        return true
×
2641
                }
×
2642
        }
2643

2644
        // If we reached this point, this HTLC is fine to forward.
2645
        return false
401✔
2646
}
2647

2648
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2649
// it to individual mailboxes. It allows the mailboxes to construct a
2650
// FailureMessage when failing back HTLC's due to expiry and may include an
2651
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2652
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2653
// use. The mailboxScid is only used in the non-alias case, so it is always
2654
// the confirmed SCID.
2655
func (s *Switch) failMailboxUpdate(outgoingScid,
2656
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
13✔
2657

13✔
2658
        // Try to use the failAliasUpdate function in case this is a channel
13✔
2659
        // that uses aliases. If it returns nil, we'll fallback to the original
13✔
2660
        // pre-alias behavior.
13✔
2661
        update := s.failAliasUpdate(outgoingScid, false)
13✔
2662
        if update == nil {
20✔
2663
                // Execute the fallback behavior.
7✔
2664
                var err error
7✔
2665
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
7✔
2666
                if err != nil {
7✔
2667
                        return &lnwire.FailTemporaryNodeFailure{}
×
2668
                }
×
2669
        }
2670

2671
        return lnwire.NewTemporaryChannelFailure(update)
13✔
2672
}
2673

2674
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2675
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2676
// the associated channel is not one of these, this function will return nil
2677
// and the caller is expected to handle this properly. In this case, a return
2678
// to the original non-alias behavior is expected.
2679
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2680
        incoming bool) *lnwire.ChannelUpdate1 {
37✔
2681

37✔
2682
        // This function does not defer the unlocking because of the database
37✔
2683
        // lookups for ChannelUpdate.
37✔
2684
        s.indexMtx.RLock()
37✔
2685

37✔
2686
        if s.cfg.IsAlias(scid) {
51✔
2687
                // The alias SCID was used. In the incoming case this means
14✔
2688
                // the channel is zero-conf as the link sets the scid. In the
14✔
2689
                // outgoing case, the sender set the scid to use and may be
14✔
2690
                // either the alias or the confirmed one, if it exists.
14✔
2691
                realScid, ok := s.aliasToReal[scid]
14✔
2692
                if !ok {
14✔
2693
                        // The real, confirmed SCID does not exist yet. Find
×
2694
                        // the "base" SCID that the link uses via the
×
2695
                        // baseIndex. If we can't find it, return nil. This
×
2696
                        // means the channel is zero-conf.
×
2697
                        baseScid, ok := s.baseIndex[scid]
×
2698
                        s.indexMtx.RUnlock()
×
2699
                        if !ok {
×
2700
                                return nil
×
2701
                        }
×
2702

2703
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2704
                        if err != nil {
×
2705
                                return nil
×
2706
                        }
×
2707

2708
                        // Replace the baseScid with the passed-in alias.
2709
                        update.ShortChannelID = scid
×
2710
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2711
                        if err != nil {
×
2712
                                return nil
×
2713
                        }
×
2714

2715
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2716
                        if err != nil {
×
2717
                                return nil
×
2718
                        }
×
2719

2720
                        return update
×
2721
                }
2722

2723
                s.indexMtx.RUnlock()
14✔
2724

14✔
2725
                // Fetch the SCID via the confirmed SCID and replace it with
14✔
2726
                // the alias.
14✔
2727
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
14✔
2728
                if err != nil {
17✔
2729
                        return nil
3✔
2730
                }
3✔
2731

2732
                // In the incoming case, we want to ensure that we don't leak
2733
                // the UTXO in case the channel is private. In the outgoing
2734
                // case, since the alias was used, we do the same thing.
2735
                update.ShortChannelID = scid
14✔
2736
                sig, err := s.cfg.SignAliasUpdate(update)
14✔
2737
                if err != nil {
14✔
2738
                        return nil
×
2739
                }
×
2740

2741
                update.Signature, err = lnwire.NewSigFromSignature(sig)
14✔
2742
                if err != nil {
14✔
2743
                        return nil
×
2744
                }
×
2745

2746
                return update
14✔
2747
        }
2748

2749
        // If the confirmed SCID is not in baseIndex, this is not an
2750
        // option-scid-alias or zero-conf channel.
2751
        baseScid, ok := s.baseIndex[scid]
26✔
2752
        if !ok {
47✔
2753
                s.indexMtx.RUnlock()
21✔
2754
                return nil
21✔
2755
        }
21✔
2756

2757
        // Fetch the link so we can get an alias to use in the ShortChannelID
2758
        // of the ChannelUpdate.
2759
        link, ok := s.forwardingIndex[baseScid]
5✔
2760
        s.indexMtx.RUnlock()
5✔
2761
        if !ok {
5✔
2762
                // This should never happen, but if it does for some reason,
×
2763
                // fallback to the old behavior.
×
2764
                return nil
×
2765
        }
×
2766

2767
        aliases := link.getAliases()
5✔
2768
        if len(aliases) == 0 {
5✔
2769
                // This should never happen, but if it does, fallback.
×
2770
                return nil
×
2771
        }
×
2772

2773
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2774
        update, err := s.cfg.FetchLastChannelUpdate(scid)
5✔
2775
        if err != nil {
5✔
2776
                return nil
×
2777
        }
×
2778

2779
        // The incoming case will replace the ShortChannelID in the retrieved
2780
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2781
        // would happen if a private non-zero-conf option-scid-alias
2782
        // feature-bit channel leaked its UTXO here rather than supplying an
2783
        // alias. In the outgoing case, the confirmed SCID was actually used
2784
        // for forwarding in the onion, so no replacement is necessary as the
2785
        // sender knows the scid.
2786
        if incoming {
7✔
2787
                // We will replace and sign the update with the first alias.
2✔
2788
                // Since this happens on the incoming side, it's not actually
2✔
2789
                // possible to know what the sender used in the onion.
2✔
2790
                update.ShortChannelID = aliases[0]
2✔
2791
                sig, err := s.cfg.SignAliasUpdate(update)
2✔
2792
                if err != nil {
2✔
2793
                        return nil
×
2794
                }
×
2795

2796
                update.Signature, err = lnwire.NewSigFromSignature(sig)
2✔
2797
                if err != nil {
2✔
2798
                        return nil
×
2799
                }
×
2800
        }
2801

2802
        return update
5✔
2803
}
2804

2805
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
2806
// that a link has a new alias.
2807
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
2808
        alias lnwire.ShortChannelID) error {
×
2809

×
2810
        // Fetch the link so that we can update the underlying channel's set of
×
2811
        // aliases.
×
2812
        s.indexMtx.RLock()
×
2813
        link, err := s.getLink(chanID)
×
2814
        s.indexMtx.RUnlock()
×
2815
        if err != nil {
×
2816
                return err
×
2817
        }
×
2818

2819
        // If the link is a channel where the option-scid-alias feature bit was
2820
        // not negotiated, we'll return an error.
2821
        if !link.negotiatedAliasFeature() {
×
2822
                return fmt.Errorf("attempted to update non-alias channel")
×
2823
        }
×
2824

2825
        linkScid := link.ShortChanID()
×
2826

×
2827
        // We'll update the maps so the Switch includes this alias in its
×
2828
        // forwarding decisions.
×
2829
        if link.isZeroConf() {
×
2830
                if link.zeroConfConfirmed() {
×
2831
                        // If the channel has confirmed on-chain, we'll
×
2832
                        // add this alias to the aliasToReal map.
×
2833
                        confirmedScid := link.confirmedScid()
×
2834

×
2835
                        s.aliasToReal[alias] = confirmedScid
×
2836
                }
×
2837

2838
                // Add this alias to the baseIndex mapping.
2839
                s.baseIndex[alias] = linkScid
×
2840
        } else if link.negotiatedAliasFeature() {
×
2841
                // The channel is confirmed, so we'll populate the aliasToReal
×
2842
                // and baseIndex maps.
×
2843
                s.aliasToReal[alias] = linkScid
×
2844
                s.baseIndex[alias] = linkScid
×
2845
        }
×
2846

2847
        return nil
×
2848
}
2849

2850
// handlePacketAdd handles forwarding an Add packet.
2851
func (s *Switch) handlePacketAdd(packet *htlcPacket,
2852
        htlc *lnwire.UpdateAddHTLC) error {
85✔
2853

85✔
2854
        // Check if the node is set to reject all onward HTLCs and also make
85✔
2855
        // sure that HTLC is not from the source node.
85✔
2856
        if s.cfg.RejectHTLC {
89✔
2857
                failure := NewDetailedLinkError(
4✔
2858
                        &lnwire.FailChannelDisabled{},
4✔
2859
                        OutgoingFailureForwardsDisabled,
4✔
2860
                )
4✔
2861

4✔
2862
                return s.failAddPacket(packet, failure)
4✔
2863
        }
4✔
2864

2865
        // Before we attempt to find a non-strict forwarding path for this
2866
        // htlc, check whether the htlc is being routed over the same incoming
2867
        // and outgoing channel. If our node does not allow forwards of this
2868
        // nature, we fail the htlc early. This check is in place to disallow
2869
        // inefficiently routed htlcs from locking up our balance. With
2870
        // channels where the option-scid-alias feature was negotiated, we also
2871
        // have to be sure that the IDs aren't the same since one or both could
2872
        // be an alias.
2873
        linkErr := s.checkCircularForward(
84✔
2874
                packet.incomingChanID, packet.outgoingChanID,
84✔
2875
                s.cfg.AllowCircularRoute, htlc.PaymentHash,
84✔
2876
        )
84✔
2877
        if linkErr != nil {
85✔
2878
                return s.failAddPacket(packet, linkErr)
1✔
2879
        }
1✔
2880

2881
        s.indexMtx.RLock()
83✔
2882
        targetLink, err := s.getLinkByMapping(packet)
83✔
2883
        if err != nil {
90✔
2884
                s.indexMtx.RUnlock()
7✔
2885

7✔
2886
                log.Debugf("unable to find link with "+
7✔
2887
                        "destination %v", packet.outgoingChanID)
7✔
2888

7✔
2889
                // If packet was forwarded from another channel link than we
7✔
2890
                // should notify this link that some error occurred.
7✔
2891
                linkError := NewLinkError(
7✔
2892
                        &lnwire.FailUnknownNextPeer{},
7✔
2893
                )
7✔
2894

7✔
2895
                return s.failAddPacket(packet, linkError)
7✔
2896
        }
7✔
2897
        targetPeerKey := targetLink.PeerPubKey()
79✔
2898
        interfaceLinks, _ := s.getLinks(targetPeerKey)
79✔
2899
        s.indexMtx.RUnlock()
79✔
2900

79✔
2901
        // We'll keep track of any HTLC failures during the link selection
79✔
2902
        // process. This way we can return the error for precise link that the
79✔
2903
        // sender selected, while optimistically trying all links to utilize
79✔
2904
        // our available bandwidth.
79✔
2905
        linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
79✔
2906

79✔
2907
        // Find all destination channel links with appropriate bandwidth.
79✔
2908
        var destinations []ChannelLink
79✔
2909
        for _, link := range interfaceLinks {
162✔
2910
                var failure *LinkError
83✔
2911

83✔
2912
                // We'll skip any links that aren't yet eligible for
83✔
2913
                // forwarding.
83✔
2914
                if !link.EligibleToForward() {
87✔
2915
                        failure = NewDetailedLinkError(
4✔
2916
                                &lnwire.FailUnknownNextPeer{},
4✔
2917
                                OutgoingFailureLinkNotEligible,
4✔
2918
                        )
4✔
2919
                } else {
83✔
2920
                        // We'll ensure that the HTLC satisfies the current
79✔
2921
                        // forwarding conditions of this target link.
79✔
2922
                        currentHeight := atomic.LoadUint32(&s.bestHeight)
79✔
2923
                        failure = link.CheckHtlcForward(
79✔
2924
                                htlc.PaymentHash, packet.incomingAmount,
79✔
2925
                                packet.amount, packet.incomingTimeout,
79✔
2926
                                packet.outgoingTimeout, packet.inboundFee,
79✔
2927
                                currentHeight, packet.originalOutgoingChanID,
79✔
2928
                                htlc.CustomRecords,
79✔
2929
                        )
79✔
2930
                }
79✔
2931

2932
                // If this link can forward the htlc, add it to the set of
2933
                // destinations.
2934
                if failure == nil {
146✔
2935
                        destinations = append(destinations, link)
63✔
2936
                        continue
63✔
2937
                }
2938

2939
                linkErrs[link.ShortChanID()] = failure
23✔
2940
        }
2941

2942
        // If we had a forwarding failure due to the HTLC not satisfying the
2943
        // current policy, then we'll send back an error, but ensure we send
2944
        // back the error sourced at the *target* link.
2945
        if len(destinations) == 0 {
98✔
2946
                // At this point, some or all of the links rejected the HTLC so
19✔
2947
                // we couldn't forward it. So we'll try to look up the error
19✔
2948
                // that came from the source.
19✔
2949
                linkErr, ok := linkErrs[packet.outgoingChanID]
19✔
2950
                if !ok {
19✔
2951
                        // If we can't find the error of the source, then we'll
×
2952
                        // return an unknown next peer, though this should
×
2953
                        // never happen.
×
2954
                        linkErr = NewLinkError(
×
2955
                                &lnwire.FailUnknownNextPeer{},
×
2956
                        )
×
2957
                        log.Warnf("unable to find err source for "+
×
2958
                                "outgoing_link=%v, errors=%v",
×
2959
                                packet.outgoingChanID,
×
2960
                                lnutils.SpewLogClosure(linkErrs))
×
2961
                }
×
2962

2963
                log.Tracef("incoming HTLC(%x) violated "+
19✔
2964
                        "target outgoing link (id=%v) policy: %v",
19✔
2965
                        htlc.PaymentHash[:], packet.outgoingChanID,
19✔
2966
                        linkErr)
19✔
2967

19✔
2968
                return s.failAddPacket(packet, linkErr)
19✔
2969
        }
2970

2971
        // Choose a random link out of the set of links that can forward this
2972
        // htlc. The reason for randomization is to evenly distribute the htlc
2973
        // load without making assumptions about what the best channel is.
2974
        //nolint:gosec
2975
        destination := destinations[rand.Intn(len(destinations))]
63✔
2976

63✔
2977
        // Retrieve the incoming link by its ShortChannelID. Note that the
63✔
2978
        // incomingChanID is never set to hop.Source here.
63✔
2979
        s.indexMtx.RLock()
63✔
2980
        incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
63✔
2981
        s.indexMtx.RUnlock()
63✔
2982
        if err != nil {
63✔
2983
                // If we couldn't find the incoming link, we can't evaluate the
×
2984
                // incoming's exposure to dust, so we just fail the HTLC back.
×
2985
                linkErr := NewLinkError(
×
2986
                        &lnwire.FailTemporaryChannelFailure{},
×
2987
                )
×
2988

×
2989
                return s.failAddPacket(packet, linkErr)
×
2990
        }
×
2991

2992
        // Evaluate whether this HTLC would increase our fee exposure over the
2993
        // threshold on the incoming link. If it does, fail it backwards.
2994
        if s.dustExceedsFeeThreshold(
63✔
2995
                incomingLink, packet.incomingAmount, true,
63✔
2996
        ) {
63✔
2997
                // The incoming dust exceeds the threshold, so we fail the add
×
2998
                // back.
×
2999
                linkErr := NewLinkError(
×
3000
                        &lnwire.FailTemporaryChannelFailure{},
×
3001
                )
×
3002

×
3003
                return s.failAddPacket(packet, linkErr)
×
3004
        }
×
3005

3006
        // Also evaluate whether this HTLC would increase our fee exposure over
3007
        // the threshold on the destination link. If it does, fail it back.
3008
        if s.dustExceedsFeeThreshold(
63✔
3009
                destination, packet.amount, false,
63✔
3010
        ) {
64✔
3011
                // The outgoing dust exceeds the threshold, so we fail the add
1✔
3012
                // back.
1✔
3013
                linkErr := NewLinkError(
1✔
3014
                        &lnwire.FailTemporaryChannelFailure{},
1✔
3015
                )
1✔
3016

1✔
3017
                return s.failAddPacket(packet, linkErr)
1✔
3018
        }
1✔
3019

3020
        // Send the packet to the destination channel link which manages the
3021
        // channel.
3022
        packet.outgoingChanID = destination.ShortChanID()
62✔
3023

62✔
3024
        return destination.handleSwitchPacket(packet)
62✔
3025
}
3026

3027
// handlePacketSettle handles forwarding a settle packet.
3028
func (s *Switch) handlePacketSettle(packet *htlcPacket) error {
402✔
3029
        // If the source of this packet has not been set, use the circuit map
402✔
3030
        // to lookup the origin.
402✔
3031
        circuit, err := s.closeCircuit(packet)
402✔
3032

402✔
3033
        // If the circuit is in the process of closing, we will return a nil as
402✔
3034
        // there's another packet handling undergoing.
402✔
3035
        if errors.Is(err, ErrCircuitClosing) {
405✔
3036
                log.Debugf("Circuit is closing for packet=%v", packet)
3✔
3037
                return nil
3✔
3038
        }
3✔
3039

3040
        // Exit early if there's another error.
3041
        if err != nil {
402✔
3042
                return err
×
3043
        }
×
3044

3045
        // closeCircuit returns a nil circuit when a settle packet returns an
3046
        // ErrUnknownCircuit error upon the inner call to CloseCircuit.
3047
        //
3048
        // NOTE: We can only get a nil circuit when it has already been deleted
3049
        // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck`
3050
        // is received, which invokes `processRemoteSettleFails` in its link.
3051
        if circuit == nil {
595✔
3052
                log.Debugf("Circuit already closed for packet=%v", packet)
193✔
3053
                return nil
193✔
3054
        }
193✔
3055

3056
        localHTLC := packet.incomingChanID == hop.Source
212✔
3057

212✔
3058
        // If this is a locally initiated HTLC, we need to handle the packet by
212✔
3059
        // storing the network result.
212✔
3060
        //
212✔
3061
        // A blank IncomingChanID in a circuit indicates that it is a pending
212✔
3062
        // user-initiated payment.
212✔
3063
        //
212✔
3064
        // NOTE: `closeCircuit` modifies the state of `packet`.
212✔
3065
        if localHTLC {
395✔
3066
                // TODO(yy): remove the goroutine and send back the error here.
183✔
3067
                s.wg.Add(1)
183✔
3068
                go s.handleLocalResponse(packet)
183✔
3069

183✔
3070
                // If this is a locally initiated HTLC, there's no need to
183✔
3071
                // forward it so we exit.
183✔
3072
                return nil
183✔
3073
        }
183✔
3074

3075
        // If this is an HTLC settle, and it wasn't from a locally initiated
3076
        // HTLC, then we'll log a forwarding event so we can flush it to disk
3077
        // later.
3078
        if circuit.Outgoing != nil {
64✔
3079
                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
32✔
3080
                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
32✔
3081
                        circuit.PaymentHash[:], circuit.OutgoingAmount,
32✔
3082
                        circuit.IncomingAmount-circuit.OutgoingAmount,
32✔
3083
                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
32✔
3084

32✔
3085
                s.fwdEventMtx.Lock()
32✔
3086
                s.pendingFwdingEvents = append(
32✔
3087
                        s.pendingFwdingEvents,
32✔
3088
                        channeldb.ForwardingEvent{
32✔
3089
                                Timestamp:      time.Now(),
32✔
3090
                                IncomingChanID: circuit.Incoming.ChanID,
32✔
3091
                                OutgoingChanID: circuit.Outgoing.ChanID,
32✔
3092
                                AmtIn:          circuit.IncomingAmount,
32✔
3093
                                AmtOut:         circuit.OutgoingAmount,
32✔
3094
                                IncomingHtlcID: fn.Some(
32✔
3095
                                        circuit.Incoming.HtlcID,
32✔
3096
                                ),
32✔
3097
                                OutgoingHtlcID: fn.Some(
32✔
3098
                                        circuit.Outgoing.HtlcID,
32✔
3099
                                ),
32✔
3100
                        },
32✔
3101
                )
32✔
3102
                s.fwdEventMtx.Unlock()
32✔
3103
        }
32✔
3104

3105
        // Deliver this packet.
3106
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
32✔
3107
}
3108

3109
// handlePacketFail handles forwarding a fail packet.
3110
func (s *Switch) handlePacketFail(packet *htlcPacket,
3111
        htlc *lnwire.UpdateFailHTLC) error {
141✔
3112

141✔
3113
        // If the source of this packet has not been set, use the circuit map
141✔
3114
        // to lookup the origin.
141✔
3115
        circuit, err := s.closeCircuit(packet)
141✔
3116
        if err != nil {
141✔
3117
                return err
×
3118
        }
×
3119

3120
        // If this is a locally initiated HTLC, we need to handle the packet by
3121
        // storing the network result.
3122
        //
3123
        // A blank IncomingChanID in a circuit indicates that it is a pending
3124
        // user-initiated payment.
3125
        //
3126
        // NOTE: `closeCircuit` modifies the state of `packet`.
3127
        if packet.incomingChanID == hop.Source {
268✔
3128
                // TODO(yy): remove the goroutine and send back the error here.
127✔
3129
                s.wg.Add(1)
127✔
3130
                go s.handleLocalResponse(packet)
127✔
3131

127✔
3132
                // If this is a locally initiated HTLC, there's no need to
127✔
3133
                // forward it so we exit.
127✔
3134
                return nil
127✔
3135
        }
127✔
3136

3137
        // Exit early if this hasSource is true. This flag is only set via
3138
        // mailbox's `FailAdd`. This method has two callsites,
3139
        // - the packet has timed out after `MailboxDeliveryTimeout`, defaults
3140
        //   to 1 min.
3141
        // - the HTLC fails the validation in `channel.AddHTLC`.
3142
        // In either case, the `Reason` field is populated. Thus there's no
3143
        // need to proceed and extract the failure reason below.
3144
        if packet.hasSource {
24✔
3145
                // Deliver this packet.
7✔
3146
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
7✔
3147
        }
7✔
3148

3149
        // HTLC resolutions and messages restored from disk don't have the
3150
        // obfuscator set from the original htlc add packet - set it here for
3151
        // use in blinded errors.
3152
        packet.obfuscator = circuit.ErrorEncrypter
10✔
3153

10✔
3154
        switch {
10✔
3155
        // No message to encrypt, locally sourced payment.
3156
        case circuit.ErrorEncrypter == nil:
×
3157
                // TODO(yy) further check this case as we shouldn't end up here
3158
                // as `isLocal` is already false.
3159

3160
        // If this is a resolution message, then we'll need to encrypt it as
3161
        // it's actually internally sourced.
3162
        case packet.isResolution:
3✔
3163
                var err error
3✔
3164
                // TODO(roasbeef): don't need to pass actually?
3✔
3165
                failure := &lnwire.FailPermanentChannelFailure{}
3✔
3166
                htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
3✔
3167
                        failure,
3✔
3168
                )
3✔
3169
                if err != nil {
3✔
3170
                        err = fmt.Errorf("unable to obfuscate error: %w", err)
×
3171
                        log.Error(err)
×
3172
                }
×
3173

3174
        // Alternatively, if the remote party sends us an
3175
        // UpdateFailMalformedHTLC, then we'll need to convert this into a
3176
        // proper well formatted onion error as there's no HMAC currently.
3177
        case packet.convertedError:
5✔
3178
                log.Infof("Converting malformed HTLC error for circuit for "+
5✔
3179
                        "Circuit(%x: (%s, %d) <-> (%s, %d))",
5✔
3180
                        packet.circuit.PaymentHash,
5✔
3181
                        packet.incomingChanID, packet.incomingHTLCID,
5✔
3182
                        packet.outgoingChanID, packet.outgoingHTLCID)
5✔
3183

5✔
3184
                htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
5✔
3185
                        htlc.Reason,
5✔
3186
                )
5✔
3187

3188
        default:
8✔
3189
                // Otherwise, it's a forwarded error, so we'll perform a
8✔
3190
                // wrapper encryption as normal.
8✔
3191
                htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
8✔
3192
                        htlc.Reason,
8✔
3193
                )
8✔
3194
        }
3195

3196
        // Deliver this packet.
3197
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
10✔
3198
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc