• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12583319996

02 Jan 2025 01:38PM UTC coverage: 57.522% (-1.1%) from 58.598%
12583319996

Pull #9361

github

starius
fn/ContextGuard: use context.AfterFunc to wait

Simplifies context cancellation handling by using context.AfterFunc instead of a
goroutine to wait for context cancellation. This approach avoids the overhead of
a goroutine during the waiting period.

For ctxQuitUnsafe, since g.quit is closed only in the Quit method (which also
cancels all associated contexts), waiting on context cancellation ensures the
same behavior without unnecessary dependency on g.quit.

Added a test to ensure that the Create method does not launch any goroutines.
Pull Request #9361: fn: optimize context guard

102587 of 178344 relevant lines covered (57.52%)

24734.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.9
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
49
        // with the same identifier from being batched together.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
72
func (p Params) String() string {
×
73
        deadline := "none"
×
74
        p.DeadlineHeight.WhenSome(func(d int32) {
×
75
                deadline = fmt.Sprintf("%d", d)
×
76
        })
×
77

78
        exclusiveGroup := "none"
×
79
        if p.ExclusiveGroup != nil {
×
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
×
81
        }
×
82

83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
×
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
×
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
×
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Failed is the state when a pending input has too many failed publish
123
        // atttempts or unknown broadcast error is returned.
124
        Failed
125
)
126

127
// String gives a human readable text for the sweep states.
128
func (s SweepState) String() string {
×
129
        switch s {
×
130
        case Init:
×
131
                return "Init"
×
132

133
        case PendingPublish:
×
134
                return "PendingPublish"
×
135

136
        case Published:
×
137
                return "Published"
×
138

139
        case PublishFailed:
×
140
                return "PublishFailed"
×
141

142
        case Swept:
×
143
                return "Swept"
×
144

145
        case Excluded:
×
146
                return "Excluded"
×
147

148
        case Failed:
×
149
                return "Failed"
×
150

151
        default:
×
152
                return "Unknown"
×
153
        }
154
}
155

156
// RBFInfo stores the information required to perform a RBF bump on a pending
157
// sweeping tx.
158
type RBFInfo struct {
159
        // Txid is the txid of the sweeping tx.
160
        Txid chainhash.Hash
161

162
        // FeeRate is the fee rate of the sweeping tx.
163
        FeeRate chainfee.SatPerKWeight
164

165
        // Fee is the total fee of the sweeping tx.
166
        Fee btcutil.Amount
167
}
168

169
// SweeperInput is created when an input reaches the main loop for the first
170
// time. It wraps the input and tracks all relevant state that is needed for
171
// sweeping.
172
type SweeperInput struct {
173
        input.Input
174

175
        // state tracks the current state of the input.
176
        state SweepState
177

178
        // listeners is a list of channels over which the final outcome of the
179
        // sweep needs to be broadcasted.
180
        listeners []chan Result
181

182
        // ntfnRegCancel is populated with a function that cancels the chain
183
        // notifier spend registration.
184
        ntfnRegCancel func()
185

186
        // publishAttempts records the number of attempts that have already been
187
        // made to sweep this tx.
188
        publishAttempts int
189

190
        // params contains the parameters that control the sweeping process.
191
        params Params
192

193
        // lastFeeRate is the most recent fee rate used for this input within a
194
        // transaction broadcast to the network.
195
        lastFeeRate chainfee.SatPerKWeight
196

197
        // rbf records the RBF constraints.
198
        rbf fn.Option[RBFInfo]
199

200
        // DeadlineHeight is the deadline height for this input. This is
201
        // different from the DeadlineHeight in its params as it's an actual
202
        // value than an option.
203
        DeadlineHeight int32
204
}
205

206
// String returns a human readable interpretation of the pending input.
207
func (p *SweeperInput) String() string {
26✔
208
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
26✔
209
}
26✔
210

211
// terminated returns a boolean indicating whether the input has reached a
212
// final state.
213
func (p *SweeperInput) terminated() bool {
22✔
214
        switch p.state {
22✔
215
        // If the input has reached a final state, that it's either
216
        // been swept, or failed, or excluded, we will remove it from
217
        // our sweeper.
218
        case Failed, Swept, Excluded:
8✔
219
                return true
8✔
220

221
        default:
14✔
222
                return false
14✔
223
        }
224
}
225

226
// isMature returns a boolean indicating whether the input has a timelock that
227
// has been reached or not. The locktime found is also returned.
228
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
4✔
229
        locktime, _ := p.RequiredLockTime()
4✔
230
        if currentHeight < locktime {
5✔
231
                log.Debugf("Input %v has locktime=%v, current height is %v",
1✔
232
                        p, locktime, currentHeight)
1✔
233

1✔
234
                return false, locktime
1✔
235
        }
1✔
236

237
        // If the input has a CSV that's not yet reached, we will skip
238
        // this input and wait for the expiry.
239
        //
240
        // NOTE: We need to consider whether this input can be included in the
241
        // next block or not, which means the CSV will be checked against the
242
        // currentHeight plus one.
243
        locktime = p.BlocksToMaturity() + p.HeightHint()
3✔
244
        if currentHeight+1 < locktime {
4✔
245
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
1✔
246
                        "skipped sweeping", p, locktime, currentHeight)
1✔
247

1✔
248
                return false, locktime
1✔
249
        }
1✔
250

251
        return true, locktime
2✔
252
}
253

254
// InputsMap is a type alias for a set of pending inputs.
255
type InputsMap = map[wire.OutPoint]*SweeperInput
256

257
// inputsMapToString returns a human readable interpretation of the pending
258
// inputs.
259
func inputsMapToString(inputs InputsMap) string {
×
260
        if len(inputs) == 0 {
×
261
                return ""
×
262
        }
×
263

264
        inps := make([]input.Input, 0, len(inputs))
×
265
        for _, in := range inputs {
×
266
                inps = append(inps, in)
×
267
        }
×
268

269
        return "\n" + inputTypeSummary(inps)
×
270
}
271

272
// pendingSweepsReq is an internal message we'll use to represent an external
273
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
274
// attempting to sweep.
275
type pendingSweepsReq struct {
276
        respChan chan map[wire.OutPoint]*PendingInputResponse
277
        errChan  chan error
278
}
279

280
// PendingInputResponse contains information about an input that is currently
281
// being swept by the UtxoSweeper.
282
type PendingInputResponse struct {
283
        // OutPoint is the identify outpoint of the input being swept.
284
        OutPoint wire.OutPoint
285

286
        // WitnessType is the witness type of the input being swept.
287
        WitnessType input.WitnessType
288

289
        // Amount is the amount of the input being swept.
290
        Amount btcutil.Amount
291

292
        // LastFeeRate is the most recent fee rate used for the input being
293
        // swept within a transaction broadcast to the network.
294
        LastFeeRate chainfee.SatPerKWeight
295

296
        // BroadcastAttempts is the number of attempts we've made to sweept the
297
        // input.
298
        BroadcastAttempts int
299

300
        // Params contains the sweep parameters for this pending request.
301
        Params Params
302

303
        // DeadlineHeight records the deadline height of this input.
304
        DeadlineHeight uint32
305
}
306

307
// updateReq is an internal message we'll use to represent an external caller's
308
// intent to update the sweep parameters of a given input.
309
type updateReq struct {
310
        input        wire.OutPoint
311
        params       Params
312
        responseChan chan *updateResp
313
}
314

315
// updateResp is an internal message we'll use to hand off the response of a
316
// updateReq from the UtxoSweeper's main event loop back to the caller.
317
type updateResp struct {
318
        resultChan chan Result
319
        err        error
320
}
321

322
// UtxoSweeper is responsible for sweeping outputs back into the wallet
323
type UtxoSweeper struct {
324
        started uint32 // To be used atomically.
325
        stopped uint32 // To be used atomically.
326

327
        // Embed the blockbeat consumer struct to get access to the method
328
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
329
        chainio.BeatConsumer
330

331
        cfg *UtxoSweeperConfig
332

333
        newInputs chan *sweepInputMessage
334
        spendChan chan *chainntnfs.SpendDetail
335

336
        // pendingSweepsReq is a channel that will be sent requests by external
337
        // callers in order to retrieve the set of pending inputs the
338
        // UtxoSweeper is attempting to sweep.
339
        pendingSweepsReqs chan *pendingSweepsReq
340

341
        // updateReqs is a channel that will be sent requests by external
342
        // callers who wish to bump the fee rate of a given input.
343
        updateReqs chan *updateReq
344

345
        // inputs is the total set of inputs the UtxoSweeper has been requested
346
        // to sweep.
347
        inputs InputsMap
348

349
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
350

351
        relayFeeRate chainfee.SatPerKWeight
352

353
        quit chan struct{}
354
        wg   sync.WaitGroup
355

356
        // currentHeight is the best known height of the main chain. This is
357
        // updated whenever a new block epoch is received.
358
        currentHeight int32
359

360
        // bumpRespChan is a channel that receives broadcast results from the
361
        // TxPublisher.
362
        bumpRespChan chan *bumpResp
363
}
364

365
// Compile-time check for the chainio.Consumer interface.
366
var _ chainio.Consumer = (*UtxoSweeper)(nil)
367

368
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
369
type UtxoSweeperConfig struct {
370
        // GenSweepScript generates a P2WKH script belonging to the wallet where
371
        // funds can be swept.
372
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
373

374
        // FeeEstimator is used when crafting sweep transactions to estimate
375
        // the necessary fee relative to the expected size of the sweep
376
        // transaction.
377
        FeeEstimator chainfee.Estimator
378

379
        // Wallet contains the wallet functions that sweeper requires.
380
        Wallet Wallet
381

382
        // Notifier is an instance of a chain notifier we'll use to watch for
383
        // certain on-chain events.
384
        Notifier chainntnfs.ChainNotifier
385

386
        // Mempool is the mempool watcher that will be used to query whether a
387
        // given input is already being spent by a transaction in the mempool.
388
        Mempool chainntnfs.MempoolWatcher
389

390
        // Store stores the published sweeper txes.
391
        Store SweeperStore
392

393
        // Signer is used by the sweeper to generate valid witnesses at the
394
        // time the incubated outputs need to be spent.
395
        Signer input.Signer
396

397
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
398
        // in a single sweep tx. If more need to be swept, multiple txes are
399
        // created and published.
400
        MaxInputsPerTx uint32
401

402
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
403
        MaxFeeRate chainfee.SatPerVByte
404

405
        // Aggregator is used to group inputs into clusters based on its
406
        // implemention-specific strategy.
407
        Aggregator UtxoAggregator
408

409
        // Publisher is used to publish the sweep tx crafted here and monitors
410
        // it for potential fee bumps.
411
        Publisher Bumper
412

413
        // NoDeadlineConfTarget is the conf target to use when sweeping
414
        // non-time-sensitive outputs.
415
        NoDeadlineConfTarget uint32
416
}
417

418
// Result is the struct that is pushed through the result channel. Callers can
419
// use this to be informed of the final sweep result. In case of a remote
420
// spend, Err will be ErrRemoteSpend.
421
type Result struct {
422
        // Err is the final result of the sweep. It is nil when the input is
423
        // swept successfully by us. ErrRemoteSpend is returned when another
424
        // party took the input.
425
        Err error
426

427
        // Tx is the transaction that spent the input.
428
        Tx *wire.MsgTx
429
}
430

431
// sweepInputMessage structs are used in the internal channel between the
432
// SweepInput call and the sweeper main loop.
433
type sweepInputMessage struct {
434
        input      input.Input
435
        params     Params
436
        resultChan chan Result
437
}
438

439
// New returns a new Sweeper instance.
440
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
16✔
441
        s := &UtxoSweeper{
16✔
442
                cfg:               cfg,
16✔
443
                newInputs:         make(chan *sweepInputMessage),
16✔
444
                spendChan:         make(chan *chainntnfs.SpendDetail),
16✔
445
                updateReqs:        make(chan *updateReq),
16✔
446
                pendingSweepsReqs: make(chan *pendingSweepsReq),
16✔
447
                quit:              make(chan struct{}),
16✔
448
                inputs:            make(InputsMap),
16✔
449
                bumpRespChan:      make(chan *bumpResp, 100),
16✔
450
        }
16✔
451

16✔
452
        // Mount the block consumer.
16✔
453
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
16✔
454

16✔
455
        return s
16✔
456
}
16✔
457

458
// Start starts the process of constructing and publish sweep txes.
459
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
×
460
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
×
461
                return nil
×
462
        }
×
463

464
        log.Info("Sweeper starting")
×
465

×
466
        // Retrieve relay fee for dust limit calculation. Assume that this will
×
467
        // not change from here on.
×
468
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
×
469

×
470
        // Set the current height.
×
471
        s.currentHeight = beat.Height()
×
472

×
473
        // Start sweeper main loop.
×
474
        s.wg.Add(1)
×
475
        go s.collector()
×
476

×
477
        return nil
×
478
}
479

480
// RelayFeePerKW returns the minimum fee rate required for transactions to be
481
// relayed.
482
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
483
        return s.relayFeeRate
×
484
}
×
485

486
// Stop stops sweeper from listening to block epochs and constructing sweep
487
// txes.
488
func (s *UtxoSweeper) Stop() error {
×
489
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
490
                return nil
×
491
        }
×
492

493
        log.Info("Sweeper shutting down...")
×
494
        defer log.Debug("Sweeper shutdown complete")
×
495

×
496
        close(s.quit)
×
497
        s.wg.Wait()
×
498

×
499
        return nil
×
500
}
501

502
// NOTE: part of the `chainio.Consumer` interface.
503
func (s *UtxoSweeper) Name() string {
16✔
504
        return "UtxoSweeper"
16✔
505
}
16✔
506

507
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
508
// swept after the batch time window ends. A custom fee preference can be
509
// provided to determine what fee rate should be used for the input. Note that
510
// the input may not always be swept with this exact value, as its possible for
511
// it to be batched under the same transaction with other similar fee rate
512
// inputs.
513
//
514
// NOTE: Extreme care needs to be taken that input isn't changed externally.
515
// Because it is an interface and we don't know what is exactly behind it, we
516
// cannot make a local copy in sweeper.
517
//
518
// TODO(yy): make sure the caller is using the Result chan.
519
func (s *UtxoSweeper) SweepInput(inp input.Input,
520
        params Params) (chan Result, error) {
×
521

×
522
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
×
523
                inp.SignDesc() == nil {
×
524

×
525
                return nil, errors.New("nil input received")
×
526
        }
×
527

528
        absoluteTimeLock, _ := inp.RequiredLockTime()
×
529
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
×
530
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
×
531
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
×
532
                inp.BlocksToMaturity(), absoluteTimeLock,
×
533
                btcutil.Amount(inp.SignDesc().Output.Value),
×
534
                inp.UnconfParent(), params)
×
535

×
536
        sweeperInput := &sweepInputMessage{
×
537
                input:      inp,
×
538
                params:     params,
×
539
                resultChan: make(chan Result, 1),
×
540
        }
×
541

×
542
        // Deliver input to the main event loop.
×
543
        select {
×
544
        case s.newInputs <- sweeperInput:
×
545
        case <-s.quit:
×
546
                return nil, ErrSweeperShuttingDown
×
547
        }
548

549
        return sweeperInput.resultChan, nil
×
550
}
551

552
// removeConflictSweepDescendants removes any transactions from the wallet that
553
// spend outputs included in the passed outpoint set. This needs to be done in
554
// cases where we're not the only ones that can sweep an output, but there may
555
// exist unconfirmed spends that spend outputs created by a sweep transaction.
556
// The most common case for this is when someone sweeps our anchor outputs
557
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
558
// as a backend when a channel is force closed and anchor cpfp txns are
559
// created to bump the initial commitment transaction. In this case an anchor
560
// cpfp is broadcasted for up to 3 commitment transactions (local,
561
// remote-dangling, remote). Using neutrino all of those transactions will be
562
// accepted (the commitment tx will be different in all of those cases) and have
563
// to be removed as soon as one of them confirmes (they do have the same
564
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
565
// nodes do not signal invalid transactions anymore.
566
func (s *UtxoSweeper) removeConflictSweepDescendants(
567
        outpoints map[wire.OutPoint]struct{}) error {
×
568

×
569
        // Obtain all the past sweeps that we've done so far. We'll need these
×
570
        // to ensure that if the spendingTx spends any of the same inputs, then
×
571
        // we remove any transaction that may be spending those inputs from the
×
572
        // wallet.
×
573
        //
×
574
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
×
575
        // from the store?
×
576
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
×
577
        if err != nil {
×
578
                return err
×
579
        }
×
580

581
        // We'll now go through each past transaction we published during this
582
        // epoch and cross reference the spent inputs. If there're any inputs
583
        // in common with the inputs the spendingTx spent, then we'll remove
584
        // those.
585
        //
586
        // TODO(roasbeef): need to start to remove all transaction hashes after
587
        // every N blocks (assumed point of no return)
588
        for _, sweepHash := range pastSweepHashes {
×
589
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
×
590
                if err != nil {
×
591
                        return err
×
592
                }
×
593

594
                // Transaction wasn't found in the wallet, may have already
595
                // been replaced/removed.
596
                if sweepTx == nil {
×
597
                        // If it was removed, then we'll play it safe and mark
×
598
                        // it as no longer need to be rebroadcasted.
×
599
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
600
                        continue
×
601
                }
602

603
                // Check to see if this past sweep transaction spent any of the
604
                // same inputs as spendingTx.
605
                var isConflicting bool
×
606
                for _, txIn := range sweepTx.TxIn {
×
607
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
×
608
                                isConflicting = true
×
609
                                break
×
610
                        }
611
                }
612

613
                if !isConflicting {
×
614
                        continue
×
615
                }
616

617
                // If it is conflicting, then we'll signal the wallet to remove
618
                // all the transactions that are descendants of outputs created
619
                // by the sweepTx and the sweepTx itself.
620
                log.Debugf("Removing sweep txid=%v from wallet: %v",
×
621
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
×
622

×
623
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
×
624
                if err != nil {
×
625
                        log.Warnf("Unable to remove descendants: %v", err)
×
626
                }
×
627

628
                // If this transaction was conflicting, then we'll stop
629
                // rebroadcasting it in the background.
630
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
631
        }
632

633
        return nil
×
634
}
635

636
// collector is the sweeper main loop. It processes new inputs, spend
637
// notifications and counts down to publication of the sweep tx.
638
func (s *UtxoSweeper) collector() {
×
639
        defer s.wg.Done()
×
640

×
641
        for {
×
642
                // Clean inputs, which will remove inputs that are swept,
×
643
                // failed, or excluded from the sweeper and return inputs that
×
644
                // are either new or has been published but failed back, which
×
645
                // will be retried again here.
×
646
                s.updateSweeperInputs()
×
647

×
648
                select {
×
649
                // A new inputs is offered to the sweeper. We check to see if
650
                // we are already trying to sweep this input and if not, set up
651
                // a listener to spend and schedule a sweep.
652
                case input := <-s.newInputs:
×
653
                        err := s.handleNewInput(input)
×
654
                        if err != nil {
×
655
                                log.Criticalf("Unable to handle new input: %v",
×
656
                                        err)
×
657

×
658
                                return
×
659
                        }
×
660

661
                        // If this input is forced, we perform an sweep
662
                        // immediately.
663
                        //
664
                        // TODO(ziggie): Make sure when `immediate` is selected
665
                        // as a parameter that we only trigger the sweeping of
666
                        // this specific input rather than triggering the sweeps
667
                        // of all current pending inputs registered with the
668
                        // sweeper.
669
                        if input.params.Immediate {
×
670
                                inputs := s.updateSweeperInputs()
×
671
                                s.sweepPendingInputs(inputs)
×
672
                        }
×
673

674
                // A spend of one of our inputs is detected. Signal sweep
675
                // results to the caller(s).
676
                case spend := <-s.spendChan:
×
677
                        s.handleInputSpent(spend)
×
678

679
                // A new external request has been received to retrieve all of
680
                // the inputs we're currently attempting to sweep.
681
                case req := <-s.pendingSweepsReqs:
×
682
                        s.handlePendingSweepsReq(req)
×
683

684
                // A new external request has been received to bump the fee rate
685
                // of a given input.
686
                case req := <-s.updateReqs:
×
687
                        resultChan, err := s.handleUpdateReq(req)
×
688
                        req.responseChan <- &updateResp{
×
689
                                resultChan: resultChan,
×
690
                                err:        err,
×
691
                        }
×
692

×
693
                        // Perform an sweep immediately if asked.
×
694
                        if req.params.Immediate {
×
695
                                inputs := s.updateSweeperInputs()
×
696
                                s.sweepPendingInputs(inputs)
×
697
                        }
×
698

699
                case resp := <-s.bumpRespChan:
×
700
                        // Handle the bump event.
×
701
                        err := s.handleBumpEvent(resp)
×
702
                        if err != nil {
×
703
                                log.Errorf("Failed to handle bump event: %v",
×
704
                                        err)
×
705
                        }
×
706

707
                // A new block comes in, update the bestHeight, perform a check
708
                // over all pending inputs and publish sweeping txns if needed.
709
                case beat := <-s.BlockbeatChan:
×
710
                        // Update the sweeper to the best height.
×
711
                        s.currentHeight = beat.Height()
×
712

×
713
                        // Update the inputs with the latest height.
×
714
                        inputs := s.updateSweeperInputs()
×
715

×
716
                        log.Debugf("Received new block: height=%v, attempt "+
×
717
                                "sweeping %d inputs:%s", s.currentHeight,
×
718
                                len(inputs),
×
719
                                lnutils.NewLogClosure(func() string {
×
720
                                        return inputsMapToString(inputs)
×
721
                                }))
×
722

723
                        // Attempt to sweep any pending inputs.
724
                        s.sweepPendingInputs(inputs)
×
725

×
726
                        // Notify we've processed the block.
×
727
                        s.NotifyBlockProcessed(beat, nil)
×
728

729
                case <-s.quit:
×
730
                        return
×
731
                }
732
        }
733
}
734

735
// removeExclusiveGroup removes all inputs in the given exclusive group. This
736
// function is called when one of the exclusive group inputs has been spent. The
737
// other inputs won't ever be spendable and can be removed. This also prevents
738
// them from being part of future sweep transactions that would fail. In
739
// addition sweep transactions of those inputs will be removed from the wallet.
740
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
×
741
        for outpoint, input := range s.inputs {
×
742
                outpoint := outpoint
×
743

×
744
                // Skip inputs that aren't exclusive.
×
745
                if input.params.ExclusiveGroup == nil {
×
746
                        continue
×
747
                }
748

749
                // Skip inputs from other exclusive groups.
750
                if *input.params.ExclusiveGroup != group {
×
751
                        continue
×
752
                }
753

754
                // Skip inputs that are already terminated.
755
                if input.terminated() {
×
756
                        log.Tracef("Skipped sending error result for "+
×
757
                                "input %v, state=%v", outpoint, input.state)
×
758

×
759
                        continue
×
760
                }
761

762
                // Signal result channels.
763
                s.signalResult(input, Result{
×
764
                        Err: ErrExclusiveGroupSpend,
×
765
                })
×
766

×
767
                // Update the input's state as it can no longer be swept.
×
768
                input.state = Excluded
×
769

×
770
                // Remove all unconfirmed transactions from the wallet which
×
771
                // spend the passed outpoint of the same exclusive group.
×
772
                outpoints := map[wire.OutPoint]struct{}{
×
773
                        outpoint: {},
×
774
                }
×
775
                err := s.removeConflictSweepDescendants(outpoints)
×
776
                if err != nil {
×
777
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
778
                                "wallet for outpoint %v : %v", outpoint, err)
×
779
                }
×
780
        }
781
}
782

783
// signalResult notifies the listeners of the final result of the input sweep.
784
// It also cancels any pending spend notification.
785
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
7✔
786
        op := pi.OutPoint()
7✔
787
        listeners := pi.listeners
7✔
788

7✔
789
        if result.Err == nil {
9✔
790
                log.Tracef("Dispatching sweep success for %v to %v listeners",
2✔
791
                        op, len(listeners),
2✔
792
                )
2✔
793
        } else {
7✔
794
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
5✔
795
                        op, len(listeners), result.Err,
5✔
796
                )
5✔
797
        }
5✔
798

799
        // Signal all listeners. Channel is buffered. Because we only send once
800
        // on every channel, it should never block.
801
        for _, resultChan := range listeners {
7✔
802
                resultChan <- result
×
803
        }
×
804

805
        // Cancel spend notification with chain notifier. This is not necessary
806
        // in case of a success, except for that a reorg could still happen.
807
        if pi.ntfnRegCancel != nil {
7✔
808
                log.Debugf("Canceling spend ntfn for %v", op)
×
809

×
810
                pi.ntfnRegCancel()
×
811
        }
×
812
}
813

814
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
815
// the tx. The output address is only marked as used if the publish succeeds.
816
func (s *UtxoSweeper) sweep(set InputSet) error {
2✔
817
        // Generate an output script if there isn't an unused script available.
2✔
818
        if s.currentOutputScript.IsNone() {
3✔
819
                addr, err := s.cfg.GenSweepScript().Unpack()
1✔
820
                if err != nil {
1✔
821
                        return fmt.Errorf("gen sweep script: %w", err)
×
822
                }
×
823
                s.currentOutputScript = fn.Some(addr)
1✔
824
        }
825

826
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
2✔
827
                fmt.Errorf("none sweep script"),
2✔
828
        )
2✔
829
        if err != nil {
2✔
830
                return err
×
831
        }
×
832

833
        // Create a fee bump request and ask the publisher to broadcast it. The
834
        // publisher will then take over and start monitoring the tx for
835
        // potential fee bump.
836
        req := &BumpRequest{
2✔
837
                Inputs:          set.Inputs(),
2✔
838
                Budget:          set.Budget(),
2✔
839
                DeadlineHeight:  set.DeadlineHeight(),
2✔
840
                DeliveryAddress: sweepAddr,
2✔
841
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
2✔
842
                StartingFeeRate: set.StartingFeeRate(),
2✔
843
                Immediate:       set.Immediate(),
2✔
844
                // TODO(yy): pass the strategy here.
2✔
845
        }
2✔
846

2✔
847
        // Reschedule the inputs that we just tried to sweep. This is done in
2✔
848
        // case the following publish fails, we'd like to update the inputs'
2✔
849
        // publish attempts and rescue them in the next sweep.
2✔
850
        s.markInputsPendingPublish(set)
2✔
851

2✔
852
        // Broadcast will return a read-only chan that we will listen to for
2✔
853
        // this publish result and future RBF attempt.
2✔
854
        resp := s.cfg.Publisher.Broadcast(req)
2✔
855

2✔
856
        // Successfully sent the broadcast attempt, we now handle the result by
2✔
857
        // subscribing to the result chan and listen for future updates about
2✔
858
        // this tx.
2✔
859
        s.wg.Add(1)
2✔
860
        go s.monitorFeeBumpResult(set, resp)
2✔
861

2✔
862
        return nil
2✔
863
}
864

865
// markInputsPendingPublish updates the pending inputs with the given tx
866
// inputs. It also increments the `publishAttempts`.
867
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
868
        // Reschedule sweep.
3✔
869
        for _, input := range set.Inputs() {
6✔
870
                op := input.OutPoint()
3✔
871
                pi, ok := s.inputs[op]
3✔
872
                if !ok {
3✔
873
                        // It could be that this input is an additional wallet
×
874
                        // input that was attached. In that case there also
×
875
                        // isn't a pending input to update.
×
876
                        log.Tracef("Skipped marking input as pending "+
×
877
                                "published: %v not found in pending inputs", op)
×
878

×
879
                        continue
×
880
                }
881

882
                // If this input has already terminated, there's clearly
883
                // something wrong as it would have been removed. In this case
884
                // we log an error and skip marking this input as pending
885
                // publish.
886
                if pi.terminated() {
4✔
887
                        log.Errorf("Expect input %v to not have terminated "+
1✔
888
                                "state, instead it has %v", op, pi.state)
1✔
889

1✔
890
                        continue
1✔
891
                }
892

893
                // Update the input's state.
894
                pi.state = PendingPublish
2✔
895

2✔
896
                // Record another publish attempt.
2✔
897
                pi.publishAttempts++
2✔
898
        }
899
}
900

901
// markInputsPublished updates the sweeping tx in db and marks the list of
902
// inputs as published.
903
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
4✔
904
        // Mark this tx in db once successfully published.
4✔
905
        //
4✔
906
        // NOTE: this will behave as an overwrite, which is fine as the record
4✔
907
        // is small.
4✔
908
        tr.Published = true
4✔
909
        err := s.cfg.Store.StoreTx(tr)
4✔
910
        if err != nil {
5✔
911
                return fmt.Errorf("store tx: %w", err)
1✔
912
        }
1✔
913

914
        // Reschedule sweep.
915
        for _, input := range set.Inputs() {
7✔
916
                op := input.OutPoint()
4✔
917
                pi, ok := s.inputs[op]
4✔
918
                if !ok {
4✔
919
                        // It could be that this input is an additional wallet
×
920
                        // input that was attached. In that case there also
×
921
                        // isn't a pending input to update.
×
922
                        log.Tracef("Skipped marking input as published: %v "+
×
923
                                "not found in pending inputs", op)
×
924

×
925
                        continue
×
926
                }
927

928
                // Valdiate that the input is in an expected state.
929
                if pi.state != PendingPublish {
5✔
930
                        // We may get a Published if this is a replacement tx.
1✔
931
                        log.Debugf("Expect input %v to have %v, instead it "+
1✔
932
                                "has %v", op, PendingPublish, pi.state)
1✔
933

1✔
934
                        continue
1✔
935
                }
936

937
                // Update the input's state.
938
                pi.state = Published
3✔
939

3✔
940
                // Update the input's latest fee rate.
3✔
941
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
942
        }
943

944
        return nil
3✔
945
}
946

947
// markInputsPublishFailed marks the list of inputs as failed to be published.
948
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
2✔
949
        // Reschedule sweep.
2✔
950
        for _, inp := range set.Inputs() {
12✔
951
                op := inp.OutPoint()
10✔
952
                pi, ok := s.inputs[op]
10✔
953
                if !ok {
10✔
954
                        // It could be that this input is an additional wallet
×
955
                        // input that was attached. In that case there also
×
956
                        // isn't a pending input to update.
×
957
                        log.Tracef("Skipped marking input as publish failed: "+
×
958
                                "%v not found in pending inputs", op)
×
959

×
960
                        continue
×
961
                }
962

963
                // Valdiate that the input is in an expected state.
964
                if pi.state != PendingPublish && pi.state != Published {
15✔
965
                        log.Debugf("Expect input %v to have %v, instead it "+
5✔
966
                                "has %v", op, PendingPublish, pi.state)
5✔
967

5✔
968
                        continue
5✔
969
                }
970

971
                log.Warnf("Failed to publish input %v", op)
5✔
972

5✔
973
                // Update the input's state.
5✔
974
                pi.state = PublishFailed
5✔
975
        }
976
}
977

978
// monitorSpend registers a spend notification with the chain notifier. It
979
// returns a cancel function that can be used to cancel the registration.
980
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
981
        script []byte, heightHint uint32) (func(), error) {
×
982

×
983
        log.Tracef("Wait for spend of %v at heightHint=%v",
×
984
                outpoint, heightHint)
×
985

×
986
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
×
987
                &outpoint, script, heightHint,
×
988
        )
×
989
        if err != nil {
×
990
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
991
        }
×
992

993
        s.wg.Add(1)
×
994
        go func() {
×
995
                defer s.wg.Done()
×
996

×
997
                select {
×
998
                case spend, ok := <-spendEvent.Spend:
×
999
                        if !ok {
×
1000
                                log.Debugf("Spend ntfn for %v canceled",
×
1001
                                        outpoint)
×
1002
                                return
×
1003
                        }
×
1004

1005
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
×
1006

×
1007
                        select {
×
1008
                        case s.spendChan <- spend:
×
1009
                                log.Debugf("Delivered spend ntfn for %v",
×
1010
                                        outpoint)
×
1011

1012
                        case <-s.quit:
×
1013
                        }
1014
                case <-s.quit:
×
1015
                }
1016
        }()
1017

1018
        return spendEvent.Cancel, nil
×
1019
}
1020

1021
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1022
// attempting to sweep.
1023
func (s *UtxoSweeper) PendingInputs() (
1024
        map[wire.OutPoint]*PendingInputResponse, error) {
×
1025

×
1026
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
×
1027
        errChan := make(chan error, 1)
×
1028
        select {
×
1029
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1030
                respChan: respChan,
1031
                errChan:  errChan,
1032
        }:
×
1033
        case <-s.quit:
×
1034
                return nil, ErrSweeperShuttingDown
×
1035
        }
1036

1037
        select {
×
1038
        case pendingSweeps := <-respChan:
×
1039
                return pendingSweeps, nil
×
1040
        case err := <-errChan:
×
1041
                return nil, err
×
1042
        case <-s.quit:
×
1043
                return nil, ErrSweeperShuttingDown
×
1044
        }
1045
}
1046

1047
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1048
// UtxoSweeper is attempting to sweep.
1049
func (s *UtxoSweeper) handlePendingSweepsReq(
1050
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
×
1051

×
1052
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
×
1053
        for _, inp := range s.inputs {
×
1054
                // Skip immature inputs for compatibility.
×
1055
                mature, _ := inp.isMature(uint32(s.currentHeight))
×
1056
                if !mature {
×
1057
                        continue
×
1058
                }
1059

1060
                // Only the exported fields are set, as we expect the response
1061
                // to only be consumed externally.
1062
                op := inp.OutPoint()
×
1063
                resps[op] = &PendingInputResponse{
×
1064
                        OutPoint:    op,
×
1065
                        WitnessType: inp.WitnessType(),
×
1066
                        Amount: btcutil.Amount(
×
1067
                                inp.SignDesc().Output.Value,
×
1068
                        ),
×
1069
                        LastFeeRate:       inp.lastFeeRate,
×
1070
                        BroadcastAttempts: inp.publishAttempts,
×
1071
                        Params:            inp.params,
×
1072
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
×
1073
                }
×
1074
        }
1075

1076
        select {
×
1077
        case req.respChan <- resps:
×
1078
        case <-s.quit:
×
1079
                log.Debug("Skipped sending pending sweep response due to " +
×
1080
                        "UtxoSweeper shutting down")
×
1081
        }
1082

1083
        return resps
×
1084
}
1085

1086
// UpdateParams allows updating the sweep parameters of a pending input in the
1087
// UtxoSweeper. This function can be used to provide an updated fee preference
1088
// and force flag that will be used for a new sweep transaction of the input
1089
// that will act as a replacement transaction (RBF) of the original sweeping
1090
// transaction, if any. The exclusive group is left unchanged.
1091
//
1092
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1093
// is actually successful. The responsibility of doing so should be handled by
1094
// the caller.
1095
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
1096
        params Params) (chan Result, error) {
×
1097

×
1098
        responseChan := make(chan *updateResp, 1)
×
1099
        select {
×
1100
        case s.updateReqs <- &updateReq{
1101
                input:        input,
1102
                params:       params,
1103
                responseChan: responseChan,
1104
        }:
×
1105
        case <-s.quit:
×
1106
                return nil, ErrSweeperShuttingDown
×
1107
        }
1108

1109
        select {
×
1110
        case response := <-responseChan:
×
1111
                return response.resultChan, response.err
×
1112
        case <-s.quit:
×
1113
                return nil, ErrSweeperShuttingDown
×
1114
        }
1115
}
1116

1117
// handleUpdateReq handles an update request by simply updating the sweep
1118
// parameters of the pending input. Currently, no validation is done on the new
1119
// fee preference to ensure it will properly create a replacement transaction.
1120
//
1121
// TODO(wilmer):
1122
//   - Validate fee preference to ensure we'll create a valid replacement
1123
//     transaction to allow the new fee rate to propagate throughout the
1124
//     network.
1125
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1126
//     did not exist in the original sweep transaction, resulting in an invalid
1127
//     replacement transaction.
1128
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
1129
        chan Result, error) {
×
1130

×
1131
        // If the UtxoSweeper is already trying to sweep this input, then we can
×
1132
        // simply just increase its fee rate. This will allow the input to be
×
1133
        // batched with others which also have a similar fee rate, creating a
×
1134
        // higher fee rate transaction that replaces the original input's
×
1135
        // sweeping transaction.
×
1136
        sweeperInput, ok := s.inputs[req.input]
×
1137
        if !ok {
×
1138
                return nil, lnwallet.ErrNotMine
×
1139
        }
×
1140

1141
        // Create the updated parameters struct. Leave the exclusive group
1142
        // unchanged.
1143
        newParams := Params{
×
1144
                StartingFeeRate: req.params.StartingFeeRate,
×
1145
                Immediate:       req.params.Immediate,
×
1146
                Budget:          req.params.Budget,
×
1147
                DeadlineHeight:  req.params.DeadlineHeight,
×
1148
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
×
1149
        }
×
1150

×
1151
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
×
1152
                req.input, sweeperInput.state, sweeperInput.params, newParams)
×
1153

×
1154
        sweeperInput.params = newParams
×
1155

×
1156
        // We need to reset the state so this input will be attempted again by
×
1157
        // our sweeper.
×
1158
        //
×
1159
        // TODO(yy): a dedicated state?
×
1160
        sweeperInput.state = Init
×
1161

×
1162
        // If the new input specifies a deadline, update the deadline height.
×
1163
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
×
1164
                sweeperInput.DeadlineHeight,
×
1165
        )
×
1166

×
1167
        resultChan := make(chan Result, 1)
×
1168
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
×
1169

×
1170
        return resultChan, nil
×
1171
}
1172

1173
// ListSweeps returns a list of the sweeps recorded by the sweep store.
1174
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
×
1175
        return s.cfg.Store.ListSweeps()
×
1176
}
×
1177

1178
// mempoolLookup takes an input's outpoint and queries the mempool to see
1179
// whether it's already been spent in a transaction found in the mempool.
1180
// Returns the transaction if found.
1181
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
7✔
1182
        // For neutrino backend, there's no mempool available, so we exit
7✔
1183
        // early.
7✔
1184
        if s.cfg.Mempool == nil {
8✔
1185
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1186

1✔
1187
                return fn.None[wire.MsgTx]()
1✔
1188
        }
1✔
1189

1190
        // Query this input in the mempool. If this outpoint is already spent
1191
        // in mempool, we should get a spending event back immediately.
1192
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
6✔
1193
}
1194

1195
// calculateDefaultDeadline calculates the default deadline height for a sweep
1196
// request that has no deadline height specified.
1197
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
×
1198
        // Create a default deadline height, which will be used when there's no
×
1199
        // DeadlineHeight specified for a given input.
×
1200
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
×
1201

×
1202
        // If the input is immature and has a locktime, we'll use the locktime
×
1203
        // height as the starting height.
×
1204
        matured, locktime := pi.isMature(uint32(s.currentHeight))
×
1205
        if !matured {
×
1206
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
×
1207
                log.Debugf("Input %v is immature, using locktime=%v instead "+
×
1208
                        "of current height=%d as starting height",
×
1209
                        pi.OutPoint(), locktime, s.currentHeight)
×
1210
        }
×
1211

1212
        return defaultDeadline
×
1213
}
1214

1215
// handleNewInput processes a new input by registering spend notification and
1216
// scheduling sweeping for it.
1217
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
×
1218
        outpoint := input.input.OutPoint()
×
1219
        pi, pending := s.inputs[outpoint]
×
1220
        if pending {
×
1221
                log.Infof("Already has pending input %v received, old params: "+
×
1222
                        "%v, new params %v", outpoint, pi.params, input.params)
×
1223

×
1224
                s.handleExistingInput(input, pi)
×
1225

×
1226
                return nil
×
1227
        }
×
1228

1229
        // This is a new input, and we want to query the mempool to see if this
1230
        // input has already been spent. If so, we'll start the input with
1231
        // state Published and attach the RBFInfo.
1232
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
×
1233

×
1234
        // Create a new pendingInput and initialize the listeners slice with
×
1235
        // the passed in result channel. If this input is offered for sweep
×
1236
        // again, the result channel will be appended to this slice.
×
1237
        pi = &SweeperInput{
×
1238
                state:     state,
×
1239
                listeners: []chan Result{input.resultChan},
×
1240
                Input:     input.input,
×
1241
                params:    input.params,
×
1242
                rbf:       rbfInfo,
×
1243
        }
×
1244

×
1245
        // Set the acutal deadline height.
×
1246
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
1247
                s.calculateDefaultDeadline(pi),
×
1248
        )
×
1249

×
1250
        s.inputs[outpoint] = pi
×
1251
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
×
1252

×
1253
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
×
1254
                "witness_type=%v, amount=%v, deadline=%d, state=%v, "+
×
1255
                "params=(%v)", s.currentHeight, pi.OutPoint(), pi.WitnessType(),
×
1256
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
×
1257
                pi.state, pi.params)
×
1258

×
1259
        // Start watching for spend of this input, either by us or the remote
×
1260
        // party.
×
1261
        cancel, err := s.monitorSpend(
×
1262
                outpoint, input.input.SignDesc().Output.PkScript,
×
1263
                input.input.HeightHint(),
×
1264
        )
×
1265
        if err != nil {
×
1266
                err := fmt.Errorf("wait for spend: %w", err)
×
1267
                s.markInputFailed(pi, err)
×
1268

×
1269
                return err
×
1270
        }
×
1271

1272
        pi.ntfnRegCancel = cancel
×
1273

×
1274
        return nil
×
1275
}
1276

1277
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1278
// already been spent. If so, the state Published will be returned, otherwise
1279
// state Init. When spent, it will query the sweeper store to fetch the fee
1280
// info of the spending transction, and construct an RBFInfo based on it.
1281
// Suppose an error occurs, fn.None is returned.
1282
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1283
        SweepState, fn.Option[RBFInfo]) {
4✔
1284

4✔
1285
        // Check if we can find the spending tx of this input in mempool.
4✔
1286
        txOption := s.mempoolLookup(op)
4✔
1287

4✔
1288
        // Extract the spending tx from the option.
4✔
1289
        var tx *wire.MsgTx
4✔
1290
        txOption.WhenSome(func(t wire.MsgTx) {
7✔
1291
                tx = &t
3✔
1292
        })
3✔
1293

1294
        // Exit early if it's not found.
1295
        //
1296
        // NOTE: this is not accurate for backends that don't support mempool
1297
        // lookup:
1298
        // - for neutrino we don't have a mempool.
1299
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1300
        if tx == nil {
5✔
1301
                return Init, fn.None[RBFInfo]()
1✔
1302
        }
1✔
1303

1304
        // Otherwise the input is already spent in the mempool, so eventually
1305
        // we will return Published.
1306
        //
1307
        // We also need to update the RBF info for this input. If the sweeping
1308
        // transaction is broadcast by us, we can find the fee info in the
1309
        // sweeper store.
1310
        txid := tx.TxHash()
3✔
1311
        tr, err := s.cfg.Store.GetTx(txid)
3✔
1312

3✔
1313
        // If the tx is not found in the store, it means it's not broadcast by
3✔
1314
        // us, hence we can't find the fee info. This is fine as, later on when
3✔
1315
        // this tx is confirmed, we will remove the input from our inputs.
3✔
1316
        if errors.Is(err, ErrTxNotFound) {
4✔
1317
                log.Warnf("Spending tx %v not found in sweeper store", txid)
1✔
1318
                return Published, fn.None[RBFInfo]()
1✔
1319
        }
1✔
1320

1321
        // Exit if we get an db error.
1322
        if err != nil {
3✔
1323
                log.Errorf("Unable to get tx %v from sweeper store: %v",
1✔
1324
                        txid, err)
1✔
1325

1✔
1326
                return Published, fn.None[RBFInfo]()
1✔
1327
        }
1✔
1328

1329
        // Prepare the fee info and return it.
1330
        rbf := fn.Some(RBFInfo{
1✔
1331
                Txid:    txid,
1✔
1332
                Fee:     btcutil.Amount(tr.Fee),
1✔
1333
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
1✔
1334
        })
1✔
1335

1✔
1336
        return Published, rbf
1✔
1337
}
1338

1339
// handleExistingInput processes an input that is already known to the sweeper.
1340
// It will overwrite the params of the old input with the new ones.
1341
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
1342
        oldInput *SweeperInput) {
×
1343

×
1344
        // Before updating the input details, check if an exclusive group was
×
1345
        // set. In case the same input is registered again without an exclusive
×
1346
        // group set, the previous input and its sweep parameters are outdated
×
1347
        // hence need to be replaced. This scenario currently only happens for
×
1348
        // anchor outputs. When a channel is force closed, in the worst case 3
×
1349
        // different sweeps with the same exclusive group are registered with
×
1350
        // the sweeper to bump the closing transaction (cpfp) when its time
×
1351
        // critical. Receiving an input which was already registered with the
×
1352
        // sweeper but now without an exclusive group means non of the previous
×
1353
        // inputs were used as CPFP, so we need to make sure we update the
×
1354
        // sweep parameters but also remove all inputs with the same exclusive
×
1355
        // group because the are outdated too.
×
1356
        var prevExclGroup *uint64
×
1357
        if oldInput.params.ExclusiveGroup != nil &&
×
1358
                input.params.ExclusiveGroup == nil {
×
1359

×
1360
                prevExclGroup = new(uint64)
×
1361
                *prevExclGroup = *oldInput.params.ExclusiveGroup
×
1362
        }
×
1363

1364
        // Update input details and sweep parameters. The re-offered input
1365
        // details may contain a change to the unconfirmed parent tx info.
1366
        oldInput.params = input.params
×
1367
        oldInput.Input = input.input
×
1368

×
1369
        // If the new input specifies a deadline, update the deadline height.
×
1370
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
1371
                oldInput.DeadlineHeight,
×
1372
        )
×
1373

×
1374
        // Add additional result channel to signal spend of this input.
×
1375
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
×
1376

×
1377
        if prevExclGroup != nil {
×
1378
                s.removeExclusiveGroup(*prevExclGroup)
×
1379
        }
×
1380
}
1381

1382
// handleInputSpent takes a spend event of our input and updates the sweeper's
1383
// internal state to remove the input.
1384
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
×
1385
        // Query store to find out if we ever published this tx.
×
1386
        spendHash := *spend.SpenderTxHash
×
1387
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
×
1388
        if err != nil {
×
1389
                log.Errorf("cannot determine if tx %v is ours: %v",
×
1390
                        spendHash, err)
×
1391
                return
×
1392
        }
×
1393

1394
        // If this isn't our transaction, it means someone else swept outputs
1395
        // that we were attempting to sweep. This can happen for anchor outputs
1396
        // as well as justice transactions. In this case, we'll notify the
1397
        // wallet to remove any spends that descent from this output.
1398
        if !isOurTx {
×
1399
                // Construct a map of the inputs this transaction spends.
×
1400
                spendingTx := spend.SpendingTx
×
1401
                inputsSpent := make(
×
1402
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
×
1403
                )
×
1404
                for _, txIn := range spendingTx.TxIn {
×
1405
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
×
1406
                }
×
1407

1408
                log.Debugf("Attempting to remove descendant txns invalidated "+
×
1409
                        "by (txid=%v): %v", spendingTx.TxHash(),
×
1410
                        spew.Sdump(spendingTx))
×
1411

×
1412
                err := s.removeConflictSweepDescendants(inputsSpent)
×
1413
                if err != nil {
×
1414
                        log.Warnf("unable to remove descendant transactions "+
×
1415
                                "due to tx %v: ", spendHash)
×
1416
                }
×
1417

1418
                log.Debugf("Detected third party spend related to in flight "+
×
1419
                        "inputs (is_ours=%v): %v", isOurTx,
×
1420
                        lnutils.SpewLogClosure(spend.SpendingTx))
×
1421
        }
1422

1423
        // We now use the spending tx to update the state of the inputs.
1424
        s.markInputsSwept(spend.SpendingTx, isOurTx)
×
1425
}
1426

1427
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1428
// It will also notify all the subscribers of this input.
1429
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
1✔
1430
        for _, txIn := range tx.TxIn {
5✔
1431
                outpoint := txIn.PreviousOutPoint
4✔
1432

4✔
1433
                // Check if this input is known to us. It could probably be
4✔
1434
                // unknown if we canceled the registration, deleted from inputs
4✔
1435
                // map but the ntfn was in-flight already. Or this could be not
4✔
1436
                // one of our inputs.
4✔
1437
                input, ok := s.inputs[outpoint]
4✔
1438
                if !ok {
5✔
1439
                        // It's very likely that a spending tx contains inputs
1✔
1440
                        // that we don't know.
1✔
1441
                        log.Tracef("Skipped marking input as swept: %v not "+
1✔
1442
                                "found in pending inputs", outpoint)
1✔
1443

1✔
1444
                        continue
1✔
1445
                }
1446

1447
                // This input may already been marked as swept by a previous
1448
                // spend notification, which is likely to happen as one sweep
1449
                // transaction usually sweeps multiple inputs.
1450
                if input.terminated() {
4✔
1451
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1452
                                "state=%v", outpoint, input.state)
1✔
1453

1✔
1454
                        continue
1✔
1455
                }
1456

1457
                input.state = Swept
2✔
1458

2✔
1459
                // Return either a nil or a remote spend result.
2✔
1460
                var err error
2✔
1461
                if !isOurTx {
2✔
1462
                        log.Warnf("Input=%v was spent by remote or third "+
×
1463
                                "party in tx=%v", outpoint, tx.TxHash())
×
1464
                        err = ErrRemoteSpend
×
1465
                }
×
1466

1467
                // Signal result channels.
1468
                s.signalResult(input, Result{
2✔
1469
                        Tx:  tx,
2✔
1470
                        Err: err,
2✔
1471
                })
2✔
1472

2✔
1473
                // Remove all other inputs in this exclusive group.
2✔
1474
                if input.params.ExclusiveGroup != nil {
2✔
1475
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
×
1476
                }
×
1477
        }
1478
}
1479

1480
// markInputFailed marks the given input as failed and won't be retried. It
1481
// will also notify all the subscribers of this input.
1482
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
5✔
1483
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
5✔
1484

5✔
1485
        pi.state = Failed
5✔
1486

5✔
1487
        s.signalResult(pi, Result{Err: err})
5✔
1488
}
5✔
1489

1490
// updateSweeperInputs updates the sweeper's internal state and returns a map
1491
// of inputs to be swept. It will remove the inputs that are in final states,
1492
// and returns a map of inputs that have either state Init or PublishFailed.
1493
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
1✔
1494
        // Create a map of inputs to be swept.
1✔
1495
        inputs := make(InputsMap)
1✔
1496

1✔
1497
        // Iterate the pending inputs and update the sweeper's state.
1✔
1498
        //
1✔
1499
        // TODO(yy): sweeper is made to communicate via go channels, so no
1✔
1500
        // locks are needed to access the map. However, it'd be safer if we
1✔
1501
        // turn this inputs map into a SyncMap in case we wanna add concurrent
1✔
1502
        // access to the map in the future.
1✔
1503
        for op, input := range s.inputs {
10✔
1504
                log.Tracef("Checking input: %s, state=%v", input, input.state)
9✔
1505

9✔
1506
                // If the input has reached a final state, that it's either
9✔
1507
                // been swept, or failed, or excluded, we will remove it from
9✔
1508
                // our sweeper.
9✔
1509
                if input.terminated() {
12✔
1510
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1511
                                input.state, op)
3✔
1512

3✔
1513
                        delete(s.inputs, op)
3✔
1514

3✔
1515
                        continue
3✔
1516
                }
1517

1518
                // If this input has been included in a sweep tx that's not
1519
                // published yet, we'd skip this input and wait for the sweep
1520
                // tx to be published.
1521
                if input.state == PendingPublish {
7✔
1522
                        continue
1✔
1523
                }
1524

1525
                // If this input has already been published, we will need to
1526
                // check the RBF condition before attempting another sweeping.
1527
                if input.state == Published {
6✔
1528
                        continue
1✔
1529
                }
1530

1531
                // If the input has a locktime that's not yet reached, we will
1532
                // skip this input and wait for the locktime to be reached.
1533
                mature, _ := input.isMature(uint32(s.currentHeight))
4✔
1534
                if !mature {
6✔
1535
                        continue
2✔
1536
                }
1537

1538
                // If this input is new or has been failed to be published,
1539
                // we'd retry it. The assumption here is that when an error is
1540
                // returned from `PublishTransaction`, it means the tx has
1541
                // failed to meet the policy, hence it's not in the mempool.
1542
                inputs[op] = input
2✔
1543
        }
1544

1545
        return inputs
1✔
1546
}
1547

1548
// sweepPendingInputs is called when the ticker fires. It will create clusters
1549
// and attempt to create and publish the sweeping transactions.
1550
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
1✔
1551
        log.Debugf("Sweeping %v inputs", len(inputs))
1✔
1552

1✔
1553
        // Cluster all of our inputs based on the specific Aggregator.
1✔
1554
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
1✔
1555

1✔
1556
        // sweepWithLock is a helper closure that executes the sweep within a
1✔
1557
        // coin select lock to prevent the coins being selected for other
1✔
1558
        // transactions like funding of a channel.
1✔
1559
        sweepWithLock := func(set InputSet) error {
2✔
1560
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
2✔
1561
                        // Try to add inputs from our wallet.
1✔
1562
                        err := set.AddWalletInputs(s.cfg.Wallet)
1✔
1563
                        if err != nil {
1✔
1564
                                return err
×
1565
                        }
×
1566

1567
                        // Create sweeping transaction for each set.
1568
                        err = s.sweep(set)
1✔
1569
                        if err != nil {
1✔
1570
                                return err
×
1571
                        }
×
1572

1573
                        return nil
1✔
1574
                })
1575
        }
1576

1577
        for _, set := range sets {
3✔
1578
                var err error
2✔
1579
                if set.NeedWalletInput() {
3✔
1580
                        // Sweep the set of inputs that need the wallet inputs.
1✔
1581
                        err = sweepWithLock(set)
1✔
1582
                } else {
2✔
1583
                        // Sweep the set of inputs that don't need the wallet
1✔
1584
                        // inputs.
1✔
1585
                        err = s.sweep(set)
1✔
1586
                }
1✔
1587

1588
                if err != nil {
2✔
1589
                        log.Errorf("Failed to sweep %v: %v", set, err)
×
1590
                }
×
1591
        }
1592
}
1593

1594
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1595
// the inputs being used.
1596
type bumpResp struct {
1597
        // result is the result of the bump attempt returned from the fee
1598
        // bumper.
1599
        result *BumpResult
1600

1601
        // set is the input set that was used in the bump attempt.
1602
        set InputSet
1603
}
1604

1605
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1606
// future updates about the sweeping tx.
1607
//
1608
// NOTE: must run as a goroutine.
1609
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1610
        resultChan <-chan *BumpResult) {
6✔
1611

6✔
1612
        defer s.wg.Done()
6✔
1613

6✔
1614
        for {
13✔
1615
                select {
7✔
1616
                case r := <-resultChan:
3✔
1617
                        // Validate the result is valid.
3✔
1618
                        if err := r.Validate(); err != nil {
3✔
1619
                                log.Errorf("Received invalid result: %v", err)
×
1620
                                continue
×
1621
                        }
1622

1623
                        resp := &bumpResp{
3✔
1624
                                result: r,
3✔
1625
                                set:    set,
3✔
1626
                        }
3✔
1627

3✔
1628
                        // Send the result back to the main event loop.
3✔
1629
                        select {
3✔
1630
                        case s.bumpRespChan <- resp:
3✔
1631
                        case <-s.quit:
×
1632
                                log.Debug("Sweeper shutting down, skip " +
×
1633
                                        "sending bump result")
×
1634

×
1635
                                return
×
1636
                        }
1637

1638
                        // The sweeping tx has been confirmed, we can exit the
1639
                        // monitor now.
1640
                        //
1641
                        // TODO(yy): can instead remove the spend subscription
1642
                        // in sweeper and rely solely on this event to mark
1643
                        // inputs as Swept?
1644
                        if r.Event == TxConfirmed || r.Event == TxFailed {
5✔
1645
                                // Exit if the tx is failed to be created.
2✔
1646
                                if r.Tx == nil {
2✔
1647
                                        log.Debugf("Received %v for nil tx, "+
×
1648
                                                "exit monitor", r.Event)
×
1649

×
1650
                                        return
×
1651
                                }
×
1652

1653
                                log.Debugf("Received %v for sweep tx %v, exit "+
2✔
1654
                                        "fee bump monitor", r.Event,
2✔
1655
                                        r.Tx.TxHash())
2✔
1656

2✔
1657
                                // Cancel the rebroadcasting of the failed tx.
2✔
1658
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
2✔
1659

2✔
1660
                                return
2✔
1661
                        }
1662

1663
                case <-s.quit:
2✔
1664
                        log.Debugf("Sweeper shutting down, exit fee " +
2✔
1665
                                "bump handler")
2✔
1666

2✔
1667
                        return
2✔
1668
                }
1669
        }
1670
}
1671

1672
// handleBumpEventTxFailed handles the case where the tx has been failed to
1673
// publish.
1674
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
1✔
1675
        r := resp.result
1✔
1676
        tx, err := r.Tx, r.Err
1✔
1677

1✔
1678
        if tx != nil {
2✔
1679
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
1✔
1680
                        err)
1✔
1681
        }
1✔
1682

1683
        // NOTE: When marking the inputs as failed, we are using the input set
1684
        // instead of the inputs found in the tx. This is fine for current
1685
        // version of the sweeper because we always create a tx using ALL of
1686
        // the inputs specified by the set.
1687
        //
1688
        // TODO(yy): should we also remove the failed tx from db?
1689
        s.markInputsPublishFailed(resp.set)
1✔
1690
}
1691

1692
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1693
// replaced by a new one.
1694
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1695
        r := resp.result
3✔
1696
        oldTx := r.ReplacedTx
3✔
1697
        newTx := r.Tx
3✔
1698

3✔
1699
        // Prepare a new record to replace the old one.
3✔
1700
        tr := &TxRecord{
3✔
1701
                Txid:    newTx.TxHash(),
3✔
1702
                FeeRate: uint64(r.FeeRate),
3✔
1703
                Fee:     uint64(r.Fee),
3✔
1704
        }
3✔
1705

3✔
1706
        // Get the old record for logging purpose.
3✔
1707
        oldTxid := oldTx.TxHash()
3✔
1708
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1709
        if err != nil {
4✔
1710
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1711
                return err
1✔
1712
        }
1✔
1713

1714
        // Cancel the rebroadcasting of the replaced tx.
1715
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
2✔
1716

2✔
1717
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
2✔
1718
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
2✔
1719
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
2✔
1720

2✔
1721
        // The old sweeping tx has been replaced by a new one, we will update
2✔
1722
        // the tx record in the sweeper db.
2✔
1723
        //
2✔
1724
        // TODO(yy): we may also need to update the inputs in this tx to a new
2✔
1725
        // state. Suppose a replacing tx only spends a subset of the inputs
2✔
1726
        // here, we'd end up with the rest being marked as `Published` and
2✔
1727
        // won't be aggregated in the next sweep. Atm it's fine as we always
2✔
1728
        // RBF the same input set.
2✔
1729
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1730
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
1✔
1731
                return err
1✔
1732
        }
1✔
1733

1734
        // Mark the inputs as published using the replacing tx.
1735
        return s.markInputsPublished(tr, resp.set)
1✔
1736
}
1737

1738
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1739
// successfully published.
1740
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
1✔
1741
        r := resp.result
1✔
1742
        tx := r.Tx
1✔
1743
        tr := &TxRecord{
1✔
1744
                Txid:    tx.TxHash(),
1✔
1745
                FeeRate: uint64(r.FeeRate),
1✔
1746
                Fee:     uint64(r.Fee),
1✔
1747
        }
1✔
1748

1✔
1749
        // Inputs have been successfully published so we update their
1✔
1750
        // states.
1✔
1751
        err := s.markInputsPublished(tr, resp.set)
1✔
1752
        if err != nil {
1✔
1753
                return err
×
1754
        }
×
1755

1756
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
1✔
1757
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
1✔
1758

1✔
1759
        // If there's no error, remove the output script. Otherwise keep it so
1✔
1760
        // that it can be reused for the next transaction and causes no address
1✔
1761
        // inflation.
1✔
1762
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
1✔
1763

1✔
1764
        return nil
1✔
1765
}
1766

1767
// handleBumpEventTxFatal handles the case where there's an unexpected error
1768
// when creating or publishing the sweeping tx. In this case, the tx will be
1769
// removed from the sweeper store and the inputs will be marked as `Failed`,
1770
// which means they will not be retried.
1771
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1772
        r := resp.result
2✔
1773

2✔
1774
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1775
        // a broadcast error, it's likely there isn't a tx here.
2✔
1776
        if r.Tx != nil {
4✔
1777
                txid := r.Tx.TxHash()
2✔
1778
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
2✔
1779

2✔
1780
                // Remove the tx from the sweeper db if it exists.
2✔
1781
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
3✔
1782
                        return fmt.Errorf("delete tx record for %v: %w", txid,
1✔
1783
                                err)
1✔
1784
                }
1✔
1785
        }
1786

1787
        // Mark the inputs as failed.
1788
        s.markInputsFailed(resp.set, r.Err)
1✔
1789

1✔
1790
        return nil
1✔
1791
}
1792

1793
// markInputsFailed marks all inputs found in the tx as failed. It will also
1794
// notify all the subscribers of these inputs.
1795
func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) {
2✔
1796
        for _, inp := range set.Inputs() {
9✔
1797
                outpoint := inp.OutPoint()
7✔
1798

7✔
1799
                input, ok := s.inputs[outpoint]
7✔
1800
                if !ok {
7✔
1801
                        // It's very likely that a spending tx contains inputs
×
1802
                        // that we don't know.
×
1803
                        log.Tracef("Skipped marking input as failed: %v not "+
×
1804
                                "found in pending inputs", outpoint)
×
1805

×
1806
                        continue
×
1807
                }
1808

1809
                // If the input is already in a terminal state, we don't want
1810
                // to rewrite it, which also indicates an error as we only get
1811
                // an error event during the initial broadcast.
1812
                if input.terminated() {
10✔
1813
                        log.Errorf("Skipped marking input=%v as failed due to "+
3✔
1814
                                "unexpected state=%v", outpoint, input.state)
3✔
1815

3✔
1816
                        continue
3✔
1817
                }
1818

1819
                s.markInputFailed(input, err)
4✔
1820
        }
1821
}
1822

1823
// handleBumpEvent handles the result sent from the bumper based on its event
1824
// type.
1825
//
1826
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1827
// input's spending event, we don't need to do anything here.
1828
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
1✔
1829
        log.Debugf("Received bump result %v", r.result)
1✔
1830

1✔
1831
        switch r.result.Event {
1✔
1832
        // The tx has been published, we update the inputs' state and create a
1833
        // record to be stored in the sweeper db.
1834
        case TxPublished:
×
1835
                return s.handleBumpEventTxPublished(r)
×
1836

1837
        // The tx has failed, we update the inputs' state.
1838
        case TxFailed:
1✔
1839
                s.handleBumpEventTxFailed(r)
1✔
1840
                return nil
1✔
1841

1842
        // The tx has been replaced, we will remove the old tx and replace it
1843
        // with the new one.
1844
        case TxReplaced:
×
1845
                return s.handleBumpEventTxReplaced(r)
×
1846

1847
        // There's a fatal error in creating the tx, we will remove the tx from
1848
        // the sweeper db and mark the inputs as failed.
1849
        case TxFatal:
×
1850
                return s.handleBumpEventTxFatal(r)
×
1851
        }
1852

1853
        return nil
×
1854
}
1855

1856
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1857
//
1858
// NOTE: It is enough to check the txid because the sweeper will create
1859
// outpoints which solely belong to the internal LND wallet.
1860
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
×
1861
        found, err := s.cfg.Store.IsOurTx(op.Hash)
×
1862
        // In case there is an error fetching the transaction details from the
×
1863
        // sweeper store we assume the outpoint is still used by the sweeper
×
1864
        // (worst case scenario).
×
1865
        //
×
1866
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
×
1867
        // bucket.
×
1868
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
×
1869
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1870
                        "with: %v, we assume it is still in use by the sweeper",
×
1871
                        op.Hash, op.Index, err)
×
1872

×
1873
                return true
×
1874
        }
×
1875

1876
        return found
×
1877
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc