• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 18852986778

27 Oct 2025 07:10PM UTC coverage: 54.859% (-11.8%) from 66.648%
18852986778

Pull #10265

github

web-flow
Merge 45787b3d5 into 9a7b526c0
Pull Request #10265: multi: update close logic to handle re-orgs of depth n-1, where n is num confs - add min conf floor

529 of 828 new or added lines in 17 files covered. (63.89%)

24026 existing lines in 286 files now uncovered.

110927 of 202205 relevant lines covered (54.86%)

21658.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.34
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/lightningnetwork/lnd/chainio"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/input"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwallet"
18
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
19
)
20

21
var (
22
        // ErrRemoteSpend is returned in case an output that we try to sweep is
23
        // confirmed in a tx of the remote party.
24
        ErrRemoteSpend = errors.New("remote party swept utxo")
25

26
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
27
        // fee rate that's below the relay fee rate.
28
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
29

30
        // ErrExclusiveGroupSpend is returned in case a different input of the
31
        // same exclusive group was spent.
32
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
33
                "was spent")
34

35
        // ErrSweeperShuttingDown is an error returned when a client attempts to
36
        // make a request to the UtxoSweeper, but it is unable to handle it as
37
        // it is/has already been stopped.
38
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
39

40
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
41
        // used when sweeping inputs with no deadline pressure.
42
        DefaultDeadlineDelta = int32(1008)
43
)
44

45
// Params contains the parameters that control the sweeping process.
46
type Params struct {
47
        // ExclusiveGroup is an identifier that, if set, ensures this input is
48
        // swept in a transaction by itself, and not batched with any other
49
        // inputs.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
UNCOV
72
func (p Params) String() string {
×
UNCOV
73
        deadline := "none"
×
UNCOV
74
        p.DeadlineHeight.WhenSome(func(d int32) {
×
UNCOV
75
                deadline = fmt.Sprintf("%d", d)
×
UNCOV
76
        })
×
77

UNCOV
78
        exclusiveGroup := "none"
×
UNCOV
79
        if p.ExclusiveGroup != nil {
×
UNCOV
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
×
UNCOV
81
        }
×
82

UNCOV
83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
×
UNCOV
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
×
UNCOV
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
×
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Fatal is the final state of a pending input. Inputs ending in this
123
        // state won't be retried. This could happen,
124
        // - when a pending input has too many failed publish attempts;
125
        // - the input has been spent by another party;
126
        // - unknown broadcast error is returned.
127
        Fatal
128
)
129

130
// String gives a human readable text for the sweep states.
UNCOV
131
func (s SweepState) String() string {
×
UNCOV
132
        switch s {
×
UNCOV
133
        case Init:
×
UNCOV
134
                return "Init"
×
135

UNCOV
136
        case PendingPublish:
×
UNCOV
137
                return "PendingPublish"
×
138

UNCOV
139
        case Published:
×
UNCOV
140
                return "Published"
×
141

UNCOV
142
        case PublishFailed:
×
UNCOV
143
                return "PublishFailed"
×
144

UNCOV
145
        case Swept:
×
UNCOV
146
                return "Swept"
×
147

UNCOV
148
        case Excluded:
×
UNCOV
149
                return "Excluded"
×
150

UNCOV
151
        case Fatal:
×
UNCOV
152
                return "Fatal"
×
153

154
        default:
×
155
                return "Unknown"
×
156
        }
157
}
158

159
// RBFInfo stores the information required to perform a RBF bump on a pending
160
// sweeping tx.
161
type RBFInfo struct {
162
        // Txid is the txid of the sweeping tx.
163
        Txid chainhash.Hash
164

165
        // FeeRate is the fee rate of the sweeping tx.
166
        FeeRate chainfee.SatPerKWeight
167

168
        // Fee is the total fee of the sweeping tx.
169
        Fee btcutil.Amount
170
}
171

172
// SweeperInput is created when an input reaches the main loop for the first
173
// time. It wraps the input and tracks all relevant state that is needed for
174
// sweeping.
175
type SweeperInput struct {
176
        input.Input
177

178
        // state tracks the current state of the input.
179
        state SweepState
180

181
        // listeners is a list of channels over which the final outcome of the
182
        // sweep needs to be broadcasted.
183
        listeners []chan Result
184

185
        // ntfnRegCancel is populated with a function that cancels the chain
186
        // notifier spend registration.
187
        ntfnRegCancel func()
188

189
        // publishAttempts records the number of attempts that have already been
190
        // made to sweep this tx.
191
        publishAttempts int
192

193
        // params contains the parameters that control the sweeping process.
194
        params Params
195

196
        // lastFeeRate is the most recent fee rate used for this input within a
197
        // transaction broadcast to the network.
198
        lastFeeRate chainfee.SatPerKWeight
199

200
        // rbf records the RBF constraints.
201
        rbf fn.Option[RBFInfo]
202

203
        // DeadlineHeight is the deadline height for this input. This is
204
        // different from the DeadlineHeight in its params as it's an actual
205
        // value than an option.
206
        DeadlineHeight int32
207
}
208

209
// String returns a human readable interpretation of the pending input.
210
func (p *SweeperInput) String() string {
34✔
211
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
34✔
212
}
34✔
213

214
// terminated returns a boolean indicating whether the input has reached a
215
// final state.
216
func (p *SweeperInput) terminated() bool {
39✔
217
        switch p.state {
39✔
218
        // If the input has reached a final state, that it's either
219
        // been swept, or failed, or excluded, we will remove it from
220
        // our sweeper.
221
        case Fatal, Swept, Excluded:
9✔
222
                return true
9✔
223

224
        default:
30✔
225
                return false
30✔
226
        }
227
}
228

229
// isMature returns a boolean indicating whether the input has a timelock that
230
// has been reached or not. The locktime found is also returned.
231
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
11✔
232
        locktime, _ := p.RequiredLockTime()
11✔
233
        if currentHeight < locktime {
12✔
234
                log.Debugf("Input %v has locktime=%v, current height is %v",
1✔
235
                        p, locktime, currentHeight)
1✔
236

1✔
237
                return false, locktime
1✔
238
        }
1✔
239

240
        // If the input has a CSV that's not yet reached, we will skip
241
        // this input and wait for the expiry.
242
        //
243
        // NOTE: We need to consider whether this input can be included in the
244
        // next block or not, which means the CSV will be checked against the
245
        // currentHeight plus one.
246
        locktime = p.BlocksToMaturity() + p.HeightHint()
10✔
247
        if currentHeight+1 < locktime {
11✔
248
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
1✔
249
                        "skipped sweeping", p, locktime, currentHeight)
1✔
250

1✔
251
                return false, locktime
1✔
252
        }
1✔
253

254
        return true, locktime
9✔
255
}
256

257
// InputsMap is a type alias for a set of pending inputs.
258
type InputsMap = map[wire.OutPoint]*SweeperInput
259

260
// inputsMapToString returns a human readable interpretation of the pending
261
// inputs.
UNCOV
262
func inputsMapToString(inputs InputsMap) string {
×
UNCOV
263
        if len(inputs) == 0 {
×
UNCOV
264
                return ""
×
UNCOV
265
        }
×
266

UNCOV
267
        inps := make([]input.Input, 0, len(inputs))
×
UNCOV
268
        for _, in := range inputs {
×
UNCOV
269
                inps = append(inps, in)
×
UNCOV
270
        }
×
271

UNCOV
272
        return "\n" + inputTypeSummary(inps)
×
273
}
274

275
// pendingSweepsReq is an internal message we'll use to represent an external
276
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
277
// attempting to sweep.
278
type pendingSweepsReq struct {
279
        respChan chan map[wire.OutPoint]*PendingInputResponse
280
        errChan  chan error
281
}
282

283
// PendingInputResponse contains information about an input that is currently
284
// being swept by the UtxoSweeper.
285
type PendingInputResponse struct {
286
        // OutPoint is the identify outpoint of the input being swept.
287
        OutPoint wire.OutPoint
288

289
        // WitnessType is the witness type of the input being swept.
290
        WitnessType input.WitnessType
291

292
        // Amount is the amount of the input being swept.
293
        Amount btcutil.Amount
294

295
        // LastFeeRate is the most recent fee rate used for the input being
296
        // swept within a transaction broadcast to the network.
297
        LastFeeRate chainfee.SatPerKWeight
298

299
        // BroadcastAttempts is the number of attempts we've made to sweept the
300
        // input.
301
        BroadcastAttempts int
302

303
        // Params contains the sweep parameters for this pending request.
304
        Params Params
305

306
        // DeadlineHeight records the deadline height of this input.
307
        DeadlineHeight uint32
308

309
        // MaturityHeight is the block height that this input's locktime will
310
        // be expired at. For inputs with no locktime this value is zero.
311
        MaturityHeight uint32
312
}
313

314
// updateReq is an internal message we'll use to represent an external caller's
315
// intent to update the sweep parameters of a given input.
316
type updateReq struct {
317
        input        wire.OutPoint
318
        params       Params
319
        responseChan chan *updateResp
320
}
321

322
// updateResp is an internal message we'll use to hand off the response of a
323
// updateReq from the UtxoSweeper's main event loop back to the caller.
324
type updateResp struct {
325
        resultChan chan Result
326
        err        error
327
}
328

329
// triggerSweepReq is an internal message we'll use to represent an external
330
// caller's intent to trigger an immediate sweep of all pending inputs.
331
type triggerSweepReq struct {
332
        respChan chan int
333
}
334

335
// UtxoSweeper is responsible for sweeping outputs back into the wallet
336
type UtxoSweeper struct {
337
        started uint32 // To be used atomically.
338
        stopped uint32 // To be used atomically.
339

340
        // Embed the blockbeat consumer struct to get access to the method
341
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
342
        chainio.BeatConsumer
343

344
        cfg *UtxoSweeperConfig
345

346
        newInputs chan *sweepInputMessage
347
        spendChan chan *chainntnfs.SpendDetail
348

349
        // pendingSweepsReq is a channel that will be sent requests by external
350
        // callers in order to retrieve the set of pending inputs the
351
        // UtxoSweeper is attempting to sweep.
352
        pendingSweepsReqs chan *pendingSweepsReq
353

354
        // updateReqs is a channel that will be sent requests by external
355
        // callers who wish to bump the fee rate of a given input.
356
        updateReqs chan *updateReq
357

358
        // triggerSweepReqs is a channel that will be sent requests by external
359
        // callers who wish to trigger an immediate sweep of all pending inputs.
360
        triggerSweepReqs chan *triggerSweepReq
361

362
        // inputs is the total set of inputs the UtxoSweeper has been requested
363
        // to sweep.
364
        inputs InputsMap
365

366
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
367

368
        relayFeeRate chainfee.SatPerKWeight
369

370
        quit chan struct{}
371
        wg   sync.WaitGroup
372

373
        // currentHeight is the best known height of the main chain. This is
374
        // updated whenever a new block epoch is received.
375
        currentHeight int32
376

377
        // bumpRespChan is a channel that receives broadcast results from the
378
        // TxPublisher.
379
        bumpRespChan chan *bumpResp
380
}
381

382
// Compile-time check for the chainio.Consumer interface.
383
var _ chainio.Consumer = (*UtxoSweeper)(nil)
384

385
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
386
type UtxoSweeperConfig struct {
387
        // GenSweepScript generates a P2WKH script belonging to the wallet where
388
        // funds can be swept.
389
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
390

391
        // FeeEstimator is used when crafting sweep transactions to estimate
392
        // the necessary fee relative to the expected size of the sweep
393
        // transaction.
394
        FeeEstimator chainfee.Estimator
395

396
        // Wallet contains the wallet functions that sweeper requires.
397
        Wallet Wallet
398

399
        // Notifier is an instance of a chain notifier we'll use to watch for
400
        // certain on-chain events.
401
        Notifier chainntnfs.ChainNotifier
402

403
        // Mempool is the mempool watcher that will be used to query whether a
404
        // given input is already being spent by a transaction in the mempool.
405
        Mempool chainntnfs.MempoolWatcher
406

407
        // Store stores the published sweeper txes.
408
        Store SweeperStore
409

410
        // Signer is used by the sweeper to generate valid witnesses at the
411
        // time the incubated outputs need to be spent.
412
        Signer input.Signer
413

414
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
415
        // in a single sweep tx. If more need to be swept, multiple txes are
416
        // created and published.
417
        MaxInputsPerTx uint32
418

419
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
420
        MaxFeeRate chainfee.SatPerVByte
421

422
        // Aggregator is used to group inputs into clusters based on its
423
        // implemention-specific strategy.
424
        Aggregator UtxoAggregator
425

426
        // Publisher is used to publish the sweep tx crafted here and monitors
427
        // it for potential fee bumps.
428
        Publisher Bumper
429

430
        // NoDeadlineConfTarget is the conf target to use when sweeping
431
        // non-time-sensitive outputs.
432
        NoDeadlineConfTarget uint32
433
}
434

435
// Result is the struct that is pushed through the result channel. Callers can
436
// use this to be informed of the final sweep result. In case of a remote
437
// spend, Err will be ErrRemoteSpend.
438
type Result struct {
439
        // Err is the final result of the sweep. It is nil when the input is
440
        // swept successfully by us. ErrRemoteSpend is returned when another
441
        // party took the input.
442
        Err error
443

444
        // Tx is the transaction that spent the input.
445
        Tx *wire.MsgTx
446
}
447

448
// sweepInputMessage structs are used in the internal channel between the
449
// SweepInput call and the sweeper main loop.
450
type sweepInputMessage struct {
451
        input      input.Input
452
        params     Params
453
        resultChan chan Result
454
}
455

456
// New returns a new Sweeper instance.
457
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
24✔
458
        s := &UtxoSweeper{
24✔
459
                cfg:               cfg,
24✔
460
                newInputs:         make(chan *sweepInputMessage),
24✔
461
                spendChan:         make(chan *chainntnfs.SpendDetail),
24✔
462
                updateReqs:        make(chan *updateReq),
24✔
463
                pendingSweepsReqs: make(chan *pendingSweepsReq),
24✔
464
                triggerSweepReqs:  make(chan *triggerSweepReq),
24✔
465
                quit:              make(chan struct{}),
24✔
466
                inputs:            make(InputsMap),
24✔
467
                bumpRespChan:      make(chan *bumpResp, 100),
24✔
468
        }
24✔
469

24✔
470
        // Mount the block consumer.
24✔
471
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
24✔
472

24✔
473
        return s
24✔
474
}
24✔
475

476
// Start starts the process of constructing and publish sweep txes.
UNCOV
477
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
×
UNCOV
478
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
×
479
                return nil
×
480
        }
×
481

UNCOV
482
        log.Info("Sweeper starting")
×
UNCOV
483

×
UNCOV
484
        // Retrieve relay fee for dust limit calculation. Assume that this will
×
UNCOV
485
        // not change from here on.
×
UNCOV
486
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
487

×
UNCOV
488
        // Set the current height.
×
UNCOV
489
        s.currentHeight = beat.Height()
×
UNCOV
490

×
UNCOV
491
        // Start sweeper main loop.
×
UNCOV
492
        s.wg.Add(1)
×
UNCOV
493
        go s.collector()
×
UNCOV
494

×
UNCOV
495
        return nil
×
496
}
497

498
// RelayFeePerKW returns the minimum fee rate required for transactions to be
499
// relayed.
500
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
501
        return s.relayFeeRate
×
502
}
×
503

504
// Stop stops sweeper from listening to block epochs and constructing sweep
505
// txes.
UNCOV
506
func (s *UtxoSweeper) Stop() error {
×
UNCOV
507
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
508
                return nil
×
509
        }
×
510

UNCOV
511
        log.Info("Sweeper shutting down...")
×
UNCOV
512
        defer log.Debug("Sweeper shutdown complete")
×
UNCOV
513

×
UNCOV
514
        close(s.quit)
×
UNCOV
515
        s.wg.Wait()
×
UNCOV
516

×
UNCOV
517
        return nil
×
518
}
519

520
// NOTE: part of the `chainio.Consumer` interface.
521
func (s *UtxoSweeper) Name() string {
24✔
522
        return "UtxoSweeper"
24✔
523
}
24✔
524

525
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
526
// swept after the batch time window ends. A custom fee preference can be
527
// provided to determine what fee rate should be used for the input. Note that
528
// the input may not always be swept with this exact value, as its possible for
529
// it to be batched under the same transaction with other similar fee rate
530
// inputs.
531
//
532
// NOTE: Extreme care needs to be taken that input isn't changed externally.
533
// Because it is an interface and we don't know what is exactly behind it, we
534
// cannot make a local copy in sweeper.
535
//
536
// TODO(yy): make sure the caller is using the Result chan.
537
func (s *UtxoSweeper) SweepInput(inp input.Input,
UNCOV
538
        params Params) (chan Result, error) {
×
UNCOV
539

×
UNCOV
540
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
×
UNCOV
541
                inp.SignDesc() == nil {
×
542

×
543
                return nil, errors.New("nil input received")
×
544
        }
×
545

UNCOV
546
        absoluteTimeLock, _ := inp.RequiredLockTime()
×
UNCOV
547
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
×
UNCOV
548
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
×
UNCOV
549
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
×
UNCOV
550
                inp.BlocksToMaturity(), absoluteTimeLock,
×
UNCOV
551
                btcutil.Amount(inp.SignDesc().Output.Value),
×
UNCOV
552
                inp.UnconfParent(), params)
×
UNCOV
553

×
UNCOV
554
        sweeperInput := &sweepInputMessage{
×
UNCOV
555
                input:      inp,
×
UNCOV
556
                params:     params,
×
UNCOV
557
                resultChan: make(chan Result, 1),
×
UNCOV
558
        }
×
UNCOV
559

×
UNCOV
560
        // Deliver input to the main event loop.
×
UNCOV
561
        select {
×
UNCOV
562
        case s.newInputs <- sweeperInput:
×
563
        case <-s.quit:
×
564
                return nil, ErrSweeperShuttingDown
×
565
        }
566

UNCOV
567
        return sweeperInput.resultChan, nil
×
568
}
569

570
// removeConflictSweepDescendants removes any transactions from the wallet that
571
// spend outputs included in the passed outpoint set. This needs to be done in
572
// cases where we're not the only ones that can sweep an output, but there may
573
// exist unconfirmed spends that spend outputs created by a sweep transaction.
574
// The most common case for this is when someone sweeps our anchor outputs
575
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
576
// as a backend when a channel is force closed and anchor cpfp txns are
577
// created to bump the initial commitment transaction. In this case an anchor
578
// cpfp is broadcasted for up to 3 commitment transactions (local,
579
// remote-dangling, remote). Using neutrino all of those transactions will be
580
// accepted (the commitment tx will be different in all of those cases) and have
581
// to be removed as soon as one of them confirmes (they do have the same
582
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
583
// nodes do not signal invalid transactions anymore.
584
func (s *UtxoSweeper) removeConflictSweepDescendants(
585
        outpoints map[wire.OutPoint]struct{}) error {
1✔
586

1✔
587
        // Obtain all the past sweeps that we've done so far. We'll need these
1✔
588
        // to ensure that if the spendingTx spends any of the same inputs, then
1✔
589
        // we remove any transaction that may be spending those inputs from the
1✔
590
        // wallet.
1✔
591
        //
1✔
592
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
1✔
593
        // from the store?
1✔
594
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
1✔
595
        if err != nil {
1✔
596
                return err
×
597
        }
×
598

599
        // We'll now go through each past transaction we published during this
600
        // epoch and cross reference the spent inputs. If there're any inputs
601
        // in common with the inputs the spendingTx spent, then we'll remove
602
        // those.
603
        //
604
        // TODO(roasbeef): need to start to remove all transaction hashes after
605
        // every N blocks (assumed point of no return)
606
        for _, sweepHash := range pastSweepHashes {
1✔
UNCOV
607
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
×
UNCOV
608
                if err != nil {
×
UNCOV
609
                        return err
×
UNCOV
610
                }
×
611

612
                // Transaction wasn't found in the wallet, may have already
613
                // been replaced/removed.
UNCOV
614
                if sweepTx == nil {
×
615
                        // If it was removed, then we'll play it safe and mark
×
616
                        // it as no longer need to be rebroadcasted.
×
617
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
618
                        continue
×
619
                }
620

621
                // Check to see if this past sweep transaction spent any of the
622
                // same inputs as spendingTx.
UNCOV
623
                var isConflicting bool
×
UNCOV
624
                for _, txIn := range sweepTx.TxIn {
×
UNCOV
625
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
×
UNCOV
626
                                isConflicting = true
×
UNCOV
627
                                break
×
628
                        }
629
                }
630

UNCOV
631
                if !isConflicting {
×
UNCOV
632
                        continue
×
633
                }
634

635
                // If it is conflicting, then we'll signal the wallet to remove
636
                // all the transactions that are descendants of outputs created
637
                // by the sweepTx and the sweepTx itself.
UNCOV
638
                log.Debugf("Removing sweep txid=%v from wallet: %v",
×
UNCOV
639
                        sweepTx.TxHash(), lnutils.SpewLogClosure(sweepTx))
×
UNCOV
640

×
UNCOV
641
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
×
UNCOV
642
                if err != nil {
×
643
                        log.Warnf("Unable to remove descendants: %v", err)
×
644
                }
×
645

646
                // If this transaction was conflicting, then we'll stop
647
                // rebroadcasting it in the background.
UNCOV
648
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
649
        }
650

651
        return nil
1✔
652
}
653

654
// collector is the sweeper main loop. It processes new inputs, spend
655
// notifications and counts down to publication of the sweep tx.
656
func (s *UtxoSweeper) collector() {
3✔
657
        defer s.wg.Done()
3✔
658

3✔
659
        for {
9✔
660
                // Clean inputs, which will remove inputs that are swept,
6✔
661
                // failed, or excluded from the sweeper and return inputs that
6✔
662
                // are either new or has been published but failed back, which
6✔
663
                // will be retried again here.
6✔
664
                s.updateSweeperInputs()
6✔
665

6✔
666
                select {
6✔
667
                // A new inputs is offered to the sweeper. We check to see if
668
                // we are already trying to sweep this input and if not, set up
669
                // a listener to spend and schedule a sweep.
UNCOV
670
                case input := <-s.newInputs:
×
UNCOV
671
                        err := s.handleNewInput(input)
×
UNCOV
672
                        if err != nil {
×
673
                                log.Criticalf("Unable to handle new input: %v",
×
674
                                        err)
×
675

×
676
                                return
×
677
                        }
×
678

679
                        // If this input is forced, we perform an sweep
680
                        // immediately.
681
                        //
682
                        // TODO(ziggie): Make sure when `immediate` is selected
683
                        // as a parameter that we only trigger the sweeping of
684
                        // this specific input rather than triggering the sweeps
685
                        // of all current pending inputs registered with the
686
                        // sweeper.
UNCOV
687
                        if input.params.Immediate {
×
UNCOV
688
                                inputs := s.updateSweeperInputs()
×
UNCOV
689
                                s.sweepPendingInputs(inputs)
×
UNCOV
690
                        }
×
691

692
                // A spend of one of our inputs is detected. Signal sweep
693
                // results to the caller(s).
UNCOV
694
                case spend := <-s.spendChan:
×
UNCOV
695
                        s.handleInputSpent(spend)
×
696

697
                // A new external request has been received to retrieve all of
698
                // the inputs we're currently attempting to sweep.
UNCOV
699
                case req := <-s.pendingSweepsReqs:
×
UNCOV
700
                        s.handlePendingSweepsReq(req)
×
701

702
                // A new external request has been received to bump the fee rate
703
                // of a given input.
UNCOV
704
                case req := <-s.updateReqs:
×
UNCOV
705
                        resultChan, err := s.handleUpdateReq(req)
×
UNCOV
706
                        req.responseChan <- &updateResp{
×
UNCOV
707
                                resultChan: resultChan,
×
UNCOV
708
                                err:        err,
×
UNCOV
709
                        }
×
UNCOV
710

×
UNCOV
711
                        // Perform an sweep immediately if asked.
×
UNCOV
712
                        if req.params.Immediate {
×
UNCOV
713
                                inputs := s.updateSweeperInputs()
×
UNCOV
714
                                s.sweepPendingInputs(inputs)
×
UNCOV
715
                        }
×
716

UNCOV
717
                case resp := <-s.bumpRespChan:
×
UNCOV
718
                        // Handle the bump event.
×
UNCOV
719
                        err := s.handleBumpEvent(resp)
×
UNCOV
720
                        if err != nil {
×
UNCOV
721
                                log.Errorf("Failed to handle bump event: %v",
×
UNCOV
722
                                        err)
×
UNCOV
723
                        }
×
724

725
                // A new external request has been received to trigger an
726
                // immediate sweep of all pending inputs.
727
                case req := <-s.triggerSweepReqs:
3✔
728
                        // Update the inputs with the latest height.
3✔
729
                        inputs := s.updateSweeperInputs()
3✔
730

3✔
731
                        // Mark all inputs as immediate so they are broadcast
3✔
732
                        // right away. This is necessary for testing where we
3✔
733
                        // want to deterministically trigger sweeps.
3✔
734
                        for _, inp := range inputs {
6✔
735
                                inp.params.Immediate = true
3✔
736
                        }
3✔
737

738
                        // Attempt to sweep any pending inputs.
739
                        s.sweepPendingInputs(inputs)
3✔
740

3✔
741
                        // Send back the number of inputs we attempted to sweep.
3✔
742
                        req.respChan <- len(inputs)
3✔
743

744
                // A new block comes in, update the bestHeight, perform a check
745
                // over all pending inputs and publish sweeping txns if needed.
UNCOV
746
                case beat := <-s.BlockbeatChan:
×
UNCOV
747
                        // Update the sweeper to the best height.
×
UNCOV
748
                        s.currentHeight = beat.Height()
×
UNCOV
749

×
UNCOV
750
                        // Update the inputs with the latest height.
×
UNCOV
751
                        inputs := s.updateSweeperInputs()
×
UNCOV
752

×
UNCOV
753
                        log.Debugf("Received new block: height=%v, attempt "+
×
UNCOV
754
                                "sweeping %d inputs:%s", s.currentHeight,
×
UNCOV
755
                                len(inputs),
×
UNCOV
756
                                lnutils.NewLogClosure(func() string {
×
UNCOV
757
                                        return inputsMapToString(inputs)
×
UNCOV
758
                                }))
×
759

760
                        // Attempt to sweep any pending inputs.
UNCOV
761
                        s.sweepPendingInputs(inputs)
×
UNCOV
762

×
UNCOV
763
                        // Notify we've processed the block.
×
UNCOV
764
                        s.NotifyBlockProcessed(beat, nil)
×
765

766
                case <-s.quit:
3✔
767
                        return
3✔
768
                }
769
        }
770
}
771

772
// removeExclusiveGroup removes all inputs in the given exclusive group except
773
// the input specified by the outpoint. This function is called when one of the
774
// exclusive group inputs has been spent or updated. The other inputs won't ever
775
// be spendable and can be removed. This also prevents them from being part of
776
// future sweep transactions that would fail. In addition sweep transactions of
777
// those inputs will be removed from the wallet.
UNCOV
778
func (s *UtxoSweeper) removeExclusiveGroup(group uint64, op wire.OutPoint) {
×
UNCOV
779
        for outpoint, input := range s.inputs {
×
UNCOV
780
                outpoint := outpoint
×
UNCOV
781

×
UNCOV
782
                // Skip the input that caused the exclusive group to be removed.
×
UNCOV
783
                if outpoint == op {
×
UNCOV
784
                        log.Debugf("Skipped removing exclusive input %v", input)
×
UNCOV
785

×
UNCOV
786
                        continue
×
787
                }
788

789
                // Skip inputs that aren't exclusive.
UNCOV
790
                if input.params.ExclusiveGroup == nil {
×
UNCOV
791
                        continue
×
792
                }
793

794
                // Skip inputs from other exclusive groups.
UNCOV
795
                if *input.params.ExclusiveGroup != group {
×
796
                        continue
×
797
                }
798

799
                // Skip inputs that are already terminated.
UNCOV
800
                if input.terminated() {
×
801
                        log.Tracef("Skipped sending error result for "+
×
802
                                "input %v, state=%v", outpoint, input.state)
×
803

×
804
                        continue
×
805
                }
806

UNCOV
807
                log.Debugf("Removing exclusive group for input %v", input)
×
UNCOV
808

×
UNCOV
809
                // Signal result channels.
×
UNCOV
810
                s.signalResult(input, Result{
×
UNCOV
811
                        Err: ErrExclusiveGroupSpend,
×
UNCOV
812
                })
×
UNCOV
813

×
UNCOV
814
                // Update the input's state as it can no longer be swept.
×
UNCOV
815
                input.state = Excluded
×
UNCOV
816

×
UNCOV
817
                // Remove all unconfirmed transactions from the wallet which
×
UNCOV
818
                // spend the passed outpoint of the same exclusive group.
×
UNCOV
819
                outpoints := map[wire.OutPoint]struct{}{
×
UNCOV
820
                        outpoint: {},
×
UNCOV
821
                }
×
UNCOV
822
                err := s.removeConflictSweepDescendants(outpoints)
×
UNCOV
823
                if err != nil {
×
UNCOV
824
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
UNCOV
825
                                "wallet for outpoint %v : %v", outpoint, err)
×
UNCOV
826
                }
×
827
        }
828
}
829

830
// signalResult notifies the listeners of the final result of the input sweep.
831
// It also cancels any pending spend notification.
832
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
11✔
833
        op := pi.OutPoint()
11✔
834
        listeners := pi.listeners
11✔
835

11✔
836
        if result.Err == nil {
16✔
837
                log.Tracef("Dispatching sweep success for %v to %v listeners",
5✔
838
                        op, len(listeners),
5✔
839
                )
5✔
840
        } else {
11✔
841
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
6✔
842
                        op, len(listeners), result.Err,
6✔
843
                )
6✔
844
        }
6✔
845

846
        // Signal all listeners. Channel is buffered. Because we only send once
847
        // on every channel, it should never block.
848
        for _, resultChan := range listeners {
11✔
UNCOV
849
                resultChan <- result
×
UNCOV
850
        }
×
851

852
        // Cancel spend notification with chain notifier. This is not necessary
853
        // in case of a success, except for that a reorg could still happen.
854
        if pi.ntfnRegCancel != nil {
11✔
UNCOV
855
                log.Debugf("Canceling spend ntfn for %v", op)
×
UNCOV
856

×
UNCOV
857
                pi.ntfnRegCancel()
×
UNCOV
858
        }
×
859
}
860

861
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
862
// the tx. The output address is only marked as used if the publish succeeds.
863
func (s *UtxoSweeper) sweep(set InputSet) error {
4✔
864
        // Generate an output script if there isn't an unused script available.
4✔
865
        if s.currentOutputScript.IsNone() {
7✔
866
                addr, err := s.cfg.GenSweepScript().Unpack()
3✔
867
                if err != nil {
3✔
868
                        return fmt.Errorf("gen sweep script: %w", err)
×
869
                }
×
870
                s.currentOutputScript = fn.Some(addr)
3✔
871

3✔
872
                log.Debugf("Created sweep DeliveryAddress %x",
3✔
873
                        addr.DeliveryAddress)
3✔
874
        }
875

876
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
4✔
877
                fmt.Errorf("none sweep script"),
4✔
878
        )
4✔
879
        if err != nil {
4✔
880
                return err
×
881
        }
×
882

883
        // Create a fee bump request and ask the publisher to broadcast it. The
884
        // publisher will then take over and start monitoring the tx for
885
        // potential fee bump.
886
        req := &BumpRequest{
4✔
887
                Inputs:          set.Inputs(),
4✔
888
                Budget:          set.Budget(),
4✔
889
                DeadlineHeight:  set.DeadlineHeight(),
4✔
890
                DeliveryAddress: sweepAddr,
4✔
891
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
4✔
892
                StartingFeeRate: set.StartingFeeRate(),
4✔
893
                Immediate:       set.Immediate(),
4✔
894
                // TODO(yy): pass the strategy here.
4✔
895
        }
4✔
896

4✔
897
        // Reschedule the inputs that we just tried to sweep. This is done in
4✔
898
        // case the following publish fails, we'd like to update the inputs'
4✔
899
        // publish attempts and rescue them in the next sweep.
4✔
900
        s.markInputsPendingPublish(set)
4✔
901

4✔
902
        // Broadcast will return a read-only chan that we will listen to for
4✔
903
        // this publish result and future RBF attempt.
4✔
904
        resp := s.cfg.Publisher.Broadcast(req)
4✔
905

4✔
906
        // Successfully sent the broadcast attempt, we now handle the result by
4✔
907
        // subscribing to the result chan and listen for future updates about
4✔
908
        // this tx.
4✔
909
        s.wg.Add(1)
4✔
910
        go s.monitorFeeBumpResult(set, resp)
4✔
911

4✔
912
        return nil
4✔
913
}
914

915
// markInputsPendingPublish updates the pending inputs with the given tx
916
// inputs. It also increments the `publishAttempts`.
917
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
5✔
918
        // Reschedule sweep.
5✔
919
        for _, input := range set.Inputs() {
11✔
920
                op := input.OutPoint()
6✔
921
                pi, ok := s.inputs[op]
6✔
922
                if !ok {
6✔
UNCOV
923
                        // It could be that this input is an additional wallet
×
UNCOV
924
                        // input that was attached. In that case there also
×
UNCOV
925
                        // isn't a pending input to update.
×
UNCOV
926
                        log.Tracef("Skipped marking input as pending "+
×
UNCOV
927
                                "published: %v not found in pending inputs", op)
×
UNCOV
928

×
UNCOV
929
                        continue
×
930
                }
931

932
                // If this input has already terminated, there's clearly
933
                // something wrong as it would have been removed. In this case
934
                // we log an error and skip marking this input as pending
935
                // publish.
936
                if pi.terminated() {
7✔
937
                        log.Errorf("Expect input %v to not have terminated "+
1✔
938
                                "state, instead it has %v", op, pi.state)
1✔
939

1✔
940
                        continue
1✔
941
                }
942

943
                // Update the input's state.
944
                pi.state = PendingPublish
5✔
945

5✔
946
                // Record another publish attempt.
5✔
947
                pi.publishAttempts++
5✔
948
        }
949
}
950

951
// markInputsPublished updates the sweeping tx in db and marks the list of
952
// inputs as published.
953
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
4✔
954
        // Mark this tx in db once successfully published.
4✔
955
        //
4✔
956
        // NOTE: this will behave as an overwrite, which is fine as the record
4✔
957
        // is small.
4✔
958
        tr.Published = true
4✔
959
        err := s.cfg.Store.StoreTx(tr)
4✔
960
        if err != nil {
5✔
961
                return fmt.Errorf("store tx: %w", err)
1✔
962
        }
1✔
963

964
        // Reschedule sweep.
965
        for _, input := range set.Inputs() {
7✔
966
                op := input.OutPoint()
4✔
967
                pi, ok := s.inputs[op]
4✔
968
                if !ok {
4✔
UNCOV
969
                        // It could be that this input is an additional wallet
×
UNCOV
970
                        // input that was attached. In that case there also
×
UNCOV
971
                        // isn't a pending input to update.
×
UNCOV
972
                        log.Tracef("Skipped marking input as published: %v "+
×
UNCOV
973
                                "not found in pending inputs", op)
×
UNCOV
974

×
UNCOV
975
                        continue
×
976
                }
977

978
                // Valdiate that the input is in an expected state.
979
                if pi.state != PendingPublish {
5✔
980
                        // We may get a Published if this is a replacement tx.
1✔
981
                        log.Debugf("Expect input %v to have %v, instead it "+
1✔
982
                                "has %v", op, PendingPublish, pi.state)
1✔
983

1✔
984
                        continue
1✔
985
                }
986

987
                // Update the input's state.
988
                pi.state = Published
3✔
989

3✔
990
                // Update the input's latest fee rate.
3✔
991
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
992
        }
993

994
        return nil
3✔
995
}
996

997
// markInputsPublishFailed marks the list of inputs as failed to be published.
998
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet,
999
        feeRate chainfee.SatPerKWeight) {
4✔
1000

4✔
1001
        // Reschedule sweep.
4✔
1002
        for _, inp := range set.Inputs() {
17✔
1003
                op := inp.OutPoint()
13✔
1004
                pi, ok := s.inputs[op]
13✔
1005
                if !ok {
13✔
UNCOV
1006
                        // It could be that this input is an additional wallet
×
UNCOV
1007
                        // input that was attached. In that case there also
×
UNCOV
1008
                        // isn't a pending input to update.
×
UNCOV
1009
                        log.Tracef("Skipped marking input as publish failed: "+
×
UNCOV
1010
                                "%v not found in pending inputs", op)
×
UNCOV
1011

×
UNCOV
1012
                        continue
×
1013
                }
1014

1015
                // Valdiate that the input is in an expected state.
1016
                if pi.state != PendingPublish && pi.state != Published {
18✔
1017
                        log.Debugf("Expect input %v to have %v, instead it "+
5✔
1018
                                "has %v", op, PendingPublish, pi.state)
5✔
1019

5✔
1020
                        continue
5✔
1021
                }
1022

1023
                log.Warnf("Failed to publish input %v", op)
8✔
1024

8✔
1025
                // Update the input's state.
8✔
1026
                pi.state = PublishFailed
8✔
1027

8✔
1028
                log.Debugf("Input(%v): updating params: starting fee rate "+
8✔
1029
                        "[%v -> %v]", op, pi.params.StartingFeeRate,
8✔
1030
                        feeRate)
8✔
1031

8✔
1032
                // Update the input using the fee rate specified from the
8✔
1033
                // BumpResult, which should be the starting fee rate to use for
8✔
1034
                // the next sweeping attempt.
8✔
1035
                pi.params.StartingFeeRate = fn.Some(feeRate)
8✔
1036
        }
1037
}
1038

1039
// monitorSpend registers a spend notification with the chain notifier. It
1040
// returns a cancel function that can be used to cancel the registration.
1041
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
UNCOV
1042
        script []byte, heightHint uint32) (func(), error) {
×
UNCOV
1043

×
UNCOV
1044
        log.Tracef("Wait for spend of %v at heightHint=%v",
×
UNCOV
1045
                outpoint, heightHint)
×
UNCOV
1046

×
UNCOV
1047
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
×
UNCOV
1048
                &outpoint, script, heightHint,
×
UNCOV
1049
        )
×
UNCOV
1050
        if err != nil {
×
1051
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
1052
        }
×
1053

UNCOV
1054
        s.wg.Add(1)
×
UNCOV
1055
        go func() {
×
UNCOV
1056
                defer s.wg.Done()
×
UNCOV
1057

×
UNCOV
1058
                select {
×
UNCOV
1059
                case spend, ok := <-spendEvent.Spend:
×
UNCOV
1060
                        if !ok {
×
UNCOV
1061
                                log.Debugf("Spend ntfn for %v canceled",
×
UNCOV
1062
                                        outpoint)
×
UNCOV
1063
                                return
×
UNCOV
1064
                        }
×
1065

UNCOV
1066
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
×
UNCOV
1067

×
UNCOV
1068
                        select {
×
UNCOV
1069
                        case s.spendChan <- spend:
×
UNCOV
1070
                                log.Debugf("Delivered spend ntfn for %v",
×
UNCOV
1071
                                        outpoint)
×
1072

1073
                        case <-s.quit:
×
1074
                        }
UNCOV
1075
                case <-s.quit:
×
1076
                }
1077
        }()
1078

UNCOV
1079
        return spendEvent.Cancel, nil
×
1080
}
1081

1082
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1083
// attempting to sweep.
1084
func (s *UtxoSweeper) PendingInputs() (
UNCOV
1085
        map[wire.OutPoint]*PendingInputResponse, error) {
×
UNCOV
1086

×
UNCOV
1087
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
×
UNCOV
1088
        errChan := make(chan error, 1)
×
UNCOV
1089
        select {
×
1090
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1091
                respChan: respChan,
1092
                errChan:  errChan,
UNCOV
1093
        }:
×
1094
        case <-s.quit:
×
1095
                return nil, ErrSweeperShuttingDown
×
1096
        }
1097

UNCOV
1098
        select {
×
UNCOV
1099
        case pendingSweeps := <-respChan:
×
UNCOV
1100
                return pendingSweeps, nil
×
1101
        case err := <-errChan:
×
1102
                return nil, err
×
1103
        case <-s.quit:
×
1104
                return nil, ErrSweeperShuttingDown
×
1105
        }
1106
}
1107

1108
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1109
// UtxoSweeper is attempting to sweep.
1110
func (s *UtxoSweeper) handlePendingSweepsReq(
UNCOV
1111
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
×
UNCOV
1112

×
UNCOV
1113
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
×
UNCOV
1114
        for _, inp := range s.inputs {
×
UNCOV
1115
                _, maturityHeight := inp.isMature(uint32(s.currentHeight))
×
UNCOV
1116

×
UNCOV
1117
                // Only the exported fields are set, as we expect the response
×
UNCOV
1118
                // to only be consumed externally.
×
UNCOV
1119
                op := inp.OutPoint()
×
UNCOV
1120
                resps[op] = &PendingInputResponse{
×
UNCOV
1121
                        OutPoint:    op,
×
UNCOV
1122
                        WitnessType: inp.WitnessType(),
×
UNCOV
1123
                        Amount: btcutil.Amount(
×
UNCOV
1124
                                inp.SignDesc().Output.Value,
×
UNCOV
1125
                        ),
×
UNCOV
1126
                        LastFeeRate:       inp.lastFeeRate,
×
UNCOV
1127
                        BroadcastAttempts: inp.publishAttempts,
×
UNCOV
1128
                        Params:            inp.params,
×
UNCOV
1129
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
×
UNCOV
1130
                        MaturityHeight:    maturityHeight,
×
UNCOV
1131
                }
×
UNCOV
1132
        }
×
1133

UNCOV
1134
        select {
×
UNCOV
1135
        case req.respChan <- resps:
×
1136
        case <-s.quit:
×
1137
                log.Debug("Skipped sending pending sweep response due to " +
×
1138
                        "UtxoSweeper shutting down")
×
1139
        }
1140

UNCOV
1141
        return resps
×
1142
}
1143

1144
// UpdateParams allows updating the sweep parameters of a pending input in the
1145
// UtxoSweeper. This function can be used to provide an updated fee preference
1146
// and force flag that will be used for a new sweep transaction of the input
1147
// that will act as a replacement transaction (RBF) of the original sweeping
1148
// transaction, if any. The exclusive group is left unchanged.
1149
//
1150
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1151
// is actually successful. The responsibility of doing so should be handled by
1152
// the caller.
1153
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
UNCOV
1154
        params Params) (chan Result, error) {
×
UNCOV
1155

×
UNCOV
1156
        responseChan := make(chan *updateResp, 1)
×
UNCOV
1157
        select {
×
1158
        case s.updateReqs <- &updateReq{
1159
                input:        input,
1160
                params:       params,
1161
                responseChan: responseChan,
UNCOV
1162
        }:
×
1163
        case <-s.quit:
×
1164
                return nil, ErrSweeperShuttingDown
×
1165
        }
1166

UNCOV
1167
        select {
×
UNCOV
1168
        case response := <-responseChan:
×
UNCOV
1169
                return response.resultChan, response.err
×
1170
        case <-s.quit:
×
1171
                return nil, ErrSweeperShuttingDown
×
1172
        }
1173
}
1174

1175
// handleUpdateReq handles an update request by simply updating the sweep
1176
// parameters of the pending input. Currently, no validation is done on the new
1177
// fee preference to ensure it will properly create a replacement transaction.
1178
//
1179
// TODO(wilmer):
1180
//   - Validate fee preference to ensure we'll create a valid replacement
1181
//     transaction to allow the new fee rate to propagate throughout the
1182
//     network.
1183
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1184
//     did not exist in the original sweep transaction, resulting in an invalid
1185
//     replacement transaction.
1186
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
UNCOV
1187
        chan Result, error) {
×
UNCOV
1188

×
UNCOV
1189
        // If the UtxoSweeper is already trying to sweep this input, then we can
×
UNCOV
1190
        // simply just increase its fee rate. This will allow the input to be
×
UNCOV
1191
        // batched with others which also have a similar fee rate, creating a
×
UNCOV
1192
        // higher fee rate transaction that replaces the original input's
×
UNCOV
1193
        // sweeping transaction.
×
UNCOV
1194
        sweeperInput, ok := s.inputs[req.input]
×
UNCOV
1195
        if !ok {
×
1196
                return nil, lnwallet.ErrNotMine
×
1197
        }
×
1198

1199
        // Create the updated parameters struct. Leave the exclusive group
1200
        // unchanged.
UNCOV
1201
        newParams := Params{
×
UNCOV
1202
                StartingFeeRate: req.params.StartingFeeRate,
×
UNCOV
1203
                Immediate:       req.params.Immediate,
×
UNCOV
1204
                Budget:          req.params.Budget,
×
UNCOV
1205
                DeadlineHeight:  req.params.DeadlineHeight,
×
UNCOV
1206
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
×
UNCOV
1207
        }
×
UNCOV
1208

×
UNCOV
1209
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
×
UNCOV
1210
                req.input, sweeperInput.state, sweeperInput.params, newParams)
×
UNCOV
1211

×
UNCOV
1212
        sweeperInput.params = newParams
×
UNCOV
1213

×
UNCOV
1214
        // We need to reset the state so this input will be attempted again by
×
UNCOV
1215
        // our sweeper.
×
UNCOV
1216
        //
×
UNCOV
1217
        // TODO(yy): a dedicated state?
×
UNCOV
1218
        sweeperInput.state = Init
×
UNCOV
1219

×
UNCOV
1220
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1221
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1222
                sweeperInput.DeadlineHeight,
×
UNCOV
1223
        )
×
UNCOV
1224

×
UNCOV
1225
        resultChan := make(chan Result, 1)
×
UNCOV
1226
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
×
UNCOV
1227

×
UNCOV
1228
        return resultChan, nil
×
1229
}
1230

1231
// ListSweeps returns a list of the sweeps recorded by the sweep store.
UNCOV
1232
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
×
UNCOV
1233
        return s.cfg.Store.ListSweeps()
×
UNCOV
1234
}
×
1235

1236
// TriggerSweep triggers an immediate attempt to create and broadcast sweep
1237
// transactions for all pending inputs. This is useful for testing to
1238
// deterministically control when sweeps are broadcast. This method is
1239
// thread-safe as it sends a message to the collector goroutine's event loop.
1240
func (s *UtxoSweeper) TriggerSweep() int {
4✔
1241
        req := &triggerSweepReq{
4✔
1242
                respChan: make(chan int, 1),
4✔
1243
        }
4✔
1244

4✔
1245
        select {
4✔
1246
        case s.triggerSweepReqs <- req:
3✔
1247
                return <-req.respChan
3✔
1248
        case <-s.quit:
1✔
1249
                return 0
1✔
1250
        }
1251
}
1252

1253
// mempoolLookup takes an input's outpoint and queries the mempool to see
1254
// whether it's already been spent in a transaction found in the mempool.
1255
// Returns the transaction if found.
1256
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
7✔
1257
        // For neutrino backend, there's no mempool available, so we exit
7✔
1258
        // early.
7✔
1259
        if s.cfg.Mempool == nil {
8✔
1260
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1261

1✔
1262
                return fn.None[wire.MsgTx]()
1✔
1263
        }
1✔
1264

1265
        // Query this input in the mempool. If this outpoint is already spent
1266
        // in mempool, we should get a spending event back immediately.
1267
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
6✔
1268
}
1269

1270
// calculateDefaultDeadline calculates the default deadline height for a sweep
1271
// request that has no deadline height specified.
UNCOV
1272
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
×
UNCOV
1273
        // Create a default deadline height, which will be used when there's no
×
UNCOV
1274
        // DeadlineHeight specified for a given input.
×
UNCOV
1275
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
×
UNCOV
1276

×
UNCOV
1277
        // If the input is immature and has a locktime, we'll use the locktime
×
UNCOV
1278
        // height as the starting height.
×
UNCOV
1279
        matured, locktime := pi.isMature(uint32(s.currentHeight))
×
UNCOV
1280
        if !matured {
×
UNCOV
1281
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
×
UNCOV
1282
                log.Debugf("Input %v is immature, using locktime=%v instead "+
×
UNCOV
1283
                        "of current height=%d as starting height",
×
UNCOV
1284
                        pi.OutPoint(), locktime, s.currentHeight)
×
UNCOV
1285
        }
×
1286

UNCOV
1287
        return defaultDeadline
×
1288
}
1289

1290
// handleNewInput processes a new input by registering spend notification and
1291
// scheduling sweeping for it.
UNCOV
1292
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
×
UNCOV
1293
        outpoint := input.input.OutPoint()
×
UNCOV
1294
        pi, pending := s.inputs[outpoint]
×
UNCOV
1295
        if pending {
×
UNCOV
1296
                log.Infof("Already has pending input %v received, old params: "+
×
UNCOV
1297
                        "%v, new params %v", outpoint, pi.params, input.params)
×
UNCOV
1298

×
UNCOV
1299
                s.handleExistingInput(input, pi)
×
UNCOV
1300

×
UNCOV
1301
                return nil
×
UNCOV
1302
        }
×
1303

1304
        // This is a new input, and we want to query the mempool to see if this
1305
        // input has already been spent. If so, we'll start the input with the
1306
        // RBFInfo.
UNCOV
1307
        rbfInfo := s.decideRBFInfo(input.input.OutPoint())
×
UNCOV
1308

×
UNCOV
1309
        // Create a new pendingInput and initialize the listeners slice with
×
UNCOV
1310
        // the passed in result channel. If this input is offered for sweep
×
UNCOV
1311
        // again, the result channel will be appended to this slice.
×
UNCOV
1312
        pi = &SweeperInput{
×
UNCOV
1313
                state:     Init,
×
UNCOV
1314
                listeners: []chan Result{input.resultChan},
×
UNCOV
1315
                Input:     input.input,
×
UNCOV
1316
                params:    input.params,
×
UNCOV
1317
                rbf:       rbfInfo,
×
UNCOV
1318
        }
×
UNCOV
1319

×
UNCOV
1320
        // Set the starting fee rate if a previous sweeping tx is found.
×
UNCOV
1321
        rbfInfo.WhenSome(func(info RBFInfo) {
×
UNCOV
1322
                pi.params.StartingFeeRate = fn.Some(info.FeeRate)
×
UNCOV
1323
        })
×
1324

1325
        // Set the acutal deadline height.
UNCOV
1326
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1327
                s.calculateDefaultDeadline(pi),
×
UNCOV
1328
        )
×
UNCOV
1329

×
UNCOV
1330
        s.inputs[outpoint] = pi
×
UNCOV
1331
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
×
UNCOV
1332

×
UNCOV
1333
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
×
UNCOV
1334
                "witness_type=%v, amount=%v, deadline=%d, state=%v, "+
×
UNCOV
1335
                "params=(%v)", s.currentHeight, pi.OutPoint(), pi.WitnessType(),
×
UNCOV
1336
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
×
UNCOV
1337
                pi.state, pi.params)
×
UNCOV
1338

×
UNCOV
1339
        // Start watching for spend of this input, either by us or the remote
×
UNCOV
1340
        // party.
×
UNCOV
1341
        cancel, err := s.monitorSpend(
×
UNCOV
1342
                outpoint, input.input.SignDesc().Output.PkScript,
×
UNCOV
1343
                input.input.HeightHint(),
×
UNCOV
1344
        )
×
UNCOV
1345
        if err != nil {
×
1346
                err := fmt.Errorf("wait for spend: %w", err)
×
1347
                s.markInputFatal(pi, nil, err)
×
1348

×
1349
                return err
×
1350
        }
×
1351

UNCOV
1352
        pi.ntfnRegCancel = cancel
×
UNCOV
1353

×
UNCOV
1354
        return nil
×
1355
}
1356

1357
// decideRBFInfo queries the mempool to see whether the given input has already
1358
// been spent. When spent, it will query the sweeper store to fetch the fee info
1359
// of the spending transction, and construct an RBFInfo based on it. Suppose an
1360
// error occurs, fn.None is returned.
1361
func (s *UtxoSweeper) decideRBFInfo(
1362
        op wire.OutPoint) fn.Option[RBFInfo] {
4✔
1363

4✔
1364
        // Check if we can find the spending tx of this input in mempool.
4✔
1365
        txOption := s.mempoolLookup(op)
4✔
1366

4✔
1367
        // Extract the spending tx from the option.
4✔
1368
        var tx *wire.MsgTx
4✔
1369
        txOption.WhenSome(func(t wire.MsgTx) {
7✔
1370
                tx = &t
3✔
1371
        })
3✔
1372

1373
        // Exit early if it's not found.
1374
        //
1375
        // NOTE: this is not accurate for backends that don't support mempool
1376
        // lookup:
1377
        // - for neutrino we don't have a mempool.
1378
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1379
        if tx == nil {
5✔
1380
                return fn.None[RBFInfo]()
1✔
1381
        }
1✔
1382

1383
        // Otherwise the input is already spent in the mempool, so eventually
1384
        // we will return Published.
1385
        //
1386
        // We also need to update the RBF info for this input. If the sweeping
1387
        // transaction is broadcast by us, we can find the fee info in the
1388
        // sweeper store.
1389
        txid := tx.TxHash()
3✔
1390
        tr, err := s.cfg.Store.GetTx(txid)
3✔
1391

3✔
1392
        log.Debugf("Found spending tx %v in mempool for input %v", tx.TxHash(),
3✔
1393
                op)
3✔
1394

3✔
1395
        // If the tx is not found in the store, it means it's not broadcast by
3✔
1396
        // us, hence we can't find the fee info. This is fine as, later on when
3✔
1397
        // this tx is confirmed, we will remove the input from our inputs.
3✔
1398
        if errors.Is(err, ErrTxNotFound) {
4✔
1399
                log.Warnf("Spending tx %v not found in sweeper store", txid)
1✔
1400
                return fn.None[RBFInfo]()
1✔
1401
        }
1✔
1402

1403
        // Exit if we get an db error.
1404
        if err != nil {
3✔
1405
                log.Errorf("Unable to get tx %v from sweeper store: %v",
1✔
1406
                        txid, err)
1✔
1407

1✔
1408
                return fn.None[RBFInfo]()
1✔
1409
        }
1✔
1410

1411
        // Prepare the fee info and return it.
1412
        rbf := fn.Some(RBFInfo{
1✔
1413
                Txid:    txid,
1✔
1414
                Fee:     btcutil.Amount(tr.Fee),
1✔
1415
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
1✔
1416
        })
1✔
1417

1✔
1418
        return rbf
1✔
1419
}
1420

1421
// handleExistingInput processes an input that is already known to the sweeper.
1422
// It will overwrite the params of the old input with the new ones.
1423
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
UNCOV
1424
        oldInput *SweeperInput) {
×
UNCOV
1425

×
UNCOV
1426
        // Before updating the input details, check if a previous exclusive
×
UNCOV
1427
        // group was set. In case the same input is registered again, the
×
UNCOV
1428
        // previous input and its sweep parameters are outdated hence need to be
×
UNCOV
1429
        // replaced. This scenario currently only happens for anchor outputs.
×
UNCOV
1430
        // When a channel is force closed, in the worst case 3 different sweeps
×
UNCOV
1431
        // with the same exclusive group are registered with the sweeper to bump
×
UNCOV
1432
        // the closing transaction (cpfp) when its time critical. Receiving an
×
UNCOV
1433
        // input which was already registered with the sweeper means none of the
×
UNCOV
1434
        // previous inputs were used as CPFP, so we need to make sure we update
×
UNCOV
1435
        // the sweep parameters but also remove all inputs with the same
×
UNCOV
1436
        // exclusive group because they are outdated too.
×
UNCOV
1437
        var prevExclGroup *uint64
×
UNCOV
1438
        if oldInput.params.ExclusiveGroup != nil {
×
UNCOV
1439
                prevExclGroup = new(uint64)
×
UNCOV
1440
                *prevExclGroup = *oldInput.params.ExclusiveGroup
×
UNCOV
1441
        }
×
1442

1443
        // Update input details and sweep parameters. The re-offered input
1444
        // details may contain a change to the unconfirmed parent tx info.
UNCOV
1445
        oldInput.params = input.params
×
UNCOV
1446
        oldInput.Input = input.input
×
UNCOV
1447

×
UNCOV
1448
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1449
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1450
                oldInput.DeadlineHeight,
×
UNCOV
1451
        )
×
UNCOV
1452

×
UNCOV
1453
        // Add additional result channel to signal spend of this input.
×
UNCOV
1454
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
×
UNCOV
1455

×
UNCOV
1456
        if prevExclGroup != nil {
×
UNCOV
1457
                s.removeExclusiveGroup(*prevExclGroup, input.input.OutPoint())
×
UNCOV
1458
        }
×
1459
}
1460

1461
// handleInputSpent takes a spend event of our input and updates the sweeper's
1462
// internal state to remove the input.
UNCOV
1463
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
×
UNCOV
1464
        // Query store to find out if we ever published this tx.
×
UNCOV
1465
        spendHash := *spend.SpenderTxHash
×
UNCOV
1466
        isOurTx := s.cfg.Store.IsOurTx(spendHash)
×
UNCOV
1467

×
UNCOV
1468
        // If this isn't our transaction, it means someone else swept outputs
×
UNCOV
1469
        // that we were attempting to sweep. This can happen for anchor outputs
×
UNCOV
1470
        // as well as justice transactions. In this case, we'll notify the
×
UNCOV
1471
        // wallet to remove any spends that descent from this output.
×
UNCOV
1472
        if !isOurTx {
×
UNCOV
1473
                // Construct a map of the inputs this transaction spends.
×
UNCOV
1474
                spendingTx := spend.SpendingTx
×
UNCOV
1475
                inputsSpent := make(
×
UNCOV
1476
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
×
UNCOV
1477
                )
×
UNCOV
1478
                for _, txIn := range spendingTx.TxIn {
×
UNCOV
1479
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
×
UNCOV
1480
                }
×
1481

UNCOV
1482
                log.Debugf("Attempting to remove descendant txns invalidated "+
×
UNCOV
1483
                        "by (txid=%v): %v", spendingTx.TxHash(),
×
UNCOV
1484
                        lnutils.SpewLogClosure(spendingTx))
×
UNCOV
1485

×
UNCOV
1486
                err := s.removeConflictSweepDescendants(inputsSpent)
×
UNCOV
1487
                if err != nil {
×
UNCOV
1488
                        log.Warnf("unable to remove descendant transactions "+
×
UNCOV
1489
                                "due to tx %v: ", spendHash)
×
UNCOV
1490
                }
×
1491

UNCOV
1492
                log.Debugf("Detected third party spend related to in flight "+
×
UNCOV
1493
                        "inputs (is_ours=%v): %v", isOurTx,
×
UNCOV
1494
                        lnutils.SpewLogClosure(spend.SpendingTx))
×
1495
        }
1496

1497
        // We now use the spending tx to update the state of the inputs.
UNCOV
1498
        s.markInputsSwept(spend.SpendingTx, isOurTx)
×
1499
}
1500

1501
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1502
// It will also notify all the subscribers of this input.
1503
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
1✔
1504
        for _, txIn := range tx.TxIn {
5✔
1505
                outpoint := txIn.PreviousOutPoint
4✔
1506

4✔
1507
                // Check if this input is known to us. It could probably be
4✔
1508
                // unknown if we canceled the registration, deleted from inputs
4✔
1509
                // map but the ntfn was in-flight already. Or this could be not
4✔
1510
                // one of our inputs.
4✔
1511
                input, ok := s.inputs[outpoint]
4✔
1512
                if !ok {
5✔
1513
                        // It's very likely that a spending tx contains inputs
1✔
1514
                        // that we don't know.
1✔
1515
                        log.Tracef("Skipped marking input as swept: %v not "+
1✔
1516
                                "found in pending inputs", outpoint)
1✔
1517

1✔
1518
                        continue
1✔
1519
                }
1520

1521
                // This input may already been marked as swept by a previous
1522
                // spend notification, which is likely to happen as one sweep
1523
                // transaction usually sweeps multiple inputs.
1524
                if input.terminated() {
4✔
1525
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1526
                                "state=%v", outpoint, input.state)
1✔
1527

1✔
1528
                        continue
1✔
1529
                }
1530

1531
                input.state = Swept
2✔
1532

2✔
1533
                // Return either a nil or a remote spend result.
2✔
1534
                var err error
2✔
1535
                if !isOurTx {
2✔
UNCOV
1536
                        log.Warnf("Input=%v was spent by remote or third "+
×
UNCOV
1537
                                "party in tx=%v", outpoint, tx.TxHash())
×
UNCOV
1538
                        err = ErrRemoteSpend
×
UNCOV
1539
                }
×
1540

1541
                // Signal result channels.
1542
                s.signalResult(input, Result{
2✔
1543
                        Tx:  tx,
2✔
1544
                        Err: err,
2✔
1545
                })
2✔
1546

2✔
1547
                // Remove all other inputs in this exclusive group.
2✔
1548
                if input.params.ExclusiveGroup != nil {
2✔
UNCOV
1549
                        s.removeExclusiveGroup(
×
UNCOV
1550
                                *input.params.ExclusiveGroup, outpoint,
×
UNCOV
1551
                        )
×
UNCOV
1552
                }
×
1553
        }
1554
}
1555

1556
// markInputFatal marks the given input as fatal and won't be retried. It
1557
// will also notify all the subscribers of this input.
1558
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, tx *wire.MsgTx,
1559
        err error) {
6✔
1560

6✔
1561
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
6✔
1562

6✔
1563
        pi.state = Fatal
6✔
1564

6✔
1565
        s.signalResult(pi, Result{
6✔
1566
                Tx:  tx,
6✔
1567
                Err: err,
6✔
1568
        })
6✔
1569
}
6✔
1570

1571
// updateSweeperInputs updates the sweeper's internal state and returns a map
1572
// of inputs to be swept. It will remove the inputs that are in final states,
1573
// and returns a map of inputs that have either state Init or PublishFailed.
1574
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
11✔
1575
        // Create a map of inputs to be swept.
11✔
1576
        inputs := make(InputsMap)
11✔
1577

11✔
1578
        // Iterate the pending inputs and update the sweeper's state.
11✔
1579
        //
11✔
1580
        // TODO(yy): sweeper is made to communicate via go channels, so no
11✔
1581
        // locks are needed to access the map. However, it'd be safer if we
11✔
1582
        // turn this inputs map into a SyncMap in case we wanna add concurrent
11✔
1583
        // access to the map in the future.
11✔
1584
        for op, input := range s.inputs {
34✔
1585
                log.Tracef("Checking input: %s, state=%v", input, input.state)
23✔
1586

23✔
1587
                // If the input has reached a final state, that it's either
23✔
1588
                // been swept, or failed, or excluded, we will remove it from
23✔
1589
                // our sweeper.
23✔
1590
                if input.terminated() {
27✔
1591
                        log.Debugf("Removing input(State=%v) %v from sweeper",
4✔
1592
                                input.state, op)
4✔
1593

4✔
1594
                        delete(s.inputs, op)
4✔
1595

4✔
1596
                        continue
4✔
1597
                }
1598

1599
                // If this input has been included in a sweep tx that's not
1600
                // published yet, we'd skip this input and wait for the sweep
1601
                // tx to be published.
1602
                if input.state == PendingPublish {
26✔
1603
                        continue
7✔
1604
                }
1605

1606
                // If this input has already been published, we will need to
1607
                // check the RBF condition before attempting another sweeping.
1608
                if input.state == Published {
13✔
1609
                        continue
1✔
1610
                }
1611

1612
                // If the input has a locktime that's not yet reached, we will
1613
                // skip this input and wait for the locktime to be reached.
1614
                mature, _ := input.isMature(uint32(s.currentHeight))
11✔
1615
                if !mature {
13✔
1616
                        continue
2✔
1617
                }
1618

1619
                // If this input is new or has been failed to be published,
1620
                // we'd retry it. The assumption here is that when an error is
1621
                // returned from `PublishTransaction`, it means the tx has
1622
                // failed to meet the policy, hence it's not in the mempool.
1623
                inputs[op] = input
9✔
1624
        }
1625

1626
        return inputs
11✔
1627
}
1628

1629
// sweepPendingInputs is called when the ticker fires. It will create clusters
1630
// and attempt to create and publish the sweeping transactions.
1631
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
5✔
1632
        log.Debugf("Sweeping %v inputs", len(inputs))
5✔
1633

5✔
1634
        // Cluster all of our inputs based on the specific Aggregator.
5✔
1635
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
5✔
1636

5✔
1637
        // sweepWithLock is a helper closure that executes the sweep within a
5✔
1638
        // coin select lock to prevent the coins being selected for other
5✔
1639
        // transactions like funding of a channel.
5✔
1640
        sweepWithLock := func(set InputSet) error {
6✔
1641
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
2✔
1642
                        // Try to add inputs from our wallet.
1✔
1643
                        err := set.AddWalletInputs(s.cfg.Wallet)
1✔
1644
                        if err != nil {
1✔
UNCOV
1645
                                return err
×
UNCOV
1646
                        }
×
1647

1648
                        // Create sweeping transaction for each set.
1649
                        err = s.sweep(set)
1✔
1650
                        if err != nil {
1✔
1651
                                return err
×
1652
                        }
×
1653

1654
                        return nil
1✔
1655
                })
1656
        }
1657

1658
        for _, set := range sets {
9✔
1659
                var err error
4✔
1660
                if set.NeedWalletInput() {
5✔
1661
                        // Sweep the set of inputs that need the wallet inputs.
1✔
1662
                        err = sweepWithLock(set)
1✔
1663
                } else {
4✔
1664
                        // Sweep the set of inputs that don't need the wallet
3✔
1665
                        // inputs.
3✔
1666
                        err = s.sweep(set)
3✔
1667
                }
3✔
1668

1669
                if err != nil {
4✔
UNCOV
1670
                        log.Errorf("Failed to sweep %v: %v", set, err)
×
UNCOV
1671
                }
×
1672
        }
1673
}
1674

1675
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1676
// the inputs being used.
1677
type bumpResp struct {
1678
        // result is the result of the bump attempt returned from the fee
1679
        // bumper.
1680
        result *BumpResult
1681

1682
        // set is the input set that was used in the bump attempt.
1683
        set InputSet
1684
}
1685

1686
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1687
// future updates about the sweeping tx.
1688
//
1689
// NOTE: must run as a goroutine.
1690
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1691
        resultChan <-chan *BumpResult) {
8✔
1692

8✔
1693
        defer s.wg.Done()
8✔
1694

8✔
1695
        for {
17✔
1696
                select {
9✔
1697
                case r := <-resultChan:
3✔
1698
                        // Validate the result is valid.
3✔
1699
                        if err := r.Validate(); err != nil {
3✔
1700
                                log.Errorf("Received invalid result: %v", err)
×
1701
                                continue
×
1702
                        }
1703

1704
                        resp := &bumpResp{
3✔
1705
                                result: r,
3✔
1706
                                set:    set,
3✔
1707
                        }
3✔
1708

3✔
1709
                        // Send the result back to the main event loop.
3✔
1710
                        select {
3✔
1711
                        case s.bumpRespChan <- resp:
3✔
1712
                        case <-s.quit:
×
1713
                                log.Debug("Sweeper shutting down, skip " +
×
1714
                                        "sending bump result")
×
1715

×
1716
                                return
×
1717
                        }
1718

1719
                        // The sweeping tx has been confirmed, we can exit the
1720
                        // monitor now.
1721
                        //
1722
                        // TODO(yy): can instead remove the spend subscription
1723
                        // in sweeper and rely solely on this event to mark
1724
                        // inputs as Swept?
1725
                        if r.Event == TxConfirmed || r.Event == TxFailed {
5✔
1726
                                // Exit if the tx is failed to be created.
2✔
1727
                                if r.Tx == nil {
2✔
UNCOV
1728
                                        log.Debugf("Received %v for nil tx, "+
×
UNCOV
1729
                                                "exit monitor", r.Event)
×
UNCOV
1730

×
UNCOV
1731
                                        return
×
UNCOV
1732
                                }
×
1733

1734
                                log.Debugf("Received %v for sweep tx %v, exit "+
2✔
1735
                                        "fee bump monitor", r.Event,
2✔
1736
                                        r.Tx.TxHash())
2✔
1737

2✔
1738
                                // Cancel the rebroadcasting of the failed tx.
2✔
1739
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
2✔
1740

2✔
1741
                                return
2✔
1742
                        }
1743

1744
                case <-s.quit:
4✔
1745
                        log.Debugf("Sweeper shutting down, exit fee " +
4✔
1746
                                "bump handler")
4✔
1747

4✔
1748
                        return
4✔
1749
                }
1750
        }
1751
}
1752

1753
// handleBumpEventTxFailed handles the case where the tx has been failed to
1754
// publish.
1755
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
1✔
1756
        r := resp.result
1✔
1757
        tx, err := r.Tx, r.Err
1✔
1758

1✔
1759
        if tx != nil {
2✔
1760
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
1✔
1761
                        err)
1✔
1762
        }
1✔
1763

1764
        // NOTE: When marking the inputs as failed, we are using the input set
1765
        // instead of the inputs found in the tx. This is fine for current
1766
        // version of the sweeper because we always create a tx using ALL of
1767
        // the inputs specified by the set.
1768
        //
1769
        // TODO(yy): should we also remove the failed tx from db?
1770
        s.markInputsPublishFailed(resp.set, resp.result.FeeRate)
1✔
1771
}
1772

1773
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1774
// replaced by a new one.
1775
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1776
        r := resp.result
3✔
1777
        oldTx := r.ReplacedTx
3✔
1778
        newTx := r.Tx
3✔
1779

3✔
1780
        // Prepare a new record to replace the old one.
3✔
1781
        tr := &TxRecord{
3✔
1782
                Txid:    newTx.TxHash(),
3✔
1783
                FeeRate: uint64(r.FeeRate),
3✔
1784
                Fee:     uint64(r.Fee),
3✔
1785
        }
3✔
1786

3✔
1787
        // Get the old record for logging purpose.
3✔
1788
        oldTxid := oldTx.TxHash()
3✔
1789
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1790
        if err != nil {
4✔
1791
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1792
                return err
1✔
1793
        }
1✔
1794

1795
        // Cancel the rebroadcasting of the replaced tx.
1796
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
2✔
1797

2✔
1798
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
2✔
1799
                "tx=%v(fee=%v sats, feerate=%v sats/kw)", record.Txid,
2✔
1800
                record.Fee, record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
2✔
1801

2✔
1802
        // The old sweeping tx has been replaced by a new one, we will update
2✔
1803
        // the tx record in the sweeper db.
2✔
1804
        //
2✔
1805
        // TODO(yy): we may also need to update the inputs in this tx to a new
2✔
1806
        // state. Suppose a replacing tx only spends a subset of the inputs
2✔
1807
        // here, we'd end up with the rest being marked as `Published` and
2✔
1808
        // won't be aggregated in the next sweep. Atm it's fine as we always
2✔
1809
        // RBF the same input set.
2✔
1810
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1811
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
1✔
1812
                return err
1✔
1813
        }
1✔
1814

1815
        // Mark the inputs as published using the replacing tx.
1816
        return s.markInputsPublished(tr, resp.set)
1✔
1817
}
1818

1819
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1820
// successfully published.
1821
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
1✔
1822
        r := resp.result
1✔
1823
        tx := r.Tx
1✔
1824
        tr := &TxRecord{
1✔
1825
                Txid:    tx.TxHash(),
1✔
1826
                FeeRate: uint64(r.FeeRate),
1✔
1827
                Fee:     uint64(r.Fee),
1✔
1828
        }
1✔
1829

1✔
1830
        // Inputs have been successfully published so we update their
1✔
1831
        // states.
1✔
1832
        err := s.markInputsPublished(tr, resp.set)
1✔
1833
        if err != nil {
1✔
1834
                return err
×
1835
        }
×
1836

1837
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
1✔
1838
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
1✔
1839

1✔
1840
        // If there's no error, remove the output script. Otherwise keep it so
1✔
1841
        // that it can be reused for the next transaction and causes no address
1✔
1842
        // inflation.
1✔
1843
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
1✔
1844

1✔
1845
        return nil
1✔
1846
}
1847

1848
// handleBumpEventTxFatal handles the case where there's an unexpected error
1849
// when creating or publishing the sweeping tx. In this case, the tx will be
1850
// removed from the sweeper store and the inputs will be marked as `Failed`,
1851
// which means they will not be retried.
1852
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1853
        r := resp.result
2✔
1854

2✔
1855
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1856
        // a broadcast error, it's likely there isn't a tx here.
2✔
1857
        if r.Tx != nil {
4✔
1858
                txid := r.Tx.TxHash()
2✔
1859
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
2✔
1860

2✔
1861
                // Remove the tx from the sweeper db if it exists.
2✔
1862
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
3✔
1863
                        return fmt.Errorf("delete tx record for %v: %w", txid,
1✔
1864
                                err)
1✔
1865
                }
1✔
1866
        }
1867

1868
        // Mark the inputs as fatal.
1869
        s.markInputsFatal(resp.set, r.Err)
1✔
1870

1✔
1871
        return nil
1✔
1872
}
1873

1874
// markInputsFatal  marks all inputs in the input set as failed. It will also
1875
// notify all the subscribers of these inputs.
1876
func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) {
2✔
1877
        for _, inp := range set.Inputs() {
9✔
1878
                outpoint := inp.OutPoint()
7✔
1879

7✔
1880
                input, ok := s.inputs[outpoint]
7✔
1881
                if !ok {
7✔
UNCOV
1882
                        // It's very likely that a spending tx contains inputs
×
UNCOV
1883
                        // that we don't know.
×
UNCOV
1884
                        log.Tracef("Skipped marking input as failed: %v not "+
×
UNCOV
1885
                                "found in pending inputs", outpoint)
×
UNCOV
1886

×
UNCOV
1887
                        continue
×
1888
                }
1889

1890
                // If the input is already in a terminal state, we don't want
1891
                // to rewrite it, which also indicates an error as we only get
1892
                // an error event during the initial broadcast.
1893
                if input.terminated() {
10✔
1894
                        log.Errorf("Skipped marking input=%v as failed due to "+
3✔
1895
                                "unexpected state=%v", outpoint, input.state)
3✔
1896

3✔
1897
                        continue
3✔
1898
                }
1899

1900
                s.markInputFatal(input, nil, err)
4✔
1901
        }
1902
}
1903

1904
// handleBumpEvent handles the result sent from the bumper based on its event
1905
// type.
1906
//
1907
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1908
// input's spending event, we don't need to do anything here.
1909
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
1✔
1910
        log.Debugf("Received bump result %v", r.result)
1✔
1911

1✔
1912
        switch r.result.Event {
1✔
1913
        // The tx has been published, we update the inputs' state and create a
1914
        // record to be stored in the sweeper db.
UNCOV
1915
        case TxPublished:
×
UNCOV
1916
                return s.handleBumpEventTxPublished(r)
×
1917

1918
        // The tx has failed, we update the inputs' state.
1919
        case TxFailed:
1✔
1920
                s.handleBumpEventTxFailed(r)
1✔
1921
                return nil
1✔
1922

1923
        // The tx has been replaced, we will remove the old tx and replace it
1924
        // with the new one.
UNCOV
1925
        case TxReplaced:
×
UNCOV
1926
                return s.handleBumpEventTxReplaced(r)
×
1927

1928
        // There are inputs being spent in a tx which the fee bumper doesn't
1929
        // understand. We will remove the tx from the sweeper db and mark the
1930
        // inputs as swept.
UNCOV
1931
        case TxUnknownSpend:
×
UNCOV
1932
                s.handleBumpEventTxUnknownSpend(r)
×
1933

1934
        // There's a fatal error in creating the tx, we will remove the tx from
1935
        // the sweeper db and mark the inputs as failed.
UNCOV
1936
        case TxFatal:
×
UNCOV
1937
                return s.handleBumpEventTxFatal(r)
×
1938
        }
1939

UNCOV
1940
        return nil
×
1941
}
1942

1943
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1944
//
1945
// NOTE: It is enough to check the txid because the sweeper will create
1946
// outpoints which solely belong to the internal LND wallet.
UNCOV
1947
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
×
UNCOV
1948
        return s.cfg.Store.IsOurTx(op.Hash)
×
UNCOV
1949
}
×
1950

1951
// markInputSwept marks the given input as swept by the tx. It will also notify
1952
// all the subscribers of this input.
1953
func (s *UtxoSweeper) markInputSwept(inp *SweeperInput, tx *wire.MsgTx) {
3✔
1954
        log.Debugf("Marking input as swept: %v from state=%v", inp.OutPoint(),
3✔
1955
                inp.state)
3✔
1956

3✔
1957
        inp.state = Swept
3✔
1958

3✔
1959
        // Signal result channels.
3✔
1960
        s.signalResult(inp, Result{
3✔
1961
                Tx: tx,
3✔
1962
        })
3✔
1963

3✔
1964
        // Remove all other inputs in this exclusive group.
3✔
1965
        if inp.params.ExclusiveGroup != nil {
3✔
1966
                s.removeExclusiveGroup(
×
1967
                        *inp.params.ExclusiveGroup, inp.OutPoint(),
×
1968
                )
×
1969
        }
×
1970
}
1971

1972
// handleUnknownSpendTx takes an input and its spending tx. If the spending tx
1973
// cannot be found in the sweeper store, the input will be marked as fatal,
1974
// otherwise it will be marked as swept.
1975
func (s *UtxoSweeper) handleUnknownSpendTx(inp *SweeperInput, tx *wire.MsgTx) {
4✔
1976
        op := inp.OutPoint()
4✔
1977
        txid := tx.TxHash()
4✔
1978

4✔
1979
        isOurTx := s.cfg.Store.IsOurTx(txid)
4✔
1980

4✔
1981
        // If this is our tx, it means it's a previous sweeping tx that got
4✔
1982
        // confirmed, which could happen when a restart happens during the
4✔
1983
        // sweeping process.
4✔
1984
        if isOurTx {
7✔
1985
                log.Debugf("Found our sweeping tx %v, marking input %v as "+
3✔
1986
                        "swept", txid, op)
3✔
1987

3✔
1988
                // We now use the spending tx to update the state of the inputs.
3✔
1989
                s.markInputSwept(inp, tx)
3✔
1990

3✔
1991
                return
3✔
1992
        }
3✔
1993

1994
        // Since the input is spent by others, we now mark it as fatal and won't
1995
        // be retried.
1996
        s.markInputFatal(inp, tx, ErrRemoteSpend)
1✔
1997

1✔
1998
        log.Debugf("Removing descendant txns invalidated by (txid=%v): %v",
1✔
1999
                txid, lnutils.SpewLogClosure(tx))
1✔
2000

1✔
2001
        // Construct a map of the inputs this transaction spends.
1✔
2002
        spentInputs := make(map[wire.OutPoint]struct{}, len(tx.TxIn))
1✔
2003
        for _, txIn := range tx.TxIn {
2✔
2004
                spentInputs[txIn.PreviousOutPoint] = struct{}{}
1✔
2005
        }
1✔
2006

2007
        err := s.removeConflictSweepDescendants(spentInputs)
1✔
2008
        if err != nil {
1✔
2009
                log.Warnf("unable to remove descendant transactions "+
×
2010
                        "due to tx %v: ", txid)
×
2011
        }
×
2012
}
2013

2014
// handleBumpEventTxUnknownSpend handles the case where the confirmed tx is
2015
// unknown to the fee bumper. In the case when the sweeping tx has been replaced
2016
// by another party with their tx being confirmed. It will retry sweeping the
2017
// "good" inputs once the "bad" ones are kicked out.
2018
func (s *UtxoSweeper) handleBumpEventTxUnknownSpend(r *bumpResp) {
2✔
2019
        // Mark the inputs as publish failed, which means they will be retried
2✔
2020
        // later.
2✔
2021
        s.markInputsPublishFailed(r.set, r.result.FeeRate)
2✔
2022

2✔
2023
        // Get all the inputs that are not spent in the current sweeping tx.
2✔
2024
        spentInputs := r.result.SpentInputs
2✔
2025

2✔
2026
        // Create a slice to track inputs to be retried.
2✔
2027
        inputsToRetry := make([]input.Input, 0, len(r.set.Inputs()))
2✔
2028

2✔
2029
        // Iterate all the inputs found in this bump and mark the ones spent by
2✔
2030
        // the third party as failed. The rest of inputs will then be updated
2✔
2031
        // with a new fee rate and be retried immediately.
2✔
2032
        for _, inp := range r.set.Inputs() {
5✔
2033
                op := inp.OutPoint()
3✔
2034
                input, ok := s.inputs[op]
3✔
2035

3✔
2036
                // Wallet inputs are not tracked so we will not find them from
3✔
2037
                // the inputs map.
3✔
2038
                if !ok {
3✔
UNCOV
2039
                        log.Debugf("Skipped marking input: %v not found in "+
×
UNCOV
2040
                                "pending inputs", op)
×
UNCOV
2041

×
UNCOV
2042
                        continue
×
2043
                }
2044

2045
                // Check whether this input has been spent, if so we mark it as
2046
                // fatal or swept based on whether this is one of our previous
2047
                // sweeping txns, then move to the next.
2048
                tx, spent := spentInputs[op]
3✔
2049
                if spent {
5✔
2050
                        s.handleUnknownSpendTx(input, tx)
2✔
2051

2✔
2052
                        continue
2✔
2053
                }
2054

2055
                log.Debugf("Input(%v): updating params: immediate [%v -> true]",
1✔
2056
                        op, r.result.FeeRate, input.params.Immediate)
1✔
2057

1✔
2058
                input.params.Immediate = true
1✔
2059
                inputsToRetry = append(inputsToRetry, input)
1✔
2060
        }
2061

2062
        // Exit early if there are no inputs to be retried.
2063
        if len(inputsToRetry) == 0 {
3✔
2064
                return
1✔
2065
        }
1✔
2066

2067
        log.Debugf("Retry sweeping inputs with updated params: %v",
1✔
2068
                inputTypeSummary(inputsToRetry))
1✔
2069

1✔
2070
        // Get the latest inputs, which should put the PublishFailed inputs back
1✔
2071
        // to the sweeping queue.
1✔
2072
        inputs := s.updateSweeperInputs()
1✔
2073

1✔
2074
        // Immediately sweep the remaining inputs - the previous inputs should
1✔
2075
        // now be swept with the updated StartingFeeRate immediately. We may
1✔
2076
        // also include more inputs in the new sweeping tx if new ones with the
1✔
2077
        // same deadline are offered.
1✔
2078
        s.sweepPendingInputs(inputs)
1✔
2079
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc