• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157844420

05 Feb 2025 12:55PM UTC coverage: 58.821% (+1.1%) from 57.712%
13157844420

Pull #9448

github

yyforyongyu
docs: add release notes
Pull Request #9448: sweep: properly handle failed sweeping txns

300 of 342 new or added lines in 3 files covered. (87.72%)

35 existing lines in 7 files now uncovered.

136339 of 231787 relevant lines covered (58.82%)

19230.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
49
        // with the same identifier from being batched together.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
72
func (p Params) String() string {
3✔
73
        deadline := "none"
3✔
74
        p.DeadlineHeight.WhenSome(func(d int32) {
6✔
75
                deadline = fmt.Sprintf("%d", d)
3✔
76
        })
3✔
77

78
        exclusiveGroup := "none"
3✔
79
        if p.ExclusiveGroup != nil {
6✔
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
3✔
81
        }
3✔
82

83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
3✔
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
3✔
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
3✔
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Fatal is the final state of a pending input. Inputs ending in this
123
        // state won't be retried. This could happen,
124
        // - when a pending input has too many failed publish attempts;
125
        // - the input has been spent by another party;
126
        // - unknown broadcast error is returned.
127
        Fatal
128
)
129

130
// String gives a human readable text for the sweep states.
131
func (s SweepState) String() string {
3✔
132
        switch s {
3✔
133
        case Init:
3✔
134
                return "Init"
3✔
135

136
        case PendingPublish:
3✔
137
                return "PendingPublish"
3✔
138

139
        case Published:
3✔
140
                return "Published"
3✔
141

142
        case PublishFailed:
3✔
143
                return "PublishFailed"
3✔
144

145
        case Swept:
3✔
146
                return "Swept"
3✔
147

148
        case Excluded:
3✔
149
                return "Excluded"
3✔
150

151
        case Fatal:
3✔
152
                return "Fatal"
3✔
153

154
        default:
×
155
                return "Unknown"
×
156
        }
157
}
158

159
// RBFInfo stores the information required to perform a RBF bump on a pending
160
// sweeping tx.
161
type RBFInfo struct {
162
        // Txid is the txid of the sweeping tx.
163
        Txid chainhash.Hash
164

165
        // FeeRate is the fee rate of the sweeping tx.
166
        FeeRate chainfee.SatPerKWeight
167

168
        // Fee is the total fee of the sweeping tx.
169
        Fee btcutil.Amount
170
}
171

172
// SweeperInput is created when an input reaches the main loop for the first
173
// time. It wraps the input and tracks all relevant state that is needed for
174
// sweeping.
175
type SweeperInput struct {
176
        input.Input
177

178
        // state tracks the current state of the input.
179
        state SweepState
180

181
        // listeners is a list of channels over which the final outcome of the
182
        // sweep needs to be broadcasted.
183
        listeners []chan Result
184

185
        // publishAttempts records the number of attempts that have already been
186
        // made to sweep this tx.
187
        publishAttempts int
188

189
        // params contains the parameters that control the sweeping process.
190
        params Params
191

192
        // lastFeeRate is the most recent fee rate used for this input within a
193
        // transaction broadcast to the network.
194
        lastFeeRate chainfee.SatPerKWeight
195

196
        // rbf records the RBF constraints.
197
        rbf fn.Option[RBFInfo]
198

199
        // DeadlineHeight is the deadline height for this input. This is
200
        // different from the DeadlineHeight in its params as it's an actual
201
        // value than an option.
202
        DeadlineHeight int32
203
}
204

205
// String returns a human readable interpretation of the pending input.
206
func (p *SweeperInput) String() string {
31✔
207
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
31✔
208
}
31✔
209

210
// terminated returns a boolean indicating whether the input has reached a
211
// final state.
212
func (p *SweeperInput) terminated() bool {
27✔
213
        switch p.state {
27✔
214
        // If the input has reached a final state, that it's either
215
        // been swept, or failed, or excluded, we will remove it from
216
        // our sweeper.
217
        case Fatal, Swept, Excluded:
12✔
218
                return true
12✔
219

220
        default:
18✔
221
                return false
18✔
222
        }
223
}
224

225
// isMature returns a boolean indicating whether the input has a timelock that
226
// has been reached or not. The locktime found is also returned.
227
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
8✔
228
        locktime, _ := p.RequiredLockTime()
8✔
229
        if currentHeight < locktime {
12✔
230
                log.Debugf("Input %v has locktime=%v, current height is %v",
4✔
231
                        p, locktime, currentHeight)
4✔
232

4✔
233
                return false, locktime
4✔
234
        }
4✔
235

236
        // If the input has a CSV that's not yet reached, we will skip
237
        // this input and wait for the expiry.
238
        //
239
        // NOTE: We need to consider whether this input can be included in the
240
        // next block or not, which means the CSV will be checked against the
241
        // currentHeight plus one.
242
        locktime = p.BlocksToMaturity() + p.HeightHint()
7✔
243
        if currentHeight+1 < locktime {
11✔
244
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
4✔
245
                        "skipped sweeping", p, locktime, currentHeight)
4✔
246

4✔
247
                return false, locktime
4✔
248
        }
4✔
249

250
        return true, locktime
6✔
251
}
252

253
// InputsMap is a type alias for a set of pending inputs.
254
type InputsMap = map[wire.OutPoint]*SweeperInput
255

256
// inputsMapToString returns a human readable interpretation of the pending
257
// inputs.
258
func inputsMapToString(inputs InputsMap) string {
3✔
259
        if len(inputs) == 0 {
6✔
260
                return ""
3✔
261
        }
3✔
262

263
        inps := make([]input.Input, 0, len(inputs))
3✔
264
        for _, in := range inputs {
6✔
265
                inps = append(inps, in)
3✔
266
        }
3✔
267

268
        return "\n" + inputTypeSummary(inps)
3✔
269
}
270

271
// pendingSweepsReq is an internal message we'll use to represent an external
272
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
273
// attempting to sweep.
274
type pendingSweepsReq struct {
275
        respChan chan map[wire.OutPoint]*PendingInputResponse
276
        errChan  chan error
277
}
278

279
// PendingInputResponse contains information about an input that is currently
280
// being swept by the UtxoSweeper.
281
type PendingInputResponse struct {
282
        // OutPoint is the identify outpoint of the input being swept.
283
        OutPoint wire.OutPoint
284

285
        // WitnessType is the witness type of the input being swept.
286
        WitnessType input.WitnessType
287

288
        // Amount is the amount of the input being swept.
289
        Amount btcutil.Amount
290

291
        // LastFeeRate is the most recent fee rate used for the input being
292
        // swept within a transaction broadcast to the network.
293
        LastFeeRate chainfee.SatPerKWeight
294

295
        // BroadcastAttempts is the number of attempts we've made to sweept the
296
        // input.
297
        BroadcastAttempts int
298

299
        // Params contains the sweep parameters for this pending request.
300
        Params Params
301

302
        // DeadlineHeight records the deadline height of this input.
303
        DeadlineHeight uint32
304
}
305

306
// updateReq is an internal message we'll use to represent an external caller's
307
// intent to update the sweep parameters of a given input.
308
type updateReq struct {
309
        input        wire.OutPoint
310
        params       Params
311
        responseChan chan *updateResp
312
}
313

314
// updateResp is an internal message we'll use to hand off the response of a
315
// updateReq from the UtxoSweeper's main event loop back to the caller.
316
type updateResp struct {
317
        resultChan chan Result
318
        err        error
319
}
320

321
// UtxoSweeper is responsible for sweeping outputs back into the wallet
322
type UtxoSweeper struct {
323
        started uint32 // To be used atomically.
324
        stopped uint32 // To be used atomically.
325

326
        // Embed the blockbeat consumer struct to get access to the method
327
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
328
        chainio.BeatConsumer
329

330
        cfg *UtxoSweeperConfig
331

332
        newInputs chan *sweepInputMessage
333

334
        // pendingSweepsReq is a channel that will be sent requests by external
335
        // callers in order to retrieve the set of pending inputs the
336
        // UtxoSweeper is attempting to sweep.
337
        pendingSweepsReqs chan *pendingSweepsReq
338

339
        // updateReqs is a channel that will be sent requests by external
340
        // callers who wish to bump the fee rate of a given input.
341
        updateReqs chan *updateReq
342

343
        // inputs is the total set of inputs the UtxoSweeper has been requested
344
        // to sweep.
345
        inputs InputsMap
346

347
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
348

349
        relayFeeRate chainfee.SatPerKWeight
350

351
        quit chan struct{}
352
        wg   sync.WaitGroup
353

354
        // currentHeight is the best known height of the main chain. This is
355
        // updated whenever a new block epoch is received.
356
        currentHeight int32
357

358
        // bumpRespChan is a channel that receives broadcast results from the
359
        // TxPublisher.
360
        bumpRespChan chan *bumpResp
361
}
362

363
// Compile-time check for the chainio.Consumer interface.
364
var _ chainio.Consumer = (*UtxoSweeper)(nil)
365

366
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
367
type UtxoSweeperConfig struct {
368
        // GenSweepScript generates a P2WKH script belonging to the wallet where
369
        // funds can be swept.
370
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
371

372
        // FeeEstimator is used when crafting sweep transactions to estimate
373
        // the necessary fee relative to the expected size of the sweep
374
        // transaction.
375
        FeeEstimator chainfee.Estimator
376

377
        // Wallet contains the wallet functions that sweeper requires.
378
        Wallet Wallet
379

380
        // Notifier is an instance of a chain notifier we'll use to watch for
381
        // certain on-chain events.
382
        Notifier chainntnfs.ChainNotifier
383

384
        // Mempool is the mempool watcher that will be used to query whether a
385
        // given input is already being spent by a transaction in the mempool.
386
        Mempool chainntnfs.MempoolWatcher
387

388
        // Store stores the published sweeper txes.
389
        //
390
        // TODO(yy): remove this store - we can use walletdb to do the job.
391
        Store SweeperStore
392

393
        // Signer is used by the sweeper to generate valid witnesses at the
394
        // time the incubated outputs need to be spent.
395
        Signer input.Signer
396

397
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
398
        // in a single sweep tx. If more need to be swept, multiple txes are
399
        // created and published.
400
        MaxInputsPerTx uint32
401

402
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
403
        MaxFeeRate chainfee.SatPerVByte
404

405
        // Aggregator is used to group inputs into clusters based on its
406
        // implemention-specific strategy.
407
        Aggregator UtxoAggregator
408

409
        // Publisher is used to publish the sweep tx crafted here and monitors
410
        // it for potential fee bumps.
411
        Publisher Bumper
412

413
        // NoDeadlineConfTarget is the conf target to use when sweeping
414
        // non-time-sensitive outputs.
415
        NoDeadlineConfTarget uint32
416
}
417

418
// Result is the struct that is pushed through the result channel. Callers can
419
// use this to be informed of the final sweep result. In case of a remote
420
// spend, Err will be ErrRemoteSpend.
421
type Result struct {
422
        // Err is the final result of the sweep. It is nil when the input is
423
        // swept successfully by us. ErrRemoteSpend is returned when another
424
        // party took the input.
425
        Err error
426

427
        // Tx is the transaction that spent the input.
428
        Tx *wire.MsgTx
429
}
430

431
// sweepInputMessage structs are used in the internal channel between the
432
// SweepInput call and the sweeper main loop.
433
type sweepInputMessage struct {
434
        input      input.Input
435
        params     Params
436
        resultChan chan Result
437
}
438

439
// New returns a new Sweeper instance.
440
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
23✔
441
        s := &UtxoSweeper{
23✔
442
                cfg:               cfg,
23✔
443
                newInputs:         make(chan *sweepInputMessage),
23✔
444
                updateReqs:        make(chan *updateReq),
23✔
445
                pendingSweepsReqs: make(chan *pendingSweepsReq),
23✔
446
                quit:              make(chan struct{}),
23✔
447
                inputs:            make(InputsMap),
23✔
448
                bumpRespChan:      make(chan *bumpResp, 100),
23✔
449
        }
23✔
450

23✔
451
        // Mount the block consumer.
23✔
452
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
23✔
453

23✔
454
        return s
23✔
455
}
23✔
456

457
// Start starts the process of constructing and publish sweep txes.
458
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
3✔
459
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
3✔
460
                return nil
×
461
        }
×
462

463
        log.Info("Sweeper starting")
3✔
464

3✔
465
        // Retrieve relay fee for dust limit calculation. Assume that this will
3✔
466
        // not change from here on.
3✔
467
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
3✔
468

3✔
469
        // Set the current height.
3✔
470
        s.currentHeight = beat.Height()
3✔
471

3✔
472
        // Start sweeper main loop.
3✔
473
        s.wg.Add(1)
3✔
474
        go s.collector()
3✔
475

3✔
476
        return nil
3✔
477
}
478

479
// RelayFeePerKW returns the minimum fee rate required for transactions to be
480
// relayed.
481
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
482
        return s.relayFeeRate
×
483
}
×
484

485
// Stop stops sweeper from listening to block epochs and constructing sweep
486
// txes.
487
func (s *UtxoSweeper) Stop() error {
3✔
488
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
3✔
489
                return nil
×
490
        }
×
491

492
        log.Info("Sweeper shutting down...")
3✔
493
        defer log.Debug("Sweeper shutdown complete")
3✔
494

3✔
495
        close(s.quit)
3✔
496
        s.wg.Wait()
3✔
497

3✔
498
        return nil
3✔
499
}
500

501
// NOTE: part of the `chainio.Consumer` interface.
502
func (s *UtxoSweeper) Name() string {
23✔
503
        return "UtxoSweeper"
23✔
504
}
23✔
505

506
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
507
// swept after the batch time window ends. A custom fee preference can be
508
// provided to determine what fee rate should be used for the input. Note that
509
// the input may not always be swept with this exact value, as its possible for
510
// it to be batched under the same transaction with other similar fee rate
511
// inputs.
512
//
513
// NOTE: Extreme care needs to be taken that input isn't changed externally.
514
// Because it is an interface and we don't know what is exactly behind it, we
515
// cannot make a local copy in sweeper.
516
//
517
// TODO(yy): make sure the caller is using the Result chan.
518
func (s *UtxoSweeper) SweepInput(inp input.Input,
519
        params Params) (chan Result, error) {
3✔
520

3✔
521
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
3✔
522
                inp.SignDesc() == nil {
3✔
523

×
524
                return nil, errors.New("nil input received")
×
525
        }
×
526

527
        absoluteTimeLock, _ := inp.RequiredLockTime()
3✔
528
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
3✔
529
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
3✔
530
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
3✔
531
                inp.BlocksToMaturity(), absoluteTimeLock,
3✔
532
                btcutil.Amount(inp.SignDesc().Output.Value),
3✔
533
                inp.UnconfParent(), params)
3✔
534

3✔
535
        sweeperInput := &sweepInputMessage{
3✔
536
                input:      inp,
3✔
537
                params:     params,
3✔
538
                resultChan: make(chan Result, 1),
3✔
539
        }
3✔
540

3✔
541
        // Deliver input to the main event loop.
3✔
542
        select {
3✔
543
        case s.newInputs <- sweeperInput:
3✔
544
        case <-s.quit:
×
545
                return nil, ErrSweeperShuttingDown
×
546
        }
547

548
        return sweeperInput.resultChan, nil
3✔
549
}
550

551
// removeConflictSweepDescendants removes any transactions from the wallet that
552
// spend outputs included in the passed outpoint set. This needs to be done in
553
// cases where we're not the only ones that can sweep an output, but there may
554
// exist unconfirmed spends that spend outputs created by a sweep transaction.
555
// The most common case for this is when someone sweeps our anchor outputs
556
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
557
// as a backend when a channel is force closed and anchor cpfp txns are
558
// created to bump the initial commitment transaction. In this case an anchor
559
// cpfp is broadcasted for up to 3 commitment transactions (local,
560
// remote-dangling, remote). Using neutrino all of those transactions will be
561
// accepted (the commitment tx will be different in all of those cases) and have
562
// to be removed as soon as one of them confirmes (they do have the same
563
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
564
// nodes do not signal invalid transactions anymore.
565
func (s *UtxoSweeper) removeConflictSweepDescendants(
566
        outpoints map[wire.OutPoint]struct{}) error {
4✔
567

4✔
568
        // Obtain all the past sweeps that we've done so far. We'll need these
4✔
569
        // to ensure that if the spendingTx spends any of the same inputs, then
4✔
570
        // we remove any transaction that may be spending those inputs from the
4✔
571
        // wallet.
4✔
572
        //
4✔
573
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
4✔
574
        // from the store?
4✔
575
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
4✔
576
        if err != nil {
4✔
577
                return err
×
578
        }
×
579

580
        // We'll now go through each past transaction we published during this
581
        // epoch and cross reference the spent inputs. If there're any inputs
582
        // in common with the inputs the spendingTx spent, then we'll remove
583
        // those.
584
        //
585
        // TODO(roasbeef): need to start to remove all transaction hashes after
586
        // every N blocks (assumed point of no return)
587
        for _, sweepHash := range pastSweepHashes {
7✔
588
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
3✔
589
                if err != nil {
3✔
590
                        return err
×
591
                }
×
592

593
                // Transaction wasn't found in the wallet, may have already
594
                // been replaced/removed.
595
                if sweepTx == nil {
4✔
596
                        // If it was removed, then we'll play it safe and mark
1✔
597
                        // it as no longer need to be rebroadcasted.
1✔
598
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
1✔
599
                        continue
1✔
600
                }
601

602
                // Check to see if this past sweep transaction spent any of the
603
                // same inputs as spendingTx.
604
                var isConflicting bool
3✔
605
                for _, txIn := range sweepTx.TxIn {
6✔
606
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
6✔
607
                                isConflicting = true
3✔
608
                                break
3✔
609
                        }
610
                }
611

612
                if !isConflicting {
6✔
613
                        continue
3✔
614
                }
615

616
                // If it is conflicting, then we'll signal the wallet to remove
617
                // all the transactions that are descendants of outputs created
618
                // by the sweepTx and the sweepTx itself.
619
                log.Debugf("Removing sweep txid=%v from wallet: %v",
3✔
620
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
3✔
621

3✔
622
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
3✔
623
                if err != nil {
3✔
624
                        log.Warnf("Unable to remove descendants: %v", err)
×
625
                }
×
626

627
                // If this transaction was conflicting, then we'll stop
628
                // rebroadcasting it in the background.
629
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
3✔
630
        }
631

632
        return nil
4✔
633
}
634

635
// collector is the sweeper main loop. It processes new inputs, spend
636
// notifications and counts down to publication of the sweep tx.
637
func (s *UtxoSweeper) collector() {
3✔
638
        defer s.wg.Done()
3✔
639

3✔
640
        for {
6✔
641
                // Clean inputs, which will remove inputs that are swept,
3✔
642
                // failed, or excluded from the sweeper and return inputs that
3✔
643
                // are either new or has been published but failed back, which
3✔
644
                // will be retried again here.
3✔
645
                s.updateSweeperInputs()
3✔
646

3✔
647
                select {
3✔
648
                // A new inputs is offered to the sweeper. We check to see if
649
                // we are already trying to sweep this input and if not, set up
650
                // a listener to spend and schedule a sweep.
651
                case input := <-s.newInputs:
3✔
652
                        err := s.handleNewInput(input)
3✔
653
                        if err != nil {
3✔
654
                                log.Criticalf("Unable to handle new input: %v",
×
655
                                        err)
×
656

×
657
                                return
×
658
                        }
×
659

660
                        // If this input is forced, we perform an sweep
661
                        // immediately.
662
                        //
663
                        // TODO(ziggie): Make sure when `immediate` is selected
664
                        // as a parameter that we only trigger the sweeping of
665
                        // this specific input rather than triggering the sweeps
666
                        // of all current pending inputs registered with the
667
                        // sweeper.
668
                        if input.params.Immediate {
5✔
669
                                inputs := s.updateSweeperInputs()
2✔
670
                                s.sweepPendingInputs(inputs)
2✔
671
                        }
2✔
672

673
                // A new external request has been received to retrieve all of
674
                // the inputs we're currently attempting to sweep.
675
                case req := <-s.pendingSweepsReqs:
3✔
676
                        s.handlePendingSweepsReq(req)
3✔
677

678
                // A new external request has been received to bump the fee rate
679
                // of a given input.
680
                case req := <-s.updateReqs:
3✔
681
                        resultChan, err := s.handleUpdateReq(req)
3✔
682
                        req.responseChan <- &updateResp{
3✔
683
                                resultChan: resultChan,
3✔
684
                                err:        err,
3✔
685
                        }
3✔
686

3✔
687
                        // Perform an sweep immediately if asked.
3✔
688
                        if req.params.Immediate {
6✔
689
                                inputs := s.updateSweeperInputs()
3✔
690
                                s.sweepPendingInputs(inputs)
3✔
691
                        }
3✔
692

693
                case resp := <-s.bumpRespChan:
3✔
694
                        // Handle the bump event.
3✔
695
                        err := s.handleBumpEvent(resp)
3✔
696
                        if err != nil {
6✔
697
                                log.Errorf("Failed to handle bump event: %v",
3✔
698
                                        err)
3✔
699
                        }
3✔
700

701
                // A new block comes in, update the bestHeight, perform a check
702
                // over all pending inputs and publish sweeping txns if needed.
703
                case beat := <-s.BlockbeatChan:
3✔
704
                        // Update the sweeper to the best height.
3✔
705
                        s.currentHeight = beat.Height()
3✔
706

3✔
707
                        // Update the inputs with the latest height.
3✔
708
                        inputs := s.updateSweeperInputs()
3✔
709

3✔
710
                        log.Debugf("Received new block: height=%v, attempt "+
3✔
711
                                "sweeping %d inputs:%s", s.currentHeight,
3✔
712
                                len(inputs),
3✔
713
                                lnutils.NewLogClosure(func() string {
6✔
714
                                        return inputsMapToString(inputs)
3✔
715
                                }))
3✔
716

717
                        // Attempt to sweep any pending inputs.
718
                        s.sweepPendingInputs(inputs)
3✔
719

3✔
720
                        // Notify we've processed the block.
3✔
721
                        s.NotifyBlockProcessed(beat, nil)
3✔
722

723
                case <-s.quit:
3✔
724
                        return
3✔
725
                }
726
        }
727
}
728

729
// removeExclusiveGroup removes all inputs in the given exclusive group. This
730
// function is called when one of the exclusive group inputs has been spent. The
731
// other inputs won't ever be spendable and can be removed. This also prevents
732
// them from being part of future sweep transactions that would fail. In
733
// addition sweep transactions of those inputs will be removed from the wallet.
734
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
3✔
735
        for outpoint, input := range s.inputs {
6✔
736
                outpoint := outpoint
3✔
737

3✔
738
                // Skip inputs that aren't exclusive.
3✔
739
                if input.params.ExclusiveGroup == nil {
6✔
740
                        continue
3✔
741
                }
742

743
                // Skip inputs from other exclusive groups.
744
                if *input.params.ExclusiveGroup != group {
3✔
745
                        continue
×
746
                }
747

748
                // Skip inputs that are already terminated.
749
                if input.terminated() {
3✔
750
                        log.Tracef("Skipped sending error result for "+
×
751
                                "input %v, state=%v", outpoint, input.state)
×
752

×
753
                        continue
×
754
                }
755

756
                // Signal result channels.
757
                s.signalResult(input, Result{
3✔
758
                        Err: ErrExclusiveGroupSpend,
3✔
759
                })
3✔
760

3✔
761
                // Update the input's state as it can no longer be swept.
3✔
762
                input.state = Excluded
3✔
763

3✔
764
                // Remove all unconfirmed transactions from the wallet which
3✔
765
                // spend the passed outpoint of the same exclusive group.
3✔
766
                outpoints := map[wire.OutPoint]struct{}{
3✔
767
                        outpoint: {},
3✔
768
                }
3✔
769
                err := s.removeConflictSweepDescendants(outpoints)
3✔
770
                if err != nil {
3✔
771
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
772
                                "wallet for outpoint %v : %v", outpoint, err)
×
773
                }
×
774
        }
775
}
776

777
// signalResult notifies the listeners of the final result of the input sweep.
778
// It also cancels any pending spend notification.
779
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
14✔
780
        op := pi.OutPoint()
14✔
781
        listeners := pi.listeners
14✔
782

14✔
783
        if result.Err == nil {
22✔
784
                log.Tracef("Dispatching sweep success for %v to %v listeners",
8✔
785
                        op, len(listeners),
8✔
786
                )
8✔
787
        } else {
17✔
788
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
9✔
789
                        op, len(listeners), result.Err,
9✔
790
                )
9✔
791
        }
9✔
792

793
        // Signal all listeners. Channel is buffered. Because we only send once
794
        // on every channel, it should never block.
795
        for _, resultChan := range listeners {
17✔
796
                resultChan <- result
3✔
797
        }
3✔
798
}
799

800
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
801
// the tx. The output address is only marked as used if the publish succeeds.
802
func (s *UtxoSweeper) sweep(set InputSet) error {
5✔
803
        // Generate an output script if there isn't an unused script available.
5✔
804
        if s.currentOutputScript.IsNone() {
9✔
805
                addr, err := s.cfg.GenSweepScript().Unpack()
4✔
806
                if err != nil {
4✔
807
                        return fmt.Errorf("gen sweep script: %w", err)
×
808
                }
×
809
                s.currentOutputScript = fn.Some(addr)
4✔
810
        }
811

812
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
5✔
813
                fmt.Errorf("none sweep script"),
5✔
814
        )
5✔
815
        if err != nil {
5✔
816
                return err
×
817
        }
×
818

819
        // Create a fee bump request and ask the publisher to broadcast it. The
820
        // publisher will then take over and start monitoring the tx for
821
        // potential fee bump.
822
        req := &BumpRequest{
5✔
823
                Inputs:          set.Inputs(),
5✔
824
                Budget:          set.Budget(),
5✔
825
                DeadlineHeight:  set.DeadlineHeight(),
5✔
826
                DeliveryAddress: sweepAddr,
5✔
827
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
5✔
828
                StartingFeeRate: set.StartingFeeRate(),
5✔
829
                Immediate:       set.Immediate(),
5✔
830
                // TODO(yy): pass the strategy here.
5✔
831
        }
5✔
832

5✔
833
        // Reschedule the inputs that we just tried to sweep. This is done in
5✔
834
        // case the following publish fails, we'd like to update the inputs'
5✔
835
        // publish attempts and rescue them in the next sweep.
5✔
836
        s.markInputsPendingPublish(set)
5✔
837

5✔
838
        // Broadcast will return a read-only chan that we will listen to for
5✔
839
        // this publish result and future RBF attempt.
5✔
840
        resp := s.cfg.Publisher.Broadcast(req)
5✔
841

5✔
842
        // Successfully sent the broadcast attempt, we now handle the result by
5✔
843
        // subscribing to the result chan and listen for future updates about
5✔
844
        // this tx.
5✔
845
        s.wg.Add(1)
5✔
846
        go s.monitorFeeBumpResult(set, resp)
5✔
847

5✔
848
        return nil
5✔
849
}
850

851
// markInputsPendingPublish updates the pending inputs with the given tx
852
// inputs. It also increments the `publishAttempts`.
853
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
6✔
854
        // Reschedule sweep.
6✔
855
        for _, input := range set.Inputs() {
12✔
856
                op := input.OutPoint()
6✔
857
                pi, ok := s.inputs[op]
6✔
858
                if !ok {
9✔
859
                        // It could be that this input is an additional wallet
3✔
860
                        // input that was attached. In that case there also
3✔
861
                        // isn't a pending input to update.
3✔
862
                        log.Tracef("Skipped marking input as pending "+
3✔
863
                                "published: %v not found in pending inputs", op)
3✔
864

3✔
865
                        continue
3✔
866
                }
867

868
                // If this input has already terminated, there's clearly
869
                // something wrong as it would have been removed. In this case
870
                // we log an error and skip marking this input as pending
871
                // publish.
872
                if pi.terminated() {
7✔
873
                        log.Errorf("Expect input %v to not have terminated "+
1✔
874
                                "state, instead it has %v", op, pi.state)
1✔
875

1✔
876
                        continue
1✔
877
                }
878

879
                // Update the input's state.
880
                pi.state = PendingPublish
5✔
881

5✔
882
                // Record another publish attempt.
5✔
883
                pi.publishAttempts++
5✔
884
        }
885
}
886

887
// markInputsPublished updates the sweeping tx in db and marks the list of
888
// inputs as published.
889
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
7✔
890
        // Mark this tx in db once successfully published.
7✔
891
        //
7✔
892
        // NOTE: this will behave as an overwrite, which is fine as the record
7✔
893
        // is small.
7✔
894
        tr.Published = true
7✔
895
        err := s.cfg.Store.StoreTx(tr)
7✔
896
        if err != nil {
8✔
897
                return fmt.Errorf("store tx: %w", err)
1✔
898
        }
1✔
899

900
        // Reschedule sweep.
901
        for _, input := range set.Inputs() {
13✔
902
                op := input.OutPoint()
7✔
903
                pi, ok := s.inputs[op]
7✔
904
                if !ok {
10✔
905
                        // It could be that this input is an additional wallet
3✔
906
                        // input that was attached. In that case there also
3✔
907
                        // isn't a pending input to update.
3✔
908
                        log.Tracef("Skipped marking input as published: %v "+
3✔
909
                                "not found in pending inputs", op)
3✔
910

3✔
911
                        continue
3✔
912
                }
913

914
                // Valdiate that the input is in an expected state.
915
                if pi.state != PendingPublish {
11✔
916
                        // We may get a Published if this is a replacement tx.
4✔
917
                        log.Debugf("Expect input %v to have %v, instead it "+
4✔
918
                                "has %v", op, PendingPublish, pi.state)
4✔
919

4✔
920
                        continue
4✔
921
                }
922

923
                // Update the input's state.
924
                pi.state = Published
6✔
925

6✔
926
                // Update the input's latest fee rate.
6✔
927
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
6✔
928
        }
929

930
        return nil
6✔
931
}
932

933
// markInputsPublishFailed marks the list of inputs as failed to be published.
934
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
7✔
935
        // Reschedule sweep.
7✔
936
        for _, inp := range set.Inputs() {
23✔
937
                op := inp.OutPoint()
16✔
938
                pi, ok := s.inputs[op]
16✔
939
                if !ok {
19✔
940
                        // It could be that this input is an additional wallet
3✔
941
                        // input that was attached. In that case there also
3✔
942
                        // isn't a pending input to update.
3✔
943
                        log.Tracef("Skipped marking input as publish failed: "+
3✔
944
                                "%v not found in pending inputs", op)
3✔
945

3✔
946
                        continue
3✔
947
                }
948

949
                // Valdiate that the input is in an expected state.
950
                if pi.state != PendingPublish && pi.state != Published {
22✔
951
                        log.Debugf("Expect input %v to have %v, instead it "+
6✔
952
                                "has %v", op, PendingPublish, pi.state)
6✔
953

6✔
954
                        continue
6✔
955
                }
956

957
                log.Warnf("Failed to publish input %v", op)
11✔
958

11✔
959
                // Update the input's state.
11✔
960
                pi.state = PublishFailed
11✔
961
        }
962
}
963

964
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
965
// attempting to sweep.
966
func (s *UtxoSweeper) PendingInputs() (
967
        map[wire.OutPoint]*PendingInputResponse, error) {
3✔
968

3✔
969
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
3✔
970
        errChan := make(chan error, 1)
3✔
971
        select {
3✔
972
        case s.pendingSweepsReqs <- &pendingSweepsReq{
973
                respChan: respChan,
974
                errChan:  errChan,
975
        }:
3✔
976
        case <-s.quit:
×
977
                return nil, ErrSweeperShuttingDown
×
978
        }
979

980
        select {
3✔
981
        case pendingSweeps := <-respChan:
3✔
982
                return pendingSweeps, nil
3✔
983
        case err := <-errChan:
×
984
                return nil, err
×
985
        case <-s.quit:
×
986
                return nil, ErrSweeperShuttingDown
×
987
        }
988
}
989

990
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
991
// UtxoSweeper is attempting to sweep.
992
func (s *UtxoSweeper) handlePendingSweepsReq(
993
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
3✔
994

3✔
995
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
3✔
996
        for _, inp := range s.inputs {
6✔
997
                // Skip immature inputs for compatibility.
3✔
998
                mature, _ := inp.isMature(uint32(s.currentHeight))
3✔
999
                if !mature {
6✔
1000
                        continue
3✔
1001
                }
1002

1003
                // Only the exported fields are set, as we expect the response
1004
                // to only be consumed externally.
1005
                op := inp.OutPoint()
3✔
1006
                resps[op] = &PendingInputResponse{
3✔
1007
                        OutPoint:    op,
3✔
1008
                        WitnessType: inp.WitnessType(),
3✔
1009
                        Amount: btcutil.Amount(
3✔
1010
                                inp.SignDesc().Output.Value,
3✔
1011
                        ),
3✔
1012
                        LastFeeRate:       inp.lastFeeRate,
3✔
1013
                        BroadcastAttempts: inp.publishAttempts,
3✔
1014
                        Params:            inp.params,
3✔
1015
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
3✔
1016
                }
3✔
1017
        }
1018

1019
        select {
3✔
1020
        case req.respChan <- resps:
3✔
1021
        case <-s.quit:
×
1022
                log.Debug("Skipped sending pending sweep response due to " +
×
1023
                        "UtxoSweeper shutting down")
×
1024
        }
1025

1026
        return resps
3✔
1027
}
1028

1029
// UpdateParams allows updating the sweep parameters of a pending input in the
1030
// UtxoSweeper. This function can be used to provide an updated fee preference
1031
// and force flag that will be used for a new sweep transaction of the input
1032
// that will act as a replacement transaction (RBF) of the original sweeping
1033
// transaction, if any. The exclusive group is left unchanged.
1034
//
1035
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1036
// is actually successful. The responsibility of doing so should be handled by
1037
// the caller.
1038
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
1039
        params Params) (chan Result, error) {
3✔
1040

3✔
1041
        responseChan := make(chan *updateResp, 1)
3✔
1042
        select {
3✔
1043
        case s.updateReqs <- &updateReq{
1044
                input:        input,
1045
                params:       params,
1046
                responseChan: responseChan,
1047
        }:
3✔
1048
        case <-s.quit:
×
1049
                return nil, ErrSweeperShuttingDown
×
1050
        }
1051

1052
        select {
3✔
1053
        case response := <-responseChan:
3✔
1054
                return response.resultChan, response.err
3✔
1055
        case <-s.quit:
×
1056
                return nil, ErrSweeperShuttingDown
×
1057
        }
1058
}
1059

1060
// handleUpdateReq handles an update request by simply updating the sweep
1061
// parameters of the pending input. Currently, no validation is done on the new
1062
// fee preference to ensure it will properly create a replacement transaction.
1063
//
1064
// TODO(wilmer):
1065
//   - Validate fee preference to ensure we'll create a valid replacement
1066
//     transaction to allow the new fee rate to propagate throughout the
1067
//     network.
1068
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1069
//     did not exist in the original sweep transaction, resulting in an invalid
1070
//     replacement transaction.
1071
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
1072
        chan Result, error) {
3✔
1073

3✔
1074
        // If the UtxoSweeper is already trying to sweep this input, then we can
3✔
1075
        // simply just increase its fee rate. This will allow the input to be
3✔
1076
        // batched with others which also have a similar fee rate, creating a
3✔
1077
        // higher fee rate transaction that replaces the original input's
3✔
1078
        // sweeping transaction.
3✔
1079
        sweeperInput, ok := s.inputs[req.input]
3✔
1080
        if !ok {
3✔
1081
                return nil, lnwallet.ErrNotMine
×
1082
        }
×
1083

1084
        // Create the updated parameters struct. Leave the exclusive group
1085
        // unchanged.
1086
        newParams := Params{
3✔
1087
                StartingFeeRate: req.params.StartingFeeRate,
3✔
1088
                Immediate:       req.params.Immediate,
3✔
1089
                Budget:          req.params.Budget,
3✔
1090
                DeadlineHeight:  req.params.DeadlineHeight,
3✔
1091
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
3✔
1092
        }
3✔
1093

3✔
1094
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
3✔
1095
                req.input, sweeperInput.state, sweeperInput.params, newParams)
3✔
1096

3✔
1097
        sweeperInput.params = newParams
3✔
1098

3✔
1099
        // We need to reset the state so this input will be attempted again by
3✔
1100
        // our sweeper.
3✔
1101
        //
3✔
1102
        // TODO(yy): a dedicated state?
3✔
1103
        sweeperInput.state = Init
3✔
1104

3✔
1105
        // If the new input specifies a deadline, update the deadline height.
3✔
1106
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
3✔
1107
                sweeperInput.DeadlineHeight,
3✔
1108
        )
3✔
1109

3✔
1110
        resultChan := make(chan Result, 1)
3✔
1111
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
3✔
1112

3✔
1113
        return resultChan, nil
3✔
1114
}
1115

1116
// ListSweeps returns a list of the sweeps recorded by the sweep store.
1117
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
3✔
1118
        return s.cfg.Store.ListSweeps()
3✔
1119
}
3✔
1120

1121
// mempoolLookup takes an input's outpoint and queries the mempool to see
1122
// whether it's already been spent in a transaction found in the mempool.
1123
// Returns the transaction if found.
1124
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
10✔
1125
        // For neutrino backend, there's no mempool available, so we exit
10✔
1126
        // early.
10✔
1127
        if s.cfg.Mempool == nil {
12✔
1128
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
2✔
1129

2✔
1130
                return fn.None[wire.MsgTx]()
2✔
1131
        }
2✔
1132

1133
        // Query this input in the mempool. If this outpoint is already spent
1134
        // in mempool, we should get a spending event back immediately.
1135
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
8✔
1136
}
1137

1138
// calculateDefaultDeadline calculates the default deadline height for a sweep
1139
// request that has no deadline height specified.
1140
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
3✔
1141
        // Create a default deadline height, which will be used when there's no
3✔
1142
        // DeadlineHeight specified for a given input.
3✔
1143
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
3✔
1144

3✔
1145
        // If the input is immature and has a locktime, we'll use the locktime
3✔
1146
        // height as the starting height.
3✔
1147
        matured, locktime := pi.isMature(uint32(s.currentHeight))
3✔
1148
        if !matured {
6✔
1149
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
3✔
1150
                log.Debugf("Input %v is immature, using locktime=%v instead "+
3✔
1151
                        "of current height=%d as starting height",
3✔
1152
                        pi.OutPoint(), locktime, s.currentHeight)
3✔
1153
        }
3✔
1154

1155
        return defaultDeadline
3✔
1156
}
1157

1158
// handleNewInput processes a new input by registering spend notification and
1159
// scheduling sweeping for it.
1160
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
3✔
1161
        outpoint := input.input.OutPoint()
3✔
1162
        pi, pending := s.inputs[outpoint]
3✔
1163
        if pending {
6✔
1164
                log.Infof("Already has pending input %v received, old params: "+
3✔
1165
                        "%v, new params %v", outpoint, pi.params, input.params)
3✔
1166

3✔
1167
                s.handleExistingInput(input, pi)
3✔
1168

3✔
1169
                return nil
3✔
1170
        }
3✔
1171

1172
        // This is a new input, and we want to query the mempool to see if this
1173
        // input has already been spent. If so, we'll start the input with the
1174
        // RBFInfo.
1175
        rbfInfo := s.decideRBFInfo(input.input.OutPoint())
3✔
1176

3✔
1177
        // Create a new pendingInput and initialize the listeners slice with
3✔
1178
        // the passed in result channel. If this input is offered for sweep
3✔
1179
        // again, the result channel will be appended to this slice.
3✔
1180
        pi = &SweeperInput{
3✔
1181
                state:     Init,
3✔
1182
                listeners: []chan Result{input.resultChan},
3✔
1183
                Input:     input.input,
3✔
1184
                params:    input.params,
3✔
1185
                rbf:       rbfInfo,
3✔
1186
        }
3✔
1187

3✔
1188
        // Set the starting fee rate if a previous sweeping tx is found.
3✔
1189
        rbfInfo.WhenSome(func(info RBFInfo) {
5✔
1190
                pi.params.StartingFeeRate = fn.Some(info.FeeRate)
2✔
1191
        })
2✔
1192

1193
        // Set the acutal deadline height.
1194
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1195
                s.calculateDefaultDeadline(pi),
3✔
1196
        )
3✔
1197

3✔
1198
        s.inputs[outpoint] = pi
3✔
1199
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
3✔
1200

3✔
1201
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
3✔
1202
                "witness_type=%v, amount=%v, deadline=%d, state=%v, "+
3✔
1203
                "params=(%v)", s.currentHeight, pi.OutPoint(), pi.WitnessType(),
3✔
1204
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
3✔
1205
                pi.state, pi.params)
3✔
1206

3✔
1207
        // Once the input is registered, quickly check whether it has already
3✔
1208
        // been spent.
3✔
1209
        err := s.checkInputSpent(pi)
3✔
1210
        if err != nil {
3✔
NEW
1211
                log.Errorf("Failed to check input spend: %v", err)
×
NEW
1212
                return err
×
NEW
1213
        }
×
1214

1215
        return nil
3✔
1216
}
1217

1218
// checkInputSpent checks whether the registered input has already been spent.
1219
// If so, we will hanlde it immediately via handleThirdPartySpent.
1220
func (s *UtxoSweeper) checkInputSpent(inp *SweeperInput) error {
3✔
1221
        op := inp.OutPoint()
3✔
1222

3✔
1223
        // If the input has already been spent after the height hint, a spend
3✔
1224
        // event is sent back immediately.
3✔
1225
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
3✔
1226
                &op, inp.SignDesc().Output.PkScript, inp.HeightHint(),
3✔
1227
        )
3✔
1228
        if err != nil {
3✔
1229
                return err
×
1230
        }
×
1231

1232
        // Remove the subscription when exit.
1233
        defer spendEvent.Cancel()
3✔
1234

3✔
1235
        // Do a non-blocking read to see if the output has been spent.
3✔
1236
        select {
3✔
NEW
1237
        case spend, ok := <-spendEvent.Spend:
×
NEW
1238
                if !ok {
×
NEW
1239
                        log.Debugf("Spend ntfn for %v canceled", op)
×
NEW
1240
                        return nil
×
NEW
1241
                }
×
1242

NEW
1243
                spendingTx := spend.SpendingTx
×
NEW
1244

×
NEW
1245
                log.Debugf("Detected spent of input=%v in tx=%v", op,
×
NEW
1246
                        spendingTx.TxHash())
×
NEW
1247

×
NEW
1248
                s.handleThirdPartySpent(inp, spendingTx)
×
NEW
1249

×
NEW
1250
                return nil
×
1251

1252
        default:
3✔
1253
                log.Tracef("Input %v not spent yet", op)
3✔
1254
        }
1255

1256
        return nil
3✔
1257
}
1258

1259
// decideRBFInfo queries the mempool to see whether the given input has already
1260
// been spent. When spent, it will query the sweeper store to fetch the fee info
1261
// of the spending transction, and construct an RBFInfo based on it. Suppose an
1262
// error occurs, fn.None is returned.
1263
func (s *UtxoSweeper) decideRBFInfo(
1264
        op wire.OutPoint) fn.Option[RBFInfo] {
7✔
1265

7✔
1266
        // Check if we can find the spending tx of this input in mempool.
7✔
1267
        txOption := s.mempoolLookup(op)
7✔
1268

7✔
1269
        // Extract the spending tx from the option.
7✔
1270
        var tx *wire.MsgTx
7✔
1271
        txOption.WhenSome(func(t wire.MsgTx) {
12✔
1272
                tx = &t
5✔
1273
        })
5✔
1274

1275
        // Exit early if it's not found.
1276
        //
1277
        // NOTE: this is not accurate for backends that don't support mempool
1278
        // lookup:
1279
        // - for neutrino we don't have a mempool.
1280
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1281
        if tx == nil {
11✔
1282
                return fn.None[RBFInfo]()
4✔
1283
        }
4✔
1284

1285
        // Otherwise the input is already spent in the mempool, so eventually
1286
        // we will return Published.
1287
        //
1288
        // We also need to update the RBF info for this input. If the sweeping
1289
        // transaction is broadcast by us, we can find the fee info in the
1290
        // sweeper store.
1291
        txid := tx.TxHash()
5✔
1292
        tr, err := s.cfg.Store.GetTx(txid)
5✔
1293

5✔
1294
        log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(),
5✔
1295
                op)
5✔
1296

5✔
1297
        // If the tx is not found in the store, it means it's not broadcast by
5✔
1298
        // us, hence we can't find the fee info. This is fine as, later on when
5✔
1299
        // this tx is confirmed, we will remove the input from our inputs.
5✔
1300
        if errors.Is(err, ErrTxNotFound) {
8✔
1301
                log.Warnf("Spending tx %v not found in sweeper store", txid)
3✔
1302
                return fn.None[RBFInfo]()
3✔
1303
        }
3✔
1304

1305
        // Exit if we get an db error.
1306
        if err != nil {
5✔
1307
                log.Errorf("Unable to get tx %v from sweeper store: %v",
1✔
1308
                        txid, err)
1✔
1309

1✔
1310
                return fn.None[RBFInfo]()
1✔
1311
        }
1✔
1312

1313
        // Prepare the fee info and return it.
1314
        rbf := fn.Some(RBFInfo{
3✔
1315
                Txid:    txid,
3✔
1316
                Fee:     btcutil.Amount(tr.Fee),
3✔
1317
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
3✔
1318
        })
3✔
1319

3✔
1320
        return rbf
3✔
1321
}
1322

1323
// handleExistingInput processes an input that is already known to the sweeper.
1324
// It will overwrite the params of the old input with the new ones.
1325
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
1326
        oldInput *SweeperInput) {
3✔
1327

3✔
1328
        // Before updating the input details, check if an exclusive group was
3✔
1329
        // set. In case the same input is registered again without an exclusive
3✔
1330
        // group set, the previous input and its sweep parameters are outdated
3✔
1331
        // hence need to be replaced. This scenario currently only happens for
3✔
1332
        // anchor outputs. When a channel is force closed, in the worst case 3
3✔
1333
        // different sweeps with the same exclusive group are registered with
3✔
1334
        // the sweeper to bump the closing transaction (cpfp) when its time
3✔
1335
        // critical. Receiving an input which was already registered with the
3✔
1336
        // sweeper but now without an exclusive group means non of the previous
3✔
1337
        // inputs were used as CPFP, so we need to make sure we update the
3✔
1338
        // sweep parameters but also remove all inputs with the same exclusive
3✔
1339
        // group because the are outdated too.
3✔
1340
        var prevExclGroup *uint64
3✔
1341
        if oldInput.params.ExclusiveGroup != nil &&
3✔
1342
                input.params.ExclusiveGroup == nil {
6✔
1343

3✔
1344
                prevExclGroup = new(uint64)
3✔
1345
                *prevExclGroup = *oldInput.params.ExclusiveGroup
3✔
1346
        }
3✔
1347

1348
        // Update input details and sweep parameters. The re-offered input
1349
        // details may contain a change to the unconfirmed parent tx info.
1350
        oldInput.params = input.params
3✔
1351
        oldInput.Input = input.input
3✔
1352

3✔
1353
        // If the new input specifies a deadline, update the deadline height.
3✔
1354
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1355
                oldInput.DeadlineHeight,
3✔
1356
        )
3✔
1357

3✔
1358
        // Add additional result channel to signal spend of this input.
3✔
1359
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
3✔
1360

3✔
1361
        if prevExclGroup != nil {
6✔
1362
                s.removeExclusiveGroup(*prevExclGroup)
3✔
1363
        }
3✔
1364
}
1365

1366
// handleBumpEventTxConfirmed marks all inputs swept by the spending transaction
1367
// as swept. It will also notify all the subscribers of this input.
1368
func (s *UtxoSweeper) handleBumpEventTxConfirmed(r *bumpResp) {
4✔
1369
        tx := r.result.Tx
4✔
1370

4✔
1371
        for _, txIn := range tx.TxIn {
11✔
1372
                outpoint := txIn.PreviousOutPoint
7✔
1373

7✔
1374
                // Check if this input is known to us. It could probably be
7✔
1375
                // unknown if we canceled the registration, deleted from inputs
7✔
1376
                // map but the ntfn was in-flight already. Or this could be not
7✔
1377
                // one of our inputs.
7✔
1378
                input, ok := s.inputs[outpoint]
7✔
1379
                if !ok {
11✔
1380
                        // It's very likely that a spending tx contains inputs
4✔
1381
                        // that we don't know.
4✔
1382
                        log.Tracef("Skipped marking input as swept: %v not "+
4✔
1383
                                "found in pending inputs", outpoint)
4✔
1384

4✔
1385
                        continue
4✔
1386
                }
1387

1388
                // This input may already been marked as swept by a previous
1389
                // spend notification, which is likely to happen as one sweep
1390
                // transaction usually sweeps multiple inputs.
1391
                if input.terminated() {
7✔
1392
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1393
                                "state=%v", outpoint, input.state)
1✔
1394

1✔
1395
                        continue
1✔
1396
                }
1397

1398
                input.state = Swept
5✔
1399

5✔
1400
                // Signal result channels.
5✔
1401
                s.signalResult(input, Result{
5✔
1402
                        Tx: tx,
5✔
1403
                })
5✔
1404

5✔
1405
                // Remove all other inputs in this exclusive group.
5✔
1406
                if input.params.ExclusiveGroup != nil {
5✔
1407
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
×
1408
                }
×
1409
        }
1410
}
1411

1412
// markInputFatal marks the given input as fatal and won't be retried. It
1413
// will also notify all the subscribers of this input.
1414
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx,
1415
        err error) {
9✔
1416

9✔
1417
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
9✔
1418

9✔
1419
        pi.state = Fatal
9✔
1420

9✔
1421
        s.signalResult(pi, Result{
9✔
1422
                Tx:  tx,
9✔
1423
                Err: err,
9✔
1424
        })
9✔
1425
}
9✔
1426

1427
// updateSweeperInputs updates the sweeper's internal state and returns a map
1428
// of inputs to be swept. It will remove the inputs that are in final states,
1429
// and returns a map of inputs that have either state Init or PublishFailed.
1430
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
5✔
1431
        // Create a map of inputs to be swept.
5✔
1432
        inputs := make(InputsMap)
5✔
1433

5✔
1434
        // Iterate the pending inputs and update the sweeper's state.
5✔
1435
        //
5✔
1436
        // TODO(yy): sweeper is made to communicate via go channels, so no
5✔
1437
        // locks are needed to access the map. However, it'd be safer if we
5✔
1438
        // turn this inputs map into a SyncMap in case we wanna add concurrent
5✔
1439
        // access to the map in the future.
5✔
1440
        for op, input := range s.inputs {
19✔
1441
                log.Tracef("Checking input: %s, state=%v", input, input.state)
14✔
1442

14✔
1443
                // If the input has reached a final state, that it's either
14✔
1444
                // been swept, or failed, or excluded, we will remove it from
14✔
1445
                // our sweeper.
14✔
1446
                if input.terminated() {
21✔
1447
                        log.Debugf("Removing input(State=%v) %v from sweeper",
7✔
1448
                                input.state, op)
7✔
1449

7✔
1450
                        delete(s.inputs, op)
7✔
1451

7✔
1452
                        continue
7✔
1453
                }
1454

1455
                // If this input has been included in a sweep tx that's not
1456
                // published yet, we'd skip this input and wait for the sweep
1457
                // tx to be published.
1458
                if input.state == PendingPublish {
14✔
1459
                        continue
4✔
1460
                }
1461

1462
                // If this input has already been published, we will need to
1463
                // check the RBF condition before attempting another sweeping.
1464
                if input.state == Published {
13✔
1465
                        continue
4✔
1466
                }
1467

1468
                // If the input has a locktime that's not yet reached, we will
1469
                // skip this input and wait for the locktime to be reached.
1470
                mature, _ := input.isMature(uint32(s.currentHeight))
8✔
1471
                if !mature {
13✔
1472
                        continue
5✔
1473
                }
1474

1475
                // If this input is new or has been failed to be published,
1476
                // we'd retry it. The assumption here is that when an error is
1477
                // returned from `PublishTransaction`, it means the tx has
1478
                // failed to meet the policy, hence it's not in the mempool.
1479
                inputs[op] = input
6✔
1480
        }
1481

1482
        return inputs
5✔
1483
}
1484

1485
// sweepPendingInputs is called when the ticker fires. It will create clusters
1486
// and attempt to create and publish the sweeping transactions.
1487
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
5✔
1488
        log.Debugf("Sweeping %v inputs", len(inputs))
5✔
1489

5✔
1490
        // Cluster all of our inputs based on the specific Aggregator.
5✔
1491
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
5✔
1492

5✔
1493
        // sweepWithLock is a helper closure that executes the sweep within a
5✔
1494
        // coin select lock to prevent the coins being selected for other
5✔
1495
        // transactions like funding of a channel.
5✔
1496
        sweepWithLock := func(set InputSet) error {
9✔
1497
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
8✔
1498
                        // Try to add inputs from our wallet.
4✔
1499
                        err := set.AddWalletInputs(s.cfg.Wallet)
4✔
1500
                        if err != nil {
7✔
1501
                                return err
3✔
1502
                        }
3✔
1503

1504
                        // Create sweeping transaction for each set.
1505
                        err = s.sweep(set)
4✔
1506
                        if err != nil {
4✔
1507
                                return err
×
1508
                        }
×
1509

1510
                        return nil
4✔
1511
                })
1512
        }
1513

1514
        for _, set := range sets {
10✔
1515
                var err error
5✔
1516
                if set.NeedWalletInput() {
9✔
1517
                        // Sweep the set of inputs that need the wallet inputs.
4✔
1518
                        err = sweepWithLock(set)
4✔
1519
                } else {
8✔
1520
                        // Sweep the set of inputs that don't need the wallet
4✔
1521
                        // inputs.
4✔
1522
                        err = s.sweep(set)
4✔
1523
                }
4✔
1524

1525
                if err != nil {
8✔
1526
                        log.Errorf("Failed to sweep %v: %v", set, err)
3✔
1527
                }
3✔
1528
        }
1529
}
1530

1531
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1532
// the inputs being used.
1533
type bumpResp struct {
1534
        // result is the result of the bump attempt returned from the fee
1535
        // bumper.
1536
        result *BumpResult
1537

1538
        // set is the input set that was used in the bump attempt.
1539
        set InputSet
1540
}
1541

1542
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1543
// future updates about the sweeping tx.
1544
//
1545
// NOTE: must run as a goroutine.
1546
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1547
        resultChan <-chan *BumpResult) {
9✔
1548

9✔
1549
        defer s.wg.Done()
9✔
1550

9✔
1551
        for {
19✔
1552
                select {
10✔
1553
                case r := <-resultChan:
6✔
1554
                        // Validate the result is valid.
6✔
1555
                        if err := r.Validate(); err != nil {
6✔
1556
                                log.Errorf("Received invalid result: %v", err)
×
1557
                                continue
×
1558
                        }
1559

1560
                        resp := &bumpResp{
6✔
1561
                                result: r,
6✔
1562
                                set:    set,
6✔
1563
                        }
6✔
1564

6✔
1565
                        // Send the result back to the main event loop.
6✔
1566
                        select {
6✔
1567
                        case s.bumpRespChan <- resp:
6✔
1568
                        case <-s.quit:
×
1569
                                log.Debug("Sweeper shutting down, skip " +
×
1570
                                        "sending bump result")
×
1571

×
1572
                                return
×
1573
                        }
1574

1575
                        // The sweeping tx has been confirmed, we can exit the
1576
                        // monitor now.
1577
                        //
1578
                        // TODO(yy): can instead remove the spend subscription
1579
                        // in sweeper and rely solely on this event to mark
1580
                        // inputs as Swept?
1581
                        if r.Event == TxConfirmed || r.Event == TxFailed {
11✔
1582
                                // Exit if the tx is failed to be created.
5✔
1583
                                if r.Tx == nil {
8✔
1584
                                        log.Debugf("Received %v for nil tx, "+
3✔
1585
                                                "exit monitor", r.Event)
3✔
1586

3✔
1587
                                        return
3✔
1588
                                }
3✔
1589

1590
                                log.Debugf("Received %v for sweep tx %v, exit "+
5✔
1591
                                        "fee bump monitor", r.Event,
5✔
1592
                                        r.Tx.TxHash())
5✔
1593

5✔
1594
                                // Cancel the rebroadcasting of the failed tx.
5✔
1595
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
5✔
1596

5✔
1597
                                return
5✔
1598
                        }
1599

1600
                case <-s.quit:
5✔
1601
                        log.Debugf("Sweeper shutting down, exit fee " +
5✔
1602
                                "bump handler")
5✔
1603

5✔
1604
                        return
5✔
1605
                }
1606
        }
1607
}
1608

1609
// handleBumpEventTxFailed handles the case where the tx has been failed to
1610
// publish.
1611
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
4✔
1612
        r := resp.result
4✔
1613
        tx, err := r.Tx, r.Err
4✔
1614

4✔
1615
        if tx != nil {
5✔
1616
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
1✔
1617
                        err)
1✔
1618
        }
1✔
1619

1620
        // NOTE: When marking the inputs as failed, we are using the input set
1621
        // instead of the inputs found in the tx. This is fine for current
1622
        // version of the sweeper because we always create a tx using ALL of
1623
        // the inputs specified by the set.
1624
        //
1625
        // TODO(yy): should we also remove the failed tx from db?
1626
        s.markInputsPublishFailed(resp.set)
4✔
1627
}
1628

1629
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1630
// replaced by a new one.
1631
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
6✔
1632
        r := resp.result
6✔
1633
        oldTx := r.ReplacedTx
6✔
1634
        newTx := r.Tx
6✔
1635

6✔
1636
        // Prepare a new record to replace the old one.
6✔
1637
        tr := &TxRecord{
6✔
1638
                Txid:    newTx.TxHash(),
6✔
1639
                FeeRate: uint64(r.FeeRate),
6✔
1640
                Fee:     uint64(r.Fee),
6✔
1641
        }
6✔
1642

6✔
1643
        // Get the old record for logging purpose.
6✔
1644
        oldTxid := oldTx.TxHash()
6✔
1645
        record, err := s.cfg.Store.GetTx(oldTxid)
6✔
1646
        if err != nil {
10✔
1647
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
4✔
1648
                return err
4✔
1649
        }
4✔
1650

1651
        // Cancel the rebroadcasting of the replaced tx.
1652
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
5✔
1653

5✔
1654
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
5✔
1655
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
5✔
1656
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
5✔
1657

5✔
1658
        // The old sweeping tx has been replaced by a new one, we will update
5✔
1659
        // the tx record in the sweeper db.
5✔
1660
        //
5✔
1661
        // TODO(yy): we may also need to update the inputs in this tx to a new
5✔
1662
        // state. Suppose a replacing tx only spends a subset of the inputs
5✔
1663
        // here, we'd end up with the rest being marked as `Published` and
5✔
1664
        // won't be aggregated in the next sweep. Atm it's fine as we always
5✔
1665
        // RBF the same input set.
5✔
1666
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
6✔
1667
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
1✔
1668
                return err
1✔
1669
        }
1✔
1670

1671
        // Mark the inputs as published using the replacing tx.
1672
        return s.markInputsPublished(tr, resp.set)
4✔
1673
}
1674

1675
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1676
// successfully published.
1677
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
4✔
1678
        r := resp.result
4✔
1679
        tx := r.Tx
4✔
1680
        tr := &TxRecord{
4✔
1681
                Txid:    tx.TxHash(),
4✔
1682
                FeeRate: uint64(r.FeeRate),
4✔
1683
                Fee:     uint64(r.Fee),
4✔
1684
        }
4✔
1685

4✔
1686
        // Inputs have been successfully published so we update their
4✔
1687
        // states.
4✔
1688
        err := s.markInputsPublished(tr, resp.set)
4✔
1689
        if err != nil {
4✔
1690
                return err
×
1691
        }
×
1692

1693
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
4✔
1694
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
4✔
1695

4✔
1696
        // If there's no error, remove the output script. Otherwise keep it so
4✔
1697
        // that it can be reused for the next transaction and causes no address
4✔
1698
        // inflation.
4✔
1699
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
4✔
1700

4✔
1701
        return nil
4✔
1702
}
1703

1704
// handleBumpEventTxFatal handles the case where there's an unexpected error
1705
// when creating or publishing the sweeping tx. In this case, the tx will be
1706
// removed from the sweeper store and the inputs will be marked as `Failed`,
1707
// which means they will not be retried.
1708
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
4✔
1709
        r := resp.result
4✔
1710

4✔
1711
        // Remove the tx from the sweeper store if there is one. Since this is
4✔
1712
        // a broadcast error, it's likely there isn't a tx here.
4✔
1713
        if r.Tx != nil {
8✔
1714
                txid := r.Tx.TxHash()
4✔
1715
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
4✔
1716

4✔
1717
                // Remove the tx from the sweeper db if it exists.
4✔
1718
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
5✔
1719
                        return fmt.Errorf("delete tx record for %v: %w", txid,
1✔
1720
                                err)
1✔
1721
                }
1✔
1722
        }
1723

1724
        // Mark the inputs as fatal.
1725
        s.markInputsFatal(resp.set, r.Tx, r.Err)
3✔
1726

3✔
1727
        return nil
3✔
1728
}
1729

1730
// markInputsFatal  marks all inputs in the input set as failed. It will also
1731
// notify all the subscribers of these inputs.
1732
func (s *UtxoSweeper) markInputsFatal(set InputSet, tx *wire.MsgTx, err error) {
4✔
1733
        for _, inp := range set.Inputs() {
13✔
1734
                outpoint := inp.OutPoint()
9✔
1735

9✔
1736
                input, ok := s.inputs[outpoint]
9✔
1737
                if !ok {
11✔
1738
                        // It's very likely that a spending tx contains inputs
2✔
1739
                        // that we don't know.
2✔
1740
                        log.Tracef("Skipped marking input as failed: %v not "+
2✔
1741
                                "found in pending inputs", outpoint)
2✔
1742

2✔
1743
                        continue
2✔
1744
                }
1745

1746
                // If the input is already in a terminal state, we don't want
1747
                // to rewrite it, which also indicates an error as we only get
1748
                // an error event during the initial broadcast.
1749
                if input.terminated() {
12✔
1750
                        log.Errorf("Skipped marking input=%v as failed due to "+
3✔
1751
                                "unexpected state=%v", outpoint, input.state)
3✔
1752

3✔
1753
                        continue
3✔
1754
                }
1755

1756
                s.markInputFatal(input, tx, err)
6✔
1757
        }
1758
}
1759

1760
// handleBumpEvent handles the result sent from the bumper based on its event
1761
// type.
1762
//
1763
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1764
// input's spending event, we don't need to do anything here.
1765
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
4✔
1766
        log.Debugf("Received bump result %v", r.result)
4✔
1767

4✔
1768
        switch r.result.Event {
4✔
1769
        // The tx has been published, we update the inputs' state and create a
1770
        // record to be stored in the sweeper db.
1771
        case TxPublished:
3✔
1772
                return s.handleBumpEventTxPublished(r)
3✔
1773

1774
        // The tx has failed, we update the inputs' state.
1775
        case TxFailed:
4✔
1776
                s.handleBumpEventTxFailed(r)
4✔
1777
                return nil
4✔
1778

1779
        // The tx has been replaced, we will remove the old tx and replace it
1780
        // with the new one.
1781
        case TxReplaced:
3✔
1782
                return s.handleBumpEventTxReplaced(r)
3✔
1783

1784
        // There are inputs being spent in a tx which the fee bumper doesn't
1785
        // understand. We will remove the tx from the sweeper db and mark the
1786
        // inputs as swept.
1787
        case TxUnknownSpend:
3✔
1788
                s.handleBumpEventTxUnknownSpent(r)
3✔
1789

1790
        // There's a fatal error in creating the tx, we will remove the tx from
1791
        // the sweeper db and mark the inputs as failed.
1792
        case TxFatal:
2✔
1793
                return s.handleBumpEventTxFatal(r)
2✔
1794

1795
        // The sweeping tx is confirmed, we now mark the inputs as swept.
1796
        case TxConfirmed:
3✔
1797
                // We now use the spending tx to update the state of the inputs.
3✔
1798
                s.handleBumpEventTxConfirmed(r)
3✔
1799
        }
1800

1801
        return nil
3✔
1802
}
1803

1804
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1805
//
1806
// NOTE: It is enough to check the txid because the sweeper will create
1807
// outpoints which solely belong to the internal LND wallet.
1808
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
3✔
1809
        found, err := s.cfg.Store.IsOurTx(op.Hash)
3✔
1810
        // In case there is an error fetching the transaction details from the
3✔
1811
        // sweeper store we assume the outpoint is still used by the sweeper
3✔
1812
        // (worst case scenario).
3✔
1813
        //
3✔
1814
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
3✔
1815
        // bucket.
3✔
1816
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
3✔
1817
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1818
                        "with: %v, we assume it is still in use by the sweeper",
×
1819
                        op.Hash, op.Index, err)
×
1820

×
1821
                return true
×
1822
        }
×
1823

1824
        return found
3✔
1825
}
1826

1827
// markInputSwept marks the given input as swept by the tx. It will also notify
1828
// all the subscribers of this input.
1829
func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) {
5✔
1830
        log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(),
5✔
1831
                inp.state)
5✔
1832

5✔
1833
        inp.state = Swept
5✔
1834

5✔
1835
        // Signal result channels.
5✔
1836
        s.signalResult(inp, Result{
5✔
1837
                Tx: tx,
5✔
1838
        })
5✔
1839

5✔
1840
        // Remove all other inputs in this exclusive group.
5✔
1841
        if inp.params.ExclusiveGroup != nil {
5✔
NEW
1842
                s.removeExclusiveGroup(*inp.params.ExclusiveGroup)
×
NEW
1843
        }
×
1844
}
1845

1846
// handleThirdPartySpent takes an input and its spending tx. If the spending tx
1847
// cannot be found in the sweeper store, the input will be marked as fatal,
1848
// otherwise it will be marked as swept.
1849
func (s *UtxoSweeper) handleThirdPartySpent(inp *SweeperInput, tx *wire.MsgTx) {
7✔
1850
        op := inp.OutPoint()
7✔
1851
        txid := tx.TxHash()
7✔
1852

7✔
1853
        isOurTx, err := s.cfg.Store.IsOurTx(txid)
7✔
1854
        if err != nil {
7✔
NEW
1855
                log.Errorf("Cannot determine if tx %v is ours: %v", txid, err)
×
NEW
1856
                return
×
NEW
1857
        }
×
1858

1859
        // If this is our tx, it means it's a previous sweeping tx that got
1860
        // confirmed, which could happen when a restart happens during the
1861
        // sweeping process.
1862
        if isOurTx {
12✔
1863
                log.Debugf("Found our sweeping tx %v, marking input %v as "+
5✔
1864
                        "swept", txid, op)
5✔
1865

5✔
1866
                // We now use the spending tx to update the state of the inputs.
5✔
1867
                s.markInputSwept(inp, tx)
5✔
1868

5✔
1869
                return
5✔
1870
        }
5✔
1871

1872
        // Since the input is spent by others, we now mark it as fatal and won't
1873
        // be retried.
1874
        s.markInputFatal(inp, tx, ErrRemoteSpend)
4✔
1875

4✔
1876
        log.Debugf("Removing descendant txns invalidated by (txid=%v): %v",
4✔
1877
                tx.TxHash(), lnutils.SpewLogClosure(tx))
4✔
1878

4✔
1879
        // Construct a map of the inputs this transaction spends.
4✔
1880
        spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn))
4✔
1881
        for _, txIn := range tx.TxIn {
8✔
1882
                spentInputs[txIn.PreviousOutPoint] = struct{}{}
4✔
1883
        }
4✔
1884

1885
        err = s.removeConflictSweepDescendants(spentInputs)
4✔
1886
        if err != nil {
4✔
NEW
1887
                log.Warnf("unable to remove descendant transactions "+
×
NEW
1888
                        "due to tx %v: ", txid)
×
NEW
1889
        }
×
1890
}
1891

1892
// handleBumpEventTxUnknownSpent handles the case where the confirmed tx is
1893
// unknown to the fee bumper. In the case when the sweeping tx has been replaced
1894
// by another party with their tx being confirmed. It will retry sweeping the
1895
// "good" inputs once the "bad" ones are kicked out.
1896
func (s *UtxoSweeper) handleBumpEventTxUnknownSpent(r *bumpResp) {
5✔
1897
        // Mark the inputs as publish failed, which means they will be retried
5✔
1898
        // later.
5✔
1899
        s.markInputsPublishFailed(r.set)
5✔
1900

5✔
1901
        // Get all the inputs that are not spent in the current sweeping tx.
5✔
1902
        spentInputs := r.result.SpentInputs
5✔
1903

5✔
1904
        // Create a slice to track inputs to be retried.
5✔
1905
        inputsToRetry := make([]input.Input, 0, len(r.set.Inputs()))
5✔
1906

5✔
1907
        // Iterate all the inputs found in this bump and mark the ones spent by
5✔
1908
        // the third party as failed. The rest of inputs will then be updated
5✔
1909
        // with a new fee rate and be retried immediately.
5✔
1910
        for _, inp := range r.set.Inputs() {
11✔
1911
                op := inp.OutPoint()
6✔
1912
                input, ok := s.inputs[op]
6✔
1913

6✔
1914
                // Wallet inputs are not tracked so we will not find them from
6✔
1915
                // the inputs map.
6✔
1916
                if !ok {
9✔
1917
                        log.Debugf("Skipped marking input: %v not found in "+
3✔
1918
                                "pending inputs", op)
3✔
1919

3✔
1920
                        continue
3✔
1921
                }
1922

1923
                // Check whether this input has been spent, if so we mark it as
1924
                // fatal or swept based on whether this is one of our previous
1925
                // sweeping txns, then move to the next.
1926
                tx, spent := spentInputs[op]
6✔
1927
                if spent {
11✔
1928
                        s.handleThirdPartySpent(input, tx)
5✔
1929

5✔
1930
                        continue
5✔
1931
                }
1932

1933
                log.Debugf("Input(%v): updating params: starting fee rate "+
4✔
1934
                        "[%v -> %v], immediate [%v -> true]", op,
4✔
1935
                        input.params.StartingFeeRate, r.result.FeeRate,
4✔
1936
                        input.params.Immediate)
4✔
1937

4✔
1938
                // Update the input using the fee rate specified from the
4✔
1939
                // BumpResult, which should be the starting fee rate to use for
4✔
1940
                // the next sweeping attempt.
4✔
1941
                input.params.StartingFeeRate = fn.Some(r.result.FeeRate)
4✔
1942
                input.params.Immediate = true
4✔
1943
                inputsToRetry = append(inputsToRetry, input)
4✔
1944
        }
1945

1946
        // Exit early if there are no inputs to be retried.
1947
        if len(inputsToRetry) == 0 {
9✔
1948
                return
4✔
1949
        }
4✔
1950

1951
        log.Debugf("Retry sweeping inputs with updated params: %v",
4✔
1952
                inputTypeSummary(inputsToRetry))
4✔
1953

4✔
1954
        // Get the latest inputs, which should put the PublishFailed inputs back
4✔
1955
        // to the sweeping queue.
4✔
1956
        inputs := s.updateSweeperInputs()
4✔
1957

4✔
1958
        // Immediately sweep the remaining inputs - the previous inputs should
4✔
1959
        // now be swept with the updated StartingFeeRate immediately. We may
4✔
1960
        // also include more inputs in the new sweeping tx if new ones with the
4✔
1961
        // same deadline are offered.
4✔
1962
        s.sweepPendingInputs(inputs)
4✔
1963
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc