• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 17027244024

17 Aug 2025 11:32PM UTC coverage: 57.287% (-9.5%) from 66.765%
17027244024

Pull #10167

github

web-flow
Merge fcb4f4303 into fb1adfc21
Pull Request #10167: multi: bump Go to 1.24.6

3 of 18 new or added lines in 6 files covered. (16.67%)

28537 existing lines in 457 files now uncovered.

99094 of 172978 relevant lines covered (57.29%)

1.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.6
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, ensures this input is
49
        // swept in a transaction by itself, and not batched with any other
50
        // inputs.
51
        ExclusiveGroup *uint64
52

53
        // DeadlineHeight specifies an absolute block height that this input
54
        // should be confirmed by. This value is used by the fee bumper to
55
        // decide its urgency and adjust its feerate used.
56
        DeadlineHeight fn.Option[int32]
57

58
        // Budget specifies the maximum amount of satoshis that can be spent on
59
        // fees for this sweep.
60
        Budget btcutil.Amount
61

62
        // Immediate indicates that the input should be swept immediately
63
        // without waiting for blocks to come to trigger the sweeping of
64
        // inputs.
65
        Immediate bool
66

67
        // StartingFeeRate is an optional parameter that can be used to specify
68
        // the initial fee rate to use for the fee function.
69
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
70
}
71

72
// String returns a human readable interpretation of the sweep parameters.
73
func (p Params) String() string {
3✔
74
        deadline := "none"
3✔
75
        p.DeadlineHeight.WhenSome(func(d int32) {
6✔
76
                deadline = fmt.Sprintf("%d", d)
3✔
77
        })
3✔
78

79
        exclusiveGroup := "none"
3✔
80
        if p.ExclusiveGroup != nil {
6✔
81
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
3✔
82
        }
3✔
83

84
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
3✔
85
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
3✔
86
                p.Immediate, exclusiveGroup, p.Budget, deadline)
3✔
87
}
88

89
// SweepState represents the current state of a pending input.
90
//
91
//nolint:revive
92
type SweepState uint8
93

94
const (
95
        // Init is the initial state of a pending input. This is set when a new
96
        // sweeping request for a given input is made.
97
        Init SweepState = iota
98

99
        // PendingPublish specifies an input's state where it's already been
100
        // included in a sweeping tx but the tx is not published yet.  Inputs
101
        // in this state should not be used for grouping again.
102
        PendingPublish
103

104
        // Published is the state where the input's sweeping tx has
105
        // successfully been published. Inputs in this state can only be
106
        // updated via RBF.
107
        Published
108

109
        // PublishFailed is the state when an error is returned from publishing
110
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
111
        // sweeping tx.
112
        PublishFailed
113

114
        // Swept is the final state of a pending input. This is set when the
115
        // input has been successfully swept.
116
        Swept
117

118
        // Excluded is the state of a pending input that has been excluded and
119
        // can no longer be swept. For instance, when one of the three anchor
120
        // sweeping transactions confirmed, the remaining two will be excluded.
121
        Excluded
122

123
        // Fatal is the final state of a pending input. Inputs ending in this
124
        // state won't be retried. This could happen,
125
        // - when a pending input has too many failed publish attempts;
126
        // - the input has been spent by another party;
127
        // - unknown broadcast error is returned.
128
        Fatal
129
)
130

131
// String gives a human readable text for the sweep states.
132
func (s SweepState) String() string {
3✔
133
        switch s {
3✔
134
        case Init:
3✔
135
                return "Init"
3✔
136

137
        case PendingPublish:
3✔
138
                return "PendingPublish"
3✔
139

140
        case Published:
3✔
141
                return "Published"
3✔
142

143
        case PublishFailed:
3✔
144
                return "PublishFailed"
3✔
145

146
        case Swept:
3✔
147
                return "Swept"
3✔
148

149
        case Excluded:
3✔
150
                return "Excluded"
3✔
151

152
        case Fatal:
2✔
153
                return "Fatal"
2✔
154

155
        default:
×
156
                return "Unknown"
×
157
        }
158
}
159

160
// RBFInfo stores the information required to perform a RBF bump on a pending
161
// sweeping tx.
162
type RBFInfo struct {
163
        // Txid is the txid of the sweeping tx.
164
        Txid chainhash.Hash
165

166
        // FeeRate is the fee rate of the sweeping tx.
167
        FeeRate chainfee.SatPerKWeight
168

169
        // Fee is the total fee of the sweeping tx.
170
        Fee btcutil.Amount
171
}
172

173
// SweeperInput is created when an input reaches the main loop for the first
174
// time. It wraps the input and tracks all relevant state that is needed for
175
// sweeping.
176
type SweeperInput struct {
177
        input.Input
178

179
        // state tracks the current state of the input.
180
        state SweepState
181

182
        // listeners is a list of channels over which the final outcome of the
183
        // sweep needs to be broadcasted.
184
        listeners []chan Result
185

186
        // ntfnRegCancel is populated with a function that cancels the chain
187
        // notifier spend registration.
188
        ntfnRegCancel func()
189

190
        // publishAttempts records the number of attempts that have already been
191
        // made to sweep this tx.
192
        publishAttempts int
193

194
        // params contains the parameters that control the sweeping process.
195
        params Params
196

197
        // lastFeeRate is the most recent fee rate used for this input within a
198
        // transaction broadcast to the network.
199
        lastFeeRate chainfee.SatPerKWeight
200

201
        // rbf records the RBF constraints.
202
        rbf fn.Option[RBFInfo]
203

204
        // DeadlineHeight is the deadline height for this input. This is
205
        // different from the DeadlineHeight in its params as it's an actual
206
        // value than an option.
207
        DeadlineHeight int32
208
}
209

210
// String returns a human readable interpretation of the pending input.
211
func (p *SweeperInput) String() string {
3✔
212
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
3✔
213
}
3✔
214

215
// terminated returns a boolean indicating whether the input has reached a
216
// final state.
217
func (p *SweeperInput) terminated() bool {
3✔
218
        switch p.state {
3✔
219
        // If the input has reached a final state, that it's either
220
        // been swept, or failed, or excluded, we will remove it from
221
        // our sweeper.
222
        case Fatal, Swept, Excluded:
3✔
223
                return true
3✔
224

225
        default:
3✔
226
                return false
3✔
227
        }
228
}
229

230
// isMature returns a boolean indicating whether the input has a timelock that
231
// has been reached or not. The locktime found is also returned.
232
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
3✔
233
        locktime, _ := p.RequiredLockTime()
3✔
234
        if currentHeight < locktime {
6✔
235
                log.Debugf("Input %v has locktime=%v, current height is %v",
3✔
236
                        p, locktime, currentHeight)
3✔
237

3✔
238
                return false, locktime
3✔
239
        }
3✔
240

241
        // If the input has a CSV that's not yet reached, we will skip
242
        // this input and wait for the expiry.
243
        //
244
        // NOTE: We need to consider whether this input can be included in the
245
        // next block or not, which means the CSV will be checked against the
246
        // currentHeight plus one.
247
        locktime = p.BlocksToMaturity() + p.HeightHint()
3✔
248
        if currentHeight+1 < locktime {
6✔
249
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
3✔
250
                        "skipped sweeping", p, locktime, currentHeight)
3✔
251

3✔
252
                return false, locktime
3✔
253
        }
3✔
254

255
        return true, locktime
3✔
256
}
257

258
// InputsMap is a type alias for a set of pending inputs.
259
type InputsMap = map[wire.OutPoint]*SweeperInput
260

261
// inputsMapToString returns a human readable interpretation of the pending
262
// inputs.
263
func inputsMapToString(inputs InputsMap) string {
3✔
264
        if len(inputs) == 0 {
6✔
265
                return ""
3✔
266
        }
3✔
267

268
        inps := make([]input.Input, 0, len(inputs))
3✔
269
        for _, in := range inputs {
6✔
270
                inps = append(inps, in)
3✔
271
        }
3✔
272

273
        return "\n" + inputTypeSummary(inps)
3✔
274
}
275

276
// pendingSweepsReq is an internal message we'll use to represent an external
277
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
278
// attempting to sweep.
279
type pendingSweepsReq struct {
280
        respChan chan map[wire.OutPoint]*PendingInputResponse
281
        errChan  chan error
282
}
283

284
// PendingInputResponse contains information about an input that is currently
285
// being swept by the UtxoSweeper.
286
type PendingInputResponse struct {
287
        // OutPoint is the identify outpoint of the input being swept.
288
        OutPoint wire.OutPoint
289

290
        // WitnessType is the witness type of the input being swept.
291
        WitnessType input.WitnessType
292

293
        // Amount is the amount of the input being swept.
294
        Amount btcutil.Amount
295

296
        // LastFeeRate is the most recent fee rate used for the input being
297
        // swept within a transaction broadcast to the network.
298
        LastFeeRate chainfee.SatPerKWeight
299

300
        // BroadcastAttempts is the number of attempts we've made to sweept the
301
        // input.
302
        BroadcastAttempts int
303

304
        // Params contains the sweep parameters for this pending request.
305
        Params Params
306

307
        // DeadlineHeight records the deadline height of this input.
308
        DeadlineHeight uint32
309

310
        // MaturityHeight is the block height that this input's locktime will
311
        // be expired at. For inputs with no locktime this value is zero.
312
        MaturityHeight uint32
313
}
314

315
// updateReq is an internal message we'll use to represent an external caller's
316
// intent to update the sweep parameters of a given input.
317
type updateReq struct {
318
        input        wire.OutPoint
319
        params       Params
320
        responseChan chan *updateResp
321
}
322

323
// updateResp is an internal message we'll use to hand off the response of a
324
// updateReq from the UtxoSweeper's main event loop back to the caller.
325
type updateResp struct {
326
        resultChan chan Result
327
        err        error
328
}
329

330
// UtxoSweeper is responsible for sweeping outputs back into the wallet
331
type UtxoSweeper struct {
332
        started uint32 // To be used atomically.
333
        stopped uint32 // To be used atomically.
334

335
        // Embed the blockbeat consumer struct to get access to the method
336
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
337
        chainio.BeatConsumer
338

339
        cfg *UtxoSweeperConfig
340

341
        newInputs chan *sweepInputMessage
342
        spendChan chan *chainntnfs.SpendDetail
343

344
        // pendingSweepsReq is a channel that will be sent requests by external
345
        // callers in order to retrieve the set of pending inputs the
346
        // UtxoSweeper is attempting to sweep.
347
        pendingSweepsReqs chan *pendingSweepsReq
348

349
        // updateReqs is a channel that will be sent requests by external
350
        // callers who wish to bump the fee rate of a given input.
351
        updateReqs chan *updateReq
352

353
        // inputs is the total set of inputs the UtxoSweeper has been requested
354
        // to sweep.
355
        inputs InputsMap
356

357
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
358

359
        relayFeeRate chainfee.SatPerKWeight
360

361
        quit chan struct{}
362
        wg   sync.WaitGroup
363

364
        // currentHeight is the best known height of the main chain. This is
365
        // updated whenever a new block epoch is received.
366
        currentHeight int32
367

368
        // bumpRespChan is a channel that receives broadcast results from the
369
        // TxPublisher.
370
        bumpRespChan chan *bumpResp
371
}
372

373
// Compile-time check for the chainio.Consumer interface.
374
var _ chainio.Consumer = (*UtxoSweeper)(nil)
375

376
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
377
type UtxoSweeperConfig struct {
378
        // GenSweepScript generates a P2WKH script belonging to the wallet where
379
        // funds can be swept.
380
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
381

382
        // FeeEstimator is used when crafting sweep transactions to estimate
383
        // the necessary fee relative to the expected size of the sweep
384
        // transaction.
385
        FeeEstimator chainfee.Estimator
386

387
        // Wallet contains the wallet functions that sweeper requires.
388
        Wallet Wallet
389

390
        // Notifier is an instance of a chain notifier we'll use to watch for
391
        // certain on-chain events.
392
        Notifier chainntnfs.ChainNotifier
393

394
        // Mempool is the mempool watcher that will be used to query whether a
395
        // given input is already being spent by a transaction in the mempool.
396
        Mempool chainntnfs.MempoolWatcher
397

398
        // Store stores the published sweeper txes.
399
        Store SweeperStore
400

401
        // Signer is used by the sweeper to generate valid witnesses at the
402
        // time the incubated outputs need to be spent.
403
        Signer input.Signer
404

405
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
406
        // in a single sweep tx. If more need to be swept, multiple txes are
407
        // created and published.
408
        MaxInputsPerTx uint32
409

410
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
411
        MaxFeeRate chainfee.SatPerVByte
412

413
        // Aggregator is used to group inputs into clusters based on its
414
        // implemention-specific strategy.
415
        Aggregator UtxoAggregator
416

417
        // Publisher is used to publish the sweep tx crafted here and monitors
418
        // it for potential fee bumps.
419
        Publisher Bumper
420

421
        // NoDeadlineConfTarget is the conf target to use when sweeping
422
        // non-time-sensitive outputs.
423
        NoDeadlineConfTarget uint32
424
}
425

426
// Result is the struct that is pushed through the result channel. Callers can
427
// use this to be informed of the final sweep result. In case of a remote
428
// spend, Err will be ErrRemoteSpend.
429
type Result struct {
430
        // Err is the final result of the sweep. It is nil when the input is
431
        // swept successfully by us. ErrRemoteSpend is returned when another
432
        // party took the input.
433
        Err error
434

435
        // Tx is the transaction that spent the input.
436
        Tx *wire.MsgTx
437
}
438

439
// sweepInputMessage structs are used in the internal channel between the
440
// SweepInput call and the sweeper main loop.
441
type sweepInputMessage struct {
442
        input      input.Input
443
        params     Params
444
        resultChan chan Result
445
}
446

447
// New returns a new Sweeper instance.
448
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
3✔
449
        s := &UtxoSweeper{
3✔
450
                cfg:               cfg,
3✔
451
                newInputs:         make(chan *sweepInputMessage),
3✔
452
                spendChan:         make(chan *chainntnfs.SpendDetail),
3✔
453
                updateReqs:        make(chan *updateReq),
3✔
454
                pendingSweepsReqs: make(chan *pendingSweepsReq),
3✔
455
                quit:              make(chan struct{}),
3✔
456
                inputs:            make(InputsMap),
3✔
457
                bumpRespChan:      make(chan *bumpResp, 100),
3✔
458
        }
3✔
459

3✔
460
        // Mount the block consumer.
3✔
461
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
3✔
462

3✔
463
        return s
3✔
464
}
3✔
465

466
// Start starts the process of constructing and publish sweep txes.
467
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
3✔
468
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
3✔
469
                return nil
×
470
        }
×
471

472
        log.Info("Sweeper starting")
3✔
473

3✔
474
        // Retrieve relay fee for dust limit calculation. Assume that this will
3✔
475
        // not change from here on.
3✔
476
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
3✔
477

3✔
478
        // Set the current height.
3✔
479
        s.currentHeight = beat.Height()
3✔
480

3✔
481
        // Start sweeper main loop.
3✔
482
        s.wg.Add(1)
3✔
483
        go s.collector()
3✔
484

3✔
485
        return nil
3✔
486
}
487

488
// RelayFeePerKW returns the minimum fee rate required for transactions to be
489
// relayed.
490
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
491
        return s.relayFeeRate
×
492
}
×
493

494
// Stop stops sweeper from listening to block epochs and constructing sweep
495
// txes.
496
func (s *UtxoSweeper) Stop() error {
3✔
497
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
3✔
498
                return nil
×
499
        }
×
500

501
        log.Info("Sweeper shutting down...")
3✔
502
        defer log.Debug("Sweeper shutdown complete")
3✔
503

3✔
504
        close(s.quit)
3✔
505
        s.wg.Wait()
3✔
506

3✔
507
        return nil
3✔
508
}
509

510
// NOTE: part of the `chainio.Consumer` interface.
511
func (s *UtxoSweeper) Name() string {
3✔
512
        return "UtxoSweeper"
3✔
513
}
3✔
514

515
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
516
// swept after the batch time window ends. A custom fee preference can be
517
// provided to determine what fee rate should be used for the input. Note that
518
// the input may not always be swept with this exact value, as its possible for
519
// it to be batched under the same transaction with other similar fee rate
520
// inputs.
521
//
522
// NOTE: Extreme care needs to be taken that input isn't changed externally.
523
// Because it is an interface and we don't know what is exactly behind it, we
524
// cannot make a local copy in sweeper.
525
//
526
// TODO(yy): make sure the caller is using the Result chan.
527
func (s *UtxoSweeper) SweepInput(inp input.Input,
528
        params Params) (chan Result, error) {
3✔
529

3✔
530
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
3✔
531
                inp.SignDesc() == nil {
3✔
532

×
533
                return nil, errors.New("nil input received")
×
534
        }
×
535

536
        absoluteTimeLock, _ := inp.RequiredLockTime()
3✔
537
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
3✔
538
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
3✔
539
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
3✔
540
                inp.BlocksToMaturity(), absoluteTimeLock,
3✔
541
                btcutil.Amount(inp.SignDesc().Output.Value),
3✔
542
                inp.UnconfParent(), params)
3✔
543

3✔
544
        sweeperInput := &sweepInputMessage{
3✔
545
                input:      inp,
3✔
546
                params:     params,
3✔
547
                resultChan: make(chan Result, 1),
3✔
548
        }
3✔
549

3✔
550
        // Deliver input to the main event loop.
3✔
551
        select {
3✔
552
        case s.newInputs <- sweeperInput:
3✔
553
        case <-s.quit:
×
554
                return nil, ErrSweeperShuttingDown
×
555
        }
556

557
        return sweeperInput.resultChan, nil
3✔
558
}
559

560
// removeConflictSweepDescendants removes any transactions from the wallet that
561
// spend outputs included in the passed outpoint set. This needs to be done in
562
// cases where we're not the only ones that can sweep an output, but there may
563
// exist unconfirmed spends that spend outputs created by a sweep transaction.
564
// The most common case for this is when someone sweeps our anchor outputs
565
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
566
// as a backend when a channel is force closed and anchor cpfp txns are
567
// created to bump the initial commitment transaction. In this case an anchor
568
// cpfp is broadcasted for up to 3 commitment transactions (local,
569
// remote-dangling, remote). Using neutrino all of those transactions will be
570
// accepted (the commitment tx will be different in all of those cases) and have
571
// to be removed as soon as one of them confirmes (they do have the same
572
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
573
// nodes do not signal invalid transactions anymore.
574
func (s *UtxoSweeper) removeConflictSweepDescendants(
575
        outpoints map[wire.OutPoint]struct{}) error {
3✔
576

3✔
577
        // Obtain all the past sweeps that we've done so far. We'll need these
3✔
578
        // to ensure that if the spendingTx spends any of the same inputs, then
3✔
579
        // we remove any transaction that may be spending those inputs from the
3✔
580
        // wallet.
3✔
581
        //
3✔
582
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
3✔
583
        // from the store?
3✔
584
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
3✔
585
        if err != nil {
3✔
586
                return err
×
587
        }
×
588

589
        // We'll now go through each past transaction we published during this
590
        // epoch and cross reference the spent inputs. If there're any inputs
591
        // in common with the inputs the spendingTx spent, then we'll remove
592
        // those.
593
        //
594
        // TODO(roasbeef): need to start to remove all transaction hashes after
595
        // every N blocks (assumed point of no return)
596
        for _, sweepHash := range pastSweepHashes {
6✔
597
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
3✔
598
                if err != nil {
5✔
599
                        return err
2✔
600
                }
2✔
601

602
                // Transaction wasn't found in the wallet, may have already
603
                // been replaced/removed.
604
                if sweepTx == nil {
3✔
605
                        // If it was removed, then we'll play it safe and mark
×
606
                        // it as no longer need to be rebroadcasted.
×
607
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
608
                        continue
×
609
                }
610

611
                // Check to see if this past sweep transaction spent any of the
612
                // same inputs as spendingTx.
613
                var isConflicting bool
3✔
614
                for _, txIn := range sweepTx.TxIn {
6✔
615
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
6✔
616
                                isConflicting = true
3✔
617
                                break
3✔
618
                        }
619
                }
620

621
                if !isConflicting {
6✔
622
                        continue
3✔
623
                }
624

625
                // If it is conflicting, then we'll signal the wallet to remove
626
                // all the transactions that are descendants of outputs created
627
                // by the sweepTx and the sweepTx itself.
628
                log.Debugf("Removing sweep txid=%v from wallet: %v",
3✔
629
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
3✔
630

3✔
631
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
3✔
632
                if err != nil {
3✔
633
                        log.Warnf("Unable to remove descendants: %v", err)
×
634
                }
×
635

636
                // If this transaction was conflicting, then we'll stop
637
                // rebroadcasting it in the background.
638
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
3✔
639
        }
640

641
        return nil
3✔
642
}
643

644
// collector is the sweeper main loop. It processes new inputs, spend
645
// notifications and counts down to publication of the sweep tx.
646
func (s *UtxoSweeper) collector() {
3✔
647
        defer s.wg.Done()
3✔
648

3✔
649
        for {
6✔
650
                // Clean inputs, which will remove inputs that are swept,
3✔
651
                // failed, or excluded from the sweeper and return inputs that
3✔
652
                // are either new or has been published but failed back, which
3✔
653
                // will be retried again here.
3✔
654
                s.updateSweeperInputs()
3✔
655

3✔
656
                select {
3✔
657
                // A new inputs is offered to the sweeper. We check to see if
658
                // we are already trying to sweep this input and if not, set up
659
                // a listener to spend and schedule a sweep.
660
                case input := <-s.newInputs:
3✔
661
                        err := s.handleNewInput(input)
3✔
662
                        if err != nil {
3✔
663
                                log.Criticalf("Unable to handle new input: %v",
×
664
                                        err)
×
665

×
666
                                return
×
667
                        }
×
668

669
                        // If this input is forced, we perform an sweep
670
                        // immediately.
671
                        //
672
                        // TODO(ziggie): Make sure when `immediate` is selected
673
                        // as a parameter that we only trigger the sweeping of
674
                        // this specific input rather than triggering the sweeps
675
                        // of all current pending inputs registered with the
676
                        // sweeper.
677
                        if input.params.Immediate {
6✔
678
                                inputs := s.updateSweeperInputs()
3✔
679
                                s.sweepPendingInputs(inputs)
3✔
680
                        }
3✔
681

682
                // A spend of one of our inputs is detected. Signal sweep
683
                // results to the caller(s).
684
                case spend := <-s.spendChan:
3✔
685
                        s.handleInputSpent(spend)
3✔
686

687
                // A new external request has been received to retrieve all of
688
                // the inputs we're currently attempting to sweep.
689
                case req := <-s.pendingSweepsReqs:
3✔
690
                        s.handlePendingSweepsReq(req)
3✔
691

692
                // A new external request has been received to bump the fee rate
693
                // of a given input.
694
                case req := <-s.updateReqs:
3✔
695
                        resultChan, err := s.handleUpdateReq(req)
3✔
696
                        req.responseChan <- &updateResp{
3✔
697
                                resultChan: resultChan,
3✔
698
                                err:        err,
3✔
699
                        }
3✔
700

3✔
701
                        // Perform an sweep immediately if asked.
3✔
702
                        if req.params.Immediate {
6✔
703
                                inputs := s.updateSweeperInputs()
3✔
704
                                s.sweepPendingInputs(inputs)
3✔
705
                        }
3✔
706

707
                case resp := <-s.bumpRespChan:
3✔
708
                        // Handle the bump event.
3✔
709
                        err := s.handleBumpEvent(resp)
3✔
710
                        if err != nil {
6✔
711
                                log.Errorf("Failed to handle bump event: %v",
3✔
712
                                        err)
3✔
713
                        }
3✔
714

715
                // A new block comes in, update the bestHeight, perform a check
716
                // over all pending inputs and publish sweeping txns if needed.
717
                case beat := <-s.BlockbeatChan:
3✔
718
                        // Update the sweeper to the best height.
3✔
719
                        s.currentHeight = beat.Height()
3✔
720

3✔
721
                        // Update the inputs with the latest height.
3✔
722
                        inputs := s.updateSweeperInputs()
3✔
723

3✔
724
                        log.Debugf("Received new block: height=%v, attempt "+
3✔
725
                                "sweeping %d inputs:%s", s.currentHeight,
3✔
726
                                len(inputs),
3✔
727
                                lnutils.NewLogClosure(func() string {
6✔
728
                                        return inputsMapToString(inputs)
3✔
729
                                }))
3✔
730

731
                        // Attempt to sweep any pending inputs.
732
                        s.sweepPendingInputs(inputs)
3✔
733

3✔
734
                        // Notify we've processed the block.
3✔
735
                        s.NotifyBlockProcessed(beat, nil)
3✔
736

737
                case <-s.quit:
3✔
738
                        return
3✔
739
                }
740
        }
741
}
742

743
// removeExclusiveGroup removes all inputs in the given exclusive group except
744
// the input specified by the outpoint. This function is called when one of the
745
// exclusive group inputs has been spent or updated. The other inputs won't ever
746
// be spendable and can be removed. This also prevents them from being part of
747
// future sweep transactions that would fail. In addition sweep transactions of
748
// those inputs will be removed from the wallet.
749
func (s *UtxoSweeper) removeExclusiveGroup(group uint64, op wire.OutPoint) {
3✔
750
        for outpoint, input := range s.inputs {
6✔
751
                outpoint := outpoint
3✔
752

3✔
753
                // Skip the input that caused the exclusive group to be removed.
3✔
754
                if outpoint == op {
6✔
755
                        log.Debugf("Skipped removing exclusive input %v", input)
3✔
756

3✔
757
                        continue
3✔
758
                }
759

760
                // Skip inputs that aren't exclusive.
761
                if input.params.ExclusiveGroup == nil {
6✔
762
                        continue
3✔
763
                }
764

765
                // Skip inputs from other exclusive groups.
766
                if *input.params.ExclusiveGroup != group {
3✔
767
                        continue
×
768
                }
769

770
                // Skip inputs that are already terminated.
771
                if input.terminated() {
3✔
772
                        log.Tracef("Skipped sending error result for "+
×
773
                                "input %v, state=%v", outpoint, input.state)
×
774

×
775
                        continue
×
776
                }
777

778
                log.Debugf("Removing exclusive group for input %v", input)
3✔
779

3✔
780
                // Signal result channels.
3✔
781
                s.signalResult(input, Result{
3✔
782
                        Err: ErrExclusiveGroupSpend,
3✔
783
                })
3✔
784

3✔
785
                // Update the input's state as it can no longer be swept.
3✔
786
                input.state = Excluded
3✔
787

3✔
788
                // Remove all unconfirmed transactions from the wallet which
3✔
789
                // spend the passed outpoint of the same exclusive group.
3✔
790
                outpoints := map[wire.OutPoint]struct{}{
3✔
791
                        outpoint: {},
3✔
792
                }
3✔
793
                err := s.removeConflictSweepDescendants(outpoints)
3✔
794
                if err != nil {
4✔
795
                        log.Warnf("Unable to remove conflicting sweep tx from "+
1✔
796
                                "wallet for outpoint %v : %v", outpoint, err)
1✔
797
                }
1✔
798
        }
799
}
800

801
// signalResult notifies the listeners of the final result of the input sweep.
802
// It also cancels any pending spend notification.
803
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
3✔
804
        op := pi.OutPoint()
3✔
805
        listeners := pi.listeners
3✔
806

3✔
807
        if result.Err == nil {
6✔
808
                log.Tracef("Dispatching sweep success for %v to %v listeners",
3✔
809
                        op, len(listeners),
3✔
810
                )
3✔
811
        } else {
6✔
812
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
3✔
813
                        op, len(listeners), result.Err,
3✔
814
                )
3✔
815
        }
3✔
816

817
        // Signal all listeners. Channel is buffered. Because we only send once
818
        // on every channel, it should never block.
819
        for _, resultChan := range listeners {
6✔
820
                resultChan <- result
3✔
821
        }
3✔
822

823
        // Cancel spend notification with chain notifier. This is not necessary
824
        // in case of a success, except for that a reorg could still happen.
825
        if pi.ntfnRegCancel != nil {
6✔
826
                log.Debugf("Canceling spend ntfn for %v", op)
3✔
827

3✔
828
                pi.ntfnRegCancel()
3✔
829
        }
3✔
830
}
831

832
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
833
// the tx. The output address is only marked as used if the publish succeeds.
834
func (s *UtxoSweeper) sweep(set InputSet) error {
3✔
835
        // Generate an output script if there isn't an unused script available.
3✔
836
        if s.currentOutputScript.IsNone() {
6✔
837
                addr, err := s.cfg.GenSweepScript().Unpack()
3✔
838
                if err != nil {
3✔
839
                        return fmt.Errorf("gen sweep script: %w", err)
×
840
                }
×
841
                s.currentOutputScript = fn.Some(addr)
3✔
842

3✔
843
                log.Debugf("Created sweep DeliveryAddress %x",
3✔
844
                        addr.DeliveryAddress)
3✔
845
        }
846

847
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
3✔
848
                fmt.Errorf("none sweep script"),
3✔
849
        )
3✔
850
        if err != nil {
3✔
851
                return err
×
852
        }
×
853

854
        // Create a fee bump request and ask the publisher to broadcast it. The
855
        // publisher will then take over and start monitoring the tx for
856
        // potential fee bump.
857
        req := &BumpRequest{
3✔
858
                Inputs:          set.Inputs(),
3✔
859
                Budget:          set.Budget(),
3✔
860
                DeadlineHeight:  set.DeadlineHeight(),
3✔
861
                DeliveryAddress: sweepAddr,
3✔
862
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
3✔
863
                StartingFeeRate: set.StartingFeeRate(),
3✔
864
                Immediate:       set.Immediate(),
3✔
865
                // TODO(yy): pass the strategy here.
3✔
866
        }
3✔
867

3✔
868
        // Reschedule the inputs that we just tried to sweep. This is done in
3✔
869
        // case the following publish fails, we'd like to update the inputs'
3✔
870
        // publish attempts and rescue them in the next sweep.
3✔
871
        s.markInputsPendingPublish(set)
3✔
872

3✔
873
        // Broadcast will return a read-only chan that we will listen to for
3✔
874
        // this publish result and future RBF attempt.
3✔
875
        resp := s.cfg.Publisher.Broadcast(req)
3✔
876

3✔
877
        // Successfully sent the broadcast attempt, we now handle the result by
3✔
878
        // subscribing to the result chan and listen for future updates about
3✔
879
        // this tx.
3✔
880
        s.wg.Add(1)
3✔
881
        go s.monitorFeeBumpResult(set, resp)
3✔
882

3✔
883
        return nil
3✔
884
}
885

886
// markInputsPendingPublish updates the pending inputs with the given tx
887
// inputs. It also increments the `publishAttempts`.
888
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
889
        // Reschedule sweep.
3✔
890
        for _, input := range set.Inputs() {
6✔
891
                op := input.OutPoint()
3✔
892
                pi, ok := s.inputs[op]
3✔
893
                if !ok {
6✔
894
                        // It could be that this input is an additional wallet
3✔
895
                        // input that was attached. In that case there also
3✔
896
                        // isn't a pending input to update.
3✔
897
                        log.Tracef("Skipped marking input as pending "+
3✔
898
                                "published: %v not found in pending inputs", op)
3✔
899

3✔
900
                        continue
3✔
901
                }
902

903
                // If this input has already terminated, there's clearly
904
                // something wrong as it would have been removed. In this case
905
                // we log an error and skip marking this input as pending
906
                // publish.
907
                if pi.terminated() {
3✔
UNCOV
908
                        log.Errorf("Expect input %v to not have terminated "+
×
UNCOV
909
                                "state, instead it has %v", op, pi.state)
×
UNCOV
910

×
UNCOV
911
                        continue
×
912
                }
913

914
                // Update the input's state.
915
                pi.state = PendingPublish
3✔
916

3✔
917
                // Record another publish attempt.
3✔
918
                pi.publishAttempts++
3✔
919
        }
920
}
921

922
// markInputsPublished updates the sweeping tx in db and marks the list of
923
// inputs as published.
924
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
3✔
925
        // Mark this tx in db once successfully published.
3✔
926
        //
3✔
927
        // NOTE: this will behave as an overwrite, which is fine as the record
3✔
928
        // is small.
3✔
929
        tr.Published = true
3✔
930
        err := s.cfg.Store.StoreTx(tr)
3✔
931
        if err != nil {
3✔
UNCOV
932
                return fmt.Errorf("store tx: %w", err)
×
UNCOV
933
        }
×
934

935
        // Reschedule sweep.
936
        for _, input := range set.Inputs() {
6✔
937
                op := input.OutPoint()
3✔
938
                pi, ok := s.inputs[op]
3✔
939
                if !ok {
6✔
940
                        // It could be that this input is an additional wallet
3✔
941
                        // input that was attached. In that case there also
3✔
942
                        // isn't a pending input to update.
3✔
943
                        log.Tracef("Skipped marking input as published: %v "+
3✔
944
                                "not found in pending inputs", op)
3✔
945

3✔
946
                        continue
3✔
947
                }
948

949
                // Valdiate that the input is in an expected state.
950
                if pi.state != PendingPublish {
6✔
951
                        // We may get a Published if this is a replacement tx.
3✔
952
                        log.Debugf("Expect input %v to have %v, instead it "+
3✔
953
                                "has %v", op, PendingPublish, pi.state)
3✔
954

3✔
955
                        continue
3✔
956
                }
957

958
                // Update the input's state.
959
                pi.state = Published
3✔
960

3✔
961
                // Update the input's latest fee rate.
3✔
962
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
963
        }
964

965
        return nil
3✔
966
}
967

968
// markInputsPublishFailed marks the list of inputs as failed to be published.
969
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet,
970
        feeRate chainfee.SatPerKWeight) {
3✔
971

3✔
972
        // Reschedule sweep.
3✔
973
        for _, inp := range set.Inputs() {
6✔
974
                op := inp.OutPoint()
3✔
975
                pi, ok := s.inputs[op]
3✔
976
                if !ok {
6✔
977
                        // It could be that this input is an additional wallet
3✔
978
                        // input that was attached. In that case there also
3✔
979
                        // isn't a pending input to update.
3✔
980
                        log.Tracef("Skipped marking input as publish failed: "+
3✔
981
                                "%v not found in pending inputs", op)
3✔
982

3✔
983
                        continue
3✔
984
                }
985

986
                // Valdiate that the input is in an expected state.
987
                if pi.state != PendingPublish && pi.state != Published {
3✔
UNCOV
988
                        log.Debugf("Expect input %v to have %v, instead it "+
×
UNCOV
989
                                "has %v", op, PendingPublish, pi.state)
×
UNCOV
990

×
UNCOV
991
                        continue
×
992
                }
993

994
                log.Warnf("Failed to publish input %v", op)
3✔
995

3✔
996
                // Update the input's state.
3✔
997
                pi.state = PublishFailed
3✔
998

3✔
999
                log.Debugf("Input(%v): updating params: starting fee rate "+
3✔
1000
                        "[%v -> %v]", op, pi.params.StartingFeeRate,
3✔
1001
                        feeRate)
3✔
1002

3✔
1003
                // Update the input using the fee rate specified from the
3✔
1004
                // BumpResult, which should be the starting fee rate to use for
3✔
1005
                // the next sweeping attempt.
3✔
1006
                pi.params.StartingFeeRate = fn.Some(feeRate)
3✔
1007
        }
1008
}
1009

1010
// monitorSpend registers a spend notification with the chain notifier. It
1011
// returns a cancel function that can be used to cancel the registration.
1012
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
1013
        script []byte, heightHint uint32) (func(), error) {
3✔
1014

3✔
1015
        log.Tracef("Wait for spend of %v at heightHint=%v",
3✔
1016
                outpoint, heightHint)
3✔
1017

3✔
1018
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
3✔
1019
                &outpoint, script, heightHint,
3✔
1020
        )
3✔
1021
        if err != nil {
3✔
1022
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
1023
        }
×
1024

1025
        s.wg.Add(1)
3✔
1026
        go func() {
6✔
1027
                defer s.wg.Done()
3✔
1028

3✔
1029
                select {
3✔
1030
                case spend, ok := <-spendEvent.Spend:
3✔
1031
                        if !ok {
6✔
1032
                                log.Debugf("Spend ntfn for %v canceled",
3✔
1033
                                        outpoint)
3✔
1034
                                return
3✔
1035
                        }
3✔
1036

1037
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
3✔
1038

3✔
1039
                        select {
3✔
1040
                        case s.spendChan <- spend:
3✔
1041
                                log.Debugf("Delivered spend ntfn for %v",
3✔
1042
                                        outpoint)
3✔
1043

1044
                        case <-s.quit:
×
1045
                        }
1046
                case <-s.quit:
3✔
1047
                }
1048
        }()
1049

1050
        return spendEvent.Cancel, nil
3✔
1051
}
1052

1053
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1054
// attempting to sweep.
1055
func (s *UtxoSweeper) PendingInputs() (
1056
        map[wire.OutPoint]*PendingInputResponse, error) {
3✔
1057

3✔
1058
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
3✔
1059
        errChan := make(chan error, 1)
3✔
1060
        select {
3✔
1061
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1062
                respChan: respChan,
1063
                errChan:  errChan,
1064
        }:
3✔
1065
        case <-s.quit:
×
1066
                return nil, ErrSweeperShuttingDown
×
1067
        }
1068

1069
        select {
3✔
1070
        case pendingSweeps := <-respChan:
3✔
1071
                return pendingSweeps, nil
3✔
1072
        case err := <-errChan:
×
1073
                return nil, err
×
1074
        case <-s.quit:
×
1075
                return nil, ErrSweeperShuttingDown
×
1076
        }
1077
}
1078

1079
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1080
// UtxoSweeper is attempting to sweep.
1081
func (s *UtxoSweeper) handlePendingSweepsReq(
1082
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
3✔
1083

3✔
1084
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
3✔
1085
        for _, inp := range s.inputs {
6✔
1086
                _, maturityHeight := inp.isMature(uint32(s.currentHeight))
3✔
1087

3✔
1088
                // Only the exported fields are set, as we expect the response
3✔
1089
                // to only be consumed externally.
3✔
1090
                op := inp.OutPoint()
3✔
1091
                resps[op] = &PendingInputResponse{
3✔
1092
                        OutPoint:    op,
3✔
1093
                        WitnessType: inp.WitnessType(),
3✔
1094
                        Amount: btcutil.Amount(
3✔
1095
                                inp.SignDesc().Output.Value,
3✔
1096
                        ),
3✔
1097
                        LastFeeRate:       inp.lastFeeRate,
3✔
1098
                        BroadcastAttempts: inp.publishAttempts,
3✔
1099
                        Params:            inp.params,
3✔
1100
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
3✔
1101
                        MaturityHeight:    maturityHeight,
3✔
1102
                }
3✔
1103
        }
3✔
1104

1105
        select {
3✔
1106
        case req.respChan <- resps:
3✔
1107
        case <-s.quit:
×
1108
                log.Debug("Skipped sending pending sweep response due to " +
×
1109
                        "UtxoSweeper shutting down")
×
1110
        }
1111

1112
        return resps
3✔
1113
}
1114

1115
// UpdateParams allows updating the sweep parameters of a pending input in the
1116
// UtxoSweeper. This function can be used to provide an updated fee preference
1117
// and force flag that will be used for a new sweep transaction of the input
1118
// that will act as a replacement transaction (RBF) of the original sweeping
1119
// transaction, if any. The exclusive group is left unchanged.
1120
//
1121
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1122
// is actually successful. The responsibility of doing so should be handled by
1123
// the caller.
1124
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
1125
        params Params) (chan Result, error) {
3✔
1126

3✔
1127
        responseChan := make(chan *updateResp, 1)
3✔
1128
        select {
3✔
1129
        case s.updateReqs <- &updateReq{
1130
                input:        input,
1131
                params:       params,
1132
                responseChan: responseChan,
1133
        }:
3✔
1134
        case <-s.quit:
×
1135
                return nil, ErrSweeperShuttingDown
×
1136
        }
1137

1138
        select {
3✔
1139
        case response := <-responseChan:
3✔
1140
                return response.resultChan, response.err
3✔
1141
        case <-s.quit:
×
1142
                return nil, ErrSweeperShuttingDown
×
1143
        }
1144
}
1145

1146
// handleUpdateReq handles an update request by simply updating the sweep
1147
// parameters of the pending input. Currently, no validation is done on the new
1148
// fee preference to ensure it will properly create a replacement transaction.
1149
//
1150
// TODO(wilmer):
1151
//   - Validate fee preference to ensure we'll create a valid replacement
1152
//     transaction to allow the new fee rate to propagate throughout the
1153
//     network.
1154
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1155
//     did not exist in the original sweep transaction, resulting in an invalid
1156
//     replacement transaction.
1157
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
1158
        chan Result, error) {
3✔
1159

3✔
1160
        // If the UtxoSweeper is already trying to sweep this input, then we can
3✔
1161
        // simply just increase its fee rate. This will allow the input to be
3✔
1162
        // batched with others which also have a similar fee rate, creating a
3✔
1163
        // higher fee rate transaction that replaces the original input's
3✔
1164
        // sweeping transaction.
3✔
1165
        sweeperInput, ok := s.inputs[req.input]
3✔
1166
        if !ok {
3✔
1167
                return nil, lnwallet.ErrNotMine
×
1168
        }
×
1169

1170
        // Create the updated parameters struct. Leave the exclusive group
1171
        // unchanged.
1172
        newParams := Params{
3✔
1173
                StartingFeeRate: req.params.StartingFeeRate,
3✔
1174
                Immediate:       req.params.Immediate,
3✔
1175
                Budget:          req.params.Budget,
3✔
1176
                DeadlineHeight:  req.params.DeadlineHeight,
3✔
1177
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
3✔
1178
        }
3✔
1179

3✔
1180
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
3✔
1181
                req.input, sweeperInput.state, sweeperInput.params, newParams)
3✔
1182

3✔
1183
        sweeperInput.params = newParams
3✔
1184

3✔
1185
        // We need to reset the state so this input will be attempted again by
3✔
1186
        // our sweeper.
3✔
1187
        //
3✔
1188
        // TODO(yy): a dedicated state?
3✔
1189
        sweeperInput.state = Init
3✔
1190

3✔
1191
        // If the new input specifies a deadline, update the deadline height.
3✔
1192
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
3✔
1193
                sweeperInput.DeadlineHeight,
3✔
1194
        )
3✔
1195

3✔
1196
        resultChan := make(chan Result, 1)
3✔
1197
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
3✔
1198

3✔
1199
        return resultChan, nil
3✔
1200
}
1201

1202
// ListSweeps returns a list of the sweeps recorded by the sweep store.
1203
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
3✔
1204
        return s.cfg.Store.ListSweeps()
3✔
1205
}
3✔
1206

1207
// mempoolLookup takes an input's outpoint and queries the mempool to see
1208
// whether it's already been spent in a transaction found in the mempool.
1209
// Returns the transaction if found.
1210
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
3✔
1211
        // For neutrino backend, there's no mempool available, so we exit
3✔
1212
        // early.
3✔
1213
        if s.cfg.Mempool == nil {
4✔
1214
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1215

1✔
1216
                return fn.None[wire.MsgTx]()
1✔
1217
        }
1✔
1218

1219
        // Query this input in the mempool. If this outpoint is already spent
1220
        // in mempool, we should get a spending event back immediately.
1221
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
2✔
1222
}
1223

1224
// calculateDefaultDeadline calculates the default deadline height for a sweep
1225
// request that has no deadline height specified.
1226
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
3✔
1227
        // Create a default deadline height, which will be used when there's no
3✔
1228
        // DeadlineHeight specified for a given input.
3✔
1229
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
3✔
1230

3✔
1231
        // If the input is immature and has a locktime, we'll use the locktime
3✔
1232
        // height as the starting height.
3✔
1233
        matured, locktime := pi.isMature(uint32(s.currentHeight))
3✔
1234
        if !matured {
6✔
1235
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
3✔
1236
                log.Debugf("Input %v is immature, using locktime=%v instead "+
3✔
1237
                        "of current height=%d as starting height",
3✔
1238
                        pi.OutPoint(), locktime, s.currentHeight)
3✔
1239
        }
3✔
1240

1241
        return defaultDeadline
3✔
1242
}
1243

1244
// handleNewInput processes a new input by registering spend notification and
1245
// scheduling sweeping for it.
1246
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
3✔
1247
        outpoint := input.input.OutPoint()
3✔
1248
        pi, pending := s.inputs[outpoint]
3✔
1249
        if pending {
6✔
1250
                log.Infof("Already has pending input %v received, old params: "+
3✔
1251
                        "%v, new params %v", outpoint, pi.params, input.params)
3✔
1252

3✔
1253
                s.handleExistingInput(input, pi)
3✔
1254

3✔
1255
                return nil
3✔
1256
        }
3✔
1257

1258
        // This is a new input, and we want to query the mempool to see if this
1259
        // input has already been spent. If so, we'll start the input with the
1260
        // RBFInfo.
1261
        rbfInfo := s.decideRBFInfo(input.input.OutPoint())
3✔
1262

3✔
1263
        // Create a new pendingInput and initialize the listeners slice with
3✔
1264
        // the passed in result channel. If this input is offered for sweep
3✔
1265
        // again, the result channel will be appended to this slice.
3✔
1266
        pi = &SweeperInput{
3✔
1267
                state:     Init,
3✔
1268
                listeners: []chan Result{input.resultChan},
3✔
1269
                Input:     input.input,
3✔
1270
                params:    input.params,
3✔
1271
                rbf:       rbfInfo,
3✔
1272
        }
3✔
1273

3✔
1274
        // Set the starting fee rate if a previous sweeping tx is found.
3✔
1275
        rbfInfo.WhenSome(func(info RBFInfo) {
5✔
1276
                pi.params.StartingFeeRate = fn.Some(info.FeeRate)
2✔
1277
        })
2✔
1278

1279
        // Set the acutal deadline height.
1280
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1281
                s.calculateDefaultDeadline(pi),
3✔
1282
        )
3✔
1283

3✔
1284
        s.inputs[outpoint] = pi
3✔
1285
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
3✔
1286

3✔
1287
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
3✔
1288
                "witness_type=%v, amount=%v, deadline=%d, state=%v, "+
3✔
1289
                "params=(%v)", s.currentHeight, pi.OutPoint(), pi.WitnessType(),
3✔
1290
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
3✔
1291
                pi.state, pi.params)
3✔
1292

3✔
1293
        // Start watching for spend of this input, either by us or the remote
3✔
1294
        // party.
3✔
1295
        cancel, err := s.monitorSpend(
3✔
1296
                outpoint, input.input.SignDesc().Output.PkScript,
3✔
1297
                input.input.HeightHint(),
3✔
1298
        )
3✔
1299
        if err != nil {
3✔
1300
                err := fmt.Errorf("wait for spend: %w", err)
×
1301
                s.markInputFatal(pi, nil, err)
×
1302

×
1303
                return err
×
1304
        }
×
1305

1306
        pi.ntfnRegCancel = cancel
3✔
1307

3✔
1308
        return nil
3✔
1309
}
1310

1311
// decideRBFInfo queries the mempool to see whether the given input has already
1312
// been spent. When spent, it will query the sweeper store to fetch the fee info
1313
// of the spending transction, and construct an RBFInfo based on it. Suppose an
1314
// error occurs, fn.None is returned.
1315
func (s *UtxoSweeper) decideRBFInfo(
1316
        op wire.OutPoint) fn.Option[RBFInfo] {
3✔
1317

3✔
1318
        // Check if we can find the spending tx of this input in mempool.
3✔
1319
        txOption := s.mempoolLookup(op)
3✔
1320

3✔
1321
        // Extract the spending tx from the option.
3✔
1322
        var tx *wire.MsgTx
3✔
1323
        txOption.WhenSome(func(t wire.MsgTx) {
5✔
1324
                tx = &t
2✔
1325
        })
2✔
1326

1327
        // Exit early if it's not found.
1328
        //
1329
        // NOTE: this is not accurate for backends that don't support mempool
1330
        // lookup:
1331
        // - for neutrino we don't have a mempool.
1332
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1333
        if tx == nil {
6✔
1334
                return fn.None[RBFInfo]()
3✔
1335
        }
3✔
1336

1337
        // Otherwise the input is already spent in the mempool, so eventually
1338
        // we will return Published.
1339
        //
1340
        // We also need to update the RBF info for this input. If the sweeping
1341
        // transaction is broadcast by us, we can find the fee info in the
1342
        // sweeper store.
1343
        txid := tx.TxHash()
2✔
1344
        tr, err := s.cfg.Store.GetTx(txid)
2✔
1345

2✔
1346
        log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(),
2✔
1347
                op)
2✔
1348

2✔
1349
        // If the tx is not found in the store, it means it's not broadcast by
2✔
1350
        // us, hence we can't find the fee info. This is fine as, later on when
2✔
1351
        // this tx is confirmed, we will remove the input from our inputs.
2✔
1352
        if errors.Is(err, ErrTxNotFound) {
4✔
1353
                log.Warnf("Spending tx %v not found in sweeper store", txid)
2✔
1354
                return fn.None[RBFInfo]()
2✔
1355
        }
2✔
1356

1357
        // Exit if we get an db error.
1358
        if err != nil {
2✔
UNCOV
1359
                log.Errorf("Unable to get tx %v from sweeper store: %v",
×
UNCOV
1360
                        txid, err)
×
UNCOV
1361

×
UNCOV
1362
                return fn.None[RBFInfo]()
×
UNCOV
1363
        }
×
1364

1365
        // Prepare the fee info and return it.
1366
        rbf := fn.Some(RBFInfo{
2✔
1367
                Txid:    txid,
2✔
1368
                Fee:     btcutil.Amount(tr.Fee),
2✔
1369
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
2✔
1370
        })
2✔
1371

2✔
1372
        return rbf
2✔
1373
}
1374

1375
// handleExistingInput processes an input that is already known to the sweeper.
1376
// It will overwrite the params of the old input with the new ones.
1377
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
1378
        oldInput *SweeperInput) {
3✔
1379

3✔
1380
        // Before updating the input details, check if a previous exclusive
3✔
1381
        // group was set. In case the same input is registered again, the
3✔
1382
        // previous input and its sweep parameters are outdated hence need to be
3✔
1383
        // replaced. This scenario currently only happens for anchor outputs.
3✔
1384
        // When a channel is force closed, in the worst case 3 different sweeps
3✔
1385
        // with the same exclusive group are registered with the sweeper to bump
3✔
1386
        // the closing transaction (cpfp) when its time critical. Receiving an
3✔
1387
        // input which was already registered with the sweeper means none of the
3✔
1388
        // previous inputs were used as CPFP, so we need to make sure we update
3✔
1389
        // the sweep parameters but also remove all inputs with the same
3✔
1390
        // exclusive group because they are outdated too.
3✔
1391
        var prevExclGroup *uint64
3✔
1392
        if oldInput.params.ExclusiveGroup != nil {
6✔
1393
                prevExclGroup = new(uint64)
3✔
1394
                *prevExclGroup = *oldInput.params.ExclusiveGroup
3✔
1395
        }
3✔
1396

1397
        // Update input details and sweep parameters. The re-offered input
1398
        // details may contain a change to the unconfirmed parent tx info.
1399
        oldInput.params = input.params
3✔
1400
        oldInput.Input = input.input
3✔
1401

3✔
1402
        // If the new input specifies a deadline, update the deadline height.
3✔
1403
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1404
                oldInput.DeadlineHeight,
3✔
1405
        )
3✔
1406

3✔
1407
        // Add additional result channel to signal spend of this input.
3✔
1408
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
3✔
1409

3✔
1410
        if prevExclGroup != nil {
6✔
1411
                s.removeExclusiveGroup(*prevExclGroup, input.input.OutPoint())
3✔
1412
        }
3✔
1413
}
1414

1415
// handleInputSpent takes a spend event of our input and updates the sweeper's
1416
// internal state to remove the input.
1417
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
3✔
1418
        // Query store to find out if we ever published this tx.
3✔
1419
        spendHash := *spend.SpenderTxHash
3✔
1420
        isOurTx := s.cfg.Store.IsOurTx(spendHash)
3✔
1421

3✔
1422
        // If this isn't our transaction, it means someone else swept outputs
3✔
1423
        // that we were attempting to sweep. This can happen for anchor outputs
3✔
1424
        // as well as justice transactions. In this case, we'll notify the
3✔
1425
        // wallet to remove any spends that descent from this output.
3✔
1426
        if !isOurTx {
6✔
1427
                // Construct a map of the inputs this transaction spends.
3✔
1428
                spendingTx := spend.SpendingTx
3✔
1429
                inputsSpent := make(
3✔
1430
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
3✔
1431
                )
3✔
1432
                for _, txIn := range spendingTx.TxIn {
6✔
1433
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
3✔
1434
                }
3✔
1435

1436
                log.Debugf("Attempting to remove descendant txns invalidated "+
3✔
1437
                        "by (txid=%v): %v", spendingTx.TxHash(),
3✔
1438
                        spew.Sdump(spendingTx))
3✔
1439

3✔
1440
                err := s.removeConflictSweepDescendants(inputsSpent)
3✔
1441
                if err != nil {
5✔
1442
                        log.Warnf("unable to remove descendant transactions "+
2✔
1443
                                "due to tx %v: ", spendHash)
2✔
1444
                }
2✔
1445

1446
                log.Debugf("Detected third party spend related to in flight "+
3✔
1447
                        "inputs (is_ours=%v): %v", isOurTx,
3✔
1448
                        lnutils.SpewLogClosure(spend.SpendingTx))
3✔
1449
        }
1450

1451
        // We now use the spending tx to update the state of the inputs.
1452
        s.markInputsSwept(spend.SpendingTx, isOurTx)
3✔
1453
}
1454

1455
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1456
// It will also notify all the subscribers of this input.
1457
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
3✔
1458
        for _, txIn := range tx.TxIn {
6✔
1459
                outpoint := txIn.PreviousOutPoint
3✔
1460

3✔
1461
                // Check if this input is known to us. It could probably be
3✔
1462
                // unknown if we canceled the registration, deleted from inputs
3✔
1463
                // map but the ntfn was in-flight already. Or this could be not
3✔
1464
                // one of our inputs.
3✔
1465
                input, ok := s.inputs[outpoint]
3✔
1466
                if !ok {
6✔
1467
                        // It's very likely that a spending tx contains inputs
3✔
1468
                        // that we don't know.
3✔
1469
                        log.Tracef("Skipped marking input as swept: %v not "+
3✔
1470
                                "found in pending inputs", outpoint)
3✔
1471

3✔
1472
                        continue
3✔
1473
                }
1474

1475
                // This input may already been marked as swept by a previous
1476
                // spend notification, which is likely to happen as one sweep
1477
                // transaction usually sweeps multiple inputs.
1478
                if input.terminated() {
3✔
UNCOV
1479
                        log.Debugf("Skipped marking input as swept: %v "+
×
UNCOV
1480
                                "state=%v", outpoint, input.state)
×
UNCOV
1481

×
UNCOV
1482
                        continue
×
1483
                }
1484

1485
                input.state = Swept
3✔
1486

3✔
1487
                // Return either a nil or a remote spend result.
3✔
1488
                var err error
3✔
1489
                if !isOurTx {
6✔
1490
                        log.Warnf("Input=%v was spent by remote or third "+
3✔
1491
                                "party in tx=%v", outpoint, tx.TxHash())
3✔
1492
                        err = ErrRemoteSpend
3✔
1493
                }
3✔
1494

1495
                // Signal result channels.
1496
                s.signalResult(input, Result{
3✔
1497
                        Tx:  tx,
3✔
1498
                        Err: err,
3✔
1499
                })
3✔
1500

3✔
1501
                // Remove all other inputs in this exclusive group.
3✔
1502
                if input.params.ExclusiveGroup != nil {
6✔
1503
                        s.removeExclusiveGroup(
3✔
1504
                                *input.params.ExclusiveGroup, outpoint,
3✔
1505
                        )
3✔
1506
                }
3✔
1507
        }
1508
}
1509

1510
// markInputFatal marks the given input as fatal and won't be retried. It
1511
// will also notify all the subscribers of this input.
1512
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx,
1513
        err error) {
2✔
1514

2✔
1515
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
2✔
1516

2✔
1517
        pi.state = Fatal
2✔
1518

2✔
1519
        s.signalResult(pi, Result{
2✔
1520
                Tx:  tx,
2✔
1521
                Err: err,
2✔
1522
        })
2✔
1523
}
2✔
1524

1525
// updateSweeperInputs updates the sweeper's internal state and returns a map
1526
// of inputs to be swept. It will remove the inputs that are in final states,
1527
// and returns a map of inputs that have either state Init or PublishFailed.
1528
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
3✔
1529
        // Create a map of inputs to be swept.
3✔
1530
        inputs := make(InputsMap)
3✔
1531

3✔
1532
        // Iterate the pending inputs and update the sweeper's state.
3✔
1533
        //
3✔
1534
        // TODO(yy): sweeper is made to communicate via go channels, so no
3✔
1535
        // locks are needed to access the map. However, it'd be safer if we
3✔
1536
        // turn this inputs map into a SyncMap in case we wanna add concurrent
3✔
1537
        // access to the map in the future.
3✔
1538
        for op, input := range s.inputs {
6✔
1539
                log.Tracef("Checking input: %s, state=%v", input, input.state)
3✔
1540

3✔
1541
                // If the input has reached a final state, that it's either
3✔
1542
                // been swept, or failed, or excluded, we will remove it from
3✔
1543
                // our sweeper.
3✔
1544
                if input.terminated() {
6✔
1545
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1546
                                input.state, op)
3✔
1547

3✔
1548
                        delete(s.inputs, op)
3✔
1549

3✔
1550
                        continue
3✔
1551
                }
1552

1553
                // If this input has been included in a sweep tx that's not
1554
                // published yet, we'd skip this input and wait for the sweep
1555
                // tx to be published.
1556
                if input.state == PendingPublish {
6✔
1557
                        continue
3✔
1558
                }
1559

1560
                // If this input has already been published, we will need to
1561
                // check the RBF condition before attempting another sweeping.
1562
                if input.state == Published {
6✔
1563
                        continue
3✔
1564
                }
1565

1566
                // If the input has a locktime that's not yet reached, we will
1567
                // skip this input and wait for the locktime to be reached.
1568
                mature, _ := input.isMature(uint32(s.currentHeight))
3✔
1569
                if !mature {
6✔
1570
                        continue
3✔
1571
                }
1572

1573
                // If this input is new or has been failed to be published,
1574
                // we'd retry it. The assumption here is that when an error is
1575
                // returned from `PublishTransaction`, it means the tx has
1576
                // failed to meet the policy, hence it's not in the mempool.
1577
                inputs[op] = input
3✔
1578
        }
1579

1580
        return inputs
3✔
1581
}
1582

1583
// sweepPendingInputs is called when the ticker fires. It will create clusters
1584
// and attempt to create and publish the sweeping transactions.
1585
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
3✔
1586
        log.Debugf("Sweeping %v inputs", len(inputs))
3✔
1587

3✔
1588
        // Cluster all of our inputs based on the specific Aggregator.
3✔
1589
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
3✔
1590

3✔
1591
        // sweepWithLock is a helper closure that executes the sweep within a
3✔
1592
        // coin select lock to prevent the coins being selected for other
3✔
1593
        // transactions like funding of a channel.
3✔
1594
        sweepWithLock := func(set InputSet) error {
6✔
1595
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
6✔
1596
                        // Try to add inputs from our wallet.
3✔
1597
                        err := set.AddWalletInputs(s.cfg.Wallet)
3✔
1598
                        if err != nil {
6✔
1599
                                return err
3✔
1600
                        }
3✔
1601

1602
                        // Create sweeping transaction for each set.
1603
                        err = s.sweep(set)
3✔
1604
                        if err != nil {
3✔
1605
                                return err
×
1606
                        }
×
1607

1608
                        return nil
3✔
1609
                })
1610
        }
1611

1612
        for _, set := range sets {
6✔
1613
                var err error
3✔
1614
                if set.NeedWalletInput() {
6✔
1615
                        // Sweep the set of inputs that need the wallet inputs.
3✔
1616
                        err = sweepWithLock(set)
3✔
1617
                } else {
6✔
1618
                        // Sweep the set of inputs that don't need the wallet
3✔
1619
                        // inputs.
3✔
1620
                        err = s.sweep(set)
3✔
1621
                }
3✔
1622

1623
                if err != nil {
6✔
1624
                        log.Errorf("Failed to sweep %v: %v", set, err)
3✔
1625
                }
3✔
1626
        }
1627
}
1628

1629
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1630
// the inputs being used.
1631
type bumpResp struct {
1632
        // result is the result of the bump attempt returned from the fee
1633
        // bumper.
1634
        result *BumpResult
1635

1636
        // set is the input set that was used in the bump attempt.
1637
        set InputSet
1638
}
1639

1640
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1641
// future updates about the sweeping tx.
1642
//
1643
// NOTE: must run as a goroutine.
1644
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1645
        resultChan <-chan *BumpResult) {
3✔
1646

3✔
1647
        defer s.wg.Done()
3✔
1648

3✔
1649
        for {
6✔
1650
                select {
3✔
1651
                case r := <-resultChan:
3✔
1652
                        // Validate the result is valid.
3✔
1653
                        if err := r.Validate(); err != nil {
3✔
1654
                                log.Errorf("Received invalid result: %v", err)
×
1655
                                continue
×
1656
                        }
1657

1658
                        resp := &bumpResp{
3✔
1659
                                result: r,
3✔
1660
                                set:    set,
3✔
1661
                        }
3✔
1662

3✔
1663
                        // Send the result back to the main event loop.
3✔
1664
                        select {
3✔
1665
                        case s.bumpRespChan <- resp:
3✔
1666
                        case <-s.quit:
×
1667
                                log.Debug("Sweeper shutting down, skip " +
×
1668
                                        "sending bump result")
×
1669

×
1670
                                return
×
1671
                        }
1672

1673
                        // The sweeping tx has been confirmed, we can exit the
1674
                        // monitor now.
1675
                        //
1676
                        // TODO(yy): can instead remove the spend subscription
1677
                        // in sweeper and rely solely on this event to mark
1678
                        // inputs as Swept?
1679
                        if r.Event == TxConfirmed || r.Event == TxFailed {
6✔
1680
                                // Exit if the tx is failed to be created.
3✔
1681
                                if r.Tx == nil {
6✔
1682
                                        log.Debugf("Received %v for nil tx, "+
3✔
1683
                                                "exit monitor", r.Event)
3✔
1684

3✔
1685
                                        return
3✔
1686
                                }
3✔
1687

1688
                                log.Debugf("Received %v for sweep tx %v, exit "+
3✔
1689
                                        "fee bump monitor", r.Event,
3✔
1690
                                        r.Tx.TxHash())
3✔
1691

3✔
1692
                                // Cancel the rebroadcasting of the failed tx.
3✔
1693
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
3✔
1694

3✔
1695
                                return
3✔
1696
                        }
1697

1698
                case <-s.quit:
3✔
1699
                        log.Debugf("Sweeper shutting down, exit fee " +
3✔
1700
                                "bump handler")
3✔
1701

3✔
1702
                        return
3✔
1703
                }
1704
        }
1705
}
1706

1707
// handleBumpEventTxFailed handles the case where the tx has been failed to
1708
// publish.
1709
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
3✔
1710
        r := resp.result
3✔
1711
        tx, err := r.Tx, r.Err
3✔
1712

3✔
1713
        if tx != nil {
6✔
1714
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
3✔
1715
                        err)
3✔
1716
        }
3✔
1717

1718
        // NOTE: When marking the inputs as failed, we are using the input set
1719
        // instead of the inputs found in the tx. This is fine for current
1720
        // version of the sweeper because we always create a tx using ALL of
1721
        // the inputs specified by the set.
1722
        //
1723
        // TODO(yy): should we also remove the failed tx from db?
1724
        s.markInputsPublishFailed(resp.set, resp.result.FeeRate)
3✔
1725
}
1726

1727
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1728
// replaced by a new one.
1729
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1730
        r := resp.result
3✔
1731
        oldTx := r.ReplacedTx
3✔
1732
        newTx := r.Tx
3✔
1733

3✔
1734
        // Prepare a new record to replace the old one.
3✔
1735
        tr := &TxRecord{
3✔
1736
                Txid:    newTx.TxHash(),
3✔
1737
                FeeRate: uint64(r.FeeRate),
3✔
1738
                Fee:     uint64(r.Fee),
3✔
1739
        }
3✔
1740

3✔
1741
        // Get the old record for logging purpose.
3✔
1742
        oldTxid := oldTx.TxHash()
3✔
1743
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1744
        if err != nil {
6✔
1745
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
3✔
1746
                return err
3✔
1747
        }
3✔
1748

1749
        // Cancel the rebroadcasting of the replaced tx.
1750
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
3✔
1751

3✔
1752
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
3✔
1753
                "tx=%v(fee=%v sats, feerate=%v sats/kw)", record.Txid,
3✔
1754
                record.Fee, record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
3✔
1755

3✔
1756
        // The old sweeping tx has been replaced by a new one, we will update
3✔
1757
        // the tx record in the sweeper db.
3✔
1758
        //
3✔
1759
        // TODO(yy): we may also need to update the inputs in this tx to a new
3✔
1760
        // state. Suppose a replacing tx only spends a subset of the inputs
3✔
1761
        // here, we'd end up with the rest being marked as `Published` and
3✔
1762
        // won't be aggregated in the next sweep. Atm it's fine as we always
3✔
1763
        // RBF the same input set.
3✔
1764
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
UNCOV
1765
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
×
UNCOV
1766
                return err
×
UNCOV
1767
        }
×
1768

1769
        // Mark the inputs as published using the replacing tx.
1770
        return s.markInputsPublished(tr, resp.set)
3✔
1771
}
1772

1773
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1774
// successfully published.
1775
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
3✔
1776
        r := resp.result
3✔
1777
        tx := r.Tx
3✔
1778
        tr := &TxRecord{
3✔
1779
                Txid:    tx.TxHash(),
3✔
1780
                FeeRate: uint64(r.FeeRate),
3✔
1781
                Fee:     uint64(r.Fee),
3✔
1782
        }
3✔
1783

3✔
1784
        // Inputs have been successfully published so we update their
3✔
1785
        // states.
3✔
1786
        err := s.markInputsPublished(tr, resp.set)
3✔
1787
        if err != nil {
3✔
1788
                return err
×
1789
        }
×
1790

1791
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
3✔
1792
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
3✔
1793

3✔
1794
        // If there's no error, remove the output script. Otherwise keep it so
3✔
1795
        // that it can be reused for the next transaction and causes no address
3✔
1796
        // inflation.
3✔
1797
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
3✔
1798

3✔
1799
        return nil
3✔
1800
}
1801

1802
// handleBumpEventTxFatal handles the case where there's an unexpected error
1803
// when creating or publishing the sweeping tx. In this case, the tx will be
1804
// removed from the sweeper store and the inputs will be marked as `Failed`,
1805
// which means they will not be retried.
1806
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1807
        r := resp.result
2✔
1808

2✔
1809
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1810
        // a broadcast error, it's likely there isn't a tx here.
2✔
1811
        if r.Tx != nil {
4✔
1812
                txid := r.Tx.TxHash()
2✔
1813
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
2✔
1814

2✔
1815
                // Remove the tx from the sweeper db if it exists.
2✔
1816
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
2✔
UNCOV
1817
                        return fmt.Errorf("delete tx record for %v: %w", txid,
×
UNCOV
1818
                                err)
×
UNCOV
1819
                }
×
1820
        }
1821

1822
        // Mark the inputs as fatal.
1823
        s.markInputsFatal(resp.set, r.Err)
2✔
1824

2✔
1825
        return nil
2✔
1826
}
1827

1828
// markInputsFatal  marks all inputs in the input set as failed. It will also
1829
// notify all the subscribers of these inputs.
1830
func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) {
2✔
1831
        for _, inp := range set.Inputs() {
4✔
1832
                outpoint := inp.OutPoint()
2✔
1833

2✔
1834
                input, ok := s.inputs[outpoint]
2✔
1835
                if !ok {
4✔
1836
                        // It's very likely that a spending tx contains inputs
2✔
1837
                        // that we don't know.
2✔
1838
                        log.Tracef("Skipped marking input as failed: %v not "+
2✔
1839
                                "found in pending inputs", outpoint)
2✔
1840

2✔
1841
                        continue
2✔
1842
                }
1843

1844
                // If the input is already in a terminal state, we don't want
1845
                // to rewrite it, which also indicates an error as we only get
1846
                // an error event during the initial broadcast.
1847
                if input.terminated() {
2✔
UNCOV
1848
                        log.Errorf("Skipped marking input=%v as failed due to "+
×
UNCOV
1849
                                "unexpected state=%v", outpoint, input.state)
×
UNCOV
1850

×
UNCOV
1851
                        continue
×
1852
                }
1853

1854
                s.markInputFatal(input, nil, err)
2✔
1855
        }
1856
}
1857

1858
// handleBumpEvent handles the result sent from the bumper based on its event
1859
// type.
1860
//
1861
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1862
// input's spending event, we don't need to do anything here.
1863
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
3✔
1864
        log.Debugf("Received bump result %v", r.result)
3✔
1865

3✔
1866
        switch r.result.Event {
3✔
1867
        // The tx has been published, we update the inputs' state and create a
1868
        // record to be stored in the sweeper db.
1869
        case TxPublished:
3✔
1870
                return s.handleBumpEventTxPublished(r)
3✔
1871

1872
        // The tx has failed, we update the inputs' state.
1873
        case TxFailed:
3✔
1874
                s.handleBumpEventTxFailed(r)
3✔
1875
                return nil
3✔
1876

1877
        // The tx has been replaced, we will remove the old tx and replace it
1878
        // with the new one.
1879
        case TxReplaced:
3✔
1880
                return s.handleBumpEventTxReplaced(r)
3✔
1881

1882
        // There are inputs being spent in a tx which the fee bumper doesn't
1883
        // understand. We will remove the tx from the sweeper db and mark the
1884
        // inputs as swept.
1885
        case TxUnknownSpend:
3✔
1886
                s.handleBumpEventTxUnknownSpend(r)
3✔
1887

1888
        // There's a fatal error in creating the tx, we will remove the tx from
1889
        // the sweeper db and mark the inputs as failed.
1890
        case TxFatal:
2✔
1891
                return s.handleBumpEventTxFatal(r)
2✔
1892
        }
1893

1894
        return nil
3✔
1895
}
1896

1897
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1898
//
1899
// NOTE: It is enough to check the txid because the sweeper will create
1900
// outpoints which solely belong to the internal LND wallet.
1901
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
3✔
1902
        return s.cfg.Store.IsOurTx(op.Hash)
3✔
1903
}
3✔
1904

1905
// markInputSwept marks the given input as swept by the tx. It will also notify
1906
// all the subscribers of this input.
UNCOV
1907
func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) {
×
UNCOV
1908
        log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(),
×
UNCOV
1909
                inp.state)
×
UNCOV
1910

×
UNCOV
1911
        inp.state = Swept
×
UNCOV
1912

×
UNCOV
1913
        // Signal result channels.
×
UNCOV
1914
        s.signalResult(inp, Result{
×
UNCOV
1915
                Tx: tx,
×
UNCOV
1916
        })
×
UNCOV
1917

×
UNCOV
1918
        // Remove all other inputs in this exclusive group.
×
UNCOV
1919
        if inp.params.ExclusiveGroup != nil {
×
1920
                s.removeExclusiveGroup(
×
1921
                        *inp.params.ExclusiveGroup, inp.OutPoint(),
×
1922
                )
×
1923
        }
×
1924
}
1925

1926
// handleUnknownSpendTx takes an input and its spending tx. If the spending tx
1927
// cannot be found in the sweeper store, the input will be marked as fatal,
1928
// otherwise it will be marked as swept.
UNCOV
1929
func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) {
×
UNCOV
1930
        op := inp.OutPoint()
×
UNCOV
1931
        txid := tx.TxHash()
×
UNCOV
1932

×
UNCOV
1933
        isOurTx := s.cfg.Store.IsOurTx(txid)
×
UNCOV
1934

×
UNCOV
1935
        // If this is our tx, it means it's a previous sweeping tx that got
×
UNCOV
1936
        // confirmed, which could happen when a restart happens during the
×
UNCOV
1937
        // sweeping process.
×
UNCOV
1938
        if isOurTx {
×
UNCOV
1939
                log.Debugf("Found our sweeping tx %v, marking input %v as "+
×
UNCOV
1940
                        "swept", txid, op)
×
UNCOV
1941

×
UNCOV
1942
                // We now use the spending tx to update the state of the inputs.
×
UNCOV
1943
                s.markInputSwept(inp, tx)
×
UNCOV
1944

×
UNCOV
1945
                return
×
UNCOV
1946
        }
×
1947

1948
        // Since the input is spent by others, we now mark it as fatal and won't
1949
        // be retried.
UNCOV
1950
        s.markInputFatal(inp, tx, ErrRemoteSpend)
×
UNCOV
1951

×
UNCOV
1952
        log.Debugf("Removing descendant txns invalidated by (txid=%v): %v",
×
UNCOV
1953
                txid, lnutils.SpewLogClosure(tx))
×
UNCOV
1954

×
UNCOV
1955
        // Construct a map of the inputs this transaction spends.
×
UNCOV
1956
        spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn))
×
UNCOV
1957
        for _, txIn := range tx.TxIn {
×
UNCOV
1958
                spentInputs[txIn.PreviousOutPoint] = struct{}{}
×
UNCOV
1959
        }
×
1960

UNCOV
1961
        err := s.removeConflictSweepDescendants(spentInputs)
×
UNCOV
1962
        if err != nil {
×
1963
                log.Warnf("unable to remove descendant transactions "+
×
1964
                        "due to tx %v: ", txid)
×
1965
        }
×
1966
}
1967

1968
// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is
1969
// unknown to the fee bumper. In the case when the sweeping tx has been replaced
1970
// by another party with their tx being confirmed. It will retry sweeping the
1971
// "good" inputs once the "bad" ones are kicked out.
1972
func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
3✔
1973
        // Mark the inputs as publish failed, which means they will be retried
3✔
1974
        // later.
3✔
1975
        s.markInputsPublishFailed(r.set, r.result.FeeRate)
3✔
1976

3✔
1977
        // Get all the inputs that are not spent in the current sweeping tx.
3✔
1978
        spentInputs := r.result.SpentInputs
3✔
1979

3✔
1980
        // Create a slice to track inputs to be retried.
3✔
1981
        inputsToRetry := make([]input.Input, 0, len(r.set.Inputs()))
3✔
1982

3✔
1983
        // Iterate all the inputs found in this bump and mark the ones spent by
3✔
1984
        // the third party as failed. The rest of inputs will then be updated
3✔
1985
        // with a new fee rate and be retried immediately.
3✔
1986
        for _, inp := range r.set.Inputs() {
6✔
1987
                op := inp.OutPoint()
3✔
1988
                input, ok := s.inputs[op]
3✔
1989

3✔
1990
                // Wallet inputs are not tracked so we will not find them from
3✔
1991
                // the inputs map.
3✔
1992
                if !ok {
6✔
1993
                        log.Debugf("Skipped marking input: %v not found in "+
3✔
1994
                                "pending inputs", op)
3✔
1995

3✔
1996
                        continue
3✔
1997
                }
1998

1999
                // Check whether this input has been spent, if so we mark it as
2000
                // fatal or swept based on whether this is one of our previous
2001
                // sweeping txns, then move to the next.
2002
                tx, spent := spentInputs[op]
3✔
2003
                if spent {
3✔
UNCOV
2004
                        s.handleUnknownSpendTx(input, tx)
×
UNCOV
2005

×
UNCOV
2006
                        continue
×
2007
                }
2008

2009
                log.Debugf("Input(%v): updating params: immediate [%v -> true]",
3✔
2010
                        op, r.result.FeeRate, input.params.Immediate)
3✔
2011

3✔
2012
                input.params.Immediate = true
3✔
2013
                inputsToRetry = append(inputsToRetry, input)
3✔
2014
        }
2015

2016
        // Exit early if there are no inputs to be retried.
2017
        if len(inputsToRetry) == 0 {
6✔
2018
                return
3✔
2019
        }
3✔
2020

2021
        log.Debugf("Retry sweeping inputs with updated params: %v",
3✔
2022
                inputTypeSummary(inputsToRetry))
3✔
2023

3✔
2024
        // Get the latest inputs, which should put the PublishFailed inputs back
3✔
2025
        // to the sweeping queue.
3✔
2026
        inputs := s.updateSweeperInputs()
3✔
2027

3✔
2028
        // Immediately sweep the remaining inputs - the previous inputs should
3✔
2029
        // now be swept with the updated StartingFeeRate immediately. We may
3✔
2030
        // also include more inputs in the new sweeping tx if new ones with the
3✔
2031
        // same deadline are offered.
3✔
2032
        s.sweepPendingInputs(inputs)
3✔
2033
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc