• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12231552240

09 Dec 2024 08:17AM UTC coverage: 58.955% (+0.02%) from 58.933%
12231552240

Pull #9242

github

aakselrod
go.mod: update btcwallet to latest to eliminate waddrmgr deadlock
Pull Request #9242: Reapply #8644

24 of 40 new or added lines in 3 files covered. (60.0%)

89 existing lines in 18 files now uncovered.

133525 of 226485 relevant lines covered (58.96%)

19398.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.42
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "math/rand"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
13
        "github.com/btcsuite/btcd/btcutil"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/davecgh/go-spew/spew"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/channeldb"
18
        "github.com/lightningnetwork/lnd/clock"
19
        "github.com/lightningnetwork/lnd/contractcourt"
20
        "github.com/lightningnetwork/lnd/fn"
21
        "github.com/lightningnetwork/lnd/graph/db/models"
22
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
23
        "github.com/lightningnetwork/lnd/kvdb"
24
        "github.com/lightningnetwork/lnd/lntypes"
25
        "github.com/lightningnetwork/lnd/lnutils"
26
        "github.com/lightningnetwork/lnd/lnwallet"
27
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
28
        "github.com/lightningnetwork/lnd/lnwire"
29
        "github.com/lightningnetwork/lnd/ticker"
30
)
31

32
const (
33
        // DefaultFwdEventInterval is the duration between attempts to flush
34
        // pending forwarding events to disk.
35
        DefaultFwdEventInterval = 15 * time.Second
36

37
        // DefaultLogInterval is the duration between attempts to log statistics
38
        // about forwarding events.
39
        DefaultLogInterval = 10 * time.Second
40

41
        // DefaultAckInterval is the duration between attempts to ack any settle
42
        // fails in a forwarding package.
43
        DefaultAckInterval = 15 * time.Second
44

45
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
46
        // be cancelled if they could not get added to an outgoing commitment.
47
        DefaultMailboxDeliveryTimeout = time.Minute
48
)
49

50
var (
51
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
52
        ErrChannelLinkNotFound = errors.New("channel link not found")
53

54
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
55
        // through the switch and is locked into another commitment txn.
56
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
57

58
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
59
        // error decryptor for this payment. This is likely due to restarting
60
        // the daemon.
61
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
62

63
        // ErrSwitchExiting signaled when the switch has received a shutdown
64
        // request.
65
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
66

67
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
68
        // active links in the switch for a specific destination.
69
        ErrNoLinksFound = errors.New("no channel links found")
70

71
        // ErrUnreadableFailureMessage is returned when the failure message
72
        // cannot be decrypted.
73
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
74

75
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
76
        // failed to be processed.
77
        ErrLocalAddFailed = errors.New("local add HTLC failed")
78

79
        // errFeeExposureExceeded is only surfaced to callers of SendHTLC and
80
        // signals that sending the HTLC would exceed the outgoing link's fee
81
        // exposure threshold.
82
        errFeeExposureExceeded = errors.New("fee exposure exceeded")
83

84
        // DefaultMaxFeeExposure is the default threshold after which we'll
85
        // fail payments if they increase our fee exposure. This is currently
86
        // set to 500m msats.
87
        DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000)
88
)
89

90
// plexPacket encapsulates switch packet and adds error channel to receive
91
// error from request handler.
92
type plexPacket struct {
93
        pkt *htlcPacket
94
        err chan error
95
}
96

97
// ChanClose represents a request which close a particular channel specified by
98
// its id.
99
type ChanClose struct {
100
        // CloseType is a variable which signals the type of channel closure the
101
        // peer should execute.
102
        CloseType contractcourt.ChannelCloseType
103

104
        // ChanPoint represent the id of the channel which should be closed.
105
        ChanPoint *wire.OutPoint
106

107
        // TargetFeePerKw is the ideal fee that was specified by the caller.
108
        // This value is only utilized if the closure type is CloseRegular.
109
        // This will be the starting offered fee when the fee negotiation
110
        // process for the cooperative closure transaction kicks off.
111
        TargetFeePerKw chainfee.SatPerKWeight
112

113
        // MaxFee is the highest fee the caller is willing to pay.
114
        //
115
        // NOTE: This field is only respected if the caller is the initiator of
116
        // the channel.
117
        MaxFee chainfee.SatPerKWeight
118

119
        // DeliveryScript is an optional delivery script to pay funds out to.
120
        DeliveryScript lnwire.DeliveryAddress
121

122
        // Updates is used by request creator to receive the notifications about
123
        // execution of the close channel request.
124
        Updates chan interface{}
125

126
        // Err is used by request creator to receive request execution error.
127
        Err chan error
128
}
129

130
// Config defines the configuration for the service. ALL elements within the
131
// configuration MUST be non-nil for the service to carry out its duties.
132
type Config struct {
133
        // FwdingLog is an interface that will be used by the switch to log
134
        // forwarding events. A forwarding event happens each time a payment
135
        // circuit is successfully completed. So when we forward an HTLC, and a
136
        // settle is eventually received.
137
        FwdingLog ForwardingLog
138

139
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
140
        // forced unilateral closure of the channel initiated by a local
141
        // subsystem.
142
        LocalChannelClose func(pubKey []byte, request *ChanClose)
143

144
        // DB is the database backend that will be used to back the switch's
145
        // persistent circuit map.
146
        DB kvdb.Backend
147

148
        // FetchAllOpenChannels is a function that fetches all currently open
149
        // channels from the channel database.
150
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
151

152
        // FetchAllChannels is a function that fetches all pending open, open,
153
        // and waiting close channels from the database.
154
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
155

156
        // FetchClosedChannels is a function that fetches all closed channels
157
        // from the channel database.
158
        FetchClosedChannels func(
159
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
160

161
        // SwitchPackager provides access to the forwarding packages of all
162
        // active channels. This gives the switch the ability to read arbitrary
163
        // forwarding packages, and ack settles and fails contained within them.
164
        SwitchPackager channeldb.FwdOperator
165

166
        // ExtractErrorEncrypter is an interface allowing switch to reextract
167
        // error encrypters stored in the circuit map on restarts, since they
168
        // are not stored directly within the database.
169
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
170

171
        // FetchLastChannelUpdate retrieves the latest routing policy for a
172
        // target channel. This channel will typically be the outgoing channel
173
        // specified when we receive an incoming HTLC.  This will be used to
174
        // provide payment senders our latest policy when sending encrypted
175
        // error messages.
176
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
177
                *lnwire.ChannelUpdate1, error)
178

179
        // Notifier is an instance of a chain notifier that we'll use to signal
180
        // the switch when a new block has arrived.
181
        Notifier chainntnfs.ChainNotifier
182

183
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
184
        // events through.
185
        HtlcNotifier htlcNotifier
186

187
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
188
        // pending forwarding events.
189
        FwdEventTicker ticker.Ticker
190

191
        // LogEventTicker is a signal instructing the htlcswitch to log
192
        // aggregate stats about it's forwarding during the last interval.
193
        LogEventTicker ticker.Ticker
194

195
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
196
        // fails in forwarding packages.
197
        AckEventTicker ticker.Ticker
198

199
        // AllowCircularRoute is true if the user has configured their node to
200
        // allow forwards that arrive and depart our node over the same channel.
201
        AllowCircularRoute bool
202

203
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
204
        // HTLCs that are not from the source hop.
205
        RejectHTLC bool
206

207
        // Clock is a time source for the switch.
208
        Clock clock.Clock
209

210
        // MailboxDeliveryTimeout is the interval after which Adds will be
211
        // cancelled if they have not been yet been delivered to a link. The
212
        // computed deadline will expiry this long after the Adds are added to
213
        // a mailbox via AddPacket.
214
        MailboxDeliveryTimeout time.Duration
215

216
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
217
        // fail incoming or outgoing payments for a particular channel.
218
        MaxFeeExposure lnwire.MilliSatoshi
219

220
        // SignAliasUpdate is used when sending FailureMessages backwards for
221
        // option_scid_alias channels. This avoids a potential privacy leak by
222
        // replacing the public, confirmed SCID with the alias in the
223
        // ChannelUpdate.
224
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
225
                error)
226

227
        // IsAlias returns whether or not a given SCID is an alias.
228
        IsAlias func(scid lnwire.ShortChannelID) bool
229
}
230

231
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
232
// Connected peers with active channels are treated as named interfaces which
233
// refer to active channels as links. A link is the switch's message
234
// communication point with the goroutine that manages an active channel. New
235
// links are registered each time a channel is created, and unregistered once
236
// the channel is closed. The switch manages the hand-off process for multi-hop
237
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
238
// notifies users local-systems concerning their outstanding payment requests.
239
type Switch struct {
240
        started  int32 // To be used atomically.
241
        shutdown int32 // To be used atomically.
242

243
        // bestHeight is the best known height of the main chain. The links will
244
        // be used this information to govern decisions based on HTLC timeouts.
245
        // This will be retrieved by the registered links atomically.
246
        bestHeight uint32
247

248
        wg   sync.WaitGroup
249
        quit chan struct{}
250

251
        // cfg is a copy of the configuration struct that the htlc switch
252
        // service was initialized with.
253
        cfg *Config
254

255
        // networkResults stores the results of payments initiated by the user.
256
        // The store is used to later look up the payments and notify the
257
        // user of the result when they are complete. Each payment attempt
258
        // should be given a unique integer ID when it is created, otherwise
259
        // results might be overwritten.
260
        networkResults *networkResultStore
261

262
        // circuits is storage for payment circuits which are used to
263
        // forward the settle/fail htlc updates back to the add htlc initiator.
264
        circuits CircuitMap
265

266
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
267
        // the switch, and facilitates delayed delivery of packets to links that
268
        // later come online.
269
        mailOrchestrator *mailOrchestrator
270

271
        // indexMtx is a read/write mutex that protects the set of indexes
272
        // below.
273
        indexMtx sync.RWMutex
274

275
        // pendingLinkIndex holds links that have not had their final, live
276
        // short_chan_id assigned.
277
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
278

279
        // links is a map of channel id and channel link which manages
280
        // this channel.
281
        linkIndex map[lnwire.ChannelID]ChannelLink
282

283
        // forwardingIndex is an index which is consulted by the switch when it
284
        // needs to locate the next hop to forward an incoming/outgoing HTLC
285
        // update to/from.
286
        //
287
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
288
        // ChannelLink
289
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
290

291
        // interfaceIndex maps the compressed public key of a peer to all the
292
        // channels that the switch maintains with that peer.
293
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
294

295
        // linkStopIndex stores the currently stopping ChannelLinks,
296
        // represented by their ChannelID. The key is the link's ChannelID and
297
        // the value is a chan that is closed when the link has fully stopped.
298
        // This map is only added to if RemoveLink is called and is not added
299
        // to when the Switch is shutting down and calls Stop() on each link.
300
        //
301
        // MUST be used with the indexMtx.
302
        linkStopIndex map[lnwire.ChannelID]chan struct{}
303

304
        // htlcPlex is the channel which all connected links use to coordinate
305
        // the setup/teardown of Sphinx (onion routing) payment circuits.
306
        // Active links forward any add/settle messages over this channel each
307
        // state transition, sending new adds/settles which are fully locked
308
        // in.
309
        htlcPlex chan *plexPacket
310

311
        // chanCloseRequests is used to transfer the channel close request to
312
        // the channel close handler.
313
        chanCloseRequests chan *ChanClose
314

315
        // resolutionMsgs is the channel that all external contract resolution
316
        // messages will be sent over.
317
        resolutionMsgs chan *resolutionMsg
318

319
        // pendingFwdingEvents is the set of forwarding events which have been
320
        // collected during the current interval, but hasn't yet been written
321
        // to the forwarding log.
322
        fwdEventMtx         sync.Mutex
323
        pendingFwdingEvents []channeldb.ForwardingEvent
324

325
        // blockEpochStream is an active block epoch event stream backed by an
326
        // active ChainNotifier instance. This will be used to retrieve the
327
        // latest height of the chain.
328
        blockEpochStream *chainntnfs.BlockEpochEvent
329

330
        // pendingSettleFails is the set of settle/fail entries that we need to
331
        // ack in the forwarding package of the outgoing link. This was added to
332
        // make pipelining settles more efficient.
333
        pendingSettleFails []channeldb.SettleFailRef
334

335
        // resMsgStore is used to store the set of ResolutionMsg that come from
336
        // contractcourt. This is used so the Switch can properly forward them,
337
        // even on restarts.
338
        resMsgStore *resolutionStore
339

340
        // aliasToReal is a map used for option-scid-alias feature-bit links.
341
        // The alias SCID is the key and the real, confirmed SCID is the value.
342
        // If the channel is unconfirmed, there will not be a mapping for it.
343
        // Since channels can have multiple aliases, this map is essentially a
344
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
345
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
346

347
        // baseIndex is a map used for option-scid-alias feature-bit links.
348
        // The value is the SCID of the link's ShortChannelID. This value may
349
        // be an alias for zero-conf channels or a confirmed SCID for
350
        // non-zero-conf channels with the option-scid-alias feature-bit. The
351
        // key includes the value itself and also any other aliases. This MUST
352
        // be accessed with the indexMtx.
353
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
354
}
355

356
// New creates the new instance of htlc switch.
357
func New(cfg Config, currentHeight uint32) (*Switch, error) {
345✔
358
        resStore := newResolutionStore(cfg.DB)
345✔
359

345✔
360
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
345✔
361
                DB:                    cfg.DB,
345✔
362
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
345✔
363
                FetchClosedChannels:   cfg.FetchClosedChannels,
345✔
364
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
345✔
365
                CheckResolutionMsg:    resStore.checkResolutionMsg,
345✔
366
        })
345✔
367
        if err != nil {
345✔
368
                return nil, err
×
369
        }
×
370

371
        s := &Switch{
345✔
372
                bestHeight:        currentHeight,
345✔
373
                cfg:               &cfg,
345✔
374
                circuits:          circuitMap,
345✔
375
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
345✔
376
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
345✔
377
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
345✔
378
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
345✔
379
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
345✔
380
                networkResults:    newNetworkResultStore(cfg.DB),
345✔
381
                htlcPlex:          make(chan *plexPacket),
345✔
382
                chanCloseRequests: make(chan *ChanClose),
345✔
383
                resolutionMsgs:    make(chan *resolutionMsg),
345✔
384
                resMsgStore:       resStore,
345✔
385
                quit:              make(chan struct{}),
345✔
386
        }
345✔
387

345✔
388
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
345✔
389
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
345✔
390

345✔
391
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
345✔
392
                forwardPackets:    s.ForwardPackets,
345✔
393
                clock:             s.cfg.Clock,
345✔
394
                expiry:            s.cfg.MailboxDeliveryTimeout,
345✔
395
                failMailboxUpdate: s.failMailboxUpdate,
345✔
396
        })
345✔
397

345✔
398
        return s, nil
345✔
399
}
400

401
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
402
// channel. We'll use this channel to synchronize delivery of the message with
403
// the caller.
404
type resolutionMsg struct {
405
        contractcourt.ResolutionMsg
406

407
        errChan chan error
408
}
409

410
// ProcessContractResolution is called by active contract resolvers once a
411
// contract they are watching over has been fully resolved. The message carries
412
// an external signal that *would* have been sent if the outgoing channel
413
// didn't need to go to the chain in order to fulfill a contract. We'll process
414
// this message just as if it came from an active outgoing channel.
415
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
5✔
416
        errChan := make(chan error, 1)
5✔
417

5✔
418
        select {
5✔
419
        case s.resolutionMsgs <- &resolutionMsg{
420
                ResolutionMsg: msg,
421
                errChan:       errChan,
422
        }:
5✔
423
        case <-s.quit:
1✔
424
                return ErrSwitchExiting
1✔
425
        }
426

427
        select {
5✔
428
        case err := <-errChan:
5✔
429
                return err
5✔
430
        case <-s.quit:
×
431
                return ErrSwitchExiting
×
432
        }
433
}
434

435
// HasAttemptResult reads the network result store to fetch the specified
436
// attempt. Returns true if the attempt result exists.
437
func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) {
4✔
438
        _, err := s.networkResults.getResult(attemptID)
4✔
439
        if err == nil {
4✔
440
                return true, nil
×
441
        }
×
442

443
        if !errors.Is(err, ErrPaymentIDNotFound) {
4✔
444
                return false, err
×
445
        }
×
446

447
        return false, nil
4✔
448
}
449

450
// GetAttemptResult returns the result of the HTLC attempt with the given
451
// attemptID. The paymentHash should be set to the payment's overall hash, or
452
// in case of AMP payments the payment's unique identifier.
453
//
454
// The method returns a channel where the HTLC attempt result will be sent when
455
// available, or an error is encountered during forwarding. When a result is
456
// received on the channel, the HTLC is guaranteed to no longer be in flight.
457
// The switch shutting down is signaled by closing the channel. If the
458
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
459
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
460
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
311✔
461

311✔
462
        var (
311✔
463
                nChan <-chan *networkResult
311✔
464
                err   error
311✔
465
                inKey = CircuitKey{
311✔
466
                        ChanID: hop.Source,
311✔
467
                        HtlcID: attemptID,
311✔
468
                }
311✔
469
        )
311✔
470

311✔
471
        // If the HTLC is not found in the circuit map, check whether a result
311✔
472
        // is already available.
311✔
473
        // Assumption: no one will add this attempt ID other than the caller.
311✔
474
        if s.circuits.LookupCircuit(inKey) == nil {
320✔
475
                res, err := s.networkResults.getResult(attemptID)
9✔
476
                if err != nil {
11✔
477
                        return nil, err
2✔
478
                }
2✔
479
                c := make(chan *networkResult, 1)
7✔
480
                c <- res
7✔
481
                nChan = c
7✔
482
        } else {
306✔
483
                // The HTLC was committed to the circuits, subscribe for a
306✔
484
                // result.
306✔
485
                nChan, err = s.networkResults.subscribeResult(attemptID)
306✔
486
                if err != nil {
306✔
487
                        return nil, err
×
488
                }
×
489
        }
490

491
        resultChan := make(chan *PaymentResult, 1)
309✔
492

309✔
493
        // Since the attempt was known, we can start a goroutine that can
309✔
494
        // extract the result when it is available, and pass it on to the
309✔
495
        // caller.
309✔
496
        s.wg.Add(1)
309✔
497
        go func() {
618✔
498
                defer s.wg.Done()
309✔
499

309✔
500
                var n *networkResult
309✔
501
                select {
309✔
502
                case n = <-nChan:
305✔
503
                case <-s.quit:
8✔
504
                        // We close the result channel to signal a shutdown. We
8✔
505
                        // don't send any result in this case since the HTLC is
8✔
506
                        // still in flight.
8✔
507
                        close(resultChan)
8✔
508
                        return
8✔
509
                }
510

511
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
305✔
512
                        attemptID)
305✔
513

305✔
514
                // Extract the result and pass it to the result channel.
305✔
515
                result, err := s.extractResult(
305✔
516
                        deobfuscator, n, attemptID, paymentHash,
305✔
517
                )
305✔
518
                if err != nil {
305✔
519
                        e := fmt.Errorf("unable to extract result: %w", err)
×
520
                        log.Error(e)
×
521
                        resultChan <- &PaymentResult{
×
522
                                Error: e,
×
523
                        }
×
524
                        return
×
525
                }
×
526
                resultChan <- result
305✔
527
        }()
528

529
        return resultChan, nil
309✔
530
}
531

532
// CleanStore calls the underlying result store, telling it is safe to delete
533
// all entries except the ones in the keepPids map. This should be called
534
// preiodically to let the switch clean up payment results that we have
535
// handled.
536
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
4✔
537
        return s.networkResults.cleanStore(keepPids)
4✔
538
}
4✔
539

540
// SendHTLC is used by other subsystems which aren't belong to htlc switch
541
// package in order to send the htlc update. The attemptID used MUST be unique
542
// for this HTLC, and MUST be used only once, otherwise the switch might reject
543
// it.
544
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
545
        htlc *lnwire.UpdateAddHTLC) error {
420✔
546

420✔
547
        // Generate and send new update packet, if error will be received on
420✔
548
        // this stage it means that packet haven't left boundaries of our
420✔
549
        // system and something wrong happened.
420✔
550
        packet := &htlcPacket{
420✔
551
                incomingChanID: hop.Source,
420✔
552
                incomingHTLCID: attemptID,
420✔
553
                outgoingChanID: firstHop,
420✔
554
                htlc:           htlc,
420✔
555
                amount:         htlc.Amount,
420✔
556
        }
420✔
557

420✔
558
        // Attempt to fetch the target link before creating a circuit so that
420✔
559
        // we don't leave dangling circuits. The getLocalLink method does not
420✔
560
        // require the circuit variable to be set on the *htlcPacket.
420✔
561
        link, linkErr := s.getLocalLink(packet, htlc)
420✔
562
        if linkErr != nil {
428✔
563
                // Notify the htlc notifier of a link failure on our outgoing
8✔
564
                // link. Incoming timelock/amount values are not set because
8✔
565
                // they are not present for local sends.
8✔
566
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
8✔
567
                        newHtlcKey(packet),
8✔
568
                        HtlcInfo{
8✔
569
                                OutgoingTimeLock: htlc.Expiry,
8✔
570
                                OutgoingAmt:      htlc.Amount,
8✔
571
                        },
8✔
572
                        HtlcEventTypeSend,
8✔
573
                        linkErr,
8✔
574
                        false,
8✔
575
                )
8✔
576

8✔
577
                return linkErr
8✔
578
        }
8✔
579

580
        // Evaluate whether this HTLC would bypass our fee exposure. If it
581
        // does, don't send it out and instead return an error.
582
        if s.dustExceedsFeeThreshold(link, htlc.Amount, false) {
417✔
583
                // Notify the htlc notifier of a link failure on our outgoing
1✔
584
                // link. We use the FailTemporaryChannelFailure in place of a
1✔
585
                // more descriptive error message.
1✔
586
                linkErr := NewLinkError(
1✔
587
                        &lnwire.FailTemporaryChannelFailure{},
1✔
588
                )
1✔
589
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
1✔
590
                        newHtlcKey(packet),
1✔
591
                        HtlcInfo{
1✔
592
                                OutgoingTimeLock: htlc.Expiry,
1✔
593
                                OutgoingAmt:      htlc.Amount,
1✔
594
                        },
1✔
595
                        HtlcEventTypeSend,
1✔
596
                        linkErr,
1✔
597
                        false,
1✔
598
                )
1✔
599

1✔
600
                return errFeeExposureExceeded
1✔
601
        }
1✔
602

603
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
415✔
604
        actions, err := s.circuits.CommitCircuits(circuit)
415✔
605
        if err != nil {
415✔
606
                log.Errorf("unable to commit circuit in switch: %v", err)
×
607
                return err
×
608
        }
×
609

610
        // Drop duplicate packet if it has already been seen.
611
        switch {
415✔
612
        case len(actions.Drops) == 1:
1✔
613
                return ErrDuplicateAdd
1✔
614

615
        case len(actions.Fails) == 1:
×
616
                return ErrLocalAddFailed
×
617
        }
618

619
        // Give the packet to the link's mailbox so that HTLC's are properly
620
        // canceled back if the mailbox timeout elapses.
621
        packet.circuit = circuit
414✔
622

414✔
623
        return link.handleSwitchPacket(packet)
414✔
624
}
625

626
// UpdateForwardingPolicies sends a message to the switch to update the
627
// forwarding policies for the set of target channels, keyed in chanPolicies.
628
//
629
// NOTE: This function is synchronous and will block until either the
630
// forwarding policies for all links have been updated, or the switch shuts
631
// down.
632
func (s *Switch) UpdateForwardingPolicies(
633
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
4✔
634

4✔
635
        log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure(
4✔
636
                chanPolicies))
4✔
637

4✔
638
        s.indexMtx.RLock()
4✔
639

4✔
640
        // Update each link in chanPolicies.
4✔
641
        for targetLink, policy := range chanPolicies {
8✔
642
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
4✔
643

4✔
644
                link, ok := s.linkIndex[cid]
4✔
645
                if !ok {
4✔
646
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
647
                                "link policy", targetLink)
×
648
                        continue
×
649
                }
650

651
                link.UpdateForwardingPolicy(policy)
4✔
652
        }
653

654
        s.indexMtx.RUnlock()
4✔
655
}
656

657
// IsForwardedHTLC checks for a given channel and htlc index if it is related
658
// to an opened circuit that represents a forwarded payment.
659
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
660
        htlcIndex uint64) bool {
6✔
661

6✔
662
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
6✔
663
                ChanID: chanID,
6✔
664
                HtlcID: htlcIndex,
6✔
665
        })
6✔
666
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
6✔
667
}
6✔
668

669
// ForwardPackets adds a list of packets to the switch for processing. Fails
670
// and settles are added on a first past, simultaneously constructing circuits
671
// for any adds. After persisting the circuits, another pass of the adds is
672
// given to forward them through the router. The sending link's quit channel is
673
// used to prevent deadlocks when the switch stops a link in the midst of
674
// forwarding.
675
func (s *Switch) ForwardPackets(linkQuit <-chan struct{},
676
        packets ...*htlcPacket) error {
873✔
677

873✔
678
        var (
873✔
679
                // fwdChan is a buffered channel used to receive err msgs from
873✔
680
                // the htlcPlex when forwarding this batch.
873✔
681
                fwdChan = make(chan error, len(packets))
873✔
682

873✔
683
                // numSent keeps a running count of how many packets are
873✔
684
                // forwarded to the switch, which determines how many responses
873✔
685
                // we will wait for on the fwdChan..
873✔
686
                numSent int
873✔
687
        )
873✔
688

873✔
689
        // No packets, nothing to do.
873✔
690
        if len(packets) == 0 {
1,097✔
691
                return nil
224✔
692
        }
224✔
693

694
        // Setup a barrier to prevent the background tasks from processing
695
        // responses until this function returns to the user.
696
        var wg sync.WaitGroup
653✔
697
        wg.Add(1)
653✔
698
        defer wg.Done()
653✔
699

653✔
700
        // Before spawning the following goroutine to proxy our error responses,
653✔
701
        // check to see if we have already been issued a shutdown request. If
653✔
702
        // so, we exit early to avoid incrementing the switch's waitgroup while
653✔
703
        // it is already in the process of shutting down.
653✔
704
        select {
653✔
UNCOV
705
        case <-linkQuit:
×
UNCOV
706
                return nil
×
707
        case <-s.quit:
1✔
708
                return nil
1✔
709
        default:
652✔
710
                // Spawn a goroutine to log the errors returned from failed packets.
652✔
711
                s.wg.Add(1)
652✔
712
                go s.logFwdErrs(&numSent, &wg, fwdChan)
652✔
713
        }
714

715
        // Make a first pass over the packets, forwarding any settles or fails.
716
        // As adds are found, we create a circuit and append it to our set of
717
        // circuits to be written to disk.
718
        var circuits []*PaymentCircuit
652✔
719
        var addBatch []*htlcPacket
652✔
720
        for _, packet := range packets {
1,304✔
721
                switch htlc := packet.htlc.(type) {
652✔
722
                case *lnwire.UpdateAddHTLC:
91✔
723
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
91✔
724
                        packet.circuit = circuit
91✔
725
                        circuits = append(circuits, circuit)
91✔
726
                        addBatch = append(addBatch, packet)
91✔
727
                default:
565✔
728
                        err := s.routeAsync(packet, fwdChan, linkQuit)
565✔
729
                        if err != nil {
575✔
730
                                return fmt.Errorf("failed to forward packet %w",
10✔
731
                                        err)
10✔
732
                        }
10✔
733
                        numSent++
540✔
734
                }
735
        }
736

737
        // If this batch did not contain any circuits to commit, we can return
738
        // early.
739
        if len(circuits) == 0 {
1,167✔
740
                return nil
540✔
741
        }
540✔
742

743
        // Write any circuits that we found to disk.
744
        actions, err := s.circuits.CommitCircuits(circuits...)
91✔
745
        if err != nil {
91✔
746
                log.Errorf("unable to commit circuits in switch: %v", err)
×
747
        }
×
748

749
        // Split the htlc packets by comparing an in-order seek to the head of
750
        // the added, dropped, or failed circuits.
751
        //
752
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
753
        // circuits, and that the union of the sets results in the original set
754
        // of circuits.
755
        var addedPackets, failedPackets []*htlcPacket
91✔
756
        for _, packet := range addBatch {
182✔
757
                switch {
91✔
758
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
87✔
759
                        addedPackets = append(addedPackets, packet)
87✔
760
                        actions.Adds = actions.Adds[1:]
87✔
761

762
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
5✔
763
                        actions.Drops = actions.Drops[1:]
5✔
764

765
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
3✔
766
                        failedPackets = append(failedPackets, packet)
3✔
767
                        actions.Fails = actions.Fails[1:]
3✔
768
                }
769
        }
770

771
        // Now, forward any packets for circuits that were successfully added to
772
        // the switch's circuit map.
773
        for _, packet := range addedPackets {
178✔
774
                err := s.routeAsync(packet, fwdChan, linkQuit)
87✔
775
                if err != nil {
88✔
776
                        return fmt.Errorf("failed to forward packet %w", err)
1✔
777
                }
1✔
778
                numSent++
86✔
779
        }
780

781
        // Lastly, for any packets that failed, this implies that they were
782
        // left in a half added state, which can happen when recovering from
783
        // failures.
784
        if len(failedPackets) > 0 {
93✔
785
                var failure lnwire.FailureMessage
3✔
786
                incomingID := failedPackets[0].incomingChanID
3✔
787

3✔
788
                // If the incoming channel is an option_scid_alias channel,
3✔
789
                // then we'll need to replace the SCID in the ChannelUpdate.
3✔
790
                update := s.failAliasUpdate(incomingID, true)
3✔
791
                if update == nil {
4✔
792
                        // Fallback to the original non-option behavior.
1✔
793
                        update, err := s.cfg.FetchLastChannelUpdate(
1✔
794
                                incomingID,
1✔
795
                        )
1✔
796
                        if err != nil {
1✔
797
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
798
                        } else {
1✔
799
                                failure = lnwire.NewTemporaryChannelFailure(
1✔
800
                                        update,
1✔
801
                                )
1✔
802
                        }
1✔
803
                } else {
2✔
804
                        // This is an option_scid_alias channel.
2✔
805
                        failure = lnwire.NewTemporaryChannelFailure(update)
2✔
806
                }
2✔
807

808
                linkError := NewDetailedLinkError(
3✔
809
                        failure, OutgoingFailureIncompleteForward,
3✔
810
                )
3✔
811

3✔
812
                for _, packet := range failedPackets {
6✔
813
                        // We don't handle the error here since this method
3✔
814
                        // always returns an error.
3✔
815
                        _ = s.failAddPacket(packet, linkError)
3✔
816
                }
3✔
817
        }
818

819
        return nil
90✔
820
}
821

822
// logFwdErrs logs any errors received on `fwdChan`.
823
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
652✔
824
        defer s.wg.Done()
652✔
825

652✔
826
        // Wait here until the outer function has finished persisting
652✔
827
        // and routing the packets. This guarantees we don't read from num until
652✔
828
        // the value is accurate.
652✔
829
        wg.Wait()
652✔
830

652✔
831
        numSent := *num
652✔
832
        for i := 0; i < numSent; i++ {
1,274✔
833
                select {
622✔
834
                case err := <-fwdChan:
621✔
835
                        if err != nil {
648✔
836
                                log.Errorf("Unhandled error while reforwarding htlc "+
27✔
837
                                        "settle/fail over htlcswitch: %v", err)
27✔
838
                        }
27✔
839
                case <-s.quit:
1✔
840
                        log.Errorf("unable to forward htlc packet " +
1✔
841
                                "htlc switch was stopped")
1✔
842
                        return
1✔
843
                }
844
        }
845
}
846

847
// routeAsync sends a packet through the htlc switch, using the provided err
848
// chan to propagate errors back to the caller. The link's quit channel is
849
// provided so that the send can be canceled if either the link or the switch
850
// receive a shutdown requuest. This method does not wait for a response from
851
// the htlcForwarder before returning.
852
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
853
        linkQuit <-chan struct{}) error {
648✔
854

648✔
855
        command := &plexPacket{
648✔
856
                pkt: packet,
648✔
857
                err: errChan,
648✔
858
        }
648✔
859

648✔
860
        select {
648✔
861
        case s.htlcPlex <- command:
622✔
862
                return nil
622✔
863
        case <-linkQuit:
11✔
864
                return ErrLinkShuttingDown
11✔
865
        case <-s.quit:
×
866
                return errors.New("htlc switch was stopped")
×
867
        }
868
}
869

870
// getLocalLink handles the addition of a htlc for a send that originates from
871
// our node. It returns the link that the htlc should be forwarded outwards on,
872
// and a link error if the htlc cannot be forwarded.
873
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
874
        ChannelLink, *LinkError) {
420✔
875

420✔
876
        // Try to find links by node destination.
420✔
877
        s.indexMtx.RLock()
420✔
878
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
420✔
879
        defer s.indexMtx.RUnlock()
420✔
880
        if err != nil {
425✔
881
                // If the link was not found for the outgoingChanID, an outside
5✔
882
                // subsystem may be using the confirmed SCID of a zero-conf
5✔
883
                // channel. In this case, we'll consult the Switch maps to see
5✔
884
                // if an alias exists and use the alias to lookup the link.
5✔
885
                // This extra step is a consequence of not updating the Switch
5✔
886
                // forwardingIndex when a zero-conf channel is confirmed. We
5✔
887
                // don't need to change the outgoingChanID since the link will
5✔
888
                // do that upon receiving the packet.
5✔
889
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
5✔
890
                if !ok {
9✔
891
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
4✔
892
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
4✔
893
                }
4✔
894

895
                // The base SCID was found, so we'll use that to fetch the
896
                // link.
897
                link, err = s.getLinkByShortID(baseScid)
5✔
898
                if err != nil {
5✔
899
                        log.Errorf("Link %v not found", baseScid)
×
900
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
901
                }
×
902
        }
903

904
        if !link.EligibleToForward() {
421✔
905
                log.Errorf("Link %v is not available to forward",
1✔
906
                        pkt.outgoingChanID)
1✔
907

1✔
908
                // The update does not need to be populated as the error
1✔
909
                // will be returned back to the router.
1✔
910
                return nil, NewDetailedLinkError(
1✔
911
                        lnwire.NewTemporaryChannelFailure(nil),
1✔
912
                        OutgoingFailureLinkNotEligible,
1✔
913
                )
1✔
914
        }
1✔
915

916
        // Ensure that the htlc satisfies the outgoing channel policy.
917
        currentHeight := atomic.LoadUint32(&s.bestHeight)
419✔
918
        htlcErr := link.CheckHtlcTransit(
419✔
919
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
419✔
920
                htlc.CustomRecords,
419✔
921
        )
419✔
922
        if htlcErr != nil {
423✔
923
                log.Errorf("Link %v policy for local forward not "+
4✔
924
                        "satisfied", pkt.outgoingChanID)
4✔
925
                return nil, htlcErr
4✔
926
        }
4✔
927
        return link, nil
416✔
928
}
929

930
// handleLocalResponse processes a Settle or Fail responding to a
931
// locally-initiated payment. This is handled asynchronously to avoid blocking
932
// the main event loop within the switch, as these operations can require
933
// multiple db transactions. The guarantees of the circuit map are stringent
934
// enough such that we are able to tolerate reordering of these operations
935
// without side effects. The primary operations handled are:
936
//  1. Save the payment result to the pending payment store.
937
//  2. Notify subscribers about the payment result.
938
//  3. Ack settle/fail references, to avoid resending this response internally
939
//  4. Teardown the closing circuit in the circuit map
940
//
941
// NOTE: This method MUST be spawned as a goroutine.
942
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
308✔
943
        defer s.wg.Done()
308✔
944

308✔
945
        attemptID := pkt.incomingHTLCID
308✔
946

308✔
947
        // The error reason will be unencypted in case this a local
308✔
948
        // failure or a converted error.
308✔
949
        unencrypted := pkt.localFailure || pkt.convertedError
308✔
950
        n := &networkResult{
308✔
951
                msg:          pkt.htlc,
308✔
952
                unencrypted:  unencrypted,
308✔
953
                isResolution: pkt.isResolution,
308✔
954
        }
308✔
955

308✔
956
        // Store the result to the db. This will also notify subscribers about
308✔
957
        // the result.
308✔
958
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
308✔
959
                log.Errorf("Unable to store attempt result for pid=%v: %v",
×
960
                        attemptID, err)
×
961
                return
×
962
        }
×
963

964
        // First, we'll clean up any fwdpkg references, circuit entries, and
965
        // mark in our db that the payment for this payment hash has either
966
        // succeeded or failed.
967
        //
968
        // If this response is contained in a forwarding package, we'll start by
969
        // acking the settle/fail so that we don't continue to retransmit the
970
        // HTLC internally.
971
        if pkt.destRef != nil {
430✔
972
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
122✔
973
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
974
                                *pkt.destRef, err)
×
975
                        return
×
976
                }
×
977
        }
978

979
        // Next, we'll remove the circuit since we are about to complete an
980
        // fulfill/fail of this HTLC. Since we've already removed the
981
        // settle/fail fwdpkg reference, the response from the peer cannot be
982
        // replayed internally if this step fails. If this happens, this logic
983
        // will be executed when a provided resolution message comes through.
984
        // This can only happen if the circuit is still open, which is why this
985
        // ordering is chosen.
986
        if err := s.teardownCircuit(pkt); err != nil {
308✔
987
                log.Errorf("Unable to teardown circuit %s: %v",
×
988
                        pkt.inKey(), err)
×
989
                return
×
990
        }
×
991

992
        // Finally, notify on the htlc failure or success that has been handled.
993
        key := newHtlcKey(pkt)
308✔
994
        eventType := getEventType(pkt)
308✔
995

308✔
996
        switch htlc := pkt.htlc.(type) {
308✔
997
        case *lnwire.UpdateFulfillHTLC:
184✔
998
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
184✔
999
                        eventType)
184✔
1000

1001
        case *lnwire.UpdateFailHTLC:
128✔
1002
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
128✔
1003
        }
1004
}
1005

1006
// extractResult uses the given deobfuscator to extract the payment result from
1007
// the given network message.
1008
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
1009
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
305✔
1010

305✔
1011
        switch htlc := n.msg.(type) {
305✔
1012

1013
        // We've received a settle update which means we can finalize the user
1014
        // payment and return successful response.
1015
        case *lnwire.UpdateFulfillHTLC:
181✔
1016
                return &PaymentResult{
181✔
1017
                        Preimage: htlc.PaymentPreimage,
181✔
1018
                }, nil
181✔
1019

1020
        // We've received a fail update which means we can finalize the
1021
        // user payment and return fail response.
1022
        case *lnwire.UpdateFailHTLC:
128✔
1023
                // TODO(yy): construct deobfuscator here to avoid creating it
128✔
1024
                // in paymentLifecycle even for settled HTLCs.
128✔
1025
                paymentErr := s.parseFailedPayment(
128✔
1026
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
128✔
1027
                        n.isResolution, htlc,
128✔
1028
                )
128✔
1029

128✔
1030
                return &PaymentResult{
128✔
1031
                        Error: paymentErr,
128✔
1032
                }, nil
128✔
1033

1034
        default:
×
1035
                return nil, fmt.Errorf("received unknown response type: %T",
×
1036
                        htlc)
×
1037
        }
1038
}
1039

1040
// parseFailedPayment determines the appropriate failure message to return to
1041
// a user initiated payment. The three cases handled are:
1042
//  1. An unencrypted failure, which should already plaintext.
1043
//  2. A resolution from the chain arbitrator, which possibly has no failure
1044
//     reason attached.
1045
//  3. A failure from the remote party, which will need to be decrypted using
1046
//     the payment deobfuscator.
1047
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1048
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1049
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
128✔
1050

128✔
1051
        switch {
128✔
1052

1053
        // The payment never cleared the link, so we don't need to
1054
        // decrypt the error, simply decode it them report back to the
1055
        // user.
1056
        case unencrypted:
9✔
1057
                r := bytes.NewReader(htlc.Reason)
9✔
1058
                failureMsg, err := lnwire.DecodeFailure(r, 0)
9✔
1059
                if err != nil {
9✔
1060
                        // If we could not decode the failure reason, return a link
×
1061
                        // error indicating that we failed to decode the onion.
×
1062
                        linkError := NewDetailedLinkError(
×
1063
                                // As this didn't even clear the link, we don't
×
1064
                                // need to apply an update here since it goes
×
1065
                                // directly to the router.
×
1066
                                lnwire.NewTemporaryChannelFailure(nil),
×
1067
                                OutgoingFailureDecodeError,
×
1068
                        )
×
1069

×
1070
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1071
                                linkError.FailureDetail.FailureString(),
×
1072
                                paymentHash, attemptID, err)
×
1073

×
1074
                        return linkError
×
1075
                }
×
1076

1077
                // If we successfully decoded the failure reason, return it.
1078
                return NewLinkError(failureMsg)
9✔
1079

1080
        // A payment had to be timed out on chain before it got past
1081
        // the first hop. In this case, we'll report a permanent
1082
        // channel failure as this means us, or the remote party had to
1083
        // go on chain.
1084
        case isResolution && htlc.Reason == nil:
4✔
1085
                linkError := NewDetailedLinkError(
4✔
1086
                        &lnwire.FailPermanentChannelFailure{},
4✔
1087
                        OutgoingFailureOnChainTimeout,
4✔
1088
                )
4✔
1089

4✔
1090
                log.Infof("%v: hash=%v, pid=%d",
4✔
1091
                        linkError.FailureDetail.FailureString(),
4✔
1092
                        paymentHash, attemptID)
4✔
1093

4✔
1094
                return linkError
4✔
1095

1096
        // A regular multi-hop payment error that we'll need to
1097
        // decrypt.
1098
        default:
123✔
1099
                // We'll attempt to fully decrypt the onion encrypted
123✔
1100
                // error. If we're unable to then we'll bail early.
123✔
1101
                failure, err := deobfuscator.DecryptError(htlc.Reason)
123✔
1102
                if err != nil {
124✔
1103
                        log.Errorf("unable to de-obfuscate onion failure "+
1✔
1104
                                "(hash=%v, pid=%d): %v",
1✔
1105
                                paymentHash, attemptID, err)
1✔
1106

1✔
1107
                        return ErrUnreadableFailureMessage
1✔
1108
                }
1✔
1109

1110
                return failure
122✔
1111
        }
1112
}
1113

1114
// handlePacketForward is used in cases when we need forward the htlc update
1115
// from one channel link to another and be able to propagate the settle/fail
1116
// updates back. This behaviour is achieved by creation of payment circuits.
1117
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
623✔
1118
        switch htlc := packet.htlc.(type) {
623✔
1119
        // Channel link forwarded us a new htlc, therefore we initiate the
1120
        // payment circuit within our internal state so we can properly forward
1121
        // the ultimate settle message back latter.
1122
        case *lnwire.UpdateAddHTLC:
86✔
1123
                return s.handlePacketAdd(packet, htlc)
86✔
1124

1125
        case *lnwire.UpdateFulfillHTLC:
403✔
1126
                return s.handlePacketSettle(packet)
403✔
1127

1128
        // Channel link forwarded us an update_fail_htlc message.
1129
        //
1130
        // NOTE: when the channel link receives an update_fail_malformed_htlc
1131
        // from upstream, it will convert the message into update_fail_htlc and
1132
        // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC`
1133
        // here.
1134
        case *lnwire.UpdateFailHTLC:
142✔
1135
                return s.handlePacketFail(packet, htlc)
142✔
1136

1137
        default:
×
1138
                return fmt.Errorf("wrong update type: %T", htlc)
×
1139
        }
1140
}
1141

1142
// checkCircularForward checks whether a forward is circular (arrives and
1143
// departs on the same link) and returns a link error if the switch is
1144
// configured to disallow this behaviour.
1145
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1146
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
94✔
1147

94✔
1148
        // If they are equal, we can skip the alias mapping checks.
94✔
1149
        if incoming == outgoing {
98✔
1150
                // The switch may be configured to allow circular routes, so
4✔
1151
                // just log and return nil.
4✔
1152
                if allowCircular {
6✔
1153
                        log.Debugf("allowing circular route over link: %v "+
2✔
1154
                                "(payment hash: %x)", incoming, paymentHash)
2✔
1155
                        return nil
2✔
1156
                }
2✔
1157

1158
                // Otherwise, we'll return a temporary channel failure.
1159
                return NewDetailedLinkError(
2✔
1160
                        lnwire.NewTemporaryChannelFailure(nil),
2✔
1161
                        OutgoingFailureCircularRoute,
2✔
1162
                )
2✔
1163
        }
1164

1165
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1166
        // outgoing SCIDs. If either one does not have a base SCID, then the
1167
        // two channels are not equal since one will be a channel that does not
1168
        // need a mapping and SCID equality was checked above. If the "base"
1169
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1170
        s.indexMtx.RLock()
90✔
1171
        incomingBaseScid, ok := s.baseIndex[incoming]
90✔
1172
        if !ok {
174✔
1173
                // This channel does not use baseIndex, bail out.
84✔
1174
                s.indexMtx.RUnlock()
84✔
1175
                return nil
84✔
1176
        }
84✔
1177

1178
        outgoingBaseScid, ok := s.baseIndex[outgoing]
10✔
1179
        if !ok {
16✔
1180
                // This channel does not use baseIndex, bail out.
6✔
1181
                s.indexMtx.RUnlock()
6✔
1182
                return nil
6✔
1183
        }
6✔
1184
        s.indexMtx.RUnlock()
8✔
1185

8✔
1186
        // Check base SCID equality.
8✔
1187
        if incomingBaseScid != outgoingBaseScid {
12✔
1188
                // The base SCIDs are not equal so these are not the same
4✔
1189
                // channel.
4✔
1190
                return nil
4✔
1191
        }
4✔
1192

1193
        // If the incoming and outgoing link are equal, the htlc is part of a
1194
        // circular route which may be used to lock up our liquidity. If the
1195
        // switch is configured to allow circular routes, log that we are
1196
        // allowing the route then return nil.
1197
        if allowCircular {
6✔
1198
                log.Debugf("allowing circular route over link: %v "+
2✔
1199
                        "(payment hash: %x)", incoming, paymentHash)
2✔
1200
                return nil
2✔
1201
        }
2✔
1202

1203
        // If our node disallows circular routes, return a temporary channel
1204
        // failure. There is nothing wrong with the policy used by the remote
1205
        // node, so we do not include a channel update.
1206
        return NewDetailedLinkError(
2✔
1207
                lnwire.NewTemporaryChannelFailure(nil),
2✔
1208
                OutgoingFailureCircularRoute,
2✔
1209
        )
2✔
1210
}
1211

1212
// failAddPacket encrypts a fail packet back to an add packet's source.
1213
// The ciphertext will be derived from the failure message proivded by context.
1214
// This method returns the failErr if all other steps complete successfully.
1215
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
30✔
1216
        // Encrypt the failure so that the sender will be able to read the error
30✔
1217
        // message. Since we failed this packet, we use EncryptFirstHop to
30✔
1218
        // obfuscate the failure for their eyes only.
30✔
1219
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
30✔
1220
        if err != nil {
30✔
1221
                err := fmt.Errorf("unable to obfuscate "+
×
1222
                        "error: %v", err)
×
1223
                log.Error(err)
×
1224
                return err
×
1225
        }
×
1226

1227
        log.Error(failure.Error())
30✔
1228

30✔
1229
        // Create a failure packet for this htlc. The full set of
30✔
1230
        // information about the htlc failure is included so that they can
30✔
1231
        // be included in link failure notifications.
30✔
1232
        failPkt := &htlcPacket{
30✔
1233
                sourceRef:       packet.sourceRef,
30✔
1234
                incomingChanID:  packet.incomingChanID,
30✔
1235
                incomingHTLCID:  packet.incomingHTLCID,
30✔
1236
                outgoingChanID:  packet.outgoingChanID,
30✔
1237
                outgoingHTLCID:  packet.outgoingHTLCID,
30✔
1238
                incomingAmount:  packet.incomingAmount,
30✔
1239
                amount:          packet.amount,
30✔
1240
                incomingTimeout: packet.incomingTimeout,
30✔
1241
                outgoingTimeout: packet.outgoingTimeout,
30✔
1242
                circuit:         packet.circuit,
30✔
1243
                obfuscator:      packet.obfuscator,
30✔
1244
                linkFailure:     failure,
30✔
1245
                htlc: &lnwire.UpdateFailHTLC{
30✔
1246
                        Reason: reason,
30✔
1247
                },
30✔
1248
        }
30✔
1249

30✔
1250
        // Route a fail packet back to the source link.
30✔
1251
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
30✔
1252
        if err != nil {
30✔
1253
                err = fmt.Errorf("source chanid=%v unable to "+
×
1254
                        "handle switch packet: %v",
×
1255
                        packet.incomingChanID, err)
×
1256
                log.Error(err)
×
1257
                return err
×
1258
        }
×
1259

1260
        return failure
30✔
1261
}
1262

1263
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1264
// attempts to determine the source that forwarded this htlc. This method will
1265
// set the incoming chan and htlc ID of the given packet if the source was
1266
// found, and will properly [re]encrypt any failure messages.
1267
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
541✔
1268
        // If the packet has its source, that means it was failed locally by
541✔
1269
        // the outgoing link. We fail it here to make sure only one response
541✔
1270
        // makes it through the switch.
541✔
1271
        if pkt.hasSource {
556✔
1272
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
15✔
1273
                switch err {
15✔
1274

1275
                // Circuit successfully closed.
1276
                case nil:
15✔
1277
                        return circuit, nil
15✔
1278

1279
                // Circuit was previously closed, but has not been deleted.
1280
                // We'll just drop this response until the circuit has been
1281
                // fully removed.
1282
                case ErrCircuitClosing:
×
1283
                        return nil, err
×
1284

1285
                // Failed to close circuit because it does not exist. This is
1286
                // likely because the circuit was already successfully closed.
1287
                // Since this packet failed locally, there is no forwarding
1288
                // package entry to acknowledge.
1289
                case ErrUnknownCircuit:
×
1290
                        return nil, err
×
1291

1292
                // Unexpected error.
1293
                default:
×
1294
                        return nil, err
×
1295
                }
1296
        }
1297

1298
        // Otherwise, this is packet was received from the remote party.  Use
1299
        // circuit map to find the incoming link to receive the settle/fail.
1300
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
530✔
1301
        switch err {
530✔
1302

1303
        // Open circuit successfully closed.
1304
        case nil:
340✔
1305
                pkt.incomingChanID = circuit.Incoming.ChanID
340✔
1306
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
340✔
1307
                pkt.circuit = circuit
340✔
1308
                pkt.sourceRef = &circuit.AddRef
340✔
1309

340✔
1310
                pktType := "SETTLE"
340✔
1311
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
471✔
1312
                        pktType = "FAIL"
131✔
1313
                }
131✔
1314

1315
                log.Debugf("Closed completed %s circuit for %x: "+
340✔
1316
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
340✔
1317
                        pkt.incomingChanID, pkt.incomingHTLCID,
340✔
1318
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
340✔
1319

340✔
1320
                return circuit, nil
340✔
1321

1322
        // Circuit was previously closed, but has not been deleted. We'll just
1323
        // drop this response until the circuit has been removed.
1324
        case ErrCircuitClosing:
4✔
1325
                return nil, err
4✔
1326

1327
        // Failed to close circuit because it does not exist. This is likely
1328
        // because the circuit was already successfully closed.
1329
        case ErrUnknownCircuit:
194✔
1330
                if pkt.destRef != nil {
387✔
1331
                        // Add this SettleFailRef to the set of pending settle/fail entries
193✔
1332
                        // awaiting acknowledgement.
193✔
1333
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
193✔
1334
                }
193✔
1335

1336
                // If this is a settle, we will not log an error message as settles
1337
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1338
                // can hit this case if the link restarts after having just sent a fail
1339
                // to the switch.
1340
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
194✔
1341
                if !isSettle {
198✔
1342
                        err := fmt.Errorf("unable to find target channel "+
4✔
1343
                                "for HTLC fail: channel ID = %s, "+
4✔
1344
                                "HTLC ID = %d", pkt.outgoingChanID,
4✔
1345
                                pkt.outgoingHTLCID)
4✔
1346
                        log.Error(err)
4✔
1347

4✔
1348
                        return nil, err
4✔
1349
                }
4✔
1350

1351
                return nil, nil
194✔
1352

1353
        // Unexpected error.
1354
        default:
×
1355
                return nil, err
×
1356
        }
1357
}
1358

1359
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1360
// forwarding package of the outgoing link for a payment circuit. We do this if
1361
// we're the originator of the payment, so the link stops attempting to
1362
// re-broadcast.
1363
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
122✔
1364
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
244✔
1365
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
122✔
1366
        })
122✔
1367
}
1368

1369
// teardownCircuit removes a pending or open circuit from the switch's circuit
1370
// map and prints useful logging statements regarding the outcome.
1371
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
318✔
1372
        var pktType string
318✔
1373
        switch htlc := pkt.htlc.(type) {
318✔
1374
        case *lnwire.UpdateFulfillHTLC:
190✔
1375
                pktType = "SETTLE"
190✔
1376
        case *lnwire.UpdateFailHTLC:
132✔
1377
                pktType = "FAIL"
132✔
1378
        default:
×
1379
                return fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1380
        }
1381

1382
        var paymentHash lntypes.Hash
318✔
1383

318✔
1384
        // Perform a defensive check to make sure we don't try to access a nil
318✔
1385
        // circuit.
318✔
1386
        circuit := pkt.circuit
318✔
1387
        if circuit != nil {
636✔
1388
                copy(paymentHash[:], circuit.PaymentHash[:])
318✔
1389
        }
318✔
1390

1391
        log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+
318✔
1392
                "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
318✔
1393

318✔
1394
        err := s.circuits.DeleteCircuits(pkt.inKey())
318✔
1395
        if err != nil {
318✔
1396
                log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+
×
1397
                        "with payment_hash=%v using %s pkt", pkt.incomingChanID,
×
1398
                        pkt.incomingHTLCID, pkt.outgoingChanID,
×
1399
                        pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType)
×
1400

×
1401
                return err
×
1402
        }
×
1403

1404
        log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType,
318✔
1405
                paymentHash, pkt.incomingChanID, pkt.incomingHTLCID,
318✔
1406
                pkt.outgoingChanID, pkt.outgoingHTLCID)
318✔
1407

318✔
1408
        return nil
318✔
1409
}
1410

1411
// CloseLink creates and sends the close channel command to the target link
1412
// directing the specified closure type. If the closure type is CloseRegular,
1413
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1414
// a starting point for close negotiation. The deliveryScript parameter is an
1415
// optional parameter which sets a user specified script to close out to.
1416
func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
1417
        closeType contractcourt.ChannelCloseType,
1418
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
1419
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
4✔
1420

4✔
1421
        // TODO(roasbeef) abstract out the close updates.
4✔
1422
        updateChan := make(chan interface{}, 2)
4✔
1423
        errChan := make(chan error, 1)
4✔
1424

4✔
1425
        command := &ChanClose{
4✔
1426
                CloseType:      closeType,
4✔
1427
                ChanPoint:      chanPoint,
4✔
1428
                Updates:        updateChan,
4✔
1429
                TargetFeePerKw: targetFeePerKw,
4✔
1430
                MaxFee:         maxFee,
4✔
1431
                DeliveryScript: deliveryScript,
4✔
1432
                Err:            errChan,
4✔
1433
        }
4✔
1434

4✔
1435
        select {
4✔
1436
        case s.chanCloseRequests <- command:
4✔
1437
                return updateChan, errChan
4✔
1438

1439
        case <-s.quit:
×
1440
                errChan <- ErrSwitchExiting
×
1441
                close(updateChan)
×
1442
                return updateChan, errChan
×
1443
        }
1444
}
1445

1446
// htlcForwarder is responsible for optimally forwarding (and possibly
1447
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1448
// links. The duties of the forwarder are similar to that of a network switch,
1449
// in that it facilitates multi-hop payments by acting as a central messaging
1450
// bus. The switch communicates will active links to create, manage, and tear
1451
// down active onion routed payments. Each active channel is modeled as
1452
// networked device with metadata such as the available payment bandwidth, and
1453
// total link capacity.
1454
//
1455
// NOTE: This MUST be run as a goroutine.
1456
func (s *Switch) htlcForwarder() {
211✔
1457
        defer s.wg.Done()
211✔
1458

211✔
1459
        defer func() {
422✔
1460
                s.blockEpochStream.Cancel()
211✔
1461

211✔
1462
                // Remove all links once we've been signalled for shutdown.
211✔
1463
                var linksToStop []ChannelLink
211✔
1464
                s.indexMtx.Lock()
211✔
1465
                for _, link := range s.linkIndex {
507✔
1466
                        activeLink := s.removeLink(link.ChanID())
296✔
1467
                        if activeLink == nil {
296✔
1468
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1469
                                        "on stop", link.ChanID())
×
1470
                                continue
×
1471
                        }
1472
                        linksToStop = append(linksToStop, activeLink)
296✔
1473
                }
1474
                for _, link := range s.pendingLinkIndex {
216✔
1475
                        pendingLink := s.removeLink(link.ChanID())
5✔
1476
                        if pendingLink == nil {
5✔
1477
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1478
                                        "on stop", link.ChanID())
×
1479
                                continue
×
1480
                        }
1481
                        linksToStop = append(linksToStop, pendingLink)
5✔
1482
                }
1483
                s.indexMtx.Unlock()
211✔
1484

211✔
1485
                // Now that all pending and live links have been removed from
211✔
1486
                // the forwarding indexes, stop each one before shutting down.
211✔
1487
                // We'll shut them down in parallel to make exiting as fast as
211✔
1488
                // possible.
211✔
1489
                var wg sync.WaitGroup
211✔
1490
                for _, link := range linksToStop {
508✔
1491
                        wg.Add(1)
297✔
1492
                        go func(l ChannelLink) {
594✔
1493
                                defer wg.Done()
297✔
1494

297✔
1495
                                l.Stop()
297✔
1496
                        }(link)
297✔
1497
                }
1498
                wg.Wait()
211✔
1499

211✔
1500
                // Before we exit fully, we'll attempt to flush out any
211✔
1501
                // forwarding events that may still be lingering since the last
211✔
1502
                // batch flush.
211✔
1503
                if err := s.FlushForwardingEvents(); err != nil {
211✔
1504
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1505
                }
×
1506
        }()
1507

1508
        // TODO(roasbeef): cleared vs settled distinction
1509
        var (
211✔
1510
                totalNumUpdates uint64
211✔
1511
                totalSatSent    btcutil.Amount
211✔
1512
                totalSatRecv    btcutil.Amount
211✔
1513
        )
211✔
1514
        s.cfg.LogEventTicker.Resume()
211✔
1515
        defer s.cfg.LogEventTicker.Stop()
211✔
1516

211✔
1517
        // Every 15 seconds, we'll flush out the forwarding events that
211✔
1518
        // occurred during that period.
211✔
1519
        s.cfg.FwdEventTicker.Resume()
211✔
1520
        defer s.cfg.FwdEventTicker.Stop()
211✔
1521

211✔
1522
        defer s.cfg.AckEventTicker.Stop()
211✔
1523

211✔
1524
out:
211✔
1525
        for {
1,047✔
1526

836✔
1527
                // If the set of pending settle/fail entries is non-zero,
836✔
1528
                // reinstate the ack ticker so we can batch ack them.
836✔
1529
                if len(s.pendingSettleFails) > 0 {
1,217✔
1530
                        s.cfg.AckEventTicker.Resume()
381✔
1531
                }
381✔
1532

1533
                select {
836✔
1534
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
4✔
1535
                        if !ok {
4✔
1536
                                break out
×
1537
                        }
1538

1539
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
4✔
1540

1541
                // A local close request has arrived, we'll forward this to the
1542
                // relevant link (if it exists) so the channel can be
1543
                // cooperatively closed (if possible).
1544
                case req := <-s.chanCloseRequests:
4✔
1545
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
4✔
1546

4✔
1547
                        s.indexMtx.RLock()
4✔
1548
                        link, ok := s.linkIndex[chanID]
4✔
1549
                        if !ok {
8✔
1550
                                s.indexMtx.RUnlock()
4✔
1551

4✔
1552
                                req.Err <- fmt.Errorf("no peer for channel with "+
4✔
1553
                                        "chan_id=%x", chanID[:])
4✔
1554
                                continue
4✔
1555
                        }
1556
                        s.indexMtx.RUnlock()
4✔
1557

4✔
1558
                        peerPub := link.PeerPubKey()
4✔
1559
                        log.Debugf("Requesting local channel close: peer=%v, "+
4✔
1560
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
4✔
1561

4✔
1562
                        go s.cfg.LocalChannelClose(peerPub[:], req)
4✔
1563

1564
                case resolutionMsg := <-s.resolutionMsgs:
5✔
1565
                        // We'll persist the resolution message to the Switch's
5✔
1566
                        // resolution store.
5✔
1567
                        resMsg := resolutionMsg.ResolutionMsg
5✔
1568
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
5✔
1569
                        if err != nil {
5✔
1570
                                // This will only fail if there is a database
×
1571
                                // error or a serialization error. Sending the
×
1572
                                // error prevents the contractcourt from being
×
1573
                                // in a state where it believes the send was
×
1574
                                // successful, when it wasn't.
×
1575
                                log.Errorf("unable to add resolution msg: %v",
×
1576
                                        err)
×
1577
                                resolutionMsg.errChan <- err
×
1578
                                continue
×
1579
                        }
1580

1581
                        // At this point, the resolution message has been
1582
                        // persisted. It is safe to signal success by sending
1583
                        // a nil error since the Switch will re-deliver the
1584
                        // resolution message on restart.
1585
                        resolutionMsg.errChan <- nil
5✔
1586

5✔
1587
                        // Create a htlc packet for this resolution. We do
5✔
1588
                        // not have some of the information that we'll need
5✔
1589
                        // for blinded error handling here , so we'll rely on
5✔
1590
                        // our forwarding logic to fill it in later.
5✔
1591
                        pkt := &htlcPacket{
5✔
1592
                                outgoingChanID: resolutionMsg.SourceChan,
5✔
1593
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
5✔
1594
                                isResolution:   true,
5✔
1595
                        }
5✔
1596

5✔
1597
                        // Resolution messages will either be cancelling
5✔
1598
                        // backwards an existing HTLC, or settling a previously
5✔
1599
                        // outgoing HTLC. Based on this, we'll map the message
5✔
1600
                        // to the proper htlcPacket.
5✔
1601
                        if resolutionMsg.Failure != nil {
9✔
1602
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
4✔
1603
                        } else {
9✔
1604
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
5✔
1605
                                        PaymentPreimage: *resolutionMsg.PreImage,
5✔
1606
                                }
5✔
1607
                        }
5✔
1608

1609
                        log.Infof("Received outside contract resolution, "+
5✔
1610
                                "mapping to: %v", spew.Sdump(pkt))
5✔
1611

5✔
1612
                        // We don't check the error, as the only failure we can
5✔
1613
                        // encounter is due to the circuit already being
5✔
1614
                        // closed. This is fine, as processing this message is
5✔
1615
                        // meant to be idempotent.
5✔
1616
                        err = s.handlePacketForward(pkt)
5✔
1617
                        if err != nil {
9✔
1618
                                log.Errorf("Unable to forward resolution msg: %v", err)
4✔
1619
                        }
4✔
1620

1621
                // A new packet has arrived for forwarding, we'll interpret the
1622
                // packet concretely, then either forward it along, or
1623
                // interpret a return packet to a locally initialized one.
1624
                case cmd := <-s.htlcPlex:
622✔
1625
                        cmd.err <- s.handlePacketForward(cmd.pkt)
622✔
1626

1627
                // When this time ticks, then it indicates that we should
1628
                // collect all the forwarding events since the last internal,
1629
                // and write them out to our log.
1630
                case <-s.cfg.FwdEventTicker.Ticks():
6✔
1631
                        s.wg.Add(1)
6✔
1632
                        go func() {
12✔
1633
                                defer s.wg.Done()
6✔
1634

6✔
1635
                                if err := s.FlushForwardingEvents(); err != nil {
6✔
1636
                                        log.Errorf("Unable to flush "+
×
1637
                                                "forwarding events: %v", err)
×
1638
                                }
×
1639
                        }()
1640

1641
                // The log ticker has fired, so we'll calculate some forwarding
1642
                // stats for the last 10 seconds to display within the logs to
1643
                // users.
1644
                case <-s.cfg.LogEventTicker.Ticks():
8✔
1645
                        // First, we'll collate the current running tally of
8✔
1646
                        // our forwarding stats.
8✔
1647
                        prevSatSent := totalSatSent
8✔
1648
                        prevSatRecv := totalSatRecv
8✔
1649
                        prevNumUpdates := totalNumUpdates
8✔
1650

8✔
1651
                        var (
8✔
1652
                                newNumUpdates uint64
8✔
1653
                                newSatSent    btcutil.Amount
8✔
1654
                                newSatRecv    btcutil.Amount
8✔
1655
                        )
8✔
1656

8✔
1657
                        // Next, we'll run through all the registered links and
8✔
1658
                        // compute their up-to-date forwarding stats.
8✔
1659
                        s.indexMtx.RLock()
8✔
1660
                        for _, link := range s.linkIndex {
18✔
1661
                                // TODO(roasbeef): when links first registered
10✔
1662
                                // stats printed.
10✔
1663
                                updates, sent, recv := link.Stats()
10✔
1664
                                newNumUpdates += updates
10✔
1665
                                newSatSent += sent.ToSatoshis()
10✔
1666
                                newSatRecv += recv.ToSatoshis()
10✔
1667
                        }
10✔
1668
                        s.indexMtx.RUnlock()
8✔
1669

8✔
1670
                        var (
8✔
1671
                                diffNumUpdates uint64
8✔
1672
                                diffSatSent    btcutil.Amount
8✔
1673
                                diffSatRecv    btcutil.Amount
8✔
1674
                        )
8✔
1675

8✔
1676
                        // If this is the first time we're computing these
8✔
1677
                        // stats, then the diff is just the new value. We do
8✔
1678
                        // this in order to avoid integer underflow issues.
8✔
1679
                        if prevNumUpdates == 0 {
16✔
1680
                                diffNumUpdates = newNumUpdates
8✔
1681
                                diffSatSent = newSatSent
8✔
1682
                                diffSatRecv = newSatRecv
8✔
1683
                        } else {
12✔
1684
                                diffNumUpdates = newNumUpdates - prevNumUpdates
4✔
1685
                                diffSatSent = newSatSent - prevSatSent
4✔
1686
                                diffSatRecv = newSatRecv - prevSatRecv
4✔
1687
                        }
4✔
1688

1689
                        // If the diff of num updates is zero, then we haven't
1690
                        // forwarded anything in the last 10 seconds, so we can
1691
                        // skip this update.
1692
                        if diffNumUpdates == 0 {
14✔
1693
                                continue
6✔
1694
                        }
1695

1696
                        // If the diff of num updates is negative, then some
1697
                        // links may have been unregistered from the switch, so
1698
                        // we'll update our stats to only include our registered
1699
                        // links.
1700
                        if int64(diffNumUpdates) < 0 {
10✔
1701
                                totalNumUpdates = newNumUpdates
4✔
1702
                                totalSatSent = newSatSent
4✔
1703
                                totalSatRecv = newSatRecv
4✔
1704
                                continue
4✔
1705
                        }
1706

1707
                        // Otherwise, we'll log this diff, then accumulate the
1708
                        // new stats into the running total.
1709
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
6✔
1710
                                "in the last 10 seconds (%f tx/sec)",
6✔
1711
                                diffSatSent, diffSatRecv,
6✔
1712
                                float64(diffNumUpdates)/10)
6✔
1713

6✔
1714
                        totalNumUpdates += diffNumUpdates
6✔
1715
                        totalSatSent += diffSatSent
6✔
1716
                        totalSatRecv += diffSatRecv
6✔
1717

1718
                // The ack ticker has fired so if we have any settle/fail entries
1719
                // for a forwarding package to ack, we will do so here in a batch
1720
                // db call.
1721
                case <-s.cfg.AckEventTicker.Ticks():
4✔
1722
                        // If the current set is empty, pause the ticker.
4✔
1723
                        if len(s.pendingSettleFails) == 0 {
8✔
1724
                                s.cfg.AckEventTicker.Pause()
4✔
1725
                                continue
4✔
1726
                        }
1727

1728
                        // Batch ack the settle/fail entries.
1729
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
4✔
1730
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1731
                                continue
×
1732
                        }
1733

1734
                        log.Tracef("Acked %d settle fails: %v",
4✔
1735
                                len(s.pendingSettleFails),
4✔
1736
                                lnutils.SpewLogClosure(s.pendingSettleFails))
4✔
1737

4✔
1738
                        // Reset the pendingSettleFails buffer while keeping acquired
4✔
1739
                        // memory.
4✔
1740
                        s.pendingSettleFails = s.pendingSettleFails[:0]
4✔
1741

1742
                case <-s.quit:
211✔
1743
                        return
211✔
1744
                }
1745
        }
1746
}
1747

1748
// Start starts all helper goroutines required for the operation of the switch.
1749
func (s *Switch) Start() error {
211✔
1750
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
211✔
1751
                log.Warn("Htlc Switch already started")
×
1752
                return errors.New("htlc switch already started")
×
1753
        }
×
1754

1755
        log.Infof("HTLC Switch starting")
211✔
1756

211✔
1757
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
211✔
1758
        if err != nil {
211✔
1759
                return err
×
1760
        }
×
1761
        s.blockEpochStream = blockEpochStream
211✔
1762

211✔
1763
        s.wg.Add(1)
211✔
1764
        go s.htlcForwarder()
211✔
1765

211✔
1766
        if err := s.reforwardResponses(); err != nil {
211✔
1767
                s.Stop()
×
1768
                log.Errorf("unable to reforward responses: %v", err)
×
1769
                return err
×
1770
        }
×
1771

1772
        if err := s.reforwardResolutions(); err != nil {
211✔
1773
                // We are already stopping so we can ignore the error.
×
1774
                _ = s.Stop()
×
1775
                log.Errorf("unable to reforward resolutions: %v", err)
×
1776
                return err
×
1777
        }
×
1778

1779
        return nil
211✔
1780
}
1781

1782
// reforwardResolutions fetches the set of resolution messages stored on-disk
1783
// and reforwards them if their circuits are still open. If the circuits have
1784
// been deleted, then we will delete the resolution message from the database.
1785
func (s *Switch) reforwardResolutions() error {
211✔
1786
        // Fetch all stored resolution messages, deleting the ones that are
211✔
1787
        // resolved.
211✔
1788
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
211✔
1789
        if err != nil {
211✔
1790
                return err
×
1791
        }
×
1792

1793
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
211✔
1794
        for _, resMsg := range resMsgs {
216✔
1795
                // If the open circuit no longer exists, then we can remove the
5✔
1796
                // message from the store.
5✔
1797
                outKey := CircuitKey{
5✔
1798
                        ChanID: resMsg.SourceChan,
5✔
1799
                        HtlcID: resMsg.HtlcIndex,
5✔
1800
                }
5✔
1801

5✔
1802
                if s.circuits.LookupOpenCircuit(outKey) == nil {
10✔
1803
                        // The open circuit doesn't exist.
5✔
1804
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
5✔
1805
                        if err != nil {
5✔
1806
                                return err
×
1807
                        }
×
1808

1809
                        continue
5✔
1810
                }
1811

1812
                // The circuit is still open, so we can assume that the link or
1813
                // switch (if we are the source) hasn't cleaned it up yet.
1814
                // We rely on our forwarding logic to fill in details that
1815
                // are not currently available to us.
1816
                resPkt := &htlcPacket{
4✔
1817
                        outgoingChanID: resMsg.SourceChan,
4✔
1818
                        outgoingHTLCID: resMsg.HtlcIndex,
4✔
1819
                        isResolution:   true,
4✔
1820
                }
4✔
1821

4✔
1822
                if resMsg.Failure != nil {
8✔
1823
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
4✔
1824
                } else {
4✔
1825
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
1826
                                PaymentPreimage: *resMsg.PreImage,
×
1827
                        }
×
1828
                }
×
1829

1830
                switchPackets = append(switchPackets, resPkt)
4✔
1831
        }
1832

1833
        // We'll now dispatch the set of resolution messages to the proper
1834
        // destination. An error is only encountered here if the switch is
1835
        // shutting down.
1836
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
211✔
1837
                return err
×
1838
        }
×
1839

1840
        return nil
211✔
1841
}
1842

1843
// reforwardResponses for every known, non-pending channel, loads all associated
1844
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
1845
// used to resurrect the switch's mailboxes after a restart. This also runs for
1846
// waiting close channels since there may be settles or fails that need to be
1847
// reforwarded before they completely close.
1848
func (s *Switch) reforwardResponses() error {
211✔
1849
        openChannels, err := s.cfg.FetchAllChannels()
211✔
1850
        if err != nil {
211✔
1851
                return err
×
1852
        }
×
1853

1854
        for _, openChannel := range openChannels {
344✔
1855
                shortChanID := openChannel.ShortChanID()
133✔
1856

133✔
1857
                // Locally-initiated payments never need reforwarding.
133✔
1858
                if shortChanID == hop.Source {
137✔
1859
                        continue
4✔
1860
                }
1861

1862
                // If the channel is pending, it should have no forwarding
1863
                // packages, and nothing to reforward.
1864
                if openChannel.IsPending {
133✔
1865
                        continue
×
1866
                }
1867

1868
                // Channels in open or waiting-close may still have responses in
1869
                // their forwarding packages. We will continue to reattempt
1870
                // forwarding on startup until the channel is fully-closed.
1871
                //
1872
                // Load this channel's forwarding packages, and deliver them to
1873
                // the switch.
1874
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
133✔
1875
                if err != nil {
133✔
1876
                        log.Errorf("unable to load forwarding "+
×
1877
                                "packages for %v: %v", shortChanID, err)
×
1878
                        return err
×
1879
                }
×
1880

1881
                s.reforwardSettleFails(fwdPkgs)
133✔
1882
        }
1883

1884
        return nil
211✔
1885
}
1886

1887
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
1888
// channel identifier.
1889
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
133✔
1890

133✔
1891
        var fwdPkgs []*channeldb.FwdPkg
133✔
1892
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
266✔
1893
                var err error
133✔
1894
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
133✔
1895
                        tx, source,
133✔
1896
                )
133✔
1897
                return err
133✔
1898
        }, func() {
266✔
1899
                fwdPkgs = nil
133✔
1900
        }); err != nil {
133✔
1901
                return nil, err
×
1902
        }
×
1903

1904
        return fwdPkgs, nil
133✔
1905
}
1906

1907
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
1908
// forwarding packages, and reforwards those that have not been acknowledged.
1909
// This is intended to occur on startup, in order to recover the switch's
1910
// mailboxes, and to ensure that responses can be propagated in case the
1911
// outgoing link never comes back online.
1912
//
1913
// NOTE: This should mimic the behavior processRemoteSettleFails.
1914
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
133✔
1915
        for _, fwdPkg := range fwdPkgs {
138✔
1916
                switchPackets := make([]*htlcPacket, 0, len(fwdPkg.SettleFails))
5✔
1917
                for i, update := range fwdPkg.SettleFails {
9✔
1918
                        // Skip any settles or fails that have already been
4✔
1919
                        // acknowledged by the incoming link that originated the
4✔
1920
                        // forwarded Add.
4✔
1921
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
8✔
1922
                                continue
4✔
1923
                        }
1924

1925
                        switch msg := update.UpdateMsg.(type) {
4✔
1926
                        // A settle for an HTLC we previously forwarded HTLC has
1927
                        // been received. So we'll forward the HTLC to the
1928
                        // switch which will handle propagating the settle to
1929
                        // the prior hop.
1930
                        case *lnwire.UpdateFulfillHTLC:
4✔
1931
                                destRef := fwdPkg.DestRef(uint16(i))
4✔
1932
                                settlePacket := &htlcPacket{
4✔
1933
                                        outgoingChanID: fwdPkg.Source,
4✔
1934
                                        outgoingHTLCID: msg.ID,
4✔
1935
                                        destRef:        &destRef,
4✔
1936
                                        htlc:           msg,
4✔
1937
                                }
4✔
1938

4✔
1939
                                // Add the packet to the batch to be forwarded, and
4✔
1940
                                // notify the overflow queue that a spare spot has been
4✔
1941
                                // freed up within the commitment state.
4✔
1942
                                switchPackets = append(switchPackets, settlePacket)
4✔
1943

1944
                        // A failureCode message for a previously forwarded HTLC has been
1945
                        // received. As a result a new slot will be freed up in our
1946
                        // commitment state, so we'll forward this to the switch so the
1947
                        // backwards undo can continue.
1948
                        case *lnwire.UpdateFailHTLC:
×
1949
                                // Fetch the reason the HTLC was canceled so
×
1950
                                // we can continue to propagate it. This
×
1951
                                // failure originated from another node, so
×
1952
                                // the linkFailure field is not set on this
×
1953
                                // packet. We rely on the link to fill in
×
1954
                                // additional circuit information for us.
×
1955
                                failPacket := &htlcPacket{
×
1956
                                        outgoingChanID: fwdPkg.Source,
×
1957
                                        outgoingHTLCID: msg.ID,
×
1958
                                        destRef: &channeldb.SettleFailRef{
×
1959
                                                Source: fwdPkg.Source,
×
1960
                                                Height: fwdPkg.Height,
×
1961
                                                Index:  uint16(i),
×
1962
                                        },
×
1963
                                        htlc: msg,
×
1964
                                }
×
1965

×
1966
                                // Add the packet to the batch to be forwarded, and
×
1967
                                // notify the overflow queue that a spare spot has been
×
1968
                                // freed up within the commitment state.
×
1969
                                switchPackets = append(switchPackets, failPacket)
×
1970
                        }
1971
                }
1972

1973
                // Since this send isn't tied to a specific link, we pass a nil
1974
                // link quit channel, meaning the send will fail only if the
1975
                // switch receives a shutdown request.
1976
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
5✔
1977
                        log.Errorf("Unhandled error while reforwarding packets "+
×
1978
                                "settle/fail over htlcswitch: %v", err)
×
1979
                }
×
1980
        }
1981
}
1982

1983
// Stop gracefully stops all active helper goroutines, then waits until they've
1984
// exited.
1985
func (s *Switch) Stop() error {
443✔
1986
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
573✔
1987
                log.Warn("Htlc Switch already stopped")
130✔
1988
                return errors.New("htlc switch already shutdown")
130✔
1989
        }
130✔
1990

1991
        log.Info("HTLC Switch shutting down...")
313✔
1992
        defer log.Debug("HTLC Switch shutdown complete")
313✔
1993

313✔
1994
        close(s.quit)
313✔
1995

313✔
1996
        s.wg.Wait()
313✔
1997

313✔
1998
        // Wait until all active goroutines have finished exiting before
313✔
1999
        // stopping the mailboxes, otherwise the mailbox map could still be
313✔
2000
        // accessed and modified.
313✔
2001
        s.mailOrchestrator.Stop()
313✔
2002

313✔
2003
        return nil
313✔
2004
}
2005

2006
// CreateAndAddLink will create a link and then add it to the internal maps
2007
// when given a ChannelLinkConfig and LightningChannel.
2008
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
2009
        lnChan *lnwallet.LightningChannel) error {
4✔
2010

4✔
2011
        link := NewChannelLink(linkCfg, lnChan)
4✔
2012
        return s.AddLink(link)
4✔
2013
}
4✔
2014

2015
// AddLink is used to initiate the handling of the add link command. The
2016
// request will be propagated and handled in the main goroutine.
2017
func (s *Switch) AddLink(link ChannelLink) error {
340✔
2018
        s.indexMtx.Lock()
340✔
2019
        defer s.indexMtx.Unlock()
340✔
2020

340✔
2021
        chanID := link.ChanID()
340✔
2022

340✔
2023
        // First, ensure that this link is not already active in the switch.
340✔
2024
        _, err := s.getLink(chanID)
340✔
2025
        if err == nil {
341✔
2026
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
1✔
2027
                        "active", chanID)
1✔
2028
        }
1✔
2029

2030
        // Get and attach the mailbox for this link, which buffers packets in
2031
        // case there packets that we tried to deliver while this link was
2032
        // offline.
2033
        shortChanID := link.ShortChanID()
339✔
2034
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
339✔
2035
        link.AttachMailBox(mailbox)
339✔
2036

339✔
2037
        // Attach the Switch's failAliasUpdate function to the link.
339✔
2038
        link.attachFailAliasUpdate(s.failAliasUpdate)
339✔
2039

339✔
2040
        if err := link.Start(); err != nil {
339✔
2041
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2042
                        chanID, err)
×
2043
                s.removeLink(chanID)
×
2044
                return err
×
2045
        }
×
2046

2047
        if shortChanID == hop.Source {
344✔
2048
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
5✔
2049
                        chanID, shortChanID)
5✔
2050

5✔
2051
                s.pendingLinkIndex[chanID] = link
5✔
2052
        } else {
343✔
2053
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
338✔
2054
                        chanID, shortChanID)
338✔
2055

338✔
2056
                s.addLiveLink(link)
338✔
2057
                s.mailOrchestrator.BindLiveShortChanID(
338✔
2058
                        mailbox, chanID, shortChanID,
338✔
2059
                )
338✔
2060
        }
338✔
2061

2062
        return nil
339✔
2063
}
2064

2065
// addLiveLink adds a link to all associated forwarding index, this makes it a
2066
// candidate for forwarding HTLCs.
2067
func (s *Switch) addLiveLink(link ChannelLink) {
338✔
2068
        linkScid := link.ShortChanID()
338✔
2069

338✔
2070
        // We'll add the link to the linkIndex which lets us quickly
338✔
2071
        // look up a channel when we need to close or register it, and
338✔
2072
        // the forwarding index which'll be used when forwarding HTLC's
338✔
2073
        // in the multi-hop setting.
338✔
2074
        s.linkIndex[link.ChanID()] = link
338✔
2075
        s.forwardingIndex[linkScid] = link
338✔
2076

338✔
2077
        // Next we'll add the link to the interface index so we can
338✔
2078
        // quickly look up all the channels for a particular node.
338✔
2079
        peerPub := link.PeerPubKey()
338✔
2080
        if _, ok := s.interfaceIndex[peerPub]; !ok {
671✔
2081
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
333✔
2082
        }
333✔
2083
        s.interfaceIndex[peerPub][link.ChanID()] = link
338✔
2084

338✔
2085
        s.updateLinkAliases(link)
338✔
2086
}
2087

2088
// UpdateLinkAliases is the externally exposed wrapper for updating link
2089
// aliases. It acquires the indexMtx and calls the internal method.
2090
func (s *Switch) UpdateLinkAliases(link ChannelLink) {
4✔
2091
        s.indexMtx.Lock()
4✔
2092
        defer s.indexMtx.Unlock()
4✔
2093

4✔
2094
        s.updateLinkAliases(link)
4✔
2095
}
4✔
2096

2097
// updateLinkAliases updates the aliases for a given link. This will cause the
2098
// htlcswitch to consult the alias manager on the up to date values of its
2099
// alias maps.
2100
//
2101
// NOTE: this MUST be called with the indexMtx held.
2102
func (s *Switch) updateLinkAliases(link ChannelLink) {
338✔
2103
        linkScid := link.ShortChanID()
338✔
2104

338✔
2105
        aliases := link.getAliases()
338✔
2106
        if link.isZeroConf() {
361✔
2107
                if link.zeroConfConfirmed() {
42✔
2108
                        // Since the zero-conf channel has confirmed, we can
19✔
2109
                        // populate the aliasToReal mapping.
19✔
2110
                        confirmedScid := link.confirmedScid()
19✔
2111

19✔
2112
                        for _, alias := range aliases {
45✔
2113
                                s.aliasToReal[alias] = confirmedScid
26✔
2114
                        }
26✔
2115

2116
                        // Add the confirmed SCID as a key in the baseIndex.
2117
                        s.baseIndex[confirmedScid] = linkScid
19✔
2118
                }
2119

2120
                // Now we populate the baseIndex which will be used to fetch
2121
                // the link given any of the channel's alias SCIDs or the real
2122
                // SCID. The link's SCID is an alias, so we don't need to
2123
                // special-case it like the option-scid-alias feature-bit case
2124
                // further down.
2125
                for _, alias := range aliases {
54✔
2126
                        s.baseIndex[alias] = linkScid
31✔
2127
                }
31✔
2128
        } else if link.negotiatedAliasFeature() {
339✔
2129
                // First, we flush any alias mappings for this link's scid
20✔
2130
                // before we populate the map again, in order to get rid of old
20✔
2131
                // values that no longer exist.
20✔
2132
                for alias, real := range s.aliasToReal {
26✔
2133
                        if real == linkScid {
10✔
2134
                                delete(s.aliasToReal, alias)
4✔
2135
                        }
4✔
2136
                }
2137

2138
                for alias, real := range s.baseIndex {
27✔
2139
                        if real == linkScid {
11✔
2140
                                delete(s.baseIndex, alias)
4✔
2141
                        }
4✔
2142
                }
2143

2144
                // The link's SCID is the confirmed SCID for non-zero-conf
2145
                // option-scid-alias feature bit channels.
2146
                for _, alias := range aliases {
47✔
2147
                        s.aliasToReal[alias] = linkScid
27✔
2148
                        s.baseIndex[alias] = linkScid
27✔
2149
                }
27✔
2150

2151
                // Since the link's SCID is confirmed, it was not included in
2152
                // the baseIndex above as a key. Add it now.
2153
                s.baseIndex[linkScid] = linkScid
20✔
2154
        }
2155
}
2156

2157
// GetLink is used to initiate the handling of the get link command. The
2158
// request will be propagated/handled to/in the main goroutine.
2159
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2160
        error) {
3,195✔
2161

3,195✔
2162
        s.indexMtx.RLock()
3,195✔
2163
        defer s.indexMtx.RUnlock()
3,195✔
2164

3,195✔
2165
        return s.getLink(chanID)
3,195✔
2166
}
3,195✔
2167

2168
// getLink returns the link stored in either the pending index or the live
2169
// lindex.
2170
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
3,860✔
2171
        link, ok := s.linkIndex[chanID]
3,860✔
2172
        if !ok {
4,200✔
2173
                link, ok = s.pendingLinkIndex[chanID]
340✔
2174
                if !ok {
679✔
2175
                        return nil, ErrChannelLinkNotFound
339✔
2176
                }
339✔
2177
        }
2178

2179
        return link, nil
3,525✔
2180
}
2181

2182
// GetLinkByShortID attempts to return the link which possesses the target short
2183
// channel ID.
2184
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
2185
        error) {
4✔
2186

4✔
2187
        s.indexMtx.RLock()
4✔
2188
        defer s.indexMtx.RUnlock()
4✔
2189

4✔
2190
        link, err := s.getLinkByShortID(chanID)
4✔
2191
        if err != nil {
8✔
2192
                // If we failed to find the link under the passed-in SCID, we
4✔
2193
                // consult the Switch's baseIndex map to see if the confirmed
4✔
2194
                // SCID was used for a zero-conf channel.
4✔
2195
                aliasID, ok := s.baseIndex[chanID]
4✔
2196
                if !ok {
8✔
2197
                        return nil, err
4✔
2198
                }
4✔
2199

2200
                // An alias was found, use it to lookup if a link exists.
2201
                return s.getLinkByShortID(aliasID)
4✔
2202
        }
2203

2204
        return link, nil
4✔
2205
}
2206

2207
// getLinkByShortID attempts to return the link which possesses the target
2208
// short channel ID.
2209
//
2210
// NOTE: This MUST be called with the indexMtx held.
2211
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
481✔
2212
        link, ok := s.forwardingIndex[chanID]
481✔
2213
        if !ok {
486✔
2214
                return nil, ErrChannelLinkNotFound
5✔
2215
        }
5✔
2216

2217
        return link, nil
480✔
2218
}
2219

2220
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2221
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2222
// the outgoingChanID will be changed so that an error can be properly
2223
// attributed when looping over linkErrs in handlePacketForward.
2224
//
2225
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2226
// public or not.
2227
//
2228
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2229
//   - If there is no entry found in baseIndex, fetch the link. This channel
2230
//     did not have the option-scid-alias feature negotiated (which includes
2231
//     zero-conf and option-scid-alias channel-types).
2232
//   - If there is an entry found, fetch the link from forwardingIndex and
2233
//     fail if this is a private link.
2234
//
2235
// NOTE: This MUST be called with the indexMtx read lock held.
2236
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
84✔
2237
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
84✔
2238
        chanID := pkt.outgoingChanID
84✔
2239
        aliasID := s.cfg.IsAlias(chanID)
84✔
2240

84✔
2241
        // Set the originalOutgoingChanID so the proper channel_update can be
84✔
2242
        // sent back if the option-scid-alias feature bit was negotiated.
84✔
2243
        pkt.originalOutgoingChanID = chanID
84✔
2244

84✔
2245
        if aliasID {
103✔
2246
                // Since outgoingChanID is an alias, we'll fetch the link via
19✔
2247
                // baseIndex.
19✔
2248
                baseScid, ok := s.baseIndex[chanID]
19✔
2249
                if !ok {
19✔
2250
                        // No mapping exists, bail.
×
2251
                        return nil, ErrChannelLinkNotFound
×
2252
                }
×
2253

2254
                // A mapping exists, so use baseScid to find the link in the
2255
                // forwardingIndex.
2256
                link, ok := s.forwardingIndex[baseScid]
19✔
2257
                if !ok {
19✔
2258
                        // Link not found, bail.
×
2259
                        return nil, ErrChannelLinkNotFound
×
2260
                }
×
2261

2262
                // Change the packet's outgoingChanID field so that errors are
2263
                // properly attributed.
2264
                pkt.outgoingChanID = baseScid
19✔
2265

19✔
2266
                // Return the link without checking if it's private or not.
19✔
2267
                return link, nil
19✔
2268
        }
2269

2270
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2271
        // SCID from baseIndex.
2272
        baseScid, ok := s.baseIndex[chanID]
69✔
2273
        if !ok {
131✔
2274
                // outgoingChanID is not a key in base index meaning this
62✔
2275
                // channel did not have the option-scid-alias feature bit
62✔
2276
                // negotiated. We'll fetch the link and return it.
62✔
2277
                link, ok := s.forwardingIndex[chanID]
62✔
2278
                if !ok {
68✔
2279
                        // The link wasn't found, bail out.
6✔
2280
                        return nil, ErrChannelLinkNotFound
6✔
2281
                }
6✔
2282

2283
                return link, nil
60✔
2284
        }
2285

2286
        // Fetch the link whose internal SCID is baseScid.
2287
        link, ok := s.forwardingIndex[baseScid]
11✔
2288
        if !ok {
11✔
2289
                // Link wasn't found, bail out.
×
2290
                return nil, ErrChannelLinkNotFound
×
2291
        }
×
2292

2293
        // If the link is unadvertised, we fail since the real SCID was used to
2294
        // forward over it and this is a channel where the option-scid-alias
2295
        // feature bit was negotiated.
2296
        if link.IsUnadvertised() {
13✔
2297
                return nil, ErrChannelLinkNotFound
2✔
2298
        }
2✔
2299

2300
        // The link is public so the confirmed SCID can be used to forward over
2301
        // it. We'll also replace pkt's outgoingChanID field so errors can
2302
        // properly be attributed in the calling function.
2303
        pkt.outgoingChanID = baseScid
9✔
2304
        return link, nil
9✔
2305
}
2306

2307
// HasActiveLink returns true if the given channel ID has a link in the link
2308
// index AND the link is eligible to forward.
2309
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
6✔
2310
        s.indexMtx.RLock()
6✔
2311
        defer s.indexMtx.RUnlock()
6✔
2312

6✔
2313
        if link, ok := s.linkIndex[chanID]; ok {
12✔
2314
                return link.EligibleToForward()
6✔
2315
        }
6✔
2316

2317
        return false
4✔
2318
}
2319

2320
// RemoveLink purges the switch of any link associated with chanID. If a pending
2321
// or active link is not found, this method does nothing. Otherwise, the method
2322
// returns after the link has been completely shutdown.
2323
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
22✔
2324
        s.indexMtx.Lock()
22✔
2325
        link, err := s.getLink(chanID)
22✔
2326
        if err != nil {
26✔
2327
                // If err is non-nil, this means that link is also nil. The
4✔
2328
                // link variable cannot be nil without err being non-nil.
4✔
2329
                s.indexMtx.Unlock()
4✔
2330
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
4✔
2331
                        chanID, err)
4✔
2332
                return
4✔
2333
        }
4✔
2334

2335
        // Check if the link is already stopping and grab the stop chan if it
2336
        // is.
2337
        stopChan, ok := s.linkStopIndex[chanID]
22✔
2338
        if !ok {
44✔
2339
                // If the link is non-nil, it is not currently stopping, so
22✔
2340
                // we'll add a stop chan to the linkStopIndex.
22✔
2341
                stopChan = make(chan struct{})
22✔
2342
                s.linkStopIndex[chanID] = stopChan
22✔
2343
        }
22✔
2344
        s.indexMtx.Unlock()
22✔
2345

22✔
2346
        if ok {
22✔
2347
                // If the stop chan exists, we will wait for it to be closed.
×
2348
                // Once it is closed, we will exit.
×
2349
                select {
×
2350
                case <-stopChan:
×
2351
                        return
×
2352
                case <-s.quit:
×
2353
                        return
×
2354
                }
2355
        }
2356

2357
        // Stop the link before removing it from the maps.
2358
        link.Stop()
22✔
2359

22✔
2360
        s.indexMtx.Lock()
22✔
2361
        _ = s.removeLink(chanID)
22✔
2362

22✔
2363
        // Close stopChan and remove this link from the linkStopIndex.
22✔
2364
        // Deleting from the index and removing from the link must be done
22✔
2365
        // in the same block while the mutex is held.
22✔
2366
        close(stopChan)
22✔
2367
        delete(s.linkStopIndex, chanID)
22✔
2368
        s.indexMtx.Unlock()
22✔
2369
}
2370

2371
// removeLink is used to remove and stop the channel link.
2372
//
2373
// NOTE: This MUST be called with the indexMtx held.
2374
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
315✔
2375
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
315✔
2376

315✔
2377
        link, err := s.getLink(chanID)
315✔
2378
        if err != nil {
315✔
2379
                return nil
×
2380
        }
×
2381

2382
        // Remove the channel from live link indexes.
2383
        delete(s.pendingLinkIndex, link.ChanID())
315✔
2384
        delete(s.linkIndex, link.ChanID())
315✔
2385
        delete(s.forwardingIndex, link.ShortChanID())
315✔
2386

315✔
2387
        // If the link has been added to the peer index, then we'll move to
315✔
2388
        // delete the entry within the index.
315✔
2389
        peerPub := link.PeerPubKey()
315✔
2390
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
629✔
2391
                delete(peerIndex, link.ChanID())
314✔
2392

314✔
2393
                // If after deletion, there are no longer any links, then we'll
314✔
2394
                // remove the interface map all together.
314✔
2395
                if len(peerIndex) == 0 {
623✔
2396
                        delete(s.interfaceIndex, peerPub)
309✔
2397
                }
309✔
2398
        }
2399

2400
        return link
315✔
2401
}
2402

2403
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2404
// underlying channel state. This is only used in zero-conf channels to allow
2405
// the confirmed SCID to be updated.
2406
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
5✔
2407
        s.indexMtx.Lock()
5✔
2408
        defer s.indexMtx.Unlock()
5✔
2409

5✔
2410
        // Locate the target link in the link index. If no such link exists,
5✔
2411
        // then we will ignore the request.
5✔
2412
        link, ok := s.linkIndex[chanID]
5✔
2413
        if !ok {
5✔
2414
                return fmt.Errorf("link %v not found", chanID)
×
2415
        }
×
2416

2417
        // Try to update the link's underlying channel state, returning early
2418
        // if this update failed.
2419
        _, err := link.UpdateShortChanID()
5✔
2420
        if err != nil {
5✔
2421
                return err
×
2422
        }
×
2423

2424
        // Since the zero-conf channel is confirmed, we should populate the
2425
        // aliasToReal map and update the baseIndex.
2426
        aliases := link.getAliases()
5✔
2427

5✔
2428
        confirmedScid := link.confirmedScid()
5✔
2429

5✔
2430
        for _, alias := range aliases {
11✔
2431
                s.aliasToReal[alias] = confirmedScid
6✔
2432
        }
6✔
2433

2434
        s.baseIndex[confirmedScid] = link.ShortChanID()
5✔
2435

5✔
2436
        return nil
5✔
2437
}
2438

2439
// GetLinksByInterface fetches all the links connected to a particular node
2440
// identified by the serialized compressed form of its public key.
2441
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
2442
        error) {
4✔
2443

4✔
2444
        s.indexMtx.RLock()
4✔
2445
        defer s.indexMtx.RUnlock()
4✔
2446

4✔
2447
        var handlers []ChannelUpdateHandler
4✔
2448

4✔
2449
        links, err := s.getLinks(hop)
4✔
2450
        if err != nil {
8✔
2451
                return nil, err
4✔
2452
        }
4✔
2453

2454
        // Range over the returned []ChannelLink to convert them into
2455
        // []ChannelUpdateHandler.
2456
        for _, link := range links {
8✔
2457
                handlers = append(handlers, link)
4✔
2458
        }
4✔
2459

2460
        return handlers, nil
4✔
2461
}
2462

2463
// getLinks is function which returns the channel links of the peer by hop
2464
// destination id.
2465
//
2466
// NOTE: This MUST be called with the indexMtx held.
2467
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
80✔
2468
        links, ok := s.interfaceIndex[destination]
80✔
2469
        if !ok {
84✔
2470
                return nil, ErrNoLinksFound
4✔
2471
        }
4✔
2472

2473
        channelLinks := make([]ChannelLink, 0, len(links))
80✔
2474
        for _, link := range links {
164✔
2475
                channelLinks = append(channelLinks, link)
84✔
2476
        }
84✔
2477

2478
        return channelLinks, nil
80✔
2479
}
2480

2481
// CircuitModifier returns a reference to subset of the interfaces provided by
2482
// the circuit map, to allow links to open and close circuits.
2483
func (s *Switch) CircuitModifier() CircuitModifier {
219✔
2484
        return s.circuits
219✔
2485
}
219✔
2486

2487
// CircuitLookup returns a reference to subset of the interfaces provided by the
2488
// circuit map, to allow looking up circuits.
2489
func (s *Switch) CircuitLookup() CircuitLookup {
4✔
2490
        return s.circuits
4✔
2491
}
4✔
2492

2493
// commitCircuits persistently adds a circuit to the switch's circuit map.
2494
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2495
        *CircuitFwdActions, error) {
17✔
2496

17✔
2497
        return s.circuits.CommitCircuits(circuits...)
17✔
2498
}
17✔
2499

2500
// FlushForwardingEvents flushes out the set of pending forwarding events to
2501
// the persistent log. This will be used by the switch to periodically flush
2502
// out the set of forwarding events to disk. External callers can also use this
2503
// method to ensure all data is flushed to dis before querying the log.
2504
func (s *Switch) FlushForwardingEvents() error {
213✔
2505
        // First, we'll obtain a copy of the current set of pending forwarding
213✔
2506
        // events.
213✔
2507
        s.fwdEventMtx.Lock()
213✔
2508

213✔
2509
        // If we won't have any forwarding events, then we can exit early.
213✔
2510
        if len(s.pendingFwdingEvents) == 0 {
409✔
2511
                s.fwdEventMtx.Unlock()
196✔
2512
                return nil
196✔
2513
        }
196✔
2514

2515
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
21✔
2516
        copy(events[:], s.pendingFwdingEvents[:])
21✔
2517

21✔
2518
        // With the copy obtained, we can now clear out the header pointer of
21✔
2519
        // the current slice. This way, we can re-use the underlying storage
21✔
2520
        // allocated for the slice.
21✔
2521
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
21✔
2522
        s.fwdEventMtx.Unlock()
21✔
2523

21✔
2524
        // Finally, we'll write out the copied events to the persistent
21✔
2525
        // forwarding log.
21✔
2526
        return s.cfg.FwdingLog.AddForwardingEvents(events)
21✔
2527
}
2528

2529
// BestHeight returns the best height known to the switch.
2530
func (s *Switch) BestHeight() uint32 {
451✔
2531
        return atomic.LoadUint32(&s.bestHeight)
451✔
2532
}
451✔
2533

2534
// dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean
2535
// to determine whether the default fee threshold has been exceeded. This
2536
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2537
// commitment's dust with the mailbox's dust with the amount is checked against
2538
// the fee exposure threshold. If incoming is true, then the amount is not
2539
// included in the sum as it was already included in the commitment's dust. A
2540
// boolean is returned telling the caller whether the HTLC should be failed
2541
// back.
2542
func (s *Switch) dustExceedsFeeThreshold(link ChannelLink,
2543
        amount lnwire.MilliSatoshi, incoming bool) bool {
536✔
2544

536✔
2545
        // Retrieve the link's current commitment feerate and dustClosure.
536✔
2546
        feeRate := link.getFeeRate()
536✔
2547
        isDust := link.getDustClosure()
536✔
2548

536✔
2549
        // Evaluate if the HTLC is dust on either sides' commitment.
536✔
2550
        isLocalDust := isDust(
536✔
2551
                feeRate, incoming, lntypes.Local, amount.ToSatoshis(),
536✔
2552
        )
536✔
2553
        isRemoteDust := isDust(
536✔
2554
                feeRate, incoming, lntypes.Remote, amount.ToSatoshis(),
536✔
2555
        )
536✔
2556

536✔
2557
        if !(isLocalDust || isRemoteDust) {
672✔
2558
                // If the HTLC is not dust on either commitment, it's fine to
136✔
2559
                // forward.
136✔
2560
                return false
136✔
2561
        }
136✔
2562

2563
        // Fetch the dust sums currently in the mailbox for this link.
2564
        cid := link.ChanID()
404✔
2565
        sid := link.ShortChanID()
404✔
2566
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
404✔
2567
        localMailDust, remoteMailDust := mailbox.DustPackets()
404✔
2568

404✔
2569
        // If the htlc is dust on the local commitment, we'll obtain the dust
404✔
2570
        // sum for it.
404✔
2571
        if isLocalDust {
808✔
2572
                localSum := link.getDustSum(
404✔
2573
                        lntypes.Local, fn.None[chainfee.SatPerKWeight](),
404✔
2574
                )
404✔
2575
                localSum += localMailDust
404✔
2576

404✔
2577
                // Optionally include the HTLC amount only for outgoing
404✔
2578
                // HTLCs.
404✔
2579
                if !incoming {
768✔
2580
                        localSum += amount
364✔
2581
                }
364✔
2582

2583
                // Finally check against the defined fee threshold.
2584
                if localSum > s.cfg.MaxFeeExposure {
406✔
2585
                        return true
2✔
2586
                }
2✔
2587
        }
2588

2589
        // Also check if the htlc is dust on the remote commitment, if we've
2590
        // reached this point.
2591
        if isRemoteDust {
804✔
2592
                remoteSum := link.getDustSum(
402✔
2593
                        lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
402✔
2594
                )
402✔
2595
                remoteSum += remoteMailDust
402✔
2596

402✔
2597
                // Optionally include the HTLC amount only for outgoing
402✔
2598
                // HTLCs.
402✔
2599
                if !incoming {
764✔
2600
                        remoteSum += amount
362✔
2601
                }
362✔
2602

2603
                // Finally check against the defined fee threshold.
2604
                if remoteSum > s.cfg.MaxFeeExposure {
402✔
2605
                        return true
×
2606
                }
×
2607
        }
2608

2609
        // If we reached this point, this HTLC is fine to forward.
2610
        return false
402✔
2611
}
2612

2613
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2614
// it to individual mailboxes. It allows the mailboxes to construct a
2615
// FailureMessage when failing back HTLC's due to expiry and may include an
2616
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2617
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2618
// use. The mailboxScid is only used in the non-alias case, so it is always
2619
// the confirmed SCID.
2620
func (s *Switch) failMailboxUpdate(outgoingScid,
2621
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
15✔
2622

15✔
2623
        // Try to use the failAliasUpdate function in case this is a channel
15✔
2624
        // that uses aliases. If it returns nil, we'll fallback to the original
15✔
2625
        // pre-alias behavior.
15✔
2626
        update := s.failAliasUpdate(outgoingScid, false)
15✔
2627
        if update == nil {
24✔
2628
                // Execute the fallback behavior.
9✔
2629
                var err error
9✔
2630
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
9✔
2631
                if err != nil {
9✔
2632
                        return &lnwire.FailTemporaryNodeFailure{}
×
2633
                }
×
2634
        }
2635

2636
        return lnwire.NewTemporaryChannelFailure(update)
15✔
2637
}
2638

2639
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2640
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2641
// the associated channel is not one of these, this function will return nil
2642
// and the caller is expected to handle this properly. In this case, a return
2643
// to the original non-alias behavior is expected.
2644
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2645
        incoming bool) *lnwire.ChannelUpdate1 {
38✔
2646

38✔
2647
        // This function does not defer the unlocking because of the database
38✔
2648
        // lookups for ChannelUpdate.
38✔
2649
        s.indexMtx.RLock()
38✔
2650

38✔
2651
        if s.cfg.IsAlias(scid) {
53✔
2652
                // The alias SCID was used. In the incoming case this means
15✔
2653
                // the channel is zero-conf as the link sets the scid. In the
15✔
2654
                // outgoing case, the sender set the scid to use and may be
15✔
2655
                // either the alias or the confirmed one, if it exists.
15✔
2656
                realScid, ok := s.aliasToReal[scid]
15✔
2657
                if !ok {
15✔
2658
                        // The real, confirmed SCID does not exist yet. Find
×
2659
                        // the "base" SCID that the link uses via the
×
2660
                        // baseIndex. If we can't find it, return nil. This
×
2661
                        // means the channel is zero-conf.
×
2662
                        baseScid, ok := s.baseIndex[scid]
×
2663
                        s.indexMtx.RUnlock()
×
2664
                        if !ok {
×
2665
                                return nil
×
2666
                        }
×
2667

2668
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2669
                        if err != nil {
×
2670
                                return nil
×
2671
                        }
×
2672

2673
                        // Replace the baseScid with the passed-in alias.
2674
                        update.ShortChannelID = scid
×
2675
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2676
                        if err != nil {
×
2677
                                return nil
×
2678
                        }
×
2679

2680
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2681
                        if err != nil {
×
2682
                                return nil
×
2683
                        }
×
2684

2685
                        return update
×
2686
                }
2687

2688
                s.indexMtx.RUnlock()
15✔
2689

15✔
2690
                // Fetch the SCID via the confirmed SCID and replace it with
15✔
2691
                // the alias.
15✔
2692
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
15✔
2693
                if err != nil {
19✔
2694
                        return nil
4✔
2695
                }
4✔
2696

2697
                // In the incoming case, we want to ensure that we don't leak
2698
                // the UTXO in case the channel is private. In the outgoing
2699
                // case, since the alias was used, we do the same thing.
2700
                update.ShortChannelID = scid
15✔
2701
                sig, err := s.cfg.SignAliasUpdate(update)
15✔
2702
                if err != nil {
15✔
2703
                        return nil
×
2704
                }
×
2705

2706
                update.Signature, err = lnwire.NewSigFromSignature(sig)
15✔
2707
                if err != nil {
15✔
2708
                        return nil
×
2709
                }
×
2710

2711
                return update
15✔
2712
        }
2713

2714
        // If the confirmed SCID is not in baseIndex, this is not an
2715
        // option-scid-alias or zero-conf channel.
2716
        baseScid, ok := s.baseIndex[scid]
27✔
2717
        if !ok {
49✔
2718
                s.indexMtx.RUnlock()
22✔
2719
                return nil
22✔
2720
        }
22✔
2721

2722
        // Fetch the link so we can get an alias to use in the ShortChannelID
2723
        // of the ChannelUpdate.
2724
        link, ok := s.forwardingIndex[baseScid]
5✔
2725
        s.indexMtx.RUnlock()
5✔
2726
        if !ok {
5✔
2727
                // This should never happen, but if it does for some reason,
×
2728
                // fallback to the old behavior.
×
2729
                return nil
×
2730
        }
×
2731

2732
        aliases := link.getAliases()
5✔
2733
        if len(aliases) == 0 {
5✔
2734
                // This should never happen, but if it does, fallback.
×
2735
                return nil
×
2736
        }
×
2737

2738
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2739
        update, err := s.cfg.FetchLastChannelUpdate(scid)
5✔
2740
        if err != nil {
5✔
2741
                return nil
×
2742
        }
×
2743

2744
        // The incoming case will replace the ShortChannelID in the retrieved
2745
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2746
        // would happen if a private non-zero-conf option-scid-alias
2747
        // feature-bit channel leaked its UTXO here rather than supplying an
2748
        // alias. In the outgoing case, the confirmed SCID was actually used
2749
        // for forwarding in the onion, so no replacement is necessary as the
2750
        // sender knows the scid.
2751
        if incoming {
7✔
2752
                // We will replace and sign the update with the first alias.
2✔
2753
                // Since this happens on the incoming side, it's not actually
2✔
2754
                // possible to know what the sender used in the onion.
2✔
2755
                update.ShortChannelID = aliases[0]
2✔
2756
                sig, err := s.cfg.SignAliasUpdate(update)
2✔
2757
                if err != nil {
2✔
2758
                        return nil
×
2759
                }
×
2760

2761
                update.Signature, err = lnwire.NewSigFromSignature(sig)
2✔
2762
                if err != nil {
2✔
2763
                        return nil
×
2764
                }
×
2765
        }
2766

2767
        return update
5✔
2768
}
2769

2770
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
2771
// that a link has a new alias.
2772
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
2773
        alias lnwire.ShortChannelID) error {
×
2774

×
2775
        // Fetch the link so that we can update the underlying channel's set of
×
2776
        // aliases.
×
2777
        s.indexMtx.RLock()
×
2778
        link, err := s.getLink(chanID)
×
2779
        s.indexMtx.RUnlock()
×
2780
        if err != nil {
×
2781
                return err
×
2782
        }
×
2783

2784
        // If the link is a channel where the option-scid-alias feature bit was
2785
        // not negotiated, we'll return an error.
2786
        if !link.negotiatedAliasFeature() {
×
2787
                return fmt.Errorf("attempted to update non-alias channel")
×
2788
        }
×
2789

2790
        linkScid := link.ShortChanID()
×
2791

×
2792
        // We'll update the maps so the Switch includes this alias in its
×
2793
        // forwarding decisions.
×
2794
        if link.isZeroConf() {
×
2795
                if link.zeroConfConfirmed() {
×
2796
                        // If the channel has confirmed on-chain, we'll
×
2797
                        // add this alias to the aliasToReal map.
×
2798
                        confirmedScid := link.confirmedScid()
×
2799

×
2800
                        s.aliasToReal[alias] = confirmedScid
×
2801
                }
×
2802

2803
                // Add this alias to the baseIndex mapping.
2804
                s.baseIndex[alias] = linkScid
×
2805
        } else if link.negotiatedAliasFeature() {
×
2806
                // The channel is confirmed, so we'll populate the aliasToReal
×
2807
                // and baseIndex maps.
×
2808
                s.aliasToReal[alias] = linkScid
×
2809
                s.baseIndex[alias] = linkScid
×
2810
        }
×
2811

2812
        return nil
×
2813
}
2814

2815
// handlePacketAdd handles forwarding an Add packet.
2816
func (s *Switch) handlePacketAdd(packet *htlcPacket,
2817
        htlc *lnwire.UpdateAddHTLC) error {
86✔
2818

86✔
2819
        // Check if the node is set to reject all onward HTLCs and also make
86✔
2820
        // sure that HTLC is not from the source node.
86✔
2821
        if s.cfg.RejectHTLC {
91✔
2822
                failure := NewDetailedLinkError(
5✔
2823
                        &lnwire.FailChannelDisabled{},
5✔
2824
                        OutgoingFailureForwardsDisabled,
5✔
2825
                )
5✔
2826

5✔
2827
                return s.failAddPacket(packet, failure)
5✔
2828
        }
5✔
2829

2830
        // Before we attempt to find a non-strict forwarding path for this
2831
        // htlc, check whether the htlc is being routed over the same incoming
2832
        // and outgoing channel. If our node does not allow forwards of this
2833
        // nature, we fail the htlc early. This check is in place to disallow
2834
        // inefficiently routed htlcs from locking up our balance. With
2835
        // channels where the option-scid-alias feature was negotiated, we also
2836
        // have to be sure that the IDs aren't the same since one or both could
2837
        // be an alias.
2838
        linkErr := s.checkCircularForward(
85✔
2839
                packet.incomingChanID, packet.outgoingChanID,
85✔
2840
                s.cfg.AllowCircularRoute, htlc.PaymentHash,
85✔
2841
        )
85✔
2842
        if linkErr != nil {
86✔
2843
                return s.failAddPacket(packet, linkErr)
1✔
2844
        }
1✔
2845

2846
        s.indexMtx.RLock()
84✔
2847
        targetLink, err := s.getLinkByMapping(packet)
84✔
2848
        if err != nil {
92✔
2849
                s.indexMtx.RUnlock()
8✔
2850

8✔
2851
                log.Debugf("unable to find link with "+
8✔
2852
                        "destination %v", packet.outgoingChanID)
8✔
2853

8✔
2854
                // If packet was forwarded from another channel link than we
8✔
2855
                // should notify this link that some error occurred.
8✔
2856
                linkError := NewLinkError(
8✔
2857
                        &lnwire.FailUnknownNextPeer{},
8✔
2858
                )
8✔
2859

8✔
2860
                return s.failAddPacket(packet, linkError)
8✔
2861
        }
8✔
2862
        targetPeerKey := targetLink.PeerPubKey()
80✔
2863
        interfaceLinks, _ := s.getLinks(targetPeerKey)
80✔
2864
        s.indexMtx.RUnlock()
80✔
2865

80✔
2866
        // We'll keep track of any HTLC failures during the link selection
80✔
2867
        // process. This way we can return the error for precise link that the
80✔
2868
        // sender selected, while optimistically trying all links to utilize
80✔
2869
        // our available bandwidth.
80✔
2870
        linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
80✔
2871

80✔
2872
        // Find all destination channel links with appropriate bandwidth.
80✔
2873
        var destinations []ChannelLink
80✔
2874
        for _, link := range interfaceLinks {
164✔
2875
                var failure *LinkError
84✔
2876

84✔
2877
                // We'll skip any links that aren't yet eligible for
84✔
2878
                // forwarding.
84✔
2879
                if !link.EligibleToForward() {
88✔
2880
                        failure = NewDetailedLinkError(
4✔
2881
                                &lnwire.FailUnknownNextPeer{},
4✔
2882
                                OutgoingFailureLinkNotEligible,
4✔
2883
                        )
4✔
2884
                } else {
84✔
2885
                        // We'll ensure that the HTLC satisfies the current
80✔
2886
                        // forwarding conditions of this target link.
80✔
2887
                        currentHeight := atomic.LoadUint32(&s.bestHeight)
80✔
2888
                        failure = link.CheckHtlcForward(
80✔
2889
                                htlc.PaymentHash, packet.incomingAmount,
80✔
2890
                                packet.amount, packet.incomingTimeout,
80✔
2891
                                packet.outgoingTimeout, packet.inboundFee,
80✔
2892
                                currentHeight, packet.originalOutgoingChanID,
80✔
2893
                                htlc.CustomRecords,
80✔
2894
                        )
80✔
2895
                }
80✔
2896

2897
                // If this link can forward the htlc, add it to the set of
2898
                // destinations.
2899
                if failure == nil {
148✔
2900
                        destinations = append(destinations, link)
64✔
2901
                        continue
64✔
2902
                }
2903

2904
                linkErrs[link.ShortChanID()] = failure
24✔
2905
        }
2906

2907
        // If we had a forwarding failure due to the HTLC not satisfying the
2908
        // current policy, then we'll send back an error, but ensure we send
2909
        // back the error sourced at the *target* link.
2910
        if len(destinations) == 0 {
100✔
2911
                // At this point, some or all of the links rejected the HTLC so
20✔
2912
                // we couldn't forward it. So we'll try to look up the error
20✔
2913
                // that came from the source.
20✔
2914
                linkErr, ok := linkErrs[packet.outgoingChanID]
20✔
2915
                if !ok {
20✔
2916
                        // If we can't find the error of the source, then we'll
×
2917
                        // return an unknown next peer, though this should
×
2918
                        // never happen.
×
2919
                        linkErr = NewLinkError(
×
2920
                                &lnwire.FailUnknownNextPeer{},
×
2921
                        )
×
2922
                        log.Warnf("unable to find err source for "+
×
2923
                                "outgoing_link=%v, errors=%v",
×
2924
                                packet.outgoingChanID,
×
2925
                                lnutils.SpewLogClosure(linkErrs))
×
2926
                }
×
2927

2928
                log.Tracef("incoming HTLC(%x) violated "+
20✔
2929
                        "target outgoing link (id=%v) policy: %v",
20✔
2930
                        htlc.PaymentHash[:], packet.outgoingChanID,
20✔
2931
                        linkErr)
20✔
2932

20✔
2933
                return s.failAddPacket(packet, linkErr)
20✔
2934
        }
2935

2936
        // Choose a random link out of the set of links that can forward this
2937
        // htlc. The reason for randomization is to evenly distribute the htlc
2938
        // load without making assumptions about what the best channel is.
2939
        //nolint:gosec
2940
        destination := destinations[rand.Intn(len(destinations))]
64✔
2941

64✔
2942
        // Retrieve the incoming link by its ShortChannelID. Note that the
64✔
2943
        // incomingChanID is never set to hop.Source here.
64✔
2944
        s.indexMtx.RLock()
64✔
2945
        incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
64✔
2946
        s.indexMtx.RUnlock()
64✔
2947
        if err != nil {
64✔
2948
                // If we couldn't find the incoming link, we can't evaluate the
×
2949
                // incoming's exposure to dust, so we just fail the HTLC back.
×
2950
                linkErr := NewLinkError(
×
2951
                        &lnwire.FailTemporaryChannelFailure{},
×
2952
                )
×
2953

×
2954
                return s.failAddPacket(packet, linkErr)
×
2955
        }
×
2956

2957
        // Evaluate whether this HTLC would increase our fee exposure over the
2958
        // threshold on the incoming link. If it does, fail it backwards.
2959
        if s.dustExceedsFeeThreshold(
64✔
2960
                incomingLink, packet.incomingAmount, true,
64✔
2961
        ) {
64✔
2962
                // The incoming dust exceeds the threshold, so we fail the add
×
2963
                // back.
×
2964
                linkErr := NewLinkError(
×
2965
                        &lnwire.FailTemporaryChannelFailure{},
×
2966
                )
×
2967

×
2968
                return s.failAddPacket(packet, linkErr)
×
2969
        }
×
2970

2971
        // Also evaluate whether this HTLC would increase our fee exposure over
2972
        // the threshold on the destination link. If it does, fail it back.
2973
        if s.dustExceedsFeeThreshold(
64✔
2974
                destination, packet.amount, false,
64✔
2975
        ) {
65✔
2976
                // The outgoing dust exceeds the threshold, so we fail the add
1✔
2977
                // back.
1✔
2978
                linkErr := NewLinkError(
1✔
2979
                        &lnwire.FailTemporaryChannelFailure{},
1✔
2980
                )
1✔
2981

1✔
2982
                return s.failAddPacket(packet, linkErr)
1✔
2983
        }
1✔
2984

2985
        // Send the packet to the destination channel link which manages the
2986
        // channel.
2987
        packet.outgoingChanID = destination.ShortChanID()
63✔
2988

63✔
2989
        return destination.handleSwitchPacket(packet)
63✔
2990
}
2991

2992
// handlePacketSettle handles forwarding a settle packet.
2993
func (s *Switch) handlePacketSettle(packet *htlcPacket) error {
403✔
2994
        // If the source of this packet has not been set, use the circuit map
403✔
2995
        // to lookup the origin.
403✔
2996
        circuit, err := s.closeCircuit(packet)
403✔
2997
        if err != nil {
407✔
2998
                return err
4✔
2999
        }
4✔
3000

3001
        // closeCircuit returns a nil circuit when a settle packet returns an
3002
        // ErrUnknownCircuit error upon the inner call to CloseCircuit.
3003
        //
3004
        // NOTE: We can only get a nil circuit when it has already been deleted
3005
        // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck`
3006
        // is received, which invokes `processRemoteSettleFails` in its link.
3007
        if circuit == nil {
597✔
3008
                log.Debugf("Found nil circuit: packet=%v", spew.Sdump(packet))
194✔
3009
                return nil
194✔
3010
        }
194✔
3011

3012
        localHTLC := packet.incomingChanID == hop.Source
213✔
3013

213✔
3014
        // If this is a locally initiated HTLC, we need to handle the packet by
213✔
3015
        // storing the network result.
213✔
3016
        //
213✔
3017
        // A blank IncomingChanID in a circuit indicates that it is a pending
213✔
3018
        // user-initiated payment.
213✔
3019
        //
213✔
3020
        // NOTE: `closeCircuit` modifies the state of `packet`.
213✔
3021
        if localHTLC {
397✔
3022
                // TODO(yy): remove the goroutine and send back the error here.
184✔
3023
                s.wg.Add(1)
184✔
3024
                go s.handleLocalResponse(packet)
184✔
3025

184✔
3026
                // If this is a locally initiated HTLC, there's no need to
184✔
3027
                // forward it so we exit.
184✔
3028
                return nil
184✔
3029
        }
184✔
3030

3031
        // If this is an HTLC settle, and it wasn't from a locally initiated
3032
        // HTLC, then we'll log a forwarding event so we can flush it to disk
3033
        // later.
3034
        if circuit.Outgoing != nil {
66✔
3035
                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
33✔
3036
                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
33✔
3037
                        circuit.PaymentHash[:], circuit.OutgoingAmount,
33✔
3038
                        circuit.IncomingAmount-circuit.OutgoingAmount,
33✔
3039
                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
33✔
3040

33✔
3041
                s.fwdEventMtx.Lock()
33✔
3042
                s.pendingFwdingEvents = append(
33✔
3043
                        s.pendingFwdingEvents,
33✔
3044
                        channeldb.ForwardingEvent{
33✔
3045
                                Timestamp:      time.Now(),
33✔
3046
                                IncomingChanID: circuit.Incoming.ChanID,
33✔
3047
                                OutgoingChanID: circuit.Outgoing.ChanID,
33✔
3048
                                AmtIn:          circuit.IncomingAmount,
33✔
3049
                                AmtOut:         circuit.OutgoingAmount,
33✔
3050
                        },
33✔
3051
                )
33✔
3052
                s.fwdEventMtx.Unlock()
33✔
3053
        }
33✔
3054

3055
        // Deliver this packet.
3056
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
33✔
3057
}
3058

3059
// handlePacketFail handles forwarding a fail packet.
3060
func (s *Switch) handlePacketFail(packet *htlcPacket,
3061
        htlc *lnwire.UpdateFailHTLC) error {
142✔
3062

142✔
3063
        // If the source of this packet has not been set, use the circuit map
142✔
3064
        // to lookup the origin.
142✔
3065
        circuit, err := s.closeCircuit(packet)
142✔
3066
        if err != nil {
146✔
3067
                return err
4✔
3068
        }
4✔
3069

3070
        // If this is a locally initiated HTLC, we need to handle the packet by
3071
        // storing the network result.
3072
        //
3073
        // A blank IncomingChanID in a circuit indicates that it is a pending
3074
        // user-initiated payment.
3075
        //
3076
        // NOTE: `closeCircuit` modifies the state of `packet`.
3077
        if packet.incomingChanID == hop.Source {
270✔
3078
                // TODO(yy): remove the goroutine and send back the error here.
128✔
3079
                s.wg.Add(1)
128✔
3080
                go s.handleLocalResponse(packet)
128✔
3081

128✔
3082
                // If this is a locally initiated HTLC, there's no need to
128✔
3083
                // forward it so we exit.
128✔
3084
                return nil
128✔
3085
        }
128✔
3086

3087
        // Exit early if this hasSource is true. This flag is only set via
3088
        // mailbox's `FailAdd`. This method has two callsites,
3089
        // - the packet has timed out after `MailboxDeliveryTimeout`, defaults
3090
        //   to 1 min.
3091
        // - the HTLC fails the validation in `channel.AddHTLC`.
3092
        // In either case, the `Reason` field is populated. Thus there's no
3093
        // need to proceed and extract the failure reason below.
3094
        if packet.hasSource {
29✔
3095
                // Deliver this packet.
11✔
3096
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
11✔
3097
        }
11✔
3098

3099
        // HTLC resolutions and messages restored from disk don't have the
3100
        // obfuscator set from the original htlc add packet - set it here for
3101
        // use in blinded errors.
3102
        packet.obfuscator = circuit.ErrorEncrypter
11✔
3103

11✔
3104
        switch {
11✔
3105
        // No message to encrypt, locally sourced payment.
3106
        case circuit.ErrorEncrypter == nil:
×
3107
                // TODO(yy) further check this case as we shouldn't end up here
3108
                // as `isLocal` is already false.
3109

3110
        // If this is a resolution message, then we'll need to encrypt it as
3111
        // it's actually internally sourced.
3112
        case packet.isResolution:
4✔
3113
                var err error
4✔
3114
                // TODO(roasbeef): don't need to pass actually?
4✔
3115
                failure := &lnwire.FailPermanentChannelFailure{}
4✔
3116
                htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
4✔
3117
                        failure,
4✔
3118
                )
4✔
3119
                if err != nil {
4✔
3120
                        err = fmt.Errorf("unable to obfuscate error: %w", err)
×
3121
                        log.Error(err)
×
3122
                }
×
3123

3124
        // Alternatively, if the remote party sends us an
3125
        // UpdateFailMalformedHTLC, then we'll need to convert this into a
3126
        // proper well formatted onion error as there's no HMAC currently.
3127
        case packet.convertedError:
6✔
3128
                log.Infof("Converting malformed HTLC error for circuit for "+
6✔
3129
                        "Circuit(%x: (%s, %d) <-> (%s, %d))",
6✔
3130
                        packet.circuit.PaymentHash,
6✔
3131
                        packet.incomingChanID, packet.incomingHTLCID,
6✔
3132
                        packet.outgoingChanID, packet.outgoingHTLCID)
6✔
3133

6✔
3134
                htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
6✔
3135
                        htlc.Reason,
6✔
3136
                )
6✔
3137

3138
        default:
9✔
3139
                // Otherwise, it's a forwarded error, so we'll perform a
9✔
3140
                // wrapper encryption as normal.
9✔
3141
                htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
9✔
3142
                        htlc.Reason,
9✔
3143
                )
9✔
3144
        }
3145

3146
        // Deliver this packet.
3147
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
11✔
3148
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc