• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11294569257

11 Oct 2024 02:52PM UTC coverage: 49.201% (+0.02%) from 49.179%
11294569257

Pull #9140

github

starius
htlcswitch: fix linter warnings
Pull Request #9140: htlcswitch: use fn.GoroutineManager

27 of 52 new or added lines in 1 file covered. (51.92%)

81 existing lines in 12 files now uncovered.

97421 of 198005 relevant lines covered (49.2%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.9
/htlcswitch/switch.go
1
package htlcswitch
2

3
import (
4
        "bytes"
5
        "context"
6
        "errors"
7
        "fmt"
8
        "math/rand"
9
        "sync"
10
        "sync/atomic"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2/ecdsa"
14
        "github.com/btcsuite/btcd/btcutil"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/davecgh/go-spew/spew"
17
        "github.com/lightningnetwork/lnd/chainntnfs"
18
        "github.com/lightningnetwork/lnd/channeldb"
19
        "github.com/lightningnetwork/lnd/channeldb/models"
20
        "github.com/lightningnetwork/lnd/clock"
21
        "github.com/lightningnetwork/lnd/contractcourt"
22
        "github.com/lightningnetwork/lnd/fn"
23
        "github.com/lightningnetwork/lnd/htlcswitch/hop"
24
        "github.com/lightningnetwork/lnd/kvdb"
25
        "github.com/lightningnetwork/lnd/lntypes"
26
        "github.com/lightningnetwork/lnd/lnutils"
27
        "github.com/lightningnetwork/lnd/lnwallet"
28
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
29
        "github.com/lightningnetwork/lnd/lnwire"
30
        "github.com/lightningnetwork/lnd/ticker"
31
)
32

33
const (
34
        // DefaultFwdEventInterval is the duration between attempts to flush
35
        // pending forwarding events to disk.
36
        DefaultFwdEventInterval = 15 * time.Second
37

38
        // DefaultLogInterval is the duration between attempts to log statistics
39
        // about forwarding events.
40
        DefaultLogInterval = 10 * time.Second
41

42
        // DefaultAckInterval is the duration between attempts to ack any settle
43
        // fails in a forwarding package.
44
        DefaultAckInterval = 15 * time.Second
45

46
        // DefaultMailboxDeliveryTimeout is the duration after which Adds will
47
        // be cancelled if they could not get added to an outgoing commitment.
48
        DefaultMailboxDeliveryTimeout = time.Minute
49
)
50

51
var (
52
        // ErrChannelLinkNotFound is used when channel link hasn't been found.
53
        ErrChannelLinkNotFound = errors.New("channel link not found")
54

55
        // ErrDuplicateAdd signals that the ADD htlc was already forwarded
56
        // through the switch and is locked into another commitment txn.
57
        ErrDuplicateAdd = errors.New("duplicate add HTLC detected")
58

59
        // ErrUnknownErrorDecryptor signals that we were unable to locate the
60
        // error decryptor for this payment. This is likely due to restarting
61
        // the daemon.
62
        ErrUnknownErrorDecryptor = errors.New("unknown error decryptor")
63

64
        // ErrSwitchExiting signaled when the switch has received a shutdown
65
        // request.
66
        ErrSwitchExiting = errors.New("htlcswitch shutting down")
67

68
        // ErrNoLinksFound is an error returned when we attempt to retrieve the
69
        // active links in the switch for a specific destination.
70
        ErrNoLinksFound = errors.New("no channel links found")
71

72
        // ErrUnreadableFailureMessage is returned when the failure message
73
        // cannot be decrypted.
74
        ErrUnreadableFailureMessage = errors.New("unreadable failure message")
75

76
        // ErrLocalAddFailed signals that the ADD htlc for a local payment
77
        // failed to be processed.
78
        ErrLocalAddFailed = errors.New("local add HTLC failed")
79

80
        // errFeeExposureExceeded is only surfaced to callers of SendHTLC and
81
        // signals that sending the HTLC would exceed the outgoing link's fee
82
        // exposure threshold.
83
        errFeeExposureExceeded = errors.New("fee exposure exceeded")
84

85
        // DefaultMaxFeeExposure is the default threshold after which we'll
86
        // fail payments if they increase our fee exposure. This is currently
87
        // set to 500m msats.
88
        DefaultMaxFeeExposure = lnwire.MilliSatoshi(500_000_000)
89
)
90

91
// plexPacket encapsulates switch packet and adds error channel to receive
92
// error from request handler.
93
type plexPacket struct {
94
        pkt *htlcPacket
95
        err chan error
96
}
97

98
// ChanClose represents a request which close a particular channel specified by
99
// its id.
100
type ChanClose struct {
101
        // CloseType is a variable which signals the type of channel closure the
102
        // peer should execute.
103
        CloseType contractcourt.ChannelCloseType
104

105
        // ChanPoint represent the id of the channel which should be closed.
106
        ChanPoint *wire.OutPoint
107

108
        // TargetFeePerKw is the ideal fee that was specified by the caller.
109
        // This value is only utilized if the closure type is CloseRegular.
110
        // This will be the starting offered fee when the fee negotiation
111
        // process for the cooperative closure transaction kicks off.
112
        TargetFeePerKw chainfee.SatPerKWeight
113

114
        // MaxFee is the highest fee the caller is willing to pay.
115
        //
116
        // NOTE: This field is only respected if the caller is the initiator of
117
        // the channel.
118
        MaxFee chainfee.SatPerKWeight
119

120
        // DeliveryScript is an optional delivery script to pay funds out to.
121
        DeliveryScript lnwire.DeliveryAddress
122

123
        // Updates is used by request creator to receive the notifications about
124
        // execution of the close channel request.
125
        Updates chan interface{}
126

127
        // Err is used by request creator to receive request execution error.
128
        Err chan error
129
}
130

131
// Config defines the configuration for the service. ALL elements within the
132
// configuration MUST be non-nil for the service to carry out its duties.
133
type Config struct {
134
        // FwdingLog is an interface that will be used by the switch to log
135
        // forwarding events. A forwarding event happens each time a payment
136
        // circuit is successfully completed. So when we forward an HTLC, and a
137
        // settle is eventually received.
138
        FwdingLog ForwardingLog
139

140
        // LocalChannelClose kicks-off the workflow to execute a cooperative or
141
        // forced unilateral closure of the channel initiated by a local
142
        // subsystem.
143
        LocalChannelClose func(pubKey []byte, request *ChanClose)
144

145
        // DB is the database backend that will be used to back the switch's
146
        // persistent circuit map.
147
        DB kvdb.Backend
148

149
        // FetchAllOpenChannels is a function that fetches all currently open
150
        // channels from the channel database.
151
        FetchAllOpenChannels func() ([]*channeldb.OpenChannel, error)
152

153
        // FetchAllChannels is a function that fetches all pending open, open,
154
        // and waiting close channels from the database.
155
        FetchAllChannels func() ([]*channeldb.OpenChannel, error)
156

157
        // FetchClosedChannels is a function that fetches all closed channels
158
        // from the channel database.
159
        FetchClosedChannels func(
160
                pendingOnly bool) ([]*channeldb.ChannelCloseSummary, error)
161

162
        // SwitchPackager provides access to the forwarding packages of all
163
        // active channels. This gives the switch the ability to read arbitrary
164
        // forwarding packages, and ack settles and fails contained within them.
165
        SwitchPackager channeldb.FwdOperator
166

167
        // ExtractErrorEncrypter is an interface allowing switch to reextract
168
        // error encrypters stored in the circuit map on restarts, since they
169
        // are not stored directly within the database.
170
        ExtractErrorEncrypter hop.ErrorEncrypterExtracter
171

172
        // FetchLastChannelUpdate retrieves the latest routing policy for a
173
        // target channel. This channel will typically be the outgoing channel
174
        // specified when we receive an incoming HTLC.  This will be used to
175
        // provide payment senders our latest policy when sending encrypted
176
        // error messages.
177
        FetchLastChannelUpdate func(lnwire.ShortChannelID) (
178
                *lnwire.ChannelUpdate1, error)
179

180
        // Notifier is an instance of a chain notifier that we'll use to signal
181
        // the switch when a new block has arrived.
182
        Notifier chainntnfs.ChainNotifier
183

184
        // HtlcNotifier is an instance of a htlcNotifier which we will pipe htlc
185
        // events through.
186
        HtlcNotifier htlcNotifier
187

188
        // FwdEventTicker is a signal that instructs the htlcswitch to flush any
189
        // pending forwarding events.
190
        FwdEventTicker ticker.Ticker
191

192
        // LogEventTicker is a signal instructing the htlcswitch to log
193
        // aggregate stats about it's forwarding during the last interval.
194
        LogEventTicker ticker.Ticker
195

196
        // AckEventTicker is a signal instructing the htlcswitch to ack any settle
197
        // fails in forwarding packages.
198
        AckEventTicker ticker.Ticker
199

200
        // AllowCircularRoute is true if the user has configured their node to
201
        // allow forwards that arrive and depart our node over the same channel.
202
        AllowCircularRoute bool
203

204
        // RejectHTLC is a flag that instructs the htlcswitch to reject any
205
        // HTLCs that are not from the source hop.
206
        RejectHTLC bool
207

208
        // Clock is a time source for the switch.
209
        Clock clock.Clock
210

211
        // MailboxDeliveryTimeout is the interval after which Adds will be
212
        // cancelled if they have not been yet been delivered to a link. The
213
        // computed deadline will expiry this long after the Adds are added to
214
        // a mailbox via AddPacket.
215
        MailboxDeliveryTimeout time.Duration
216

217
        // MaxFeeExposure is the threshold in milli-satoshis after which we'll
218
        // fail incoming or outgoing payments for a particular channel.
219
        MaxFeeExposure lnwire.MilliSatoshi
220

221
        // SignAliasUpdate is used when sending FailureMessages backwards for
222
        // option_scid_alias channels. This avoids a potential privacy leak by
223
        // replacing the public, confirmed SCID with the alias in the
224
        // ChannelUpdate.
225
        SignAliasUpdate func(u *lnwire.ChannelUpdate1) (*ecdsa.Signature,
226
                error)
227

228
        // IsAlias returns whether or not a given SCID is an alias.
229
        IsAlias func(scid lnwire.ShortChannelID) bool
230
}
231

232
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
233
// Connected peers with active channels are treated as named interfaces which
234
// refer to active channels as links. A link is the switch's message
235
// communication point with the goroutine that manages an active channel. New
236
// links are registered each time a channel is created, and unregistered once
237
// the channel is closed. The switch manages the hand-off process for multi-hop
238
// HTLCs, forwarding HTLCs initiated from within the daemon, and finally
239
// notifies users local-systems concerning their outstanding payment requests.
240
type Switch struct {
241
        started  int32 // To be used atomically.
242
        shutdown int32 // To be used atomically.
243

244
        // bestHeight is the best known height of the main chain. The links will
245
        // be used this information to govern decisions based on HTLC timeouts.
246
        // This will be retrieved by the registered links atomically.
247
        bestHeight uint32
248

249
        // TODO(yy): remove handleLocalResponseWG, once handleLocalResponse runs
250
        // without a goroutine. Currently we can't run handleLocalResponse in
251
        // gm, since if gm is stopping, the goroutine won't start and it is
252
        // unclear if it safe to skip handleLocalResponse.
253
        handleLocalResponseWG sync.WaitGroup
254

255
        // gm starts and stops tasks in goroutines and waits for them.
256
        gm *fn.GoroutineManager
257

258
        // cfg is a copy of the configuration struct that the htlc switch
259
        // service was initialized with.
260
        cfg *Config
261

262
        // networkResults stores the results of payments initiated by the user.
263
        // The store is used to later look up the payments and notify the
264
        // user of the result when they are complete. Each payment attempt
265
        // should be given a unique integer ID when it is created, otherwise
266
        // results might be overwritten.
267
        networkResults *networkResultStore
268

269
        // circuits is storage for payment circuits which are used to
270
        // forward the settle/fail htlc updates back to the add htlc initiator.
271
        circuits CircuitMap
272

273
        // mailOrchestrator manages the lifecycle of mailboxes used throughout
274
        // the switch, and facilitates delayed delivery of packets to links that
275
        // later come online.
276
        mailOrchestrator *mailOrchestrator
277

278
        // indexMtx is a read/write mutex that protects the set of indexes
279
        // below.
280
        indexMtx sync.RWMutex
281

282
        // pendingLinkIndex holds links that have not had their final, live
283
        // short_chan_id assigned.
284
        pendingLinkIndex map[lnwire.ChannelID]ChannelLink
285

286
        // links is a map of channel id and channel link which manages
287
        // this channel.
288
        linkIndex map[lnwire.ChannelID]ChannelLink
289

290
        // forwardingIndex is an index which is consulted by the switch when it
291
        // needs to locate the next hop to forward an incoming/outgoing HTLC
292
        // update to/from.
293
        //
294
        // TODO(roasbeef): eventually add a NetworkHop mapping before the
295
        // ChannelLink
296
        forwardingIndex map[lnwire.ShortChannelID]ChannelLink
297

298
        // interfaceIndex maps the compressed public key of a peer to all the
299
        // channels that the switch maintains with that peer.
300
        interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
301

302
        // linkStopIndex stores the currently stopping ChannelLinks,
303
        // represented by their ChannelID. The key is the link's ChannelID and
304
        // the value is a chan that is closed when the link has fully stopped.
305
        // This map is only added to if RemoveLink is called and is not added
306
        // to when the Switch is shutting down and calls Stop() on each link.
307
        //
308
        // MUST be used with the indexMtx.
309
        linkStopIndex map[lnwire.ChannelID]chan struct{}
310

311
        // htlcPlex is the channel which all connected links use to coordinate
312
        // the setup/teardown of Sphinx (onion routing) payment circuits.
313
        // Active links forward any add/settle messages over this channel each
314
        // state transition, sending new adds/settles which are fully locked
315
        // in.
316
        htlcPlex chan *plexPacket
317

318
        // chanCloseRequests is used to transfer the channel close request to
319
        // the channel close handler.
320
        chanCloseRequests chan *ChanClose
321

322
        // resolutionMsgs is the channel that all external contract resolution
323
        // messages will be sent over.
324
        resolutionMsgs chan *resolutionMsg
325

326
        // pendingFwdingEvents is the set of forwarding events which have been
327
        // collected during the current interval, but hasn't yet been written
328
        // to the forwarding log.
329
        fwdEventMtx         sync.Mutex
330
        pendingFwdingEvents []channeldb.ForwardingEvent
331

332
        // blockEpochStream is an active block epoch event stream backed by an
333
        // active ChainNotifier instance. This will be used to retrieve the
334
        // latest height of the chain.
335
        blockEpochStream *chainntnfs.BlockEpochEvent
336

337
        // pendingSettleFails is the set of settle/fail entries that we need to
338
        // ack in the forwarding package of the outgoing link. This was added to
339
        // make pipelining settles more efficient.
340
        pendingSettleFails []channeldb.SettleFailRef
341

342
        // resMsgStore is used to store the set of ResolutionMsg that come from
343
        // contractcourt. This is used so the Switch can properly forward them,
344
        // even on restarts.
345
        resMsgStore *resolutionStore
346

347
        // aliasToReal is a map used for option-scid-alias feature-bit links.
348
        // The alias SCID is the key and the real, confirmed SCID is the value.
349
        // If the channel is unconfirmed, there will not be a mapping for it.
350
        // Since channels can have multiple aliases, this map is essentially a
351
        // N->1 mapping for a channel. This MUST be accessed with the indexMtx.
352
        aliasToReal map[lnwire.ShortChannelID]lnwire.ShortChannelID
353

354
        // baseIndex is a map used for option-scid-alias feature-bit links.
355
        // The value is the SCID of the link's ShortChannelID. This value may
356
        // be an alias for zero-conf channels or a confirmed SCID for
357
        // non-zero-conf channels with the option-scid-alias feature-bit. The
358
        // key includes the value itself and also any other aliases. This MUST
359
        // be accessed with the indexMtx.
360
        baseIndex map[lnwire.ShortChannelID]lnwire.ShortChannelID
361
}
362

363
// New creates the new instance of htlc switch.
364
func New(cfg Config, currentHeight uint32) (*Switch, error) {
3✔
365
        resStore := newResolutionStore(cfg.DB)
3✔
366

3✔
367
        circuitMap, err := NewCircuitMap(&CircuitMapConfig{
3✔
368
                DB:                    cfg.DB,
3✔
369
                FetchAllOpenChannels:  cfg.FetchAllOpenChannels,
3✔
370
                FetchClosedChannels:   cfg.FetchClosedChannels,
3✔
371
                ExtractErrorEncrypter: cfg.ExtractErrorEncrypter,
3✔
372
                CheckResolutionMsg:    resStore.checkResolutionMsg,
3✔
373
        })
3✔
374
        if err != nil {
3✔
375
                return nil, err
×
376
        }
×
377

378
        s := &Switch{
3✔
379
                bestHeight:        currentHeight,
3✔
380
                gm:                fn.NewGoroutineManager(context.Background()),
3✔
381
                cfg:               &cfg,
3✔
382
                circuits:          circuitMap,
3✔
383
                linkIndex:         make(map[lnwire.ChannelID]ChannelLink),
3✔
384
                forwardingIndex:   make(map[lnwire.ShortChannelID]ChannelLink),
3✔
385
                interfaceIndex:    make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
3✔
386
                pendingLinkIndex:  make(map[lnwire.ChannelID]ChannelLink),
3✔
387
                linkStopIndex:     make(map[lnwire.ChannelID]chan struct{}),
3✔
388
                networkResults:    newNetworkResultStore(cfg.DB),
3✔
389
                htlcPlex:          make(chan *plexPacket),
3✔
390
                chanCloseRequests: make(chan *ChanClose),
3✔
391
                resolutionMsgs:    make(chan *resolutionMsg),
3✔
392
                resMsgStore:       resStore,
3✔
393
        }
3✔
394

3✔
395
        s.aliasToReal = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
3✔
396
        s.baseIndex = make(map[lnwire.ShortChannelID]lnwire.ShortChannelID)
3✔
397

3✔
398
        s.mailOrchestrator = newMailOrchestrator(&mailOrchConfig{
3✔
399
                forwardPackets:    s.ForwardPackets,
3✔
400
                clock:             s.cfg.Clock,
3✔
401
                expiry:            s.cfg.MailboxDeliveryTimeout,
3✔
402
                failMailboxUpdate: s.failMailboxUpdate,
3✔
403
        })
3✔
404

3✔
405
        return s, nil
3✔
406
}
407

408
// resolutionMsg is a struct that wraps an existing ResolutionMsg with a done
409
// channel. We'll use this channel to synchronize delivery of the message with
410
// the caller.
411
type resolutionMsg struct {
412
        contractcourt.ResolutionMsg
413

414
        errChan chan error
415
}
416

417
// ProcessContractResolution is called by active contract resolvers once a
418
// contract they are watching over has been fully resolved. The message carries
419
// an external signal that *would* have been sent if the outgoing channel
420
// didn't need to go to the chain in order to fulfill a contract. We'll process
421
// this message just as if it came from an active outgoing channel.
422
func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) error {
3✔
423
        errChan := make(chan error, 1)
3✔
424

3✔
425
        select {
3✔
426
        case s.resolutionMsgs <- &resolutionMsg{
427
                ResolutionMsg: msg,
428
                errChan:       errChan,
429
        }:
3✔
NEW
430
        case <-s.gm.Done():
×
431
                return ErrSwitchExiting
×
432
        }
433

434
        select {
3✔
435
        case err := <-errChan:
3✔
436
                return err
3✔
NEW
437
        case <-s.gm.Done():
×
438
                return ErrSwitchExiting
×
439
        }
440
}
441

442
// HasAttemptResult reads the network result store to fetch the specified
443
// attempt. Returns true if the attempt result exists.
444
func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) {
3✔
445
        _, err := s.networkResults.getResult(attemptID)
3✔
446
        if err == nil {
3✔
447
                return true, nil
×
448
        }
×
449

450
        if !errors.Is(err, ErrPaymentIDNotFound) {
3✔
451
                return false, err
×
452
        }
×
453

454
        return false, nil
3✔
455
}
456

457
// GetAttemptResult returns the result of the HTLC attempt with the given
458
// attemptID. The paymentHash should be set to the payment's overall hash, or
459
// in case of AMP payments the payment's unique identifier.
460
//
461
// The method returns a channel where the HTLC attempt result will be sent when
462
// available, or an error is encountered during forwarding. When a result is
463
// received on the channel, the HTLC is guaranteed to no longer be in flight.
464
// The switch shutting down is signaled by closing the channel. If the
465
// attemptID is unknown, ErrPaymentIDNotFound will be returned.
466
func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
467
        deobfuscator ErrorDecrypter) (<-chan *PaymentResult, error) {
3✔
468

3✔
469
        var (
3✔
470
                nChan <-chan *networkResult
3✔
471
                err   error
3✔
472
                inKey = CircuitKey{
3✔
473
                        ChanID: hop.Source,
3✔
474
                        HtlcID: attemptID,
3✔
475
                }
3✔
476
        )
3✔
477

3✔
478
        // If the HTLC is not found in the circuit map, check whether a result
3✔
479
        // is already available.
3✔
480
        // Assumption: no one will add this attempt ID other than the caller.
3✔
481
        if s.circuits.LookupCircuit(inKey) == nil {
6✔
482
                res, err := s.networkResults.getResult(attemptID)
3✔
483
                if err != nil {
3✔
484
                        return nil, err
×
485
                }
×
486
                c := make(chan *networkResult, 1)
3✔
487
                c <- res
3✔
488
                nChan = c
3✔
489
        } else {
3✔
490
                // The HTLC was committed to the circuits, subscribe for a
3✔
491
                // result.
3✔
492
                nChan, err = s.networkResults.subscribeResult(attemptID)
3✔
493
                if err != nil {
3✔
494
                        return nil, err
×
495
                }
×
496
        }
497

498
        resultChan := make(chan *PaymentResult, 1)
3✔
499

3✔
500
        // Since the attempt was known, we can start a goroutine that can
3✔
501
        // extract the result when it is available, and pass it on to the
3✔
502
        // caller.
3✔
503
        err = s.gm.Go(func(ctx context.Context) {
6✔
504
                var n *networkResult
3✔
505
                select {
3✔
506
                case n = <-nChan:
3✔
507
                case <-ctx.Done():
3✔
508
                        // We close the result channel to signal a shutdown. We
3✔
509
                        // don't send any result in this case since the HTLC is
3✔
510
                        // still in flight.
3✔
511
                        close(resultChan)
3✔
512
                        return
3✔
513
                }
514

515
                log.Debugf("Received network result %T for attemptID=%v", n.msg,
3✔
516
                        attemptID)
3✔
517

3✔
518
                // Extract the result and pass it to the result channel.
3✔
519
                result, err := s.extractResult(
3✔
520
                        deobfuscator, n, attemptID, paymentHash,
3✔
521
                )
3✔
522
                if err != nil {
3✔
523
                        e := fmt.Errorf("unable to extract result: %w", err)
×
524
                        log.Error(e)
×
525
                        resultChan <- &PaymentResult{
×
526
                                Error: e,
×
527
                        }
×
528
                        return
×
529
                }
×
530
                resultChan <- result
3✔
531
        })
532

533
        // The switch shutting down is signaled by closing the channel.
534
        if err != nil {
3✔
NEW
535
                close(resultChan)
×
NEW
536
        }
×
537

538
        return resultChan, nil
3✔
539
}
540

541
// CleanStore calls the underlying result store, telling it is safe to delete
542
// all entries except the ones in the keepPids map. This should be called
543
// preiodically to let the switch clean up payment results that we have
544
// handled.
545
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
3✔
546
        return s.networkResults.cleanStore(keepPids)
3✔
547
}
3✔
548

549
// SendHTLC is used by other subsystems which aren't belong to htlc switch
550
// package in order to send the htlc update. The attemptID used MUST be unique
551
// for this HTLC, and MUST be used only once, otherwise the switch might reject
552
// it.
553
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
554
        htlc *lnwire.UpdateAddHTLC) error {
3✔
555

3✔
556
        // Generate and send new update packet, if error will be received on
3✔
557
        // this stage it means that packet haven't left boundaries of our
3✔
558
        // system and something wrong happened.
3✔
559
        packet := &htlcPacket{
3✔
560
                incomingChanID: hop.Source,
3✔
561
                incomingHTLCID: attemptID,
3✔
562
                outgoingChanID: firstHop,
3✔
563
                htlc:           htlc,
3✔
564
                amount:         htlc.Amount,
3✔
565
        }
3✔
566

3✔
567
        // Attempt to fetch the target link before creating a circuit so that
3✔
568
        // we don't leave dangling circuits. The getLocalLink method does not
3✔
569
        // require the circuit variable to be set on the *htlcPacket.
3✔
570
        link, linkErr := s.getLocalLink(packet, htlc)
3✔
571
        if linkErr != nil {
6✔
572
                // Notify the htlc notifier of a link failure on our outgoing
3✔
573
                // link. Incoming timelock/amount values are not set because
3✔
574
                // they are not present for local sends.
3✔
575
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
3✔
576
                        newHtlcKey(packet),
3✔
577
                        HtlcInfo{
3✔
578
                                OutgoingTimeLock: htlc.Expiry,
3✔
579
                                OutgoingAmt:      htlc.Amount,
3✔
580
                        },
3✔
581
                        HtlcEventTypeSend,
3✔
582
                        linkErr,
3✔
583
                        false,
3✔
584
                )
3✔
585

3✔
586
                return linkErr
3✔
587
        }
3✔
588

589
        // Evaluate whether this HTLC would bypass our fee exposure. If it
590
        // does, don't send it out and instead return an error.
591
        if s.dustExceedsFeeThreshold(link, htlc.Amount, false) {
3✔
592
                // Notify the htlc notifier of a link failure on our outgoing
×
593
                // link. We use the FailTemporaryChannelFailure in place of a
×
594
                // more descriptive error message.
×
595
                linkErr := NewLinkError(
×
596
                        &lnwire.FailTemporaryChannelFailure{},
×
597
                )
×
598
                s.cfg.HtlcNotifier.NotifyLinkFailEvent(
×
599
                        newHtlcKey(packet),
×
600
                        HtlcInfo{
×
601
                                OutgoingTimeLock: htlc.Expiry,
×
602
                                OutgoingAmt:      htlc.Amount,
×
603
                        },
×
604
                        HtlcEventTypeSend,
×
605
                        linkErr,
×
606
                        false,
×
607
                )
×
608

×
609
                return errFeeExposureExceeded
×
610
        }
×
611

612
        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
3✔
613
        actions, err := s.circuits.CommitCircuits(circuit)
3✔
614
        if err != nil {
3✔
615
                log.Errorf("unable to commit circuit in switch: %v", err)
×
616
                return err
×
617
        }
×
618

619
        // Drop duplicate packet if it has already been seen.
620
        switch {
3✔
621
        case len(actions.Drops) == 1:
×
622
                return ErrDuplicateAdd
×
623

624
        case len(actions.Fails) == 1:
×
625
                return ErrLocalAddFailed
×
626
        }
627

628
        // Give the packet to the link's mailbox so that HTLC's are properly
629
        // canceled back if the mailbox timeout elapses.
630
        packet.circuit = circuit
3✔
631

3✔
632
        return link.handleSwitchPacket(packet)
3✔
633
}
634

635
// UpdateForwardingPolicies sends a message to the switch to update the
636
// forwarding policies for the set of target channels, keyed in chanPolicies.
637
//
638
// NOTE: This function is synchronous and will block until either the
639
// forwarding policies for all links have been updated, or the switch shuts
640
// down.
641
func (s *Switch) UpdateForwardingPolicies(
642
        chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
3✔
643

3✔
644
        log.Tracef("Updating link policies: %v", lnutils.SpewLogClosure(
3✔
645
                chanPolicies))
3✔
646

3✔
647
        s.indexMtx.RLock()
3✔
648

3✔
649
        // Update each link in chanPolicies.
3✔
650
        for targetLink, policy := range chanPolicies {
6✔
651
                cid := lnwire.NewChanIDFromOutPoint(targetLink)
3✔
652

3✔
653
                link, ok := s.linkIndex[cid]
3✔
654
                if !ok {
3✔
655
                        log.Debugf("Unable to find ChannelPoint(%v) to update "+
×
656
                                "link policy", targetLink)
×
657
                        continue
×
658
                }
659

660
                link.UpdateForwardingPolicy(policy)
3✔
661
        }
662

663
        s.indexMtx.RUnlock()
3✔
664
}
665

666
// IsForwardedHTLC checks for a given channel and htlc index if it is related
667
// to an opened circuit that represents a forwarded payment.
668
func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
669
        htlcIndex uint64) bool {
3✔
670

3✔
671
        circuit := s.circuits.LookupOpenCircuit(models.CircuitKey{
3✔
672
                ChanID: chanID,
3✔
673
                HtlcID: htlcIndex,
3✔
674
        })
3✔
675
        return circuit != nil && circuit.Incoming.ChanID != hop.Source
3✔
676
}
3✔
677

678
// ForwardPackets adds a list of packets to the switch for processing. Fails
679
// and settles are added on a first past, simultaneously constructing circuits
680
// for any adds. After persisting the circuits, another pass of the adds is
681
// given to forward them through the router. The sending link's quit channel is
682
// used to prevent deadlocks when the switch stops a link in the midst of
683
// forwarding.
684
func (s *Switch) ForwardPackets(linkQuit chan struct{},
685
        packets ...*htlcPacket) error {
3✔
686

3✔
687
        var (
3✔
688
                // fwdChan is a buffered channel used to receive err msgs from
3✔
689
                // the htlcPlex when forwarding this batch.
3✔
690
                fwdChan = make(chan error, len(packets))
3✔
691

3✔
692
                // numSent keeps a running count of how many packets are
3✔
693
                // forwarded to the switch, which determines how many responses
3✔
694
                // we will wait for on the fwdChan..
3✔
695
                numSent int
3✔
696
        )
3✔
697

3✔
698
        // No packets, nothing to do.
3✔
699
        if len(packets) == 0 {
6✔
700
                return nil
3✔
701
        }
3✔
702

703
        // Setup a barrier to prevent the background tasks from processing
704
        // responses until this function returns to the user.
705
        var wg sync.WaitGroup
3✔
706
        wg.Add(1)
3✔
707
        defer wg.Done()
3✔
708

3✔
709
        // Before spawning the following goroutine to proxy our error responses,
3✔
710
        // check to see if we have already been issued a shutdown request. If
3✔
711
        // so, we exit early to avoid incrementing the switch's waitgroup while
3✔
712
        // it is already in the process of shutting down.
3✔
713
        select {
3✔
714
        case <-linkQuit:
×
715
                return nil
×
716

NEW
717
        case <-s.gm.Done():
×
UNCOV
718
                return nil
×
719

720
        default:
3✔
721
                // Spawn a goroutine to log the errors returned from failed packets.
3✔
722
                err := s.gm.Go(func(ctx context.Context) {
6✔
723
                        s.logFwdErrs(ctx, &numSent, &wg, fwdChan)
3✔
724
                })
3✔
725
                if err != nil {
3✔
NEW
726
                        return nil
×
NEW
727
                }
×
728
        }
729

730
        // Make a first pass over the packets, forwarding any settles or fails.
731
        // As adds are found, we create a circuit and append it to our set of
732
        // circuits to be written to disk.
733
        var circuits []*PaymentCircuit
3✔
734
        var addBatch []*htlcPacket
3✔
735
        for _, packet := range packets {
6✔
736
                switch htlc := packet.htlc.(type) {
3✔
737
                case *lnwire.UpdateAddHTLC:
3✔
738
                        circuit := newPaymentCircuit(&htlc.PaymentHash, packet)
3✔
739
                        packet.circuit = circuit
3✔
740
                        circuits = append(circuits, circuit)
3✔
741
                        addBatch = append(addBatch, packet)
3✔
742
                default:
3✔
743
                        err := s.routeAsync(packet, fwdChan, linkQuit)
3✔
744
                        if err != nil {
3✔
745
                                return fmt.Errorf("failed to forward packet %w",
×
746
                                        err)
×
747
                        }
×
748
                        numSent++
3✔
749
                }
750
        }
751

752
        // If this batch did not contain any circuits to commit, we can return
753
        // early.
754
        if len(circuits) == 0 {
6✔
755
                return nil
3✔
756
        }
3✔
757

758
        // Write any circuits that we found to disk.
759
        actions, err := s.circuits.CommitCircuits(circuits...)
3✔
760
        if err != nil {
3✔
761
                log.Errorf("unable to commit circuits in switch: %v", err)
×
762
        }
×
763

764
        // Split the htlc packets by comparing an in-order seek to the head of
765
        // the added, dropped, or failed circuits.
766
        //
767
        // NOTE: This assumes each list is guaranteed to be a subsequence of the
768
        // circuits, and that the union of the sets results in the original set
769
        // of circuits.
770
        var addedPackets, failedPackets []*htlcPacket
3✔
771
        for _, packet := range addBatch {
6✔
772
                switch {
3✔
773
                case len(actions.Adds) > 0 && packet.circuit == actions.Adds[0]:
3✔
774
                        addedPackets = append(addedPackets, packet)
3✔
775
                        actions.Adds = actions.Adds[1:]
3✔
776

777
                case len(actions.Drops) > 0 && packet.circuit == actions.Drops[0]:
3✔
778
                        actions.Drops = actions.Drops[1:]
3✔
779

780
                case len(actions.Fails) > 0 && packet.circuit == actions.Fails[0]:
×
781
                        failedPackets = append(failedPackets, packet)
×
782
                        actions.Fails = actions.Fails[1:]
×
783
                }
784
        }
785

786
        // Now, forward any packets for circuits that were successfully added to
787
        // the switch's circuit map.
788
        for _, packet := range addedPackets {
6✔
789
                err := s.routeAsync(packet, fwdChan, linkQuit)
3✔
790
                if err != nil {
3✔
791
                        return fmt.Errorf("failed to forward packet %w", err)
×
792
                }
×
793
                numSent++
3✔
794
        }
795

796
        // Lastly, for any packets that failed, this implies that they were
797
        // left in a half added state, which can happen when recovering from
798
        // failures.
799
        if len(failedPackets) > 0 {
3✔
800
                var failure lnwire.FailureMessage
×
801
                incomingID := failedPackets[0].incomingChanID
×
802

×
803
                // If the incoming channel is an option_scid_alias channel,
×
804
                // then we'll need to replace the SCID in the ChannelUpdate.
×
805
                update := s.failAliasUpdate(incomingID, true)
×
806
                if update == nil {
×
807
                        // Fallback to the original non-option behavior.
×
808
                        update, err := s.cfg.FetchLastChannelUpdate(
×
809
                                incomingID,
×
810
                        )
×
811
                        if err != nil {
×
812
                                failure = &lnwire.FailTemporaryNodeFailure{}
×
813
                        } else {
×
814
                                failure = lnwire.NewTemporaryChannelFailure(
×
815
                                        update,
×
816
                                )
×
817
                        }
×
818
                } else {
×
819
                        // This is an option_scid_alias channel.
×
820
                        failure = lnwire.NewTemporaryChannelFailure(update)
×
821
                }
×
822

823
                linkError := NewDetailedLinkError(
×
824
                        failure, OutgoingFailureIncompleteForward,
×
825
                )
×
826

×
827
                for _, packet := range failedPackets {
×
828
                        // We don't handle the error here since this method
×
829
                        // always returns an error.
×
830
                        _ = s.failAddPacket(packet, linkError)
×
831
                }
×
832
        }
833

834
        return nil
3✔
835
}
836

837
// logFwdErrs logs any errors received on `fwdChan`.
838
func (s *Switch) logFwdErrs(ctx context.Context, num *int, wg *sync.WaitGroup,
839
        fwdChan chan error) {
3✔
840

3✔
841
        // Wait here until the outer function has finished persisting
3✔
842
        // and routing the packets. This guarantees we don't read from num until
3✔
843
        // the value is accurate.
3✔
844
        wg.Wait()
3✔
845

3✔
846
        numSent := *num
3✔
847
        for i := 0; i < numSent; i++ {
6✔
848
                select {
3✔
849
                case err := <-fwdChan:
3✔
850
                        if err != nil {
6✔
851
                                log.Errorf("Unhandled error while reforwarding htlc "+
3✔
852
                                        "settle/fail over htlcswitch: %v", err)
3✔
853
                        }
3✔
854

NEW
855
                case <-ctx.Done():
×
856
                        log.Errorf("unable to forward htlc packet " +
×
857
                                "htlc switch was stopped")
×
858
                        return
×
859
                }
860
        }
861
}
862

863
// routeAsync sends a packet through the htlc switch, using the provided err
864
// chan to propagate errors back to the caller. The link's quit channel is
865
// provided so that the send can be canceled if either the link or the switch
866
// receive a shutdown requuest. This method does not wait for a response from
867
// the htlcForwarder before returning.
868
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
869
        linkQuit chan struct{}) error {
3✔
870

3✔
871
        command := &plexPacket{
3✔
872
                pkt: packet,
3✔
873
                err: errChan,
3✔
874
        }
3✔
875

3✔
876
        select {
3✔
877
        case s.htlcPlex <- command:
3✔
878
                return nil
3✔
879
        case <-linkQuit:
×
880
                return ErrLinkShuttingDown
×
NEW
881
        case <-s.gm.Done():
×
882
                return errors.New("htlc switch was stopped")
×
883
        }
884
}
885

886
// getLocalLink handles the addition of a htlc for a send that originates from
887
// our node. It returns the link that the htlc should be forwarded outwards on,
888
// and a link error if the htlc cannot be forwarded.
889
func (s *Switch) getLocalLink(pkt *htlcPacket, htlc *lnwire.UpdateAddHTLC) (
890
        ChannelLink, *LinkError) {
3✔
891

3✔
892
        // Try to find links by node destination.
3✔
893
        s.indexMtx.RLock()
3✔
894
        link, err := s.getLinkByShortID(pkt.outgoingChanID)
3✔
895
        defer s.indexMtx.RUnlock()
3✔
896
        if err != nil {
6✔
897
                // If the link was not found for the outgoingChanID, an outside
3✔
898
                // subsystem may be using the confirmed SCID of a zero-conf
3✔
899
                // channel. In this case, we'll consult the Switch maps to see
3✔
900
                // if an alias exists and use the alias to lookup the link.
3✔
901
                // This extra step is a consequence of not updating the Switch
3✔
902
                // forwardingIndex when a zero-conf channel is confirmed. We
3✔
903
                // don't need to change the outgoingChanID since the link will
3✔
904
                // do that upon receiving the packet.
3✔
905
                baseScid, ok := s.baseIndex[pkt.outgoingChanID]
3✔
906
                if !ok {
6✔
907
                        log.Errorf("Link %v not found", pkt.outgoingChanID)
3✔
908
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
3✔
909
                }
3✔
910

911
                // The base SCID was found, so we'll use that to fetch the
912
                // link.
913
                link, err = s.getLinkByShortID(baseScid)
3✔
914
                if err != nil {
3✔
915
                        log.Errorf("Link %v not found", baseScid)
×
916
                        return nil, NewLinkError(&lnwire.FailUnknownNextPeer{})
×
917
                }
×
918
        }
919

920
        if !link.EligibleToForward() {
3✔
921
                log.Errorf("Link %v is not available to forward",
×
922
                        pkt.outgoingChanID)
×
923

×
924
                // The update does not need to be populated as the error
×
925
                // will be returned back to the router.
×
926
                return nil, NewDetailedLinkError(
×
927
                        lnwire.NewTemporaryChannelFailure(nil),
×
928
                        OutgoingFailureLinkNotEligible,
×
929
                )
×
930
        }
×
931

932
        // Ensure that the htlc satisfies the outgoing channel policy.
933
        currentHeight := atomic.LoadUint32(&s.bestHeight)
3✔
934
        htlcErr := link.CheckHtlcTransit(
3✔
935
                htlc.PaymentHash, htlc.Amount, htlc.Expiry, currentHeight,
3✔
936
        )
3✔
937
        if htlcErr != nil {
5✔
938
                log.Errorf("Link %v policy for local forward not "+
2✔
939
                        "satisfied", pkt.outgoingChanID)
2✔
940
                return nil, htlcErr
2✔
941
        }
2✔
942
        return link, nil
3✔
943
}
944

945
// handleLocalResponse processes a Settle or Fail responding to a
946
// locally-initiated payment. This is handled asynchronously to avoid blocking
947
// the main event loop within the switch, as these operations can require
948
// multiple db transactions. The guarantees of the circuit map are stringent
949
// enough such that we are able to tolerate reordering of these operations
950
// without side effects. The primary operations handled are:
951
//  1. Save the payment result to the pending payment store.
952
//  2. Notify subscribers about the payment result.
953
//  3. Ack settle/fail references, to avoid resending this response internally
954
//  4. Teardown the closing circuit in the circuit map
955
//
956
// NOTE: This method MUST be spawned as a goroutine.
957
func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
3✔
958
        defer s.handleLocalResponseWG.Done()
3✔
959

3✔
960
        attemptID := pkt.incomingHTLCID
3✔
961

3✔
962
        // The error reason will be unencypted in case this a local
3✔
963
        // failure or a converted error.
3✔
964
        unencrypted := pkt.localFailure || pkt.convertedError
3✔
965
        n := &networkResult{
3✔
966
                msg:          pkt.htlc,
3✔
967
                unencrypted:  unencrypted,
3✔
968
                isResolution: pkt.isResolution,
3✔
969
        }
3✔
970

3✔
971
        // Store the result to the db. This will also notify subscribers about
3✔
972
        // the result.
3✔
973
        if err := s.networkResults.storeResult(attemptID, n); err != nil {
3✔
974
                log.Errorf("Unable to store attempt result for pid=%v: %v",
×
975
                        attemptID, err)
×
976
                return
×
977
        }
×
978

979
        // First, we'll clean up any fwdpkg references, circuit entries, and
980
        // mark in our db that the payment for this payment hash has either
981
        // succeeded or failed.
982
        //
983
        // If this response is contained in a forwarding package, we'll start by
984
        // acking the settle/fail so that we don't continue to retransmit the
985
        // HTLC internally.
986
        if pkt.destRef != nil {
6✔
987
                if err := s.ackSettleFail(*pkt.destRef); err != nil {
3✔
988
                        log.Warnf("Unable to ack settle/fail reference: %s: %v",
×
989
                                *pkt.destRef, err)
×
990
                        return
×
991
                }
×
992
        }
993

994
        // Next, we'll remove the circuit since we are about to complete an
995
        // fulfill/fail of this HTLC. Since we've already removed the
996
        // settle/fail fwdpkg reference, the response from the peer cannot be
997
        // replayed internally if this step fails. If this happens, this logic
998
        // will be executed when a provided resolution message comes through.
999
        // This can only happen if the circuit is still open, which is why this
1000
        // ordering is chosen.
1001
        if err := s.teardownCircuit(pkt); err != nil {
3✔
1002
                log.Errorf("Unable to teardown circuit %s: %v",
×
1003
                        pkt.inKey(), err)
×
1004
                return
×
1005
        }
×
1006

1007
        // Finally, notify on the htlc failure or success that has been handled.
1008
        key := newHtlcKey(pkt)
3✔
1009
        eventType := getEventType(pkt)
3✔
1010

3✔
1011
        switch htlc := pkt.htlc.(type) {
3✔
1012
        case *lnwire.UpdateFulfillHTLC:
3✔
1013
                s.cfg.HtlcNotifier.NotifySettleEvent(key, htlc.PaymentPreimage,
3✔
1014
                        eventType)
3✔
1015

1016
        case *lnwire.UpdateFailHTLC:
3✔
1017
                s.cfg.HtlcNotifier.NotifyForwardingFailEvent(key, eventType)
3✔
1018
        }
1019
}
1020

1021
// extractResult uses the given deobfuscator to extract the payment result from
1022
// the given network message.
1023
func (s *Switch) extractResult(deobfuscator ErrorDecrypter, n *networkResult,
1024
        attemptID uint64, paymentHash lntypes.Hash) (*PaymentResult, error) {
3✔
1025

3✔
1026
        switch htlc := n.msg.(type) {
3✔
1027

1028
        // We've received a settle update which means we can finalize the user
1029
        // payment and return successful response.
1030
        case *lnwire.UpdateFulfillHTLC:
3✔
1031
                return &PaymentResult{
3✔
1032
                        Preimage: htlc.PaymentPreimage,
3✔
1033
                }, nil
3✔
1034

1035
        // We've received a fail update which means we can finalize the
1036
        // user payment and return fail response.
1037
        case *lnwire.UpdateFailHTLC:
3✔
1038
                // TODO(yy): construct deobfuscator here to avoid creating it
3✔
1039
                // in paymentLifecycle even for settled HTLCs.
3✔
1040
                paymentErr := s.parseFailedPayment(
3✔
1041
                        deobfuscator, attemptID, paymentHash, n.unencrypted,
3✔
1042
                        n.isResolution, htlc,
3✔
1043
                )
3✔
1044

3✔
1045
                return &PaymentResult{
3✔
1046
                        Error: paymentErr,
3✔
1047
                }, nil
3✔
1048

1049
        default:
×
1050
                return nil, fmt.Errorf("received unknown response type: %T",
×
1051
                        htlc)
×
1052
        }
1053
}
1054

1055
// parseFailedPayment determines the appropriate failure message to return to
1056
// a user initiated payment. The three cases handled are:
1057
//  1. An unencrypted failure, which should already plaintext.
1058
//  2. A resolution from the chain arbitrator, which possibly has no failure
1059
//     reason attached.
1060
//  3. A failure from the remote party, which will need to be decrypted using
1061
//     the payment deobfuscator.
1062
func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
1063
        attemptID uint64, paymentHash lntypes.Hash, unencrypted,
1064
        isResolution bool, htlc *lnwire.UpdateFailHTLC) error {
3✔
1065

3✔
1066
        switch {
3✔
1067

1068
        // The payment never cleared the link, so we don't need to
1069
        // decrypt the error, simply decode it them report back to the
1070
        // user.
1071
        case unencrypted:
3✔
1072
                r := bytes.NewReader(htlc.Reason)
3✔
1073
                failureMsg, err := lnwire.DecodeFailure(r, 0)
3✔
1074
                if err != nil {
3✔
1075
                        // If we could not decode the failure reason, return a link
×
1076
                        // error indicating that we failed to decode the onion.
×
1077
                        linkError := NewDetailedLinkError(
×
1078
                                // As this didn't even clear the link, we don't
×
1079
                                // need to apply an update here since it goes
×
1080
                                // directly to the router.
×
1081
                                lnwire.NewTemporaryChannelFailure(nil),
×
1082
                                OutgoingFailureDecodeError,
×
1083
                        )
×
1084

×
1085
                        log.Errorf("%v: (hash=%v, pid=%d): %v",
×
1086
                                linkError.FailureDetail.FailureString(),
×
1087
                                paymentHash, attemptID, err)
×
1088

×
1089
                        return linkError
×
1090
                }
×
1091

1092
                // If we successfully decoded the failure reason, return it.
1093
                return NewLinkError(failureMsg)
3✔
1094

1095
        // A payment had to be timed out on chain before it got past
1096
        // the first hop. In this case, we'll report a permanent
1097
        // channel failure as this means us, or the remote party had to
1098
        // go on chain.
1099
        case isResolution && htlc.Reason == nil:
3✔
1100
                linkError := NewDetailedLinkError(
3✔
1101
                        &lnwire.FailPermanentChannelFailure{},
3✔
1102
                        OutgoingFailureOnChainTimeout,
3✔
1103
                )
3✔
1104

3✔
1105
                log.Infof("%v: hash=%v, pid=%d",
3✔
1106
                        linkError.FailureDetail.FailureString(),
3✔
1107
                        paymentHash, attemptID)
3✔
1108

3✔
1109
                return linkError
3✔
1110

1111
        // A regular multi-hop payment error that we'll need to
1112
        // decrypt.
1113
        default:
3✔
1114
                // We'll attempt to fully decrypt the onion encrypted
3✔
1115
                // error. If we're unable to then we'll bail early.
3✔
1116
                failure, err := deobfuscator.DecryptError(htlc.Reason)
3✔
1117
                if err != nil {
3✔
1118
                        log.Errorf("unable to de-obfuscate onion failure "+
×
1119
                                "(hash=%v, pid=%d): %v",
×
1120
                                paymentHash, attemptID, err)
×
1121

×
1122
                        return ErrUnreadableFailureMessage
×
1123
                }
×
1124

1125
                return failure
3✔
1126
        }
1127
}
1128

1129
// handlePacketForward is used in cases when we need forward the htlc update
1130
// from one channel link to another and be able to propagate the settle/fail
1131
// updates back. This behaviour is achieved by creation of payment circuits.
1132
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
3✔
1133
        switch htlc := packet.htlc.(type) {
3✔
1134
        // Channel link forwarded us a new htlc, therefore we initiate the
1135
        // payment circuit within our internal state so we can properly forward
1136
        // the ultimate settle message back latter.
1137
        case *lnwire.UpdateAddHTLC:
3✔
1138
                return s.handlePacketAdd(packet, htlc)
3✔
1139

1140
        case *lnwire.UpdateFulfillHTLC:
3✔
1141
                return s.handlePacketSettle(packet)
3✔
1142

1143
        // Channel link forwarded us an update_fail_htlc message.
1144
        //
1145
        // NOTE: when the channel link receives an update_fail_malformed_htlc
1146
        // from upstream, it will convert the message into update_fail_htlc and
1147
        // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC`
1148
        // here.
1149
        case *lnwire.UpdateFailHTLC:
3✔
1150
                return s.handlePacketFail(packet, htlc)
3✔
1151

1152
        default:
×
1153
                return fmt.Errorf("wrong update type: %T", htlc)
×
1154
        }
1155
}
1156

1157
// checkCircularForward checks whether a forward is circular (arrives and
1158
// departs on the same link) and returns a link error if the switch is
1159
// configured to disallow this behaviour.
1160
func (s *Switch) checkCircularForward(incoming, outgoing lnwire.ShortChannelID,
1161
        allowCircular bool, paymentHash lntypes.Hash) *LinkError {
3✔
1162

3✔
1163
        // If they are equal, we can skip the alias mapping checks.
3✔
1164
        if incoming == outgoing {
3✔
1165
                // The switch may be configured to allow circular routes, so
×
1166
                // just log and return nil.
×
1167
                if allowCircular {
×
1168
                        log.Debugf("allowing circular route over link: %v "+
×
1169
                                "(payment hash: %x)", incoming, paymentHash)
×
1170
                        return nil
×
1171
                }
×
1172

1173
                // Otherwise, we'll return a temporary channel failure.
1174
                return NewDetailedLinkError(
×
1175
                        lnwire.NewTemporaryChannelFailure(nil),
×
1176
                        OutgoingFailureCircularRoute,
×
1177
                )
×
1178
        }
1179

1180
        // We'll fetch the "base" SCID from the baseIndex for the incoming and
1181
        // outgoing SCIDs. If either one does not have a base SCID, then the
1182
        // two channels are not equal since one will be a channel that does not
1183
        // need a mapping and SCID equality was checked above. If the "base"
1184
        // SCIDs are equal, then this is a circular route. Otherwise, it isn't.
1185
        s.indexMtx.RLock()
3✔
1186
        incomingBaseScid, ok := s.baseIndex[incoming]
3✔
1187
        if !ok {
6✔
1188
                // This channel does not use baseIndex, bail out.
3✔
1189
                s.indexMtx.RUnlock()
3✔
1190
                return nil
3✔
1191
        }
3✔
1192

1193
        outgoingBaseScid, ok := s.baseIndex[outgoing]
3✔
1194
        if !ok {
6✔
1195
                // This channel does not use baseIndex, bail out.
3✔
1196
                s.indexMtx.RUnlock()
3✔
1197
                return nil
3✔
1198
        }
3✔
1199
        s.indexMtx.RUnlock()
3✔
1200

3✔
1201
        // Check base SCID equality.
3✔
1202
        if incomingBaseScid != outgoingBaseScid {
6✔
1203
                // The base SCIDs are not equal so these are not the same
3✔
1204
                // channel.
3✔
1205
                return nil
3✔
1206
        }
3✔
1207

1208
        // If the incoming and outgoing link are equal, the htlc is part of a
1209
        // circular route which may be used to lock up our liquidity. If the
1210
        // switch is configured to allow circular routes, log that we are
1211
        // allowing the route then return nil.
1212
        if allowCircular {
×
1213
                log.Debugf("allowing circular route over link: %v "+
×
1214
                        "(payment hash: %x)", incoming, paymentHash)
×
1215
                return nil
×
1216
        }
×
1217

1218
        // If our node disallows circular routes, return a temporary channel
1219
        // failure. There is nothing wrong with the policy used by the remote
1220
        // node, so we do not include a channel update.
1221
        return NewDetailedLinkError(
×
1222
                lnwire.NewTemporaryChannelFailure(nil),
×
1223
                OutgoingFailureCircularRoute,
×
1224
        )
×
1225
}
1226

1227
// failAddPacket encrypts a fail packet back to an add packet's source.
1228
// The ciphertext will be derived from the failure message proivded by context.
1229
// This method returns the failErr if all other steps complete successfully.
1230
func (s *Switch) failAddPacket(packet *htlcPacket, failure *LinkError) error {
3✔
1231
        // Encrypt the failure so that the sender will be able to read the error
3✔
1232
        // message. Since we failed this packet, we use EncryptFirstHop to
3✔
1233
        // obfuscate the failure for their eyes only.
3✔
1234
        reason, err := packet.obfuscator.EncryptFirstHop(failure.WireMessage())
3✔
1235
        if err != nil {
3✔
1236
                err := fmt.Errorf("unable to obfuscate "+
×
1237
                        "error: %v", err)
×
1238
                log.Error(err)
×
1239
                return err
×
1240
        }
×
1241

1242
        log.Error(failure.Error())
3✔
1243

3✔
1244
        // Create a failure packet for this htlc. The full set of
3✔
1245
        // information about the htlc failure is included so that they can
3✔
1246
        // be included in link failure notifications.
3✔
1247
        failPkt := &htlcPacket{
3✔
1248
                sourceRef:       packet.sourceRef,
3✔
1249
                incomingChanID:  packet.incomingChanID,
3✔
1250
                incomingHTLCID:  packet.incomingHTLCID,
3✔
1251
                outgoingChanID:  packet.outgoingChanID,
3✔
1252
                outgoingHTLCID:  packet.outgoingHTLCID,
3✔
1253
                incomingAmount:  packet.incomingAmount,
3✔
1254
                amount:          packet.amount,
3✔
1255
                incomingTimeout: packet.incomingTimeout,
3✔
1256
                outgoingTimeout: packet.outgoingTimeout,
3✔
1257
                circuit:         packet.circuit,
3✔
1258
                obfuscator:      packet.obfuscator,
3✔
1259
                linkFailure:     failure,
3✔
1260
                htlc: &lnwire.UpdateFailHTLC{
3✔
1261
                        Reason: reason,
3✔
1262
                },
3✔
1263
        }
3✔
1264

3✔
1265
        // Route a fail packet back to the source link.
3✔
1266
        err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
3✔
1267
        if err != nil {
3✔
1268
                err = fmt.Errorf("source chanid=%v unable to "+
×
1269
                        "handle switch packet: %v",
×
1270
                        packet.incomingChanID, err)
×
1271
                log.Error(err)
×
1272
                return err
×
1273
        }
×
1274

1275
        return failure
3✔
1276
}
1277

1278
// closeCircuit accepts a settle or fail htlc and the associated htlc packet and
1279
// attempts to determine the source that forwarded this htlc. This method will
1280
// set the incoming chan and htlc ID of the given packet if the source was
1281
// found, and will properly [re]encrypt any failure messages.
1282
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
3✔
1283
        // If the packet has its source, that means it was failed locally by
3✔
1284
        // the outgoing link. We fail it here to make sure only one response
3✔
1285
        // makes it through the switch.
3✔
1286
        if pkt.hasSource {
6✔
1287
                circuit, err := s.circuits.FailCircuit(pkt.inKey())
3✔
1288
                switch err {
3✔
1289

1290
                // Circuit successfully closed.
1291
                case nil:
3✔
1292
                        return circuit, nil
3✔
1293

1294
                // Circuit was previously closed, but has not been deleted.
1295
                // We'll just drop this response until the circuit has been
1296
                // fully removed.
1297
                case ErrCircuitClosing:
×
1298
                        return nil, err
×
1299

1300
                // Failed to close circuit because it does not exist. This is
1301
                // likely because the circuit was already successfully closed.
1302
                // Since this packet failed locally, there is no forwarding
1303
                // package entry to acknowledge.
1304
                case ErrUnknownCircuit:
×
1305
                        return nil, err
×
1306

1307
                // Unexpected error.
1308
                default:
×
1309
                        return nil, err
×
1310
                }
1311
        }
1312

1313
        // Otherwise, this is packet was received from the remote party.  Use
1314
        // circuit map to find the incoming link to receive the settle/fail.
1315
        circuit, err := s.circuits.CloseCircuit(pkt.outKey())
3✔
1316
        switch err {
3✔
1317

1318
        // Open circuit successfully closed.
1319
        case nil:
3✔
1320
                pkt.incomingChanID = circuit.Incoming.ChanID
3✔
1321
                pkt.incomingHTLCID = circuit.Incoming.HtlcID
3✔
1322
                pkt.circuit = circuit
3✔
1323
                pkt.sourceRef = &circuit.AddRef
3✔
1324

3✔
1325
                pktType := "SETTLE"
3✔
1326
                if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
6✔
1327
                        pktType = "FAIL"
3✔
1328
                }
3✔
1329

1330
                log.Debugf("Closed completed %s circuit for %x: "+
3✔
1331
                        "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
3✔
1332
                        pkt.incomingChanID, pkt.incomingHTLCID,
3✔
1333
                        pkt.outgoingChanID, pkt.outgoingHTLCID)
3✔
1334

3✔
1335
                return circuit, nil
3✔
1336

1337
        // Circuit was previously closed, but has not been deleted. We'll just
1338
        // drop this response until the circuit has been removed.
1339
        case ErrCircuitClosing:
3✔
1340
                return nil, err
3✔
1341

1342
        // Failed to close circuit because it does not exist. This is likely
1343
        // because the circuit was already successfully closed.
1344
        case ErrUnknownCircuit:
3✔
1345
                if pkt.destRef != nil {
6✔
1346
                        // Add this SettleFailRef to the set of pending settle/fail entries
3✔
1347
                        // awaiting acknowledgement.
3✔
1348
                        s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef)
3✔
1349
                }
3✔
1350

1351
                // If this is a settle, we will not log an error message as settles
1352
                // are expected to hit the ErrUnknownCircuit case. The only way fails
1353
                // can hit this case if the link restarts after having just sent a fail
1354
                // to the switch.
1355
                _, isSettle := pkt.htlc.(*lnwire.UpdateFulfillHTLC)
3✔
1356
                if !isSettle {
6✔
1357
                        err := fmt.Errorf("unable to find target channel "+
3✔
1358
                                "for HTLC fail: channel ID = %s, "+
3✔
1359
                                "HTLC ID = %d", pkt.outgoingChanID,
3✔
1360
                                pkt.outgoingHTLCID)
3✔
1361
                        log.Error(err)
3✔
1362

3✔
1363
                        return nil, err
3✔
1364
                }
3✔
1365

1366
                return nil, nil
3✔
1367

1368
        // Unexpected error.
1369
        default:
×
1370
                return nil, err
×
1371
        }
1372
}
1373

1374
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
1375
// forwarding package of the outgoing link for a payment circuit. We do this if
1376
// we're the originator of the payment, so the link stops attempting to
1377
// re-broadcast.
1378
func (s *Switch) ackSettleFail(settleFailRefs ...channeldb.SettleFailRef) error {
3✔
1379
        return kvdb.Batch(s.cfg.DB, func(tx kvdb.RwTx) error {
6✔
1380
                return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRefs...)
3✔
1381
        })
3✔
1382
}
1383

1384
// teardownCircuit removes a pending or open circuit from the switch's circuit
1385
// map and prints useful logging statements regarding the outcome.
1386
func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
3✔
1387
        var pktType string
3✔
1388
        switch htlc := pkt.htlc.(type) {
3✔
1389
        case *lnwire.UpdateFulfillHTLC:
3✔
1390
                pktType = "SETTLE"
3✔
1391
        case *lnwire.UpdateFailHTLC:
3✔
1392
                pktType = "FAIL"
3✔
1393
        default:
×
1394
                return fmt.Errorf("cannot tear down packet of type: %T", htlc)
×
1395
        }
1396

1397
        var paymentHash lntypes.Hash
3✔
1398

3✔
1399
        // Perform a defensive check to make sure we don't try to access a nil
3✔
1400
        // circuit.
3✔
1401
        circuit := pkt.circuit
3✔
1402
        if circuit != nil {
6✔
1403
                copy(paymentHash[:], circuit.PaymentHash[:])
3✔
1404
        }
3✔
1405

1406
        log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+
3✔
1407
                "with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
3✔
1408

3✔
1409
        err := s.circuits.DeleteCircuits(pkt.inKey())
3✔
1410
        if err != nil {
3✔
1411
                log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+
×
1412
                        "with payment_hash=%v using %s pkt", pkt.incomingChanID,
×
1413
                        pkt.incomingHTLCID, pkt.outgoingChanID,
×
1414
                        pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType)
×
1415

×
1416
                return err
×
1417
        }
×
1418

1419
        log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType,
3✔
1420
                paymentHash, pkt.incomingChanID, pkt.incomingHTLCID,
3✔
1421
                pkt.outgoingChanID, pkt.outgoingHTLCID)
3✔
1422

3✔
1423
        return nil
3✔
1424
}
1425

1426
// CloseLink creates and sends the close channel command to the target link
1427
// directing the specified closure type. If the closure type is CloseRegular,
1428
// targetFeePerKw parameter should be the ideal fee-per-kw that will be used as
1429
// a starting point for close negotiation. The deliveryScript parameter is an
1430
// optional parameter which sets a user specified script to close out to.
1431
func (s *Switch) CloseLink(chanPoint *wire.OutPoint,
1432
        closeType contractcourt.ChannelCloseType,
1433
        targetFeePerKw, maxFee chainfee.SatPerKWeight,
1434
        deliveryScript lnwire.DeliveryAddress) (chan interface{}, chan error) {
3✔
1435

3✔
1436
        // TODO(roasbeef) abstract out the close updates.
3✔
1437
        updateChan := make(chan interface{}, 2)
3✔
1438
        errChan := make(chan error, 1)
3✔
1439

3✔
1440
        command := &ChanClose{
3✔
1441
                CloseType:      closeType,
3✔
1442
                ChanPoint:      chanPoint,
3✔
1443
                Updates:        updateChan,
3✔
1444
                TargetFeePerKw: targetFeePerKw,
3✔
1445
                MaxFee:         maxFee,
3✔
1446
                DeliveryScript: deliveryScript,
3✔
1447
                Err:            errChan,
3✔
1448
        }
3✔
1449

3✔
1450
        select {
3✔
1451
        case s.chanCloseRequests <- command:
3✔
1452
                return updateChan, errChan
3✔
1453

NEW
1454
        case <-s.gm.Done():
×
1455
                errChan <- ErrSwitchExiting
×
1456
                close(updateChan)
×
1457
                return updateChan, errChan
×
1458
        }
1459
}
1460

1461
// htlcForwarder is responsible for optimally forwarding (and possibly
1462
// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and their
1463
// links. The duties of the forwarder are similar to that of a network switch,
1464
// in that it facilitates multi-hop payments by acting as a central messaging
1465
// bus. The switch communicates will active links to create, manage, and tear
1466
// down active onion routed payments. Each active channel is modeled as
1467
// networked device with metadata such as the available payment bandwidth, and
1468
// total link capacity.
1469
//
1470
// NOTE: This MUST be run as a goroutine.
1471
func (s *Switch) htlcForwarder() {
3✔
1472
        defer func() {
6✔
1473
                s.blockEpochStream.Cancel()
3✔
1474

3✔
1475
                // Remove all links once we've been signalled for shutdown.
3✔
1476
                var linksToStop []ChannelLink
3✔
1477
                s.indexMtx.Lock()
3✔
1478
                for _, link := range s.linkIndex {
6✔
1479
                        activeLink := s.removeLink(link.ChanID())
3✔
1480
                        if activeLink == nil {
3✔
1481
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1482
                                        "on stop", link.ChanID())
×
1483
                                continue
×
1484
                        }
1485
                        linksToStop = append(linksToStop, activeLink)
3✔
1486
                }
1487
                for _, link := range s.pendingLinkIndex {
6✔
1488
                        pendingLink := s.removeLink(link.ChanID())
3✔
1489
                        if pendingLink == nil {
3✔
1490
                                log.Errorf("unable to remove ChannelLink(%v) "+
×
1491
                                        "on stop", link.ChanID())
×
1492
                                continue
×
1493
                        }
1494
                        linksToStop = append(linksToStop, pendingLink)
3✔
1495
                }
1496
                s.indexMtx.Unlock()
3✔
1497

3✔
1498
                // Now that all pending and live links have been removed from
3✔
1499
                // the forwarding indexes, stop each one before shutting down.
3✔
1500
                // We'll shut them down in parallel to make exiting as fast as
3✔
1501
                // possible.
3✔
1502
                var wg sync.WaitGroup
3✔
1503
                for _, link := range linksToStop {
6✔
1504
                        wg.Add(1)
3✔
1505
                        // Here it is ok to start a goroutine directly bypassing
3✔
1506
                        // s.gm, because we want for them to complete here.
3✔
1507
                        go func(l ChannelLink) {
6✔
1508
                                defer wg.Done()
3✔
1509

3✔
1510
                                l.Stop()
3✔
1511
                        }(link)
3✔
1512
                }
1513
                wg.Wait()
3✔
1514

3✔
1515
                // Before we exit fully, we'll attempt to flush out any
3✔
1516
                // forwarding events that may still be lingering since the last
3✔
1517
                // batch flush.
3✔
1518
                if err := s.FlushForwardingEvents(); err != nil {
3✔
1519
                        log.Errorf("unable to flush forwarding events: %v", err)
×
1520
                }
×
1521
        }()
1522

1523
        // TODO(roasbeef): cleared vs settled distinction
1524
        var (
3✔
1525
                totalNumUpdates uint64
3✔
1526
                totalSatSent    btcutil.Amount
3✔
1527
                totalSatRecv    btcutil.Amount
3✔
1528
        )
3✔
1529
        s.cfg.LogEventTicker.Resume()
3✔
1530
        defer s.cfg.LogEventTicker.Stop()
3✔
1531

3✔
1532
        // Every 15 seconds, we'll flush out the forwarding events that
3✔
1533
        // occurred during that period.
3✔
1534
        s.cfg.FwdEventTicker.Resume()
3✔
1535
        defer s.cfg.FwdEventTicker.Stop()
3✔
1536

3✔
1537
        defer s.cfg.AckEventTicker.Stop()
3✔
1538

3✔
1539
out:
3✔
1540
        for {
6✔
1541

3✔
1542
                // If the set of pending settle/fail entries is non-zero,
3✔
1543
                // reinstate the ack ticker so we can batch ack them.
3✔
1544
                if len(s.pendingSettleFails) > 0 {
6✔
1545
                        s.cfg.AckEventTicker.Resume()
3✔
1546
                }
3✔
1547

1548
                select {
3✔
1549
                case blockEpoch, ok := <-s.blockEpochStream.Epochs:
3✔
1550
                        if !ok {
3✔
1551
                                break out
×
1552
                        }
1553

1554
                        atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height))
3✔
1555

1556
                // A local close request has arrived, we'll forward this to the
1557
                // relevant link (if it exists) so the channel can be
1558
                // cooperatively closed (if possible).
1559
                case req := <-s.chanCloseRequests:
3✔
1560
                        chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
3✔
1561

3✔
1562
                        s.indexMtx.RLock()
3✔
1563
                        link, ok := s.linkIndex[chanID]
3✔
1564
                        if !ok {
6✔
1565
                                s.indexMtx.RUnlock()
3✔
1566

3✔
1567
                                req.Err <- fmt.Errorf("no peer for channel with "+
3✔
1568
                                        "chan_id=%x", chanID[:])
3✔
1569
                                continue
3✔
1570
                        }
1571
                        s.indexMtx.RUnlock()
3✔
1572

3✔
1573
                        peerPub := link.PeerPubKey()
3✔
1574
                        log.Debugf("Requesting local channel close: peer=%v, "+
3✔
1575
                                "chan_id=%x", link.PeerPubKey(), chanID[:])
3✔
1576

3✔
1577
                        go s.cfg.LocalChannelClose(peerPub[:], req)
3✔
1578

1579
                case resolutionMsg := <-s.resolutionMsgs:
3✔
1580
                        // We'll persist the resolution message to the Switch's
3✔
1581
                        // resolution store.
3✔
1582
                        resMsg := resolutionMsg.ResolutionMsg
3✔
1583
                        err := s.resMsgStore.addResolutionMsg(&resMsg)
3✔
1584
                        if err != nil {
3✔
1585
                                // This will only fail if there is a database
×
1586
                                // error or a serialization error. Sending the
×
1587
                                // error prevents the contractcourt from being
×
1588
                                // in a state where it believes the send was
×
1589
                                // successful, when it wasn't.
×
1590
                                log.Errorf("unable to add resolution msg: %v",
×
1591
                                        err)
×
1592
                                resolutionMsg.errChan <- err
×
1593
                                continue
×
1594
                        }
1595

1596
                        // At this point, the resolution message has been
1597
                        // persisted. It is safe to signal success by sending
1598
                        // a nil error since the Switch will re-deliver the
1599
                        // resolution message on restart.
1600
                        resolutionMsg.errChan <- nil
3✔
1601

3✔
1602
                        // Create a htlc packet for this resolution. We do
3✔
1603
                        // not have some of the information that we'll need
3✔
1604
                        // for blinded error handling here , so we'll rely on
3✔
1605
                        // our forwarding logic to fill it in later.
3✔
1606
                        pkt := &htlcPacket{
3✔
1607
                                outgoingChanID: resolutionMsg.SourceChan,
3✔
1608
                                outgoingHTLCID: resolutionMsg.HtlcIndex,
3✔
1609
                                isResolution:   true,
3✔
1610
                        }
3✔
1611

3✔
1612
                        // Resolution messages will either be cancelling
3✔
1613
                        // backwards an existing HTLC, or settling a previously
3✔
1614
                        // outgoing HTLC. Based on this, we'll map the message
3✔
1615
                        // to the proper htlcPacket.
3✔
1616
                        if resolutionMsg.Failure != nil {
6✔
1617
                                pkt.htlc = &lnwire.UpdateFailHTLC{}
3✔
1618
                        } else {
6✔
1619
                                pkt.htlc = &lnwire.UpdateFulfillHTLC{
3✔
1620
                                        PaymentPreimage: *resolutionMsg.PreImage,
3✔
1621
                                }
3✔
1622
                        }
3✔
1623

1624
                        log.Infof("Received outside contract resolution, "+
3✔
1625
                                "mapping to: %v", spew.Sdump(pkt))
3✔
1626

3✔
1627
                        // We don't check the error, as the only failure we can
3✔
1628
                        // encounter is due to the circuit already being
3✔
1629
                        // closed. This is fine, as processing this message is
3✔
1630
                        // meant to be idempotent.
3✔
1631
                        err = s.handlePacketForward(pkt)
3✔
1632
                        if err != nil {
6✔
1633
                                log.Errorf("Unable to forward resolution msg: %v", err)
3✔
1634
                        }
3✔
1635

1636
                // A new packet has arrived for forwarding, we'll interpret the
1637
                // packet concretely, then either forward it along, or
1638
                // interpret a return packet to a locally initialized one.
1639
                case cmd := <-s.htlcPlex:
3✔
1640
                        cmd.err <- s.handlePacketForward(cmd.pkt)
3✔
1641

1642
                // When this time ticks, then it indicates that we should
1643
                // collect all the forwarding events since the last internal,
1644
                // and write them out to our log.
1645
                case <-s.cfg.FwdEventTicker.Ticks():
3✔
1646
                        err := s.gm.Go(func(ctx context.Context) {
6✔
1647
                                err := s.FlushForwardingEvents()
3✔
1648
                                if err != nil {
3✔
1649
                                        log.Errorf("unable to flush "+
×
1650
                                                "forwarding events: %v", err)
×
1651
                                }
×
1652
                        })
1653
                        if err != nil {
3✔
NEW
1654
                                return
×
NEW
1655
                        }
×
1656

1657
                // The log ticker has fired, so we'll calculate some forwarding
1658
                // stats for the last 10 seconds to display within the logs to
1659
                // users.
1660
                case <-s.cfg.LogEventTicker.Ticks():
3✔
1661
                        // First, we'll collate the current running tally of
3✔
1662
                        // our forwarding stats.
3✔
1663
                        prevSatSent := totalSatSent
3✔
1664
                        prevSatRecv := totalSatRecv
3✔
1665
                        prevNumUpdates := totalNumUpdates
3✔
1666

3✔
1667
                        var (
3✔
1668
                                newNumUpdates uint64
3✔
1669
                                newSatSent    btcutil.Amount
3✔
1670
                                newSatRecv    btcutil.Amount
3✔
1671
                        )
3✔
1672

3✔
1673
                        // Next, we'll run through all the registered links and
3✔
1674
                        // compute their up-to-date forwarding stats.
3✔
1675
                        s.indexMtx.RLock()
3✔
1676
                        for _, link := range s.linkIndex {
6✔
1677
                                // TODO(roasbeef): when links first registered
3✔
1678
                                // stats printed.
3✔
1679
                                updates, sent, recv := link.Stats()
3✔
1680
                                newNumUpdates += updates
3✔
1681
                                newSatSent += sent.ToSatoshis()
3✔
1682
                                newSatRecv += recv.ToSatoshis()
3✔
1683
                        }
3✔
1684
                        s.indexMtx.RUnlock()
3✔
1685

3✔
1686
                        var (
3✔
1687
                                diffNumUpdates uint64
3✔
1688
                                diffSatSent    btcutil.Amount
3✔
1689
                                diffSatRecv    btcutil.Amount
3✔
1690
                        )
3✔
1691

3✔
1692
                        // If this is the first time we're computing these
3✔
1693
                        // stats, then the diff is just the new value. We do
3✔
1694
                        // this in order to avoid integer underflow issues.
3✔
1695
                        if prevNumUpdates == 0 {
6✔
1696
                                diffNumUpdates = newNumUpdates
3✔
1697
                                diffSatSent = newSatSent
3✔
1698
                                diffSatRecv = newSatRecv
3✔
1699
                        } else {
6✔
1700
                                diffNumUpdates = newNumUpdates - prevNumUpdates
3✔
1701
                                diffSatSent = newSatSent - prevSatSent
3✔
1702
                                diffSatRecv = newSatRecv - prevSatRecv
3✔
1703
                        }
3✔
1704

1705
                        // If the diff of num updates is zero, then we haven't
1706
                        // forwarded anything in the last 10 seconds, so we can
1707
                        // skip this update.
1708
                        if diffNumUpdates == 0 {
6✔
1709
                                continue
3✔
1710
                        }
1711

1712
                        // If the diff of num updates is negative, then some
1713
                        // links may have been unregistered from the switch, so
1714
                        // we'll update our stats to only include our registered
1715
                        // links.
1716
                        if int64(diffNumUpdates) < 0 {
6✔
1717
                                totalNumUpdates = newNumUpdates
3✔
1718
                                totalSatSent = newSatSent
3✔
1719
                                totalSatRecv = newSatRecv
3✔
1720
                                continue
3✔
1721
                        }
1722

1723
                        // Otherwise, we'll log this diff, then accumulate the
1724
                        // new stats into the running total.
1725
                        log.Debugf("Sent %d satoshis and received %d satoshis "+
3✔
1726
                                "in the last 10 seconds (%f tx/sec)",
3✔
1727
                                diffSatSent, diffSatRecv,
3✔
1728
                                float64(diffNumUpdates)/10)
3✔
1729

3✔
1730
                        totalNumUpdates += diffNumUpdates
3✔
1731
                        totalSatSent += diffSatSent
3✔
1732
                        totalSatRecv += diffSatRecv
3✔
1733

1734
                // The ack ticker has fired so if we have any settle/fail entries
1735
                // for a forwarding package to ack, we will do so here in a batch
1736
                // db call.
1737
                case <-s.cfg.AckEventTicker.Ticks():
3✔
1738
                        // If the current set is empty, pause the ticker.
3✔
1739
                        if len(s.pendingSettleFails) == 0 {
6✔
1740
                                s.cfg.AckEventTicker.Pause()
3✔
1741
                                continue
3✔
1742
                        }
1743

1744
                        // Batch ack the settle/fail entries.
1745
                        if err := s.ackSettleFail(s.pendingSettleFails...); err != nil {
3✔
1746
                                log.Errorf("Unable to ack batch of settle/fails: %v", err)
×
1747
                                continue
×
1748
                        }
1749

1750
                        log.Tracef("Acked %d settle fails: %v",
3✔
1751
                                len(s.pendingSettleFails),
3✔
1752
                                lnutils.SpewLogClosure(s.pendingSettleFails))
3✔
1753

3✔
1754
                        // Reset the pendingSettleFails buffer while keeping acquired
3✔
1755
                        // memory.
3✔
1756
                        s.pendingSettleFails = s.pendingSettleFails[:0]
3✔
1757

1758
                case <-s.gm.Done():
3✔
1759
                        return
3✔
1760
                }
1761
        }
1762
}
1763

1764
// Start starts all helper goroutines required for the operation of the switch.
1765
func (s *Switch) Start() error {
3✔
1766
        if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
3✔
1767
                log.Warn("Htlc Switch already started")
×
NEW
1768

×
1769
                return errors.New("htlc switch already started")
×
1770
        }
×
1771

1772
        log.Infof("HTLC Switch starting")
3✔
1773

3✔
1774
        blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
3✔
1775
        if err != nil {
3✔
1776
                return err
×
1777
        }
×
1778
        s.blockEpochStream = blockEpochStream
3✔
1779

3✔
1780
        err = s.gm.Go(func(ctx context.Context) {
6✔
1781
                s.htlcForwarder()
3✔
1782
        })
3✔
1783
        if err != nil {
3✔
NEW
1784
                // We are already stopping so we can ignore the error.
×
NEW
1785
                _ = s.Stop()
×
NEW
1786
                err = fmt.Errorf("unable to start htlc forwarder: %w", err)
×
NEW
1787
                log.Errorf("%v", err)
×
NEW
1788

×
NEW
1789
                return err
×
NEW
1790
        }
×
1791

1792
        if err := s.reforwardResponses(); err != nil {
3✔
NEW
1793
                // We are already stopping so we can ignore the error.
×
NEW
1794
                _ = s.Stop()
×
1795
                log.Errorf("unable to reforward responses: %v", err)
×
NEW
1796

×
1797
                return err
×
1798
        }
×
1799

1800
        if err := s.reforwardResolutions(); err != nil {
3✔
1801
                // We are already stopping so we can ignore the error.
×
1802
                _ = s.Stop()
×
1803
                log.Errorf("unable to reforward resolutions: %v", err)
×
NEW
1804

×
1805
                return err
×
1806
        }
×
1807

1808
        return nil
3✔
1809
}
1810

1811
// reforwardResolutions fetches the set of resolution messages stored on-disk
1812
// and reforwards them if their circuits are still open. If the circuits have
1813
// been deleted, then we will delete the resolution message from the database.
1814
func (s *Switch) reforwardResolutions() error {
3✔
1815
        // Fetch all stored resolution messages, deleting the ones that are
3✔
1816
        // resolved.
3✔
1817
        resMsgs, err := s.resMsgStore.fetchAllResolutionMsg()
3✔
1818
        if err != nil {
3✔
1819
                return err
×
1820
        }
×
1821

1822
        switchPackets := make([]*htlcPacket, 0, len(resMsgs))
3✔
1823
        for _, resMsg := range resMsgs {
6✔
1824
                // If the open circuit no longer exists, then we can remove the
3✔
1825
                // message from the store.
3✔
1826
                outKey := CircuitKey{
3✔
1827
                        ChanID: resMsg.SourceChan,
3✔
1828
                        HtlcID: resMsg.HtlcIndex,
3✔
1829
                }
3✔
1830

3✔
1831
                if s.circuits.LookupOpenCircuit(outKey) == nil {
6✔
1832
                        // The open circuit doesn't exist.
3✔
1833
                        err := s.resMsgStore.deleteResolutionMsg(&outKey)
3✔
1834
                        if err != nil {
3✔
1835
                                return err
×
1836
                        }
×
1837

1838
                        continue
3✔
1839
                }
1840

1841
                // The circuit is still open, so we can assume that the link or
1842
                // switch (if we are the source) hasn't cleaned it up yet.
1843
                // We rely on our forwarding logic to fill in details that
1844
                // are not currently available to us.
1845
                resPkt := &htlcPacket{
3✔
1846
                        outgoingChanID: resMsg.SourceChan,
3✔
1847
                        outgoingHTLCID: resMsg.HtlcIndex,
3✔
1848
                        isResolution:   true,
3✔
1849
                }
3✔
1850

3✔
1851
                if resMsg.Failure != nil {
6✔
1852
                        resPkt.htlc = &lnwire.UpdateFailHTLC{}
3✔
1853
                } else {
3✔
1854
                        resPkt.htlc = &lnwire.UpdateFulfillHTLC{
×
1855
                                PaymentPreimage: *resMsg.PreImage,
×
1856
                        }
×
1857
                }
×
1858

1859
                switchPackets = append(switchPackets, resPkt)
3✔
1860
        }
1861

1862
        // We'll now dispatch the set of resolution messages to the proper
1863
        // destination. An error is only encountered here if the switch is
1864
        // shutting down.
1865
        if err := s.ForwardPackets(nil, switchPackets...); err != nil {
3✔
1866
                return err
×
1867
        }
×
1868

1869
        return nil
3✔
1870
}
1871

1872
// reforwardResponses for every known, non-pending channel, loads all associated
1873
// forwarding packages and reforwards any Settle or Fail HTLCs found. This is
1874
// used to resurrect the switch's mailboxes after a restart. This also runs for
1875
// waiting close channels since there may be settles or fails that need to be
1876
// reforwarded before they completely close.
1877
func (s *Switch) reforwardResponses() error {
3✔
1878
        openChannels, err := s.cfg.FetchAllChannels()
3✔
1879
        if err != nil {
3✔
1880
                return err
×
1881
        }
×
1882

1883
        for _, openChannel := range openChannels {
6✔
1884
                shortChanID := openChannel.ShortChanID()
3✔
1885

3✔
1886
                // Locally-initiated payments never need reforwarding.
3✔
1887
                if shortChanID == hop.Source {
6✔
1888
                        continue
3✔
1889
                }
1890

1891
                // If the channel is pending, it should have no forwarding
1892
                // packages, and nothing to reforward.
1893
                if openChannel.IsPending {
3✔
1894
                        continue
×
1895
                }
1896

1897
                // Channels in open or waiting-close may still have responses in
1898
                // their forwarding packages. We will continue to reattempt
1899
                // forwarding on startup until the channel is fully-closed.
1900
                //
1901
                // Load this channel's forwarding packages, and deliver them to
1902
                // the switch.
1903
                fwdPkgs, err := s.loadChannelFwdPkgs(shortChanID)
3✔
1904
                if err != nil {
3✔
1905
                        log.Errorf("unable to load forwarding "+
×
1906
                                "packages for %v: %v", shortChanID, err)
×
1907
                        return err
×
1908
                }
×
1909

1910
                s.reforwardSettleFails(fwdPkgs)
3✔
1911
        }
1912

1913
        return nil
3✔
1914
}
1915

1916
// loadChannelFwdPkgs loads all forwarding packages owned by the `source` short
1917
// channel identifier.
1918
func (s *Switch) loadChannelFwdPkgs(source lnwire.ShortChannelID) ([]*channeldb.FwdPkg, error) {
3✔
1919

3✔
1920
        var fwdPkgs []*channeldb.FwdPkg
3✔
1921
        if err := kvdb.View(s.cfg.DB, func(tx kvdb.RTx) error {
6✔
1922
                var err error
3✔
1923
                fwdPkgs, err = s.cfg.SwitchPackager.LoadChannelFwdPkgs(
3✔
1924
                        tx, source,
3✔
1925
                )
3✔
1926
                return err
3✔
1927
        }, func() {
6✔
1928
                fwdPkgs = nil
3✔
1929
        }); err != nil {
3✔
1930
                return nil, err
×
1931
        }
×
1932

1933
        return fwdPkgs, nil
3✔
1934
}
1935

1936
// reforwardSettleFails parses the Settle and Fail HTLCs from the list of
1937
// forwarding packages, and reforwards those that have not been acknowledged.
1938
// This is intended to occur on startup, in order to recover the switch's
1939
// mailboxes, and to ensure that responses can be propagated in case the
1940
// outgoing link never comes back online.
1941
//
1942
// NOTE: This should mimic the behavior processRemoteSettleFails.
1943
func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
3✔
1944
        for _, fwdPkg := range fwdPkgs {
6✔
1945
                switchPackets := make([]*htlcPacket, 0, len(fwdPkg.SettleFails))
3✔
1946
                for i, update := range fwdPkg.SettleFails {
6✔
1947
                        // Skip any settles or fails that have already been
3✔
1948
                        // acknowledged by the incoming link that originated the
3✔
1949
                        // forwarded Add.
3✔
1950
                        if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
6✔
1951
                                continue
3✔
1952
                        }
1953

1954
                        switch msg := update.UpdateMsg.(type) {
3✔
1955
                        // A settle for an HTLC we previously forwarded HTLC has
1956
                        // been received. So we'll forward the HTLC to the
1957
                        // switch which will handle propagating the settle to
1958
                        // the prior hop.
1959
                        case *lnwire.UpdateFulfillHTLC:
3✔
1960
                                destRef := fwdPkg.DestRef(uint16(i))
3✔
1961
                                settlePacket := &htlcPacket{
3✔
1962
                                        outgoingChanID: fwdPkg.Source,
3✔
1963
                                        outgoingHTLCID: msg.ID,
3✔
1964
                                        destRef:        &destRef,
3✔
1965
                                        htlc:           msg,
3✔
1966
                                }
3✔
1967

3✔
1968
                                // Add the packet to the batch to be forwarded, and
3✔
1969
                                // notify the overflow queue that a spare spot has been
3✔
1970
                                // freed up within the commitment state.
3✔
1971
                                switchPackets = append(switchPackets, settlePacket)
3✔
1972

1973
                        // A failureCode message for a previously forwarded HTLC has been
1974
                        // received. As a result a new slot will be freed up in our
1975
                        // commitment state, so we'll forward this to the switch so the
1976
                        // backwards undo can continue.
1977
                        case *lnwire.UpdateFailHTLC:
×
1978
                                // Fetch the reason the HTLC was canceled so
×
1979
                                // we can continue to propagate it. This
×
1980
                                // failure originated from another node, so
×
1981
                                // the linkFailure field is not set on this
×
1982
                                // packet. We rely on the link to fill in
×
1983
                                // additional circuit information for us.
×
1984
                                failPacket := &htlcPacket{
×
1985
                                        outgoingChanID: fwdPkg.Source,
×
1986
                                        outgoingHTLCID: msg.ID,
×
1987
                                        destRef: &channeldb.SettleFailRef{
×
1988
                                                Source: fwdPkg.Source,
×
1989
                                                Height: fwdPkg.Height,
×
1990
                                                Index:  uint16(i),
×
1991
                                        },
×
1992
                                        htlc: msg,
×
1993
                                }
×
1994

×
1995
                                // Add the packet to the batch to be forwarded, and
×
1996
                                // notify the overflow queue that a spare spot has been
×
1997
                                // freed up within the commitment state.
×
1998
                                switchPackets = append(switchPackets, failPacket)
×
1999
                        }
2000
                }
2001

2002
                // Since this send isn't tied to a specific link, we pass a nil
2003
                // link quit channel, meaning the send will fail only if the
2004
                // switch receives a shutdown request.
2005
                if err := s.ForwardPackets(nil, switchPackets...); err != nil {
3✔
2006
                        log.Errorf("Unhandled error while reforwarding packets "+
×
2007
                                "settle/fail over htlcswitch: %v", err)
×
2008
                }
×
2009
        }
2010
}
2011

2012
// Stop gracefully stops all active helper goroutines, then waits until they've
2013
// exited.
2014
func (s *Switch) Stop() error {
3✔
2015
        if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
3✔
2016
                log.Warn("Htlc Switch already stopped")
×
2017
                return errors.New("htlc switch already shutdown")
×
2018
        }
×
2019

2020
        log.Info("HTLC Switch shutting down...")
3✔
2021
        defer log.Debug("HTLC Switch shutdown complete")
3✔
2022

3✔
2023
        // Ask running goroutines to stop and wait for them.
3✔
2024
        s.gm.Stop()
3✔
2025

3✔
2026
        // TODO(yy): remove this, when s.handleLocalResponseWG is removed.
3✔
2027
        s.handleLocalResponseWG.Wait()
3✔
2028

3✔
2029
        // Wait until all active goroutines have finished exiting before
3✔
2030
        // stopping the mailboxes, otherwise the mailbox map could still be
3✔
2031
        // accessed and modified.
3✔
2032
        s.mailOrchestrator.Stop()
3✔
2033

3✔
2034
        return nil
3✔
2035
}
2036

2037
// CreateAndAddLink will create a link and then add it to the internal maps
2038
// when given a ChannelLinkConfig and LightningChannel.
2039
func (s *Switch) CreateAndAddLink(linkCfg ChannelLinkConfig,
2040
        lnChan *lnwallet.LightningChannel) error {
3✔
2041

3✔
2042
        link := NewChannelLink(linkCfg, lnChan)
3✔
2043
        return s.AddLink(link)
3✔
2044
}
3✔
2045

2046
// AddLink is used to initiate the handling of the add link command. The
2047
// request will be propagated and handled in the main goroutine.
2048
func (s *Switch) AddLink(link ChannelLink) error {
3✔
2049
        s.indexMtx.Lock()
3✔
2050
        defer s.indexMtx.Unlock()
3✔
2051

3✔
2052
        chanID := link.ChanID()
3✔
2053

3✔
2054
        // First, ensure that this link is not already active in the switch.
3✔
2055
        _, err := s.getLink(chanID)
3✔
2056
        if err == nil {
3✔
2057
                return fmt.Errorf("unable to add ChannelLink(%v), already "+
×
2058
                        "active", chanID)
×
2059
        }
×
2060

2061
        // Get and attach the mailbox for this link, which buffers packets in
2062
        // case there packets that we tried to deliver while this link was
2063
        // offline.
2064
        shortChanID := link.ShortChanID()
3✔
2065
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID, shortChanID)
3✔
2066
        link.AttachMailBox(mailbox)
3✔
2067

3✔
2068
        // Attach the Switch's failAliasUpdate function to the link.
3✔
2069
        link.attachFailAliasUpdate(s.failAliasUpdate)
3✔
2070

3✔
2071
        if err := link.Start(); err != nil {
3✔
2072
                log.Errorf("AddLink failed to start link with chanID=%v: %v",
×
2073
                        chanID, err)
×
2074
                s.removeLink(chanID)
×
2075
                return err
×
2076
        }
×
2077

2078
        if shortChanID == hop.Source {
6✔
2079
                log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
3✔
2080
                        chanID, shortChanID)
3✔
2081

3✔
2082
                s.pendingLinkIndex[chanID] = link
3✔
2083
        } else {
6✔
2084
                log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
3✔
2085
                        chanID, shortChanID)
3✔
2086

3✔
2087
                s.addLiveLink(link)
3✔
2088
                s.mailOrchestrator.BindLiveShortChanID(
3✔
2089
                        mailbox, chanID, shortChanID,
3✔
2090
                )
3✔
2091
        }
3✔
2092

2093
        return nil
3✔
2094
}
2095

2096
// addLiveLink adds a link to all associated forwarding index, this makes it a
2097
// candidate for forwarding HTLCs.
2098
func (s *Switch) addLiveLink(link ChannelLink) {
3✔
2099
        linkScid := link.ShortChanID()
3✔
2100

3✔
2101
        // We'll add the link to the linkIndex which lets us quickly
3✔
2102
        // look up a channel when we need to close or register it, and
3✔
2103
        // the forwarding index which'll be used when forwarding HTLC's
3✔
2104
        // in the multi-hop setting.
3✔
2105
        s.linkIndex[link.ChanID()] = link
3✔
2106
        s.forwardingIndex[linkScid] = link
3✔
2107

3✔
2108
        // Next we'll add the link to the interface index so we can
3✔
2109
        // quickly look up all the channels for a particular node.
3✔
2110
        peerPub := link.PeerPubKey()
3✔
2111
        if _, ok := s.interfaceIndex[peerPub]; !ok {
6✔
2112
                s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
3✔
2113
        }
3✔
2114
        s.interfaceIndex[peerPub][link.ChanID()] = link
3✔
2115

3✔
2116
        s.updateLinkAliases(link)
3✔
2117
}
2118

2119
// UpdateLinkAliases is the externally exposed wrapper for updating link
2120
// aliases. It acquires the indexMtx and calls the internal method.
2121
func (s *Switch) UpdateLinkAliases(link ChannelLink) {
3✔
2122
        s.indexMtx.Lock()
3✔
2123
        defer s.indexMtx.Unlock()
3✔
2124

3✔
2125
        s.updateLinkAliases(link)
3✔
2126
}
3✔
2127

2128
// updateLinkAliases updates the aliases for a given link. This will cause the
2129
// htlcswitch to consult the alias manager on the up to date values of its
2130
// alias maps.
2131
//
2132
// NOTE: this MUST be called with the indexMtx held.
2133
func (s *Switch) updateLinkAliases(link ChannelLink) {
3✔
2134
        linkScid := link.ShortChanID()
3✔
2135

3✔
2136
        aliases := link.getAliases()
3✔
2137
        if link.isZeroConf() {
6✔
2138
                if link.zeroConfConfirmed() {
6✔
2139
                        // Since the zero-conf channel has confirmed, we can
3✔
2140
                        // populate the aliasToReal mapping.
3✔
2141
                        confirmedScid := link.confirmedScid()
3✔
2142

3✔
2143
                        for _, alias := range aliases {
6✔
2144
                                s.aliasToReal[alias] = confirmedScid
3✔
2145
                        }
3✔
2146

2147
                        // Add the confirmed SCID as a key in the baseIndex.
2148
                        s.baseIndex[confirmedScid] = linkScid
3✔
2149
                }
2150

2151
                // Now we populate the baseIndex which will be used to fetch
2152
                // the link given any of the channel's alias SCIDs or the real
2153
                // SCID. The link's SCID is an alias, so we don't need to
2154
                // special-case it like the option-scid-alias feature-bit case
2155
                // further down.
2156
                for _, alias := range aliases {
6✔
2157
                        s.baseIndex[alias] = linkScid
3✔
2158
                }
3✔
2159
        } else if link.negotiatedAliasFeature() {
6✔
2160
                // First, we flush any alias mappings for this link's scid
3✔
2161
                // before we populate the map again, in order to get rid of old
3✔
2162
                // values that no longer exist.
3✔
2163
                for alias, real := range s.aliasToReal {
6✔
2164
                        if real == linkScid {
6✔
2165
                                delete(s.aliasToReal, alias)
3✔
2166
                        }
3✔
2167
                }
2168

2169
                for alias, real := range s.baseIndex {
6✔
2170
                        if real == linkScid {
6✔
2171
                                delete(s.baseIndex, alias)
3✔
2172
                        }
3✔
2173
                }
2174

2175
                // The link's SCID is the confirmed SCID for non-zero-conf
2176
                // option-scid-alias feature bit channels.
2177
                for _, alias := range aliases {
6✔
2178
                        s.aliasToReal[alias] = linkScid
3✔
2179
                        s.baseIndex[alias] = linkScid
3✔
2180
                }
3✔
2181

2182
                // Since the link's SCID is confirmed, it was not included in
2183
                // the baseIndex above as a key. Add it now.
2184
                s.baseIndex[linkScid] = linkScid
3✔
2185
        }
2186
}
2187

2188
// GetLink is used to initiate the handling of the get link command. The
2189
// request will be propagated/handled to/in the main goroutine.
2190
func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelUpdateHandler,
2191
        error) {
3✔
2192

3✔
2193
        s.indexMtx.RLock()
3✔
2194
        defer s.indexMtx.RUnlock()
3✔
2195

3✔
2196
        return s.getLink(chanID)
3✔
2197
}
3✔
2198

2199
// getLink returns the link stored in either the pending index or the live
2200
// lindex.
2201
func (s *Switch) getLink(chanID lnwire.ChannelID) (ChannelLink, error) {
3✔
2202
        link, ok := s.linkIndex[chanID]
3✔
2203
        if !ok {
6✔
2204
                link, ok = s.pendingLinkIndex[chanID]
3✔
2205
                if !ok {
6✔
2206
                        return nil, ErrChannelLinkNotFound
3✔
2207
                }
3✔
2208
        }
2209

2210
        return link, nil
3✔
2211
}
2212

2213
// GetLinkByShortID attempts to return the link which possesses the target short
2214
// channel ID.
2215
func (s *Switch) GetLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink,
2216
        error) {
3✔
2217

3✔
2218
        s.indexMtx.RLock()
3✔
2219
        defer s.indexMtx.RUnlock()
3✔
2220

3✔
2221
        link, err := s.getLinkByShortID(chanID)
3✔
2222
        if err != nil {
6✔
2223
                // If we failed to find the link under the passed-in SCID, we
3✔
2224
                // consult the Switch's baseIndex map to see if the confirmed
3✔
2225
                // SCID was used for a zero-conf channel.
3✔
2226
                aliasID, ok := s.baseIndex[chanID]
3✔
2227
                if !ok {
6✔
2228
                        return nil, err
3✔
2229
                }
3✔
2230

2231
                // An alias was found, use it to lookup if a link exists.
2232
                return s.getLinkByShortID(aliasID)
3✔
2233
        }
2234

2235
        return link, nil
3✔
2236
}
2237

2238
// getLinkByShortID attempts to return the link which possesses the target
2239
// short channel ID.
2240
//
2241
// NOTE: This MUST be called with the indexMtx held.
2242
func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, error) {
3✔
2243
        link, ok := s.forwardingIndex[chanID]
3✔
2244
        if !ok {
6✔
2245
                return nil, ErrChannelLinkNotFound
3✔
2246
        }
3✔
2247

2248
        return link, nil
3✔
2249
}
2250

2251
// getLinkByMapping attempts to fetch the link via the htlcPacket's
2252
// outgoingChanID, possibly using a mapping. If it finds the link via mapping,
2253
// the outgoingChanID will be changed so that an error can be properly
2254
// attributed when looping over linkErrs in handlePacketForward.
2255
//
2256
// * If the outgoingChanID is an alias, we'll fetch the link regardless if it's
2257
// public or not.
2258
//
2259
// * If the outgoingChanID is a confirmed SCID, we'll need to do more checks.
2260
//   - If there is no entry found in baseIndex, fetch the link. This channel
2261
//     did not have the option-scid-alias feature negotiated (which includes
2262
//     zero-conf and option-scid-alias channel-types).
2263
//   - If there is an entry found, fetch the link from forwardingIndex and
2264
//     fail if this is a private link.
2265
//
2266
// NOTE: This MUST be called with the indexMtx read lock held.
2267
func (s *Switch) getLinkByMapping(pkt *htlcPacket) (ChannelLink, error) {
3✔
2268
        // Determine if this ShortChannelID is an alias or a confirmed SCID.
3✔
2269
        chanID := pkt.outgoingChanID
3✔
2270
        aliasID := s.cfg.IsAlias(chanID)
3✔
2271

3✔
2272
        // Set the originalOutgoingChanID so the proper channel_update can be
3✔
2273
        // sent back if the option-scid-alias feature bit was negotiated.
3✔
2274
        pkt.originalOutgoingChanID = chanID
3✔
2275

3✔
2276
        if aliasID {
6✔
2277
                // Since outgoingChanID is an alias, we'll fetch the link via
3✔
2278
                // baseIndex.
3✔
2279
                baseScid, ok := s.baseIndex[chanID]
3✔
2280
                if !ok {
3✔
2281
                        // No mapping exists, bail.
×
2282
                        return nil, ErrChannelLinkNotFound
×
2283
                }
×
2284

2285
                // A mapping exists, so use baseScid to find the link in the
2286
                // forwardingIndex.
2287
                link, ok := s.forwardingIndex[baseScid]
3✔
2288
                if !ok {
3✔
2289
                        // Link not found, bail.
×
2290
                        return nil, ErrChannelLinkNotFound
×
2291
                }
×
2292

2293
                // Change the packet's outgoingChanID field so that errors are
2294
                // properly attributed.
2295
                pkt.outgoingChanID = baseScid
3✔
2296

3✔
2297
                // Return the link without checking if it's private or not.
3✔
2298
                return link, nil
3✔
2299
        }
2300

2301
        // The outgoingChanID is a confirmed SCID. Attempt to fetch the base
2302
        // SCID from baseIndex.
2303
        baseScid, ok := s.baseIndex[chanID]
3✔
2304
        if !ok {
6✔
2305
                // outgoingChanID is not a key in base index meaning this
3✔
2306
                // channel did not have the option-scid-alias feature bit
3✔
2307
                // negotiated. We'll fetch the link and return it.
3✔
2308
                link, ok := s.forwardingIndex[chanID]
3✔
2309
                if !ok {
6✔
2310
                        // The link wasn't found, bail out.
3✔
2311
                        return nil, ErrChannelLinkNotFound
3✔
2312
                }
3✔
2313

2314
                return link, nil
3✔
2315
        }
2316

2317
        // Fetch the link whose internal SCID is baseScid.
2318
        link, ok := s.forwardingIndex[baseScid]
3✔
2319
        if !ok {
3✔
2320
                // Link wasn't found, bail out.
×
2321
                return nil, ErrChannelLinkNotFound
×
2322
        }
×
2323

2324
        // If the link is unadvertised, we fail since the real SCID was used to
2325
        // forward over it and this is a channel where the option-scid-alias
2326
        // feature bit was negotiated.
2327
        if link.IsUnadvertised() {
3✔
2328
                return nil, ErrChannelLinkNotFound
×
2329
        }
×
2330

2331
        // The link is public so the confirmed SCID can be used to forward over
2332
        // it. We'll also replace pkt's outgoingChanID field so errors can
2333
        // properly be attributed in the calling function.
2334
        pkt.outgoingChanID = baseScid
3✔
2335
        return link, nil
3✔
2336
}
2337

2338
// HasActiveLink returns true if the given channel ID has a link in the link
2339
// index AND the link is eligible to forward.
2340
func (s *Switch) HasActiveLink(chanID lnwire.ChannelID) bool {
3✔
2341
        s.indexMtx.RLock()
3✔
2342
        defer s.indexMtx.RUnlock()
3✔
2343

3✔
2344
        if link, ok := s.linkIndex[chanID]; ok {
6✔
2345
                return link.EligibleToForward()
3✔
2346
        }
3✔
2347

2348
        return false
3✔
2349
}
2350

2351
// RemoveLink purges the switch of any link associated with chanID. If a pending
2352
// or active link is not found, this method does nothing. Otherwise, the method
2353
// returns after the link has been completely shutdown.
2354
func (s *Switch) RemoveLink(chanID lnwire.ChannelID) {
3✔
2355
        s.indexMtx.Lock()
3✔
2356
        link, err := s.getLink(chanID)
3✔
2357
        if err != nil {
6✔
2358
                // If err is non-nil, this means that link is also nil. The
3✔
2359
                // link variable cannot be nil without err being non-nil.
3✔
2360
                s.indexMtx.Unlock()
3✔
2361
                log.Tracef("Unable to remove link for ChannelID(%v): %v",
3✔
2362
                        chanID, err)
3✔
2363
                return
3✔
2364
        }
3✔
2365

2366
        // Check if the link is already stopping and grab the stop chan if it
2367
        // is.
2368
        stopChan, ok := s.linkStopIndex[chanID]
3✔
2369
        if !ok {
6✔
2370
                // If the link is non-nil, it is not currently stopping, so
3✔
2371
                // we'll add a stop chan to the linkStopIndex.
3✔
2372
                stopChan = make(chan struct{})
3✔
2373
                s.linkStopIndex[chanID] = stopChan
3✔
2374
        }
3✔
2375
        s.indexMtx.Unlock()
3✔
2376

3✔
2377
        if ok {
3✔
2378
                // If the stop chan exists, we will wait for it to be closed.
×
2379
                // Once it is closed, we will exit.
×
2380
                select {
×
2381
                case <-stopChan:
×
2382
                        return
×
NEW
2383
                case <-s.gm.Done():
×
2384
                        return
×
2385
                }
2386
        }
2387

2388
        // Stop the link before removing it from the maps.
2389
        link.Stop()
3✔
2390

3✔
2391
        s.indexMtx.Lock()
3✔
2392
        _ = s.removeLink(chanID)
3✔
2393

3✔
2394
        // Close stopChan and remove this link from the linkStopIndex.
3✔
2395
        // Deleting from the index and removing from the link must be done
3✔
2396
        // in the same block while the mutex is held.
3✔
2397
        close(stopChan)
3✔
2398
        delete(s.linkStopIndex, chanID)
3✔
2399
        s.indexMtx.Unlock()
3✔
2400
}
2401

2402
// removeLink is used to remove and stop the channel link.
2403
//
2404
// NOTE: This MUST be called with the indexMtx held.
2405
func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
3✔
2406
        log.Infof("Removing channel link with ChannelID(%v)", chanID)
3✔
2407

3✔
2408
        link, err := s.getLink(chanID)
3✔
2409
        if err != nil {
4✔
2410
                return nil
1✔
2411
        }
1✔
2412

2413
        // Remove the channel from live link indexes.
2414
        delete(s.pendingLinkIndex, link.ChanID())
3✔
2415
        delete(s.linkIndex, link.ChanID())
3✔
2416
        delete(s.forwardingIndex, link.ShortChanID())
3✔
2417

3✔
2418
        // If the link has been added to the peer index, then we'll move to
3✔
2419
        // delete the entry within the index.
3✔
2420
        peerPub := link.PeerPubKey()
3✔
2421
        if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
6✔
2422
                delete(peerIndex, link.ChanID())
3✔
2423

3✔
2424
                // If after deletion, there are no longer any links, then we'll
3✔
2425
                // remove the interface map all together.
3✔
2426
                if len(peerIndex) == 0 {
6✔
2427
                        delete(s.interfaceIndex, peerPub)
3✔
2428
                }
3✔
2429
        }
2430

2431
        return link
3✔
2432
}
2433

2434
// UpdateShortChanID locates the link with the passed-in chanID and updates the
2435
// underlying channel state. This is only used in zero-conf channels to allow
2436
// the confirmed SCID to be updated.
2437
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
3✔
2438
        s.indexMtx.Lock()
3✔
2439
        defer s.indexMtx.Unlock()
3✔
2440

3✔
2441
        // Locate the target link in the link index. If no such link exists,
3✔
2442
        // then we will ignore the request.
3✔
2443
        link, ok := s.linkIndex[chanID]
3✔
2444
        if !ok {
3✔
2445
                return fmt.Errorf("link %v not found", chanID)
×
2446
        }
×
2447

2448
        // Try to update the link's underlying channel state, returning early
2449
        // if this update failed.
2450
        _, err := link.UpdateShortChanID()
3✔
2451
        if err != nil {
3✔
2452
                return err
×
2453
        }
×
2454

2455
        // Since the zero-conf channel is confirmed, we should populate the
2456
        // aliasToReal map and update the baseIndex.
2457
        aliases := link.getAliases()
3✔
2458

3✔
2459
        confirmedScid := link.confirmedScid()
3✔
2460

3✔
2461
        for _, alias := range aliases {
6✔
2462
                s.aliasToReal[alias] = confirmedScid
3✔
2463
        }
3✔
2464

2465
        s.baseIndex[confirmedScid] = link.ShortChanID()
3✔
2466

3✔
2467
        return nil
3✔
2468
}
2469

2470
// GetLinksByInterface fetches all the links connected to a particular node
2471
// identified by the serialized compressed form of its public key.
2472
func (s *Switch) GetLinksByInterface(hop [33]byte) ([]ChannelUpdateHandler,
2473
        error) {
3✔
2474

3✔
2475
        s.indexMtx.RLock()
3✔
2476
        defer s.indexMtx.RUnlock()
3✔
2477

3✔
2478
        var handlers []ChannelUpdateHandler
3✔
2479

3✔
2480
        links, err := s.getLinks(hop)
3✔
2481
        if err != nil {
6✔
2482
                return nil, err
3✔
2483
        }
3✔
2484

2485
        // Range over the returned []ChannelLink to convert them into
2486
        // []ChannelUpdateHandler.
2487
        for _, link := range links {
6✔
2488
                handlers = append(handlers, link)
3✔
2489
        }
3✔
2490

2491
        return handlers, nil
3✔
2492
}
2493

2494
// getLinks is function which returns the channel links of the peer by hop
2495
// destination id.
2496
//
2497
// NOTE: This MUST be called with the indexMtx held.
2498
func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
3✔
2499
        links, ok := s.interfaceIndex[destination]
3✔
2500
        if !ok {
6✔
2501
                return nil, ErrNoLinksFound
3✔
2502
        }
3✔
2503

2504
        channelLinks := make([]ChannelLink, 0, len(links))
3✔
2505
        for _, link := range links {
6✔
2506
                channelLinks = append(channelLinks, link)
3✔
2507
        }
3✔
2508

2509
        return channelLinks, nil
3✔
2510
}
2511

2512
// CircuitModifier returns a reference to subset of the interfaces provided by
2513
// the circuit map, to allow links to open and close circuits.
2514
func (s *Switch) CircuitModifier() CircuitModifier {
3✔
2515
        return s.circuits
3✔
2516
}
3✔
2517

2518
// CircuitLookup returns a reference to subset of the interfaces provided by the
2519
// circuit map, to allow looking up circuits.
2520
func (s *Switch) CircuitLookup() CircuitLookup {
3✔
2521
        return s.circuits
3✔
2522
}
3✔
2523

2524
// commitCircuits persistently adds a circuit to the switch's circuit map.
2525
func (s *Switch) commitCircuits(circuits ...*PaymentCircuit) (
2526
        *CircuitFwdActions, error) {
×
2527

×
2528
        return s.circuits.CommitCircuits(circuits...)
×
2529
}
×
2530

2531
// FlushForwardingEvents flushes out the set of pending forwarding events to
2532
// the persistent log. This will be used by the switch to periodically flush
2533
// out the set of forwarding events to disk. External callers can also use this
2534
// method to ensure all data is flushed to dis before querying the log.
2535
func (s *Switch) FlushForwardingEvents() error {
3✔
2536
        // First, we'll obtain a copy of the current set of pending forwarding
3✔
2537
        // events.
3✔
2538
        s.fwdEventMtx.Lock()
3✔
2539

3✔
2540
        // If we won't have any forwarding events, then we can exit early.
3✔
2541
        if len(s.pendingFwdingEvents) == 0 {
6✔
2542
                s.fwdEventMtx.Unlock()
3✔
2543
                return nil
3✔
2544
        }
3✔
2545

2546
        events := make([]channeldb.ForwardingEvent, len(s.pendingFwdingEvents))
3✔
2547
        copy(events[:], s.pendingFwdingEvents[:])
3✔
2548

3✔
2549
        // With the copy obtained, we can now clear out the header pointer of
3✔
2550
        // the current slice. This way, we can re-use the underlying storage
3✔
2551
        // allocated for the slice.
3✔
2552
        s.pendingFwdingEvents = s.pendingFwdingEvents[:0]
3✔
2553
        s.fwdEventMtx.Unlock()
3✔
2554

3✔
2555
        // Finally, we'll write out the copied events to the persistent
3✔
2556
        // forwarding log.
3✔
2557
        return s.cfg.FwdingLog.AddForwardingEvents(events)
3✔
2558
}
2559

2560
// BestHeight returns the best height known to the switch.
2561
func (s *Switch) BestHeight() uint32 {
3✔
2562
        return atomic.LoadUint32(&s.bestHeight)
3✔
2563
}
3✔
2564

2565
// dustExceedsFeeThreshold takes in a ChannelLink, HTLC amount, and a boolean
2566
// to determine whether the default fee threshold has been exceeded. This
2567
// heuristic takes into account the trimmed-to-dust mechanism. The sum of the
2568
// commitment's dust with the mailbox's dust with the amount is checked against
2569
// the fee exposure threshold. If incoming is true, then the amount is not
2570
// included in the sum as it was already included in the commitment's dust. A
2571
// boolean is returned telling the caller whether the HTLC should be failed
2572
// back.
2573
func (s *Switch) dustExceedsFeeThreshold(link ChannelLink,
2574
        amount lnwire.MilliSatoshi, incoming bool) bool {
3✔
2575

3✔
2576
        // Retrieve the link's current commitment feerate and dustClosure.
3✔
2577
        feeRate := link.getFeeRate()
3✔
2578
        isDust := link.getDustClosure()
3✔
2579

3✔
2580
        // Evaluate if the HTLC is dust on either sides' commitment.
3✔
2581
        isLocalDust := isDust(
3✔
2582
                feeRate, incoming, lntypes.Local, amount.ToSatoshis(),
3✔
2583
        )
3✔
2584
        isRemoteDust := isDust(
3✔
2585
                feeRate, incoming, lntypes.Remote, amount.ToSatoshis(),
3✔
2586
        )
3✔
2587

3✔
2588
        if !(isLocalDust || isRemoteDust) {
6✔
2589
                // If the HTLC is not dust on either commitment, it's fine to
3✔
2590
                // forward.
3✔
2591
                return false
3✔
2592
        }
3✔
2593

2594
        // Fetch the dust sums currently in the mailbox for this link.
2595
        cid := link.ChanID()
3✔
2596
        sid := link.ShortChanID()
3✔
2597
        mailbox := s.mailOrchestrator.GetOrCreateMailBox(cid, sid)
3✔
2598
        localMailDust, remoteMailDust := mailbox.DustPackets()
3✔
2599

3✔
2600
        // If the htlc is dust on the local commitment, we'll obtain the dust
3✔
2601
        // sum for it.
3✔
2602
        if isLocalDust {
6✔
2603
                localSum := link.getDustSum(
3✔
2604
                        lntypes.Local, fn.None[chainfee.SatPerKWeight](),
3✔
2605
                )
3✔
2606
                localSum += localMailDust
3✔
2607

3✔
2608
                // Optionally include the HTLC amount only for outgoing
3✔
2609
                // HTLCs.
3✔
2610
                if !incoming {
6✔
2611
                        localSum += amount
3✔
2612
                }
3✔
2613

2614
                // Finally check against the defined fee threshold.
2615
                if localSum > s.cfg.MaxFeeExposure {
3✔
2616
                        return true
×
2617
                }
×
2618
        }
2619

2620
        // Also check if the htlc is dust on the remote commitment, if we've
2621
        // reached this point.
2622
        if isRemoteDust {
6✔
2623
                remoteSum := link.getDustSum(
3✔
2624
                        lntypes.Remote, fn.None[chainfee.SatPerKWeight](),
3✔
2625
                )
3✔
2626
                remoteSum += remoteMailDust
3✔
2627

3✔
2628
                // Optionally include the HTLC amount only for outgoing
3✔
2629
                // HTLCs.
3✔
2630
                if !incoming {
6✔
2631
                        remoteSum += amount
3✔
2632
                }
3✔
2633

2634
                // Finally check against the defined fee threshold.
2635
                if remoteSum > s.cfg.MaxFeeExposure {
3✔
2636
                        return true
×
2637
                }
×
2638
        }
2639

2640
        // If we reached this point, this HTLC is fine to forward.
2641
        return false
3✔
2642
}
2643

2644
// failMailboxUpdate is passed to the mailbox orchestrator which in turn passes
2645
// it to individual mailboxes. It allows the mailboxes to construct a
2646
// FailureMessage when failing back HTLC's due to expiry and may include an
2647
// alias in the ShortChannelID field. The outgoingScid is the SCID originally
2648
// used in the onion. The mailboxScid is the SCID that the mailbox and link
2649
// use. The mailboxScid is only used in the non-alias case, so it is always
2650
// the confirmed SCID.
2651
func (s *Switch) failMailboxUpdate(outgoingScid,
2652
        mailboxScid lnwire.ShortChannelID) lnwire.FailureMessage {
3✔
2653

3✔
2654
        // Try to use the failAliasUpdate function in case this is a channel
3✔
2655
        // that uses aliases. If it returns nil, we'll fallback to the original
3✔
2656
        // pre-alias behavior.
3✔
2657
        update := s.failAliasUpdate(outgoingScid, false)
3✔
2658
        if update == nil {
6✔
2659
                // Execute the fallback behavior.
3✔
2660
                var err error
3✔
2661
                update, err = s.cfg.FetchLastChannelUpdate(mailboxScid)
3✔
2662
                if err != nil {
3✔
2663
                        return &lnwire.FailTemporaryNodeFailure{}
×
2664
                }
×
2665
        }
2666

2667
        return lnwire.NewTemporaryChannelFailure(update)
3✔
2668
}
2669

2670
// failAliasUpdate prepares a ChannelUpdate for a failed incoming or outgoing
2671
// HTLC on a channel where the option-scid-alias feature bit was negotiated. If
2672
// the associated channel is not one of these, this function will return nil
2673
// and the caller is expected to handle this properly. In this case, a return
2674
// to the original non-alias behavior is expected.
2675
func (s *Switch) failAliasUpdate(scid lnwire.ShortChannelID,
2676
        incoming bool) *lnwire.ChannelUpdate1 {
3✔
2677

3✔
2678
        // This function does not defer the unlocking because of the database
3✔
2679
        // lookups for ChannelUpdate.
3✔
2680
        s.indexMtx.RLock()
3✔
2681

3✔
2682
        if s.cfg.IsAlias(scid) {
6✔
2683
                // The alias SCID was used. In the incoming case this means
3✔
2684
                // the channel is zero-conf as the link sets the scid. In the
3✔
2685
                // outgoing case, the sender set the scid to use and may be
3✔
2686
                // either the alias or the confirmed one, if it exists.
3✔
2687
                realScid, ok := s.aliasToReal[scid]
3✔
2688
                if !ok {
3✔
2689
                        // The real, confirmed SCID does not exist yet. Find
×
2690
                        // the "base" SCID that the link uses via the
×
2691
                        // baseIndex. If we can't find it, return nil. This
×
2692
                        // means the channel is zero-conf.
×
2693
                        baseScid, ok := s.baseIndex[scid]
×
2694
                        s.indexMtx.RUnlock()
×
2695
                        if !ok {
×
2696
                                return nil
×
2697
                        }
×
2698

2699
                        update, err := s.cfg.FetchLastChannelUpdate(baseScid)
×
2700
                        if err != nil {
×
2701
                                return nil
×
2702
                        }
×
2703

2704
                        // Replace the baseScid with the passed-in alias.
2705
                        update.ShortChannelID = scid
×
2706
                        sig, err := s.cfg.SignAliasUpdate(update)
×
2707
                        if err != nil {
×
2708
                                return nil
×
2709
                        }
×
2710

2711
                        update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2712
                        if err != nil {
×
2713
                                return nil
×
2714
                        }
×
2715

2716
                        return update
×
2717
                }
2718

2719
                s.indexMtx.RUnlock()
3✔
2720

3✔
2721
                // Fetch the SCID via the confirmed SCID and replace it with
3✔
2722
                // the alias.
3✔
2723
                update, err := s.cfg.FetchLastChannelUpdate(realScid)
3✔
2724
                if err != nil {
6✔
2725
                        return nil
3✔
2726
                }
3✔
2727

2728
                // In the incoming case, we want to ensure that we don't leak
2729
                // the UTXO in case the channel is private. In the outgoing
2730
                // case, since the alias was used, we do the same thing.
2731
                update.ShortChannelID = scid
3✔
2732
                sig, err := s.cfg.SignAliasUpdate(update)
3✔
2733
                if err != nil {
3✔
2734
                        return nil
×
2735
                }
×
2736

2737
                update.Signature, err = lnwire.NewSigFromSignature(sig)
3✔
2738
                if err != nil {
3✔
2739
                        return nil
×
2740
                }
×
2741

2742
                return update
3✔
2743
        }
2744

2745
        // If the confirmed SCID is not in baseIndex, this is not an
2746
        // option-scid-alias or zero-conf channel.
2747
        baseScid, ok := s.baseIndex[scid]
3✔
2748
        if !ok {
6✔
2749
                s.indexMtx.RUnlock()
3✔
2750
                return nil
3✔
2751
        }
3✔
2752

2753
        // Fetch the link so we can get an alias to use in the ShortChannelID
2754
        // of the ChannelUpdate.
2755
        link, ok := s.forwardingIndex[baseScid]
×
2756
        s.indexMtx.RUnlock()
×
2757
        if !ok {
×
2758
                // This should never happen, but if it does for some reason,
×
2759
                // fallback to the old behavior.
×
2760
                return nil
×
2761
        }
×
2762

2763
        aliases := link.getAliases()
×
2764
        if len(aliases) == 0 {
×
2765
                // This should never happen, but if it does, fallback.
×
2766
                return nil
×
2767
        }
×
2768

2769
        // Fetch the ChannelUpdate via the real, confirmed SCID.
2770
        update, err := s.cfg.FetchLastChannelUpdate(scid)
×
2771
        if err != nil {
×
2772
                return nil
×
2773
        }
×
2774

2775
        // The incoming case will replace the ShortChannelID in the retrieved
2776
        // ChannelUpdate with the alias to ensure no privacy leak occurs. This
2777
        // would happen if a private non-zero-conf option-scid-alias
2778
        // feature-bit channel leaked its UTXO here rather than supplying an
2779
        // alias. In the outgoing case, the confirmed SCID was actually used
2780
        // for forwarding in the onion, so no replacement is necessary as the
2781
        // sender knows the scid.
2782
        if incoming {
×
2783
                // We will replace and sign the update with the first alias.
×
2784
                // Since this happens on the incoming side, it's not actually
×
2785
                // possible to know what the sender used in the onion.
×
2786
                update.ShortChannelID = aliases[0]
×
2787
                sig, err := s.cfg.SignAliasUpdate(update)
×
2788
                if err != nil {
×
2789
                        return nil
×
2790
                }
×
2791

2792
                update.Signature, err = lnwire.NewSigFromSignature(sig)
×
2793
                if err != nil {
×
2794
                        return nil
×
2795
                }
×
2796
        }
2797

2798
        return update
×
2799
}
2800

2801
// AddAliasForLink instructs the Switch to update its in-memory maps to reflect
2802
// that a link has a new alias.
2803
func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
2804
        alias lnwire.ShortChannelID) error {
×
2805

×
2806
        // Fetch the link so that we can update the underlying channel's set of
×
2807
        // aliases.
×
2808
        s.indexMtx.RLock()
×
2809
        link, err := s.getLink(chanID)
×
2810
        s.indexMtx.RUnlock()
×
2811
        if err != nil {
×
2812
                return err
×
2813
        }
×
2814

2815
        // If the link is a channel where the option-scid-alias feature bit was
2816
        // not negotiated, we'll return an error.
2817
        if !link.negotiatedAliasFeature() {
×
2818
                return fmt.Errorf("attempted to update non-alias channel")
×
2819
        }
×
2820

2821
        linkScid := link.ShortChanID()
×
2822

×
2823
        // We'll update the maps so the Switch includes this alias in its
×
2824
        // forwarding decisions.
×
2825
        if link.isZeroConf() {
×
2826
                if link.zeroConfConfirmed() {
×
2827
                        // If the channel has confirmed on-chain, we'll
×
2828
                        // add this alias to the aliasToReal map.
×
2829
                        confirmedScid := link.confirmedScid()
×
2830

×
2831
                        s.aliasToReal[alias] = confirmedScid
×
2832
                }
×
2833

2834
                // Add this alias to the baseIndex mapping.
2835
                s.baseIndex[alias] = linkScid
×
2836
        } else if link.negotiatedAliasFeature() {
×
2837
                // The channel is confirmed, so we'll populate the aliasToReal
×
2838
                // and baseIndex maps.
×
2839
                s.aliasToReal[alias] = linkScid
×
2840
                s.baseIndex[alias] = linkScid
×
2841
        }
×
2842

2843
        return nil
×
2844
}
2845

2846
// handlePacketAdd handles forwarding an Add packet.
2847
func (s *Switch) handlePacketAdd(packet *htlcPacket,
2848
        htlc *lnwire.UpdateAddHTLC) error {
3✔
2849

3✔
2850
        // Check if the node is set to reject all onward HTLCs and also make
3✔
2851
        // sure that HTLC is not from the source node.
3✔
2852
        if s.cfg.RejectHTLC {
6✔
2853
                failure := NewDetailedLinkError(
3✔
2854
                        &lnwire.FailChannelDisabled{},
3✔
2855
                        OutgoingFailureForwardsDisabled,
3✔
2856
                )
3✔
2857

3✔
2858
                return s.failAddPacket(packet, failure)
3✔
2859
        }
3✔
2860

2861
        // Before we attempt to find a non-strict forwarding path for this
2862
        // htlc, check whether the htlc is being routed over the same incoming
2863
        // and outgoing channel. If our node does not allow forwards of this
2864
        // nature, we fail the htlc early. This check is in place to disallow
2865
        // inefficiently routed htlcs from locking up our balance. With
2866
        // channels where the option-scid-alias feature was negotiated, we also
2867
        // have to be sure that the IDs aren't the same since one or both could
2868
        // be an alias.
2869
        linkErr := s.checkCircularForward(
3✔
2870
                packet.incomingChanID, packet.outgoingChanID,
3✔
2871
                s.cfg.AllowCircularRoute, htlc.PaymentHash,
3✔
2872
        )
3✔
2873
        if linkErr != nil {
3✔
2874
                return s.failAddPacket(packet, linkErr)
×
2875
        }
×
2876

2877
        s.indexMtx.RLock()
3✔
2878
        targetLink, err := s.getLinkByMapping(packet)
3✔
2879
        if err != nil {
6✔
2880
                s.indexMtx.RUnlock()
3✔
2881

3✔
2882
                log.Debugf("unable to find link with "+
3✔
2883
                        "destination %v", packet.outgoingChanID)
3✔
2884

3✔
2885
                // If packet was forwarded from another channel link than we
3✔
2886
                // should notify this link that some error occurred.
3✔
2887
                linkError := NewLinkError(
3✔
2888
                        &lnwire.FailUnknownNextPeer{},
3✔
2889
                )
3✔
2890

3✔
2891
                return s.failAddPacket(packet, linkError)
3✔
2892
        }
3✔
2893
        targetPeerKey := targetLink.PeerPubKey()
3✔
2894
        interfaceLinks, _ := s.getLinks(targetPeerKey)
3✔
2895
        s.indexMtx.RUnlock()
3✔
2896

3✔
2897
        // We'll keep track of any HTLC failures during the link selection
3✔
2898
        // process. This way we can return the error for precise link that the
3✔
2899
        // sender selected, while optimistically trying all links to utilize
3✔
2900
        // our available bandwidth.
3✔
2901
        linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
3✔
2902

3✔
2903
        // Find all destination channel links with appropriate bandwidth.
3✔
2904
        var destinations []ChannelLink
3✔
2905
        for _, link := range interfaceLinks {
6✔
2906
                var failure *LinkError
3✔
2907

3✔
2908
                // We'll skip any links that aren't yet eligible for
3✔
2909
                // forwarding.
3✔
2910
                if !link.EligibleToForward() {
3✔
2911
                        failure = NewDetailedLinkError(
×
2912
                                &lnwire.FailUnknownNextPeer{},
×
2913
                                OutgoingFailureLinkNotEligible,
×
2914
                        )
×
2915
                } else {
3✔
2916
                        // We'll ensure that the HTLC satisfies the current
3✔
2917
                        // forwarding conditions of this target link.
3✔
2918
                        currentHeight := atomic.LoadUint32(&s.bestHeight)
3✔
2919
                        failure = link.CheckHtlcForward(
3✔
2920
                                htlc.PaymentHash, packet.incomingAmount,
3✔
2921
                                packet.amount, packet.incomingTimeout,
3✔
2922
                                packet.outgoingTimeout,
3✔
2923
                                packet.inboundFee,
3✔
2924
                                currentHeight,
3✔
2925
                                packet.originalOutgoingChanID,
3✔
2926
                        )
3✔
2927
                }
3✔
2928

2929
                // If this link can forward the htlc, add it to the set of
2930
                // destinations.
2931
                if failure == nil {
6✔
2932
                        destinations = append(destinations, link)
3✔
2933
                        continue
3✔
2934
                }
2935

2936
                linkErrs[link.ShortChanID()] = failure
3✔
2937
        }
2938

2939
        // If we had a forwarding failure due to the HTLC not satisfying the
2940
        // current policy, then we'll send back an error, but ensure we send
2941
        // back the error sourced at the *target* link.
2942
        if len(destinations) == 0 {
6✔
2943
                // At this point, some or all of the links rejected the HTLC so
3✔
2944
                // we couldn't forward it. So we'll try to look up the error
3✔
2945
                // that came from the source.
3✔
2946
                linkErr, ok := linkErrs[packet.outgoingChanID]
3✔
2947
                if !ok {
3✔
2948
                        // If we can't find the error of the source, then we'll
×
2949
                        // return an unknown next peer, though this should
×
2950
                        // never happen.
×
2951
                        linkErr = NewLinkError(
×
2952
                                &lnwire.FailUnknownNextPeer{},
×
2953
                        )
×
2954
                        log.Warnf("unable to find err source for "+
×
2955
                                "outgoing_link=%v, errors=%v",
×
2956
                                packet.outgoingChanID,
×
2957
                                lnutils.SpewLogClosure(linkErrs))
×
2958
                }
×
2959

2960
                log.Tracef("incoming HTLC(%x) violated "+
3✔
2961
                        "target outgoing link (id=%v) policy: %v",
3✔
2962
                        htlc.PaymentHash[:], packet.outgoingChanID,
3✔
2963
                        linkErr)
3✔
2964

3✔
2965
                return s.failAddPacket(packet, linkErr)
3✔
2966
        }
2967

2968
        // Choose a random link out of the set of links that can forward this
2969
        // htlc. The reason for randomization is to evenly distribute the htlc
2970
        // load without making assumptions about what the best channel is.
2971
        //nolint:gosec
2972
        destination := destinations[rand.Intn(len(destinations))]
3✔
2973

3✔
2974
        // Retrieve the incoming link by its ShortChannelID. Note that the
3✔
2975
        // incomingChanID is never set to hop.Source here.
3✔
2976
        s.indexMtx.RLock()
3✔
2977
        incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
3✔
2978
        s.indexMtx.RUnlock()
3✔
2979
        if err != nil {
3✔
2980
                // If we couldn't find the incoming link, we can't evaluate the
×
2981
                // incoming's exposure to dust, so we just fail the HTLC back.
×
2982
                linkErr := NewLinkError(
×
2983
                        &lnwire.FailTemporaryChannelFailure{},
×
2984
                )
×
2985

×
2986
                return s.failAddPacket(packet, linkErr)
×
2987
        }
×
2988

2989
        // Evaluate whether this HTLC would increase our fee exposure over the
2990
        // threshold on the incoming link. If it does, fail it backwards.
2991
        if s.dustExceedsFeeThreshold(
3✔
2992
                incomingLink, packet.incomingAmount, true,
3✔
2993
        ) {
3✔
2994
                // The incoming dust exceeds the threshold, so we fail the add
×
2995
                // back.
×
2996
                linkErr := NewLinkError(
×
2997
                        &lnwire.FailTemporaryChannelFailure{},
×
2998
                )
×
2999

×
3000
                return s.failAddPacket(packet, linkErr)
×
3001
        }
×
3002

3003
        // Also evaluate whether this HTLC would increase our fee exposure over
3004
        // the threshold on the destination link. If it does, fail it back.
3005
        if s.dustExceedsFeeThreshold(
3✔
3006
                destination, packet.amount, false,
3✔
3007
        ) {
3✔
3008
                // The outgoing dust exceeds the threshold, so we fail the add
×
3009
                // back.
×
3010
                linkErr := NewLinkError(
×
3011
                        &lnwire.FailTemporaryChannelFailure{},
×
3012
                )
×
3013

×
3014
                return s.failAddPacket(packet, linkErr)
×
3015
        }
×
3016

3017
        // Send the packet to the destination channel link which manages the
3018
        // channel.
3019
        packet.outgoingChanID = destination.ShortChanID()
3✔
3020

3✔
3021
        return destination.handleSwitchPacket(packet)
3✔
3022
}
3023

3024
// handlePacketSettle handles forwarding a settle packet.
3025
func (s *Switch) handlePacketSettle(packet *htlcPacket) error {
3✔
3026
        // If the source of this packet has not been set, use the circuit map
3✔
3027
        // to lookup the origin.
3✔
3028
        circuit, err := s.closeCircuit(packet)
3✔
3029
        if err != nil {
6✔
3030
                return err
3✔
3031
        }
3✔
3032

3033
        // closeCircuit returns a nil circuit when a settle packet returns an
3034
        // ErrUnknownCircuit error upon the inner call to CloseCircuit.
3035
        //
3036
        // NOTE: We can only get a nil circuit when it has already been deleted
3037
        // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck`
3038
        // is received, which invokes `processRemoteSettleFails` in its link.
3039
        if circuit == nil {
6✔
3040
                log.Debugf("Found nil circuit: packet=%v", spew.Sdump(packet))
3✔
3041
                return nil
3✔
3042
        }
3✔
3043

3044
        localHTLC := packet.incomingChanID == hop.Source
3✔
3045

3✔
3046
        // If this is a locally initiated HTLC, we need to handle the packet by
3✔
3047
        // storing the network result.
3✔
3048
        //
3✔
3049
        // A blank IncomingChanID in a circuit indicates that it is a pending
3✔
3050
        // user-initiated payment.
3✔
3051
        //
3✔
3052
        // NOTE: `closeCircuit` modifies the state of `packet`.
3✔
3053
        if localHTLC {
6✔
3054
                // TODO(yy): remove the goroutine and send back the error here.
3✔
3055
                s.handleLocalResponseWG.Add(1)
3✔
3056
                go s.handleLocalResponse(packet)
3✔
3057

3✔
3058
                // If this is a locally initiated HTLC, there's no need to
3✔
3059
                // forward it so we exit.
3✔
3060
                return nil
3✔
3061
        }
3✔
3062

3063
        // If this is an HTLC settle, and it wasn't from a locally initiated
3064
        // HTLC, then we'll log a forwarding event so we can flush it to disk
3065
        // later.
3066
        if circuit.Outgoing != nil {
6✔
3067
                log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
3✔
3068
                        "from IncomingChanID(%v) to OutgoingChanID(%v)",
3✔
3069
                        circuit.PaymentHash[:], circuit.OutgoingAmount,
3✔
3070
                        circuit.IncomingAmount-circuit.OutgoingAmount,
3✔
3071
                        circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
3✔
3072

3✔
3073
                s.fwdEventMtx.Lock()
3✔
3074
                s.pendingFwdingEvents = append(
3✔
3075
                        s.pendingFwdingEvents,
3✔
3076
                        channeldb.ForwardingEvent{
3✔
3077
                                Timestamp:      time.Now(),
3✔
3078
                                IncomingChanID: circuit.Incoming.ChanID,
3✔
3079
                                OutgoingChanID: circuit.Outgoing.ChanID,
3✔
3080
                                AmtIn:          circuit.IncomingAmount,
3✔
3081
                                AmtOut:         circuit.OutgoingAmount,
3✔
3082
                        },
3✔
3083
                )
3✔
3084
                s.fwdEventMtx.Unlock()
3✔
3085
        }
3✔
3086

3087
        // Deliver this packet.
3088
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
3✔
3089
}
3090

3091
// handlePacketFail handles forwarding a fail packet.
3092
func (s *Switch) handlePacketFail(packet *htlcPacket,
3093
        htlc *lnwire.UpdateFailHTLC) error {
3✔
3094

3✔
3095
        // If the source of this packet has not been set, use the circuit map
3✔
3096
        // to lookup the origin.
3✔
3097
        circuit, err := s.closeCircuit(packet)
3✔
3098
        if err != nil {
6✔
3099
                return err
3✔
3100
        }
3✔
3101

3102
        // If this is a locally initiated HTLC, we need to handle the packet by
3103
        // storing the network result.
3104
        //
3105
        // A blank IncomingChanID in a circuit indicates that it is a pending
3106
        // user-initiated payment.
3107
        //
3108
        // NOTE: `closeCircuit` modifies the state of `packet`.
3109
        if packet.incomingChanID == hop.Source {
6✔
3110
                // TODO(yy): remove the goroutine and send back the error here.
3✔
3111
                s.handleLocalResponseWG.Add(1)
3✔
3112
                go s.handleLocalResponse(packet)
3✔
3113

3✔
3114
                // If this is a locally initiated HTLC, there's no need to
3✔
3115
                // forward it so we exit.
3✔
3116
                return nil
3✔
3117
        }
3✔
3118

3119
        // Exit early if this hasSource is true. This flag is only set via
3120
        // mailbox's `FailAdd`. This method has two callsites,
3121
        // - the packet has timed out after `MailboxDeliveryTimeout`, defaults
3122
        //   to 1 min.
3123
        // - the HTLC fails the validation in `channel.AddHTLC`.
3124
        // In either case, the `Reason` field is populated. Thus there's no
3125
        // need to proceed and extract the failure reason below.
3126
        if packet.hasSource {
6✔
3127
                // Deliver this packet.
3✔
3128
                return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
3✔
3129
        }
3✔
3130

3131
        // HTLC resolutions and messages restored from disk don't have the
3132
        // obfuscator set from the original htlc add packet - set it here for
3133
        // use in blinded errors.
3134
        packet.obfuscator = circuit.ErrorEncrypter
3✔
3135

3✔
3136
        switch {
3✔
3137
        // No message to encrypt, locally sourced payment.
3138
        case circuit.ErrorEncrypter == nil:
×
3139
                // TODO(yy) further check this case as we shouldn't end up here
3140
                // as `isLocal` is already false.
3141

3142
        // If this is a resolution message, then we'll need to encrypt it as
3143
        // it's actually internally sourced.
3144
        case packet.isResolution:
3✔
3145
                var err error
3✔
3146
                // TODO(roasbeef): don't need to pass actually?
3✔
3147
                failure := &lnwire.FailPermanentChannelFailure{}
3✔
3148
                htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
3✔
3149
                        failure,
3✔
3150
                )
3✔
3151
                if err != nil {
3✔
3152
                        err = fmt.Errorf("unable to obfuscate error: %w", err)
×
3153
                        log.Error(err)
×
3154
                }
×
3155

3156
        // Alternatively, if the remote party sends us an
3157
        // UpdateFailMalformedHTLC, then we'll need to convert this into a
3158
        // proper well formatted onion error as there's no HMAC currently.
3159
        case packet.convertedError:
3✔
3160
                log.Infof("Converting malformed HTLC error for circuit for "+
3✔
3161
                        "Circuit(%x: (%s, %d) <-> (%s, %d))",
3✔
3162
                        packet.circuit.PaymentHash,
3✔
3163
                        packet.incomingChanID, packet.incomingHTLCID,
3✔
3164
                        packet.outgoingChanID, packet.outgoingHTLCID)
3✔
3165

3✔
3166
                htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
3✔
3167
                        htlc.Reason,
3✔
3168
                )
3✔
3169

3170
        default:
3✔
3171
                // Otherwise, it's a forwarded error, so we'll perform a
3✔
3172
                // wrapper encryption as normal.
3✔
3173
                htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
3✔
3174
                        htlc.Reason,
3✔
3175
                )
3✔
3176
        }
3177

3178
        // Deliver this packet.
3179
        return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
3✔
3180
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc