• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13236757158

10 Feb 2025 08:39AM UTC coverage: 57.649% (-1.2%) from 58.815%
13236757158

Pull #9493

github

ziggie1984
lncli: for some cmds we don't replace the data of the response.

For some cmds it is not very practical to replace the json output
because we might pipe it into other commands. For example when
creating the route we want to pipe it into sendtoRoute.
Pull Request #9493: For some lncli cmds we should not replace the content with other data

0 of 9 new or added lines in 2 files covered. (0.0%)

19535 existing lines in 252 files now uncovered.

103517 of 179563 relevant lines covered (57.65%)

24878.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.9
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
49
        // with the same identifier from being batched together.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
UNCOV
72
func (p Params) String() string {
×
UNCOV
73
        deadline := "none"
×
UNCOV
74
        p.DeadlineHeight.WhenSome(func(d int32) {
×
UNCOV
75
                deadline = fmt.Sprintf("%d", d)
×
UNCOV
76
        })
×
77

UNCOV
78
        exclusiveGroup := "none"
×
UNCOV
79
        if p.ExclusiveGroup != nil {
×
UNCOV
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
×
UNCOV
81
        }
×
82

UNCOV
83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
×
UNCOV
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
×
UNCOV
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
×
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Fatal is the final state of a pending input. Inputs ending in this
123
        // state won't be retried. This could happen,
124
        // - when a pending input has too many failed publish attempts;
125
        // - the input has been spent by another party;
126
        // - unknown broadcast error is returned.
127
        Fatal
128
)
129

130
// String gives a human readable text for the sweep states.
UNCOV
131
func (s SweepState) String() string {
×
UNCOV
132
        switch s {
×
UNCOV
133
        case Init:
×
UNCOV
134
                return "Init"
×
135

UNCOV
136
        case PendingPublish:
×
UNCOV
137
                return "PendingPublish"
×
138

UNCOV
139
        case Published:
×
UNCOV
140
                return "Published"
×
141

UNCOV
142
        case PublishFailed:
×
UNCOV
143
                return "PublishFailed"
×
144

UNCOV
145
        case Swept:
×
UNCOV
146
                return "Swept"
×
147

UNCOV
148
        case Excluded:
×
UNCOV
149
                return "Excluded"
×
150

UNCOV
151
        case Fatal:
×
UNCOV
152
                return "Fatal"
×
153

154
        default:
×
155
                return "Unknown"
×
156
        }
157
}
158

159
// RBFInfo stores the information required to perform a RBF bump on a pending
160
// sweeping tx.
161
type RBFInfo struct {
162
        // Txid is the txid of the sweeping tx.
163
        Txid chainhash.Hash
164

165
        // FeeRate is the fee rate of the sweeping tx.
166
        FeeRate chainfee.SatPerKWeight
167

168
        // Fee is the total fee of the sweeping tx.
169
        Fee btcutil.Amount
170
}
171

172
// SweeperInput is created when an input reaches the main loop for the first
173
// time. It wraps the input and tracks all relevant state that is needed for
174
// sweeping.
175
type SweeperInput struct {
176
        input.Input
177

178
        // state tracks the current state of the input.
179
        state SweepState
180

181
        // listeners is a list of channels over which the final outcome of the
182
        // sweep needs to be broadcasted.
183
        listeners []chan Result
184

185
        // ntfnRegCancel is populated with a function that cancels the chain
186
        // notifier spend registration.
187
        ntfnRegCancel func()
188

189
        // publishAttempts records the number of attempts that have already been
190
        // made to sweep this tx.
191
        publishAttempts int
192

193
        // params contains the parameters that control the sweeping process.
194
        params Params
195

196
        // lastFeeRate is the most recent fee rate used for this input within a
197
        // transaction broadcast to the network.
198
        lastFeeRate chainfee.SatPerKWeight
199

200
        // rbf records the RBF constraints.
201
        rbf fn.Option[RBFInfo]
202

203
        // DeadlineHeight is the deadline height for this input. This is
204
        // different from the DeadlineHeight in its params as it's an actual
205
        // value than an option.
206
        DeadlineHeight int32
207
}
208

209
// String returns a human readable interpretation of the pending input.
210
func (p *SweeperInput) String() string {
26✔
211
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
26✔
212
}
26✔
213

214
// terminated returns a boolean indicating whether the input has reached a
215
// final state.
216
func (p *SweeperInput) terminated() bool {
22✔
217
        switch p.state {
22✔
218
        // If the input has reached a final state, that it's either
219
        // been swept, or failed, or excluded, we will remove it from
220
        // our sweeper.
221
        case Fatal, Swept, Excluded:
8✔
222
                return true
8✔
223

224
        default:
14✔
225
                return false
14✔
226
        }
227
}
228

229
// isMature returns a boolean indicating whether the input has a timelock that
230
// has been reached or not. The locktime found is also returned.
231
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
4✔
232
        locktime, _ := p.RequiredLockTime()
4✔
233
        if currentHeight < locktime {
5✔
234
                log.Debugf("Input %v has locktime=%v, current height is %v",
1✔
235
                        p, locktime, currentHeight)
1✔
236

1✔
237
                return false, locktime
1✔
238
        }
1✔
239

240
        // If the input has a CSV that's not yet reached, we will skip
241
        // this input and wait for the expiry.
242
        //
243
        // NOTE: We need to consider whether this input can be included in the
244
        // next block or not, which means the CSV will be checked against the
245
        // currentHeight plus one.
246
        locktime = p.BlocksToMaturity() + p.HeightHint()
3✔
247
        if currentHeight+1 < locktime {
4✔
248
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
1✔
249
                        "skipped sweeping", p, locktime, currentHeight)
1✔
250

1✔
251
                return false, locktime
1✔
252
        }
1✔
253

254
        return true, locktime
2✔
255
}
256

257
// InputsMap is a type alias for a set of pending inputs.
258
type InputsMap = map[wire.OutPoint]*SweeperInput
259

260
// inputsMapToString returns a human readable interpretation of the pending
261
// inputs.
UNCOV
262
func inputsMapToString(inputs InputsMap) string {
×
UNCOV
263
        if len(inputs) == 0 {
×
UNCOV
264
                return ""
×
UNCOV
265
        }
×
266

UNCOV
267
        inps := make([]input.Input, 0, len(inputs))
×
UNCOV
268
        for _, in := range inputs {
×
UNCOV
269
                inps = append(inps, in)
×
UNCOV
270
        }
×
271

UNCOV
272
        return "\n" + inputTypeSummary(inps)
×
273
}
274

275
// pendingSweepsReq is an internal message we'll use to represent an external
276
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
277
// attempting to sweep.
278
type pendingSweepsReq struct {
279
        respChan chan map[wire.OutPoint]*PendingInputResponse
280
        errChan  chan error
281
}
282

283
// PendingInputResponse contains information about an input that is currently
284
// being swept by the UtxoSweeper.
285
type PendingInputResponse struct {
286
        // OutPoint is the identify outpoint of the input being swept.
287
        OutPoint wire.OutPoint
288

289
        // WitnessType is the witness type of the input being swept.
290
        WitnessType input.WitnessType
291

292
        // Amount is the amount of the input being swept.
293
        Amount btcutil.Amount
294

295
        // LastFeeRate is the most recent fee rate used for the input being
296
        // swept within a transaction broadcast to the network.
297
        LastFeeRate chainfee.SatPerKWeight
298

299
        // BroadcastAttempts is the number of attempts we've made to sweept the
300
        // input.
301
        BroadcastAttempts int
302

303
        // Params contains the sweep parameters for this pending request.
304
        Params Params
305

306
        // DeadlineHeight records the deadline height of this input.
307
        DeadlineHeight uint32
308
}
309

310
// updateReq is an internal message we'll use to represent an external caller's
311
// intent to update the sweep parameters of a given input.
312
type updateReq struct {
313
        input        wire.OutPoint
314
        params       Params
315
        responseChan chan *updateResp
316
}
317

318
// updateResp is an internal message we'll use to hand off the response of a
319
// updateReq from the UtxoSweeper's main event loop back to the caller.
320
type updateResp struct {
321
        resultChan chan Result
322
        err        error
323
}
324

325
// UtxoSweeper is responsible for sweeping outputs back into the wallet
326
type UtxoSweeper struct {
327
        started uint32 // To be used atomically.
328
        stopped uint32 // To be used atomically.
329

330
        // Embed the blockbeat consumer struct to get access to the method
331
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
332
        chainio.BeatConsumer
333

334
        cfg *UtxoSweeperConfig
335

336
        newInputs chan *sweepInputMessage
337
        spendChan chan *chainntnfs.SpendDetail
338

339
        // pendingSweepsReq is a channel that will be sent requests by external
340
        // callers in order to retrieve the set of pending inputs the
341
        // UtxoSweeper is attempting to sweep.
342
        pendingSweepsReqs chan *pendingSweepsReq
343

344
        // updateReqs is a channel that will be sent requests by external
345
        // callers who wish to bump the fee rate of a given input.
346
        updateReqs chan *updateReq
347

348
        // inputs is the total set of inputs the UtxoSweeper has been requested
349
        // to sweep.
350
        inputs InputsMap
351

352
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
353

354
        relayFeeRate chainfee.SatPerKWeight
355

356
        quit chan struct{}
357
        wg   sync.WaitGroup
358

359
        // currentHeight is the best known height of the main chain. This is
360
        // updated whenever a new block epoch is received.
361
        currentHeight int32
362

363
        // bumpRespChan is a channel that receives broadcast results from the
364
        // TxPublisher.
365
        bumpRespChan chan *bumpResp
366
}
367

368
// Compile-time check for the chainio.Consumer interface.
369
var _ chainio.Consumer = (*UtxoSweeper)(nil)
370

371
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
372
type UtxoSweeperConfig struct {
373
        // GenSweepScript generates a P2WKH script belonging to the wallet where
374
        // funds can be swept.
375
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
376

377
        // FeeEstimator is used when crafting sweep transactions to estimate
378
        // the necessary fee relative to the expected size of the sweep
379
        // transaction.
380
        FeeEstimator chainfee.Estimator
381

382
        // Wallet contains the wallet functions that sweeper requires.
383
        Wallet Wallet
384

385
        // Notifier is an instance of a chain notifier we'll use to watch for
386
        // certain on-chain events.
387
        Notifier chainntnfs.ChainNotifier
388

389
        // Mempool is the mempool watcher that will be used to query whether a
390
        // given input is already being spent by a transaction in the mempool.
391
        Mempool chainntnfs.MempoolWatcher
392

393
        // Store stores the published sweeper txes.
394
        Store SweeperStore
395

396
        // Signer is used by the sweeper to generate valid witnesses at the
397
        // time the incubated outputs need to be spent.
398
        Signer input.Signer
399

400
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
401
        // in a single sweep tx. If more need to be swept, multiple txes are
402
        // created and published.
403
        MaxInputsPerTx uint32
404

405
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
406
        MaxFeeRate chainfee.SatPerVByte
407

408
        // Aggregator is used to group inputs into clusters based on its
409
        // implemention-specific strategy.
410
        Aggregator UtxoAggregator
411

412
        // Publisher is used to publish the sweep tx crafted here and monitors
413
        // it for potential fee bumps.
414
        Publisher Bumper
415

416
        // NoDeadlineConfTarget is the conf target to use when sweeping
417
        // non-time-sensitive outputs.
418
        NoDeadlineConfTarget uint32
419
}
420

421
// Result is the struct that is pushed through the result channel. Callers can
422
// use this to be informed of the final sweep result. In case of a remote
423
// spend, Err will be ErrRemoteSpend.
424
type Result struct {
425
        // Err is the final result of the sweep. It is nil when the input is
426
        // swept successfully by us. ErrRemoteSpend is returned when another
427
        // party took the input.
428
        Err error
429

430
        // Tx is the transaction that spent the input.
431
        Tx *wire.MsgTx
432
}
433

434
// sweepInputMessage structs are used in the internal channel between the
435
// SweepInput call and the sweeper main loop.
436
type sweepInputMessage struct {
437
        input      input.Input
438
        params     Params
439
        resultChan chan Result
440
}
441

442
// New returns a new Sweeper instance.
443
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
16✔
444
        s := &UtxoSweeper{
16✔
445
                cfg:               cfg,
16✔
446
                newInputs:         make(chan *sweepInputMessage),
16✔
447
                spendChan:         make(chan *chainntnfs.SpendDetail),
16✔
448
                updateReqs:        make(chan *updateReq),
16✔
449
                pendingSweepsReqs: make(chan *pendingSweepsReq),
16✔
450
                quit:              make(chan struct{}),
16✔
451
                inputs:            make(InputsMap),
16✔
452
                bumpRespChan:      make(chan *bumpResp, 100),
16✔
453
        }
16✔
454

16✔
455
        // Mount the block consumer.
16✔
456
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
16✔
457

16✔
458
        return s
16✔
459
}
16✔
460

461
// Start starts the process of constructing and publish sweep txes.
UNCOV
462
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
×
UNCOV
463
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
×
464
                return nil
×
465
        }
×
466

UNCOV
467
        log.Info("Sweeper starting")
×
UNCOV
468

×
UNCOV
469
        // Retrieve relay fee for dust limit calculation. Assume that this will
×
UNCOV
470
        // not change from here on.
×
UNCOV
471
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
472

×
UNCOV
473
        // Set the current height.
×
UNCOV
474
        s.currentHeight = beat.Height()
×
UNCOV
475

×
UNCOV
476
        // Start sweeper main loop.
×
UNCOV
477
        s.wg.Add(1)
×
UNCOV
478
        go s.collector()
×
UNCOV
479

×
UNCOV
480
        return nil
×
481
}
482

483
// RelayFeePerKW returns the minimum fee rate required for transactions to be
484
// relayed.
485
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
486
        return s.relayFeeRate
×
487
}
×
488

489
// Stop stops sweeper from listening to block epochs and constructing sweep
490
// txes.
UNCOV
491
func (s *UtxoSweeper) Stop() error {
×
UNCOV
492
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
493
                return nil
×
494
        }
×
495

UNCOV
496
        log.Info("Sweeper shutting down...")
×
UNCOV
497
        defer log.Debug("Sweeper shutdown complete")
×
UNCOV
498

×
UNCOV
499
        close(s.quit)
×
UNCOV
500
        s.wg.Wait()
×
UNCOV
501

×
UNCOV
502
        return nil
×
503
}
504

505
// NOTE: part of the `chainio.Consumer` interface.
506
func (s *UtxoSweeper) Name() string {
16✔
507
        return "UtxoSweeper"
16✔
508
}
16✔
509

510
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
511
// swept after the batch time window ends. A custom fee preference can be
512
// provided to determine what fee rate should be used for the input. Note that
513
// the input may not always be swept with this exact value, as its possible for
514
// it to be batched under the same transaction with other similar fee rate
515
// inputs.
516
//
517
// NOTE: Extreme care needs to be taken that input isn't changed externally.
518
// Because it is an interface and we don't know what is exactly behind it, we
519
// cannot make a local copy in sweeper.
520
//
521
// TODO(yy): make sure the caller is using the Result chan.
522
func (s *UtxoSweeper) SweepInput(inp input.Input,
UNCOV
523
        params Params) (chan Result, error) {
×
UNCOV
524

×
UNCOV
525
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
×
UNCOV
526
                inp.SignDesc() == nil {
×
527

×
528
                return nil, errors.New("nil input received")
×
529
        }
×
530

UNCOV
531
        absoluteTimeLock, _ := inp.RequiredLockTime()
×
UNCOV
532
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
×
UNCOV
533
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
×
UNCOV
534
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
×
UNCOV
535
                inp.BlocksToMaturity(), absoluteTimeLock,
×
UNCOV
536
                btcutil.Amount(inp.SignDesc().Output.Value),
×
UNCOV
537
                inp.UnconfParent(), params)
×
UNCOV
538

×
UNCOV
539
        sweeperInput := &sweepInputMessage{
×
UNCOV
540
                input:      inp,
×
UNCOV
541
                params:     params,
×
UNCOV
542
                resultChan: make(chan Result, 1),
×
UNCOV
543
        }
×
UNCOV
544

×
UNCOV
545
        // Deliver input to the main event loop.
×
UNCOV
546
        select {
×
UNCOV
547
        case s.newInputs <- sweeperInput:
×
548
        case <-s.quit:
×
549
                return nil, ErrSweeperShuttingDown
×
550
        }
551

UNCOV
552
        return sweeperInput.resultChan, nil
×
553
}
554

555
// removeConflictSweepDescendants removes any transactions from the wallet that
556
// spend outputs included in the passed outpoint set. This needs to be done in
557
// cases where we're not the only ones that can sweep an output, but there may
558
// exist unconfirmed spends that spend outputs created by a sweep transaction.
559
// The most common case for this is when someone sweeps our anchor outputs
560
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
561
// as a backend when a channel is force closed and anchor cpfp txns are
562
// created to bump the initial commitment transaction. In this case an anchor
563
// cpfp is broadcasted for up to 3 commitment transactions (local,
564
// remote-dangling, remote). Using neutrino all of those transactions will be
565
// accepted (the commitment tx will be different in all of those cases) and have
566
// to be removed as soon as one of them confirmes (they do have the same
567
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
568
// nodes do not signal invalid transactions anymore.
569
func (s *UtxoSweeper) removeConflictSweepDescendants(
UNCOV
570
        outpoints map[wire.OutPoint]struct{}) error {
×
UNCOV
571

×
UNCOV
572
        // Obtain all the past sweeps that we've done so far. We'll need these
×
UNCOV
573
        // to ensure that if the spendingTx spends any of the same inputs, then
×
UNCOV
574
        // we remove any transaction that may be spending those inputs from the
×
UNCOV
575
        // wallet.
×
UNCOV
576
        //
×
UNCOV
577
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
×
UNCOV
578
        // from the store?
×
UNCOV
579
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
×
UNCOV
580
        if err != nil {
×
581
                return err
×
582
        }
×
583

584
        // We'll now go through each past transaction we published during this
585
        // epoch and cross reference the spent inputs. If there're any inputs
586
        // in common with the inputs the spendingTx spent, then we'll remove
587
        // those.
588
        //
589
        // TODO(roasbeef): need to start to remove all transaction hashes after
590
        // every N blocks (assumed point of no return)
UNCOV
591
        for _, sweepHash := range pastSweepHashes {
×
UNCOV
592
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
×
UNCOV
593
                if err != nil {
×
594
                        return err
×
595
                }
×
596

597
                // Transaction wasn't found in the wallet, may have already
598
                // been replaced/removed.
UNCOV
599
                if sweepTx == nil {
×
UNCOV
600
                        // If it was removed, then we'll play it safe and mark
×
UNCOV
601
                        // it as no longer need to be rebroadcasted.
×
UNCOV
602
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
UNCOV
603
                        continue
×
604
                }
605

606
                // Check to see if this past sweep transaction spent any of the
607
                // same inputs as spendingTx.
UNCOV
608
                var isConflicting bool
×
UNCOV
609
                for _, txIn := range sweepTx.TxIn {
×
UNCOV
610
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
×
UNCOV
611
                                isConflicting = true
×
UNCOV
612
                                break
×
613
                        }
614
                }
615

UNCOV
616
                if !isConflicting {
×
UNCOV
617
                        continue
×
618
                }
619

620
                // If it is conflicting, then we'll signal the wallet to remove
621
                // all the transactions that are descendants of outputs created
622
                // by the sweepTx and the sweepTx itself.
UNCOV
623
                log.Debugf("Removing sweep txid=%v from wallet: %v",
×
UNCOV
624
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
×
UNCOV
625

×
UNCOV
626
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
×
UNCOV
627
                if err != nil {
×
628
                        log.Warnf("Unable to remove descendants: %v", err)
×
629
                }
×
630

631
                // If this transaction was conflicting, then we'll stop
632
                // rebroadcasting it in the background.
UNCOV
633
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
634
        }
635

UNCOV
636
        return nil
×
637
}
638

639
// collector is the sweeper main loop. It processes new inputs, spend
640
// notifications and counts down to publication of the sweep tx.
UNCOV
641
func (s *UtxoSweeper) collector() {
×
UNCOV
642
        defer s.wg.Done()
×
UNCOV
643

×
UNCOV
644
        for {
×
UNCOV
645
                // Clean inputs, which will remove inputs that are swept,
×
UNCOV
646
                // failed, or excluded from the sweeper and return inputs that
×
UNCOV
647
                // are either new or has been published but failed back, which
×
UNCOV
648
                // will be retried again here.
×
UNCOV
649
                s.updateSweeperInputs()
×
UNCOV
650

×
UNCOV
651
                select {
×
652
                // A new inputs is offered to the sweeper. We check to see if
653
                // we are already trying to sweep this input and if not, set up
654
                // a listener to spend and schedule a sweep.
UNCOV
655
                case input := <-s.newInputs:
×
UNCOV
656
                        err := s.handleNewInput(input)
×
UNCOV
657
                        if err != nil {
×
658
                                log.Criticalf("Unable to handle new input: %v",
×
659
                                        err)
×
660

×
661
                                return
×
662
                        }
×
663

664
                        // If this input is forced, we perform an sweep
665
                        // immediately.
666
                        //
667
                        // TODO(ziggie): Make sure when `immediate` is selected
668
                        // as a parameter that we only trigger the sweeping of
669
                        // this specific input rather than triggering the sweeps
670
                        // of all current pending inputs registered with the
671
                        // sweeper.
UNCOV
672
                        if input.params.Immediate {
×
UNCOV
673
                                inputs := s.updateSweeperInputs()
×
UNCOV
674
                                s.sweepPendingInputs(inputs)
×
UNCOV
675
                        }
×
676

677
                // A spend of one of our inputs is detected. Signal sweep
678
                // results to the caller(s).
UNCOV
679
                case spend := <-s.spendChan:
×
UNCOV
680
                        s.handleInputSpent(spend)
×
681

682
                // A new external request has been received to retrieve all of
683
                // the inputs we're currently attempting to sweep.
UNCOV
684
                case req := <-s.pendingSweepsReqs:
×
UNCOV
685
                        s.handlePendingSweepsReq(req)
×
686

687
                // A new external request has been received to bump the fee rate
688
                // of a given input.
UNCOV
689
                case req := <-s.updateReqs:
×
UNCOV
690
                        resultChan, err := s.handleUpdateReq(req)
×
UNCOV
691
                        req.responseChan <- &updateResp{
×
UNCOV
692
                                resultChan: resultChan,
×
UNCOV
693
                                err:        err,
×
UNCOV
694
                        }
×
UNCOV
695

×
UNCOV
696
                        // Perform an sweep immediately if asked.
×
UNCOV
697
                        if req.params.Immediate {
×
UNCOV
698
                                inputs := s.updateSweeperInputs()
×
UNCOV
699
                                s.sweepPendingInputs(inputs)
×
UNCOV
700
                        }
×
701

UNCOV
702
                case resp := <-s.bumpRespChan:
×
UNCOV
703
                        // Handle the bump event.
×
UNCOV
704
                        err := s.handleBumpEvent(resp)
×
UNCOV
705
                        if err != nil {
×
UNCOV
706
                                log.Errorf("Failed to handle bump event: %v",
×
UNCOV
707
                                        err)
×
UNCOV
708
                        }
×
709

710
                // A new block comes in, update the bestHeight, perform a check
711
                // over all pending inputs and publish sweeping txns if needed.
UNCOV
712
                case beat := <-s.BlockbeatChan:
×
UNCOV
713
                        // Update the sweeper to the best height.
×
UNCOV
714
                        s.currentHeight = beat.Height()
×
UNCOV
715

×
UNCOV
716
                        // Update the inputs with the latest height.
×
UNCOV
717
                        inputs := s.updateSweeperInputs()
×
UNCOV
718

×
UNCOV
719
                        log.Debugf("Received new block: height=%v, attempt "+
×
UNCOV
720
                                "sweeping %d inputs:%s", s.currentHeight,
×
UNCOV
721
                                len(inputs),
×
UNCOV
722
                                lnutils.NewLogClosure(func() string {
×
UNCOV
723
                                        return inputsMapToString(inputs)
×
UNCOV
724
                                }))
×
725

726
                        // Attempt to sweep any pending inputs.
UNCOV
727
                        s.sweepPendingInputs(inputs)
×
UNCOV
728

×
UNCOV
729
                        // Notify we've processed the block.
×
UNCOV
730
                        s.NotifyBlockProcessed(beat, nil)
×
731

UNCOV
732
                case <-s.quit:
×
UNCOV
733
                        return
×
734
                }
735
        }
736
}
737

738
// removeExclusiveGroup removes all inputs in the given exclusive group. This
739
// function is called when one of the exclusive group inputs has been spent. The
740
// other inputs won't ever be spendable and can be removed. This also prevents
741
// them from being part of future sweep transactions that would fail. In
742
// addition sweep transactions of those inputs will be removed from the wallet.
UNCOV
743
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
×
UNCOV
744
        for outpoint, input := range s.inputs {
×
UNCOV
745
                outpoint := outpoint
×
UNCOV
746

×
UNCOV
747
                // Skip inputs that aren't exclusive.
×
UNCOV
748
                if input.params.ExclusiveGroup == nil {
×
UNCOV
749
                        continue
×
750
                }
751

752
                // Skip inputs from other exclusive groups.
UNCOV
753
                if *input.params.ExclusiveGroup != group {
×
754
                        continue
×
755
                }
756

757
                // Skip inputs that are already terminated.
UNCOV
758
                if input.terminated() {
×
UNCOV
759
                        log.Tracef("Skipped sending error result for "+
×
UNCOV
760
                                "input %v, state=%v", outpoint, input.state)
×
UNCOV
761

×
UNCOV
762
                        continue
×
763
                }
764

765
                // Signal result channels.
UNCOV
766
                s.signalResult(input, Result{
×
UNCOV
767
                        Err: ErrExclusiveGroupSpend,
×
UNCOV
768
                })
×
UNCOV
769

×
UNCOV
770
                // Update the input's state as it can no longer be swept.
×
UNCOV
771
                input.state = Excluded
×
UNCOV
772

×
UNCOV
773
                // Remove all unconfirmed transactions from the wallet which
×
UNCOV
774
                // spend the passed outpoint of the same exclusive group.
×
UNCOV
775
                outpoints := map[wire.OutPoint]struct{}{
×
UNCOV
776
                        outpoint: {},
×
UNCOV
777
                }
×
UNCOV
778
                err := s.removeConflictSweepDescendants(outpoints)
×
UNCOV
779
                if err != nil {
×
780
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
781
                                "wallet for outpoint %v : %v", outpoint, err)
×
782
                }
×
783
        }
784
}
785

786
// signalResult notifies the listeners of the final result of the input sweep.
787
// It also cancels any pending spend notification.
788
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
7✔
789
        op := pi.OutPoint()
7✔
790
        listeners := pi.listeners
7✔
791

7✔
792
        if result.Err == nil {
9✔
793
                log.Tracef("Dispatching sweep success for %v to %v listeners",
2✔
794
                        op, len(listeners),
2✔
795
                )
2✔
796
        } else {
7✔
797
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
5✔
798
                        op, len(listeners), result.Err,
5✔
799
                )
5✔
800
        }
5✔
801

802
        // Signal all listeners. Channel is buffered. Because we only send once
803
        // on every channel, it should never block.
804
        for _, resultChan := range listeners {
7✔
UNCOV
805
                resultChan <- result
×
UNCOV
806
        }
×
807

808
        // Cancel spend notification with chain notifier. This is not necessary
809
        // in case of a success, except for that a reorg could still happen.
810
        if pi.ntfnRegCancel != nil {
7✔
UNCOV
811
                log.Debugf("Canceling spend ntfn for %v", op)
×
UNCOV
812

×
UNCOV
813
                pi.ntfnRegCancel()
×
UNCOV
814
        }
×
815
}
816

817
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
818
// the tx. The output address is only marked as used if the publish succeeds.
819
func (s *UtxoSweeper) sweep(set InputSet) error {
2✔
820
        // Generate an output script if there isn't an unused script available.
2✔
821
        if s.currentOutputScript.IsNone() {
3✔
822
                addr, err := s.cfg.GenSweepScript().Unpack()
1✔
823
                if err != nil {
1✔
824
                        return fmt.Errorf("gen sweep script: %w", err)
×
825
                }
×
826
                s.currentOutputScript = fn.Some(addr)
1✔
827
        }
828

829
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
2✔
830
                fmt.Errorf("none sweep script"),
2✔
831
        )
2✔
832
        if err != nil {
2✔
833
                return err
×
834
        }
×
835

836
        // Create a fee bump request and ask the publisher to broadcast it. The
837
        // publisher will then take over and start monitoring the tx for
838
        // potential fee bump.
839
        req := &BumpRequest{
2✔
840
                Inputs:          set.Inputs(),
2✔
841
                Budget:          set.Budget(),
2✔
842
                DeadlineHeight:  set.DeadlineHeight(),
2✔
843
                DeliveryAddress: sweepAddr,
2✔
844
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
2✔
845
                StartingFeeRate: set.StartingFeeRate(),
2✔
846
                Immediate:       set.Immediate(),
2✔
847
                // TODO(yy): pass the strategy here.
2✔
848
        }
2✔
849

2✔
850
        // Reschedule the inputs that we just tried to sweep. This is done in
2✔
851
        // case the following publish fails, we'd like to update the inputs'
2✔
852
        // publish attempts and rescue them in the next sweep.
2✔
853
        s.markInputsPendingPublish(set)
2✔
854

2✔
855
        // Broadcast will return a read-only chan that we will listen to for
2✔
856
        // this publish result and future RBF attempt.
2✔
857
        resp := s.cfg.Publisher.Broadcast(req)
2✔
858

2✔
859
        // Successfully sent the broadcast attempt, we now handle the result by
2✔
860
        // subscribing to the result chan and listen for future updates about
2✔
861
        // this tx.
2✔
862
        s.wg.Add(1)
2✔
863
        go s.monitorFeeBumpResult(set, resp)
2✔
864

2✔
865
        return nil
2✔
866
}
867

868
// markInputsPendingPublish updates the pending inputs with the given tx
869
// inputs. It also increments the `publishAttempts`.
870
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
871
        // Reschedule sweep.
3✔
872
        for _, input := range set.Inputs() {
6✔
873
                op := input.OutPoint()
3✔
874
                pi, ok := s.inputs[op]
3✔
875
                if !ok {
3✔
UNCOV
876
                        // It could be that this input is an additional wallet
×
UNCOV
877
                        // input that was attached. In that case there also
×
UNCOV
878
                        // isn't a pending input to update.
×
UNCOV
879
                        log.Tracef("Skipped marking input as pending "+
×
UNCOV
880
                                "published: %v not found in pending inputs", op)
×
UNCOV
881

×
UNCOV
882
                        continue
×
883
                }
884

885
                // If this input has already terminated, there's clearly
886
                // something wrong as it would have been removed. In this case
887
                // we log an error and skip marking this input as pending
888
                // publish.
889
                if pi.terminated() {
4✔
890
                        log.Errorf("Expect input %v to not have terminated "+
1✔
891
                                "state, instead it has %v", op, pi.state)
1✔
892

1✔
893
                        continue
1✔
894
                }
895

896
                // Update the input's state.
897
                pi.state = PendingPublish
2✔
898

2✔
899
                // Record another publish attempt.
2✔
900
                pi.publishAttempts++
2✔
901
        }
902
}
903

904
// markInputsPublished updates the sweeping tx in db and marks the list of
905
// inputs as published.
906
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
4✔
907
        // Mark this tx in db once successfully published.
4✔
908
        //
4✔
909
        // NOTE: this will behave as an overwrite, which is fine as the record
4✔
910
        // is small.
4✔
911
        tr.Published = true
4✔
912
        err := s.cfg.Store.StoreTx(tr)
4✔
913
        if err != nil {
5✔
914
                return fmt.Errorf("store tx: %w", err)
1✔
915
        }
1✔
916

917
        // Reschedule sweep.
918
        for _, input := range set.Inputs() {
7✔
919
                op := input.OutPoint()
4✔
920
                pi, ok := s.inputs[op]
4✔
921
                if !ok {
4✔
UNCOV
922
                        // It could be that this input is an additional wallet
×
UNCOV
923
                        // input that was attached. In that case there also
×
UNCOV
924
                        // isn't a pending input to update.
×
UNCOV
925
                        log.Tracef("Skipped marking input as published: %v "+
×
UNCOV
926
                                "not found in pending inputs", op)
×
UNCOV
927

×
UNCOV
928
                        continue
×
929
                }
930

931
                // Valdiate that the input is in an expected state.
932
                if pi.state != PendingPublish {
5✔
933
                        // We may get a Published if this is a replacement tx.
1✔
934
                        log.Debugf("Expect input %v to have %v, instead it "+
1✔
935
                                "has %v", op, PendingPublish, pi.state)
1✔
936

1✔
937
                        continue
1✔
938
                }
939

940
                // Update the input's state.
941
                pi.state = Published
3✔
942

3✔
943
                // Update the input's latest fee rate.
3✔
944
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
945
        }
946

947
        return nil
3✔
948
}
949

950
// markInputsPublishFailed marks the list of inputs as failed to be published.
951
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
2✔
952
        // Reschedule sweep.
2✔
953
        for _, inp := range set.Inputs() {
12✔
954
                op := inp.OutPoint()
10✔
955
                pi, ok := s.inputs[op]
10✔
956
                if !ok {
10✔
UNCOV
957
                        // It could be that this input is an additional wallet
×
UNCOV
958
                        // input that was attached. In that case there also
×
UNCOV
959
                        // isn't a pending input to update.
×
UNCOV
960
                        log.Tracef("Skipped marking input as publish failed: "+
×
UNCOV
961
                                "%v not found in pending inputs", op)
×
UNCOV
962

×
UNCOV
963
                        continue
×
964
                }
965

966
                // Valdiate that the input is in an expected state.
967
                if pi.state != PendingPublish && pi.state != Published {
15✔
968
                        log.Debugf("Expect input %v to have %v, instead it "+
5✔
969
                                "has %v", op, PendingPublish, pi.state)
5✔
970

5✔
971
                        continue
5✔
972
                }
973

974
                log.Warnf("Failed to publish input %v", op)
5✔
975

5✔
976
                // Update the input's state.
5✔
977
                pi.state = PublishFailed
5✔
978
        }
979
}
980

981
// monitorSpend registers a spend notification with the chain notifier. It
982
// returns a cancel function that can be used to cancel the registration.
983
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
UNCOV
984
        script []byte, heightHint uint32) (func(), error) {
×
UNCOV
985

×
UNCOV
986
        log.Tracef("Wait for spend of %v at heightHint=%v",
×
UNCOV
987
                outpoint, heightHint)
×
UNCOV
988

×
UNCOV
989
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
×
UNCOV
990
                &outpoint, script, heightHint,
×
UNCOV
991
        )
×
UNCOV
992
        if err != nil {
×
993
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
994
        }
×
995

UNCOV
996
        s.wg.Add(1)
×
UNCOV
997
        go func() {
×
UNCOV
998
                defer s.wg.Done()
×
UNCOV
999

×
UNCOV
1000
                select {
×
UNCOV
1001
                case spend, ok := <-spendEvent.Spend:
×
UNCOV
1002
                        if !ok {
×
UNCOV
1003
                                log.Debugf("Spend ntfn for %v canceled",
×
UNCOV
1004
                                        outpoint)
×
UNCOV
1005
                                return
×
UNCOV
1006
                        }
×
1007

UNCOV
1008
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
×
UNCOV
1009

×
UNCOV
1010
                        select {
×
UNCOV
1011
                        case s.spendChan <- spend:
×
UNCOV
1012
                                log.Debugf("Delivered spend ntfn for %v",
×
UNCOV
1013
                                        outpoint)
×
1014

1015
                        case <-s.quit:
×
1016
                        }
UNCOV
1017
                case <-s.quit:
×
1018
                }
1019
        }()
1020

UNCOV
1021
        return spendEvent.Cancel, nil
×
1022
}
1023

1024
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1025
// attempting to sweep.
1026
func (s *UtxoSweeper) PendingInputs() (
UNCOV
1027
        map[wire.OutPoint]*PendingInputResponse, error) {
×
UNCOV
1028

×
UNCOV
1029
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
×
UNCOV
1030
        errChan := make(chan error, 1)
×
UNCOV
1031
        select {
×
1032
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1033
                respChan: respChan,
1034
                errChan:  errChan,
UNCOV
1035
        }:
×
1036
        case <-s.quit:
×
1037
                return nil, ErrSweeperShuttingDown
×
1038
        }
1039

UNCOV
1040
        select {
×
UNCOV
1041
        case pendingSweeps := <-respChan:
×
UNCOV
1042
                return pendingSweeps, nil
×
1043
        case err := <-errChan:
×
1044
                return nil, err
×
1045
        case <-s.quit:
×
1046
                return nil, ErrSweeperShuttingDown
×
1047
        }
1048
}
1049

1050
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1051
// UtxoSweeper is attempting to sweep.
1052
func (s *UtxoSweeper) handlePendingSweepsReq(
UNCOV
1053
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
×
UNCOV
1054

×
UNCOV
1055
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
×
UNCOV
1056
        for _, inp := range s.inputs {
×
UNCOV
1057
                // Skip immature inputs for compatibility.
×
UNCOV
1058
                mature, _ := inp.isMature(uint32(s.currentHeight))
×
UNCOV
1059
                if !mature {
×
UNCOV
1060
                        continue
×
1061
                }
1062

1063
                // Only the exported fields are set, as we expect the response
1064
                // to only be consumed externally.
UNCOV
1065
                op := inp.OutPoint()
×
UNCOV
1066
                resps[op] = &PendingInputResponse{
×
UNCOV
1067
                        OutPoint:    op,
×
UNCOV
1068
                        WitnessType: inp.WitnessType(),
×
UNCOV
1069
                        Amount: btcutil.Amount(
×
UNCOV
1070
                                inp.SignDesc().Output.Value,
×
UNCOV
1071
                        ),
×
UNCOV
1072
                        LastFeeRate:       inp.lastFeeRate,
×
UNCOV
1073
                        BroadcastAttempts: inp.publishAttempts,
×
UNCOV
1074
                        Params:            inp.params,
×
UNCOV
1075
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
×
UNCOV
1076
                }
×
1077
        }
1078

UNCOV
1079
        select {
×
UNCOV
1080
        case req.respChan <- resps:
×
1081
        case <-s.quit:
×
1082
                log.Debug("Skipped sending pending sweep response due to " +
×
1083
                        "UtxoSweeper shutting down")
×
1084
        }
1085

UNCOV
1086
        return resps
×
1087
}
1088

1089
// UpdateParams allows updating the sweep parameters of a pending input in the
1090
// UtxoSweeper. This function can be used to provide an updated fee preference
1091
// and force flag that will be used for a new sweep transaction of the input
1092
// that will act as a replacement transaction (RBF) of the original sweeping
1093
// transaction, if any. The exclusive group is left unchanged.
1094
//
1095
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1096
// is actually successful. The responsibility of doing so should be handled by
1097
// the caller.
1098
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
UNCOV
1099
        params Params) (chan Result, error) {
×
UNCOV
1100

×
UNCOV
1101
        responseChan := make(chan *updateResp, 1)
×
UNCOV
1102
        select {
×
1103
        case s.updateReqs <- &updateReq{
1104
                input:        input,
1105
                params:       params,
1106
                responseChan: responseChan,
UNCOV
1107
        }:
×
1108
        case <-s.quit:
×
1109
                return nil, ErrSweeperShuttingDown
×
1110
        }
1111

UNCOV
1112
        select {
×
UNCOV
1113
        case response := <-responseChan:
×
UNCOV
1114
                return response.resultChan, response.err
×
1115
        case <-s.quit:
×
1116
                return nil, ErrSweeperShuttingDown
×
1117
        }
1118
}
1119

1120
// handleUpdateReq handles an update request by simply updating the sweep
1121
// parameters of the pending input. Currently, no validation is done on the new
1122
// fee preference to ensure it will properly create a replacement transaction.
1123
//
1124
// TODO(wilmer):
1125
//   - Validate fee preference to ensure we'll create a valid replacement
1126
//     transaction to allow the new fee rate to propagate throughout the
1127
//     network.
1128
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1129
//     did not exist in the original sweep transaction, resulting in an invalid
1130
//     replacement transaction.
1131
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
UNCOV
1132
        chan Result, error) {
×
UNCOV
1133

×
UNCOV
1134
        // If the UtxoSweeper is already trying to sweep this input, then we can
×
UNCOV
1135
        // simply just increase its fee rate. This will allow the input to be
×
UNCOV
1136
        // batched with others which also have a similar fee rate, creating a
×
UNCOV
1137
        // higher fee rate transaction that replaces the original input's
×
UNCOV
1138
        // sweeping transaction.
×
UNCOV
1139
        sweeperInput, ok := s.inputs[req.input]
×
UNCOV
1140
        if !ok {
×
1141
                return nil, lnwallet.ErrNotMine
×
1142
        }
×
1143

1144
        // Create the updated parameters struct. Leave the exclusive group
1145
        // unchanged.
UNCOV
1146
        newParams := Params{
×
UNCOV
1147
                StartingFeeRate: req.params.StartingFeeRate,
×
UNCOV
1148
                Immediate:       req.params.Immediate,
×
UNCOV
1149
                Budget:          req.params.Budget,
×
UNCOV
1150
                DeadlineHeight:  req.params.DeadlineHeight,
×
UNCOV
1151
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
×
UNCOV
1152
        }
×
UNCOV
1153

×
UNCOV
1154
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
×
UNCOV
1155
                req.input, sweeperInput.state, sweeperInput.params, newParams)
×
UNCOV
1156

×
UNCOV
1157
        sweeperInput.params = newParams
×
UNCOV
1158

×
UNCOV
1159
        // We need to reset the state so this input will be attempted again by
×
UNCOV
1160
        // our sweeper.
×
UNCOV
1161
        //
×
UNCOV
1162
        // TODO(yy): a dedicated state?
×
UNCOV
1163
        sweeperInput.state = Init
×
UNCOV
1164

×
UNCOV
1165
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1166
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1167
                sweeperInput.DeadlineHeight,
×
UNCOV
1168
        )
×
UNCOV
1169

×
UNCOV
1170
        resultChan := make(chan Result, 1)
×
UNCOV
1171
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
×
UNCOV
1172

×
UNCOV
1173
        return resultChan, nil
×
1174
}
1175

1176
// ListSweeps returns a list of the sweeps recorded by the sweep store.
UNCOV
1177
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
×
UNCOV
1178
        return s.cfg.Store.ListSweeps()
×
UNCOV
1179
}
×
1180

1181
// mempoolLookup takes an input's outpoint and queries the mempool to see
1182
// whether it's already been spent in a transaction found in the mempool.
1183
// Returns the transaction if found.
1184
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
7✔
1185
        // For neutrino backend, there's no mempool available, so we exit
7✔
1186
        // early.
7✔
1187
        if s.cfg.Mempool == nil {
8✔
1188
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1189

1✔
1190
                return fn.None[wire.MsgTx]()
1✔
1191
        }
1✔
1192

1193
        // Query this input in the mempool. If this outpoint is already spent
1194
        // in mempool, we should get a spending event back immediately.
1195
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
6✔
1196
}
1197

1198
// calculateDefaultDeadline calculates the default deadline height for a sweep
1199
// request that has no deadline height specified.
UNCOV
1200
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
×
UNCOV
1201
        // Create a default deadline height, which will be used when there's no
×
UNCOV
1202
        // DeadlineHeight specified for a given input.
×
UNCOV
1203
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
×
UNCOV
1204

×
UNCOV
1205
        // If the input is immature and has a locktime, we'll use the locktime
×
UNCOV
1206
        // height as the starting height.
×
UNCOV
1207
        matured, locktime := pi.isMature(uint32(s.currentHeight))
×
UNCOV
1208
        if !matured {
×
UNCOV
1209
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
×
UNCOV
1210
                log.Debugf("Input %v is immature, using locktime=%v instead "+
×
UNCOV
1211
                        "of current height=%d as starting height",
×
UNCOV
1212
                        pi.OutPoint(), locktime, s.currentHeight)
×
UNCOV
1213
        }
×
1214

UNCOV
1215
        return defaultDeadline
×
1216
}
1217

1218
// handleNewInput processes a new input by registering spend notification and
1219
// scheduling sweeping for it.
UNCOV
1220
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
×
UNCOV
1221
        outpoint := input.input.OutPoint()
×
UNCOV
1222
        pi, pending := s.inputs[outpoint]
×
UNCOV
1223
        if pending {
×
UNCOV
1224
                log.Infof("Already has pending input %v received, old params: "+
×
UNCOV
1225
                        "%v, new params %v", outpoint, pi.params, input.params)
×
UNCOV
1226

×
UNCOV
1227
                s.handleExistingInput(input, pi)
×
UNCOV
1228

×
UNCOV
1229
                return nil
×
UNCOV
1230
        }
×
1231

1232
        // This is a new input, and we want to query the mempool to see if this
1233
        // input has already been spent. If so, we'll start the input with
1234
        // state Published and attach the RBFInfo.
UNCOV
1235
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
×
UNCOV
1236

×
UNCOV
1237
        // Create a new pendingInput and initialize the listeners slice with
×
UNCOV
1238
        // the passed in result channel. If this input is offered for sweep
×
UNCOV
1239
        // again, the result channel will be appended to this slice.
×
UNCOV
1240
        pi = &SweeperInput{
×
UNCOV
1241
                state:     state,
×
UNCOV
1242
                listeners: []chan Result{input.resultChan},
×
UNCOV
1243
                Input:     input.input,
×
UNCOV
1244
                params:    input.params,
×
UNCOV
1245
                rbf:       rbfInfo,
×
UNCOV
1246
        }
×
UNCOV
1247

×
UNCOV
1248
        // Set the acutal deadline height.
×
UNCOV
1249
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1250
                s.calculateDefaultDeadline(pi),
×
UNCOV
1251
        )
×
UNCOV
1252

×
UNCOV
1253
        s.inputs[outpoint] = pi
×
UNCOV
1254
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
×
UNCOV
1255

×
UNCOV
1256
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
×
UNCOV
1257
                "witness_type=%v, amount=%v, deadline=%d, state=%v, "+
×
UNCOV
1258
                "params=(%v)", s.currentHeight, pi.OutPoint(), pi.WitnessType(),
×
UNCOV
1259
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
×
UNCOV
1260
                pi.state, pi.params)
×
UNCOV
1261

×
UNCOV
1262
        // Start watching for spend of this input, either by us or the remote
×
UNCOV
1263
        // party.
×
UNCOV
1264
        cancel, err := s.monitorSpend(
×
UNCOV
1265
                outpoint, input.input.SignDesc().Output.PkScript,
×
UNCOV
1266
                input.input.HeightHint(),
×
UNCOV
1267
        )
×
UNCOV
1268
        if err != nil {
×
1269
                err := fmt.Errorf("wait for spend: %w", err)
×
1270
                s.markInputFatal(pi, err)
×
1271

×
1272
                return err
×
1273
        }
×
1274

UNCOV
1275
        pi.ntfnRegCancel = cancel
×
UNCOV
1276

×
UNCOV
1277
        return nil
×
1278
}
1279

1280
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1281
// already been spent. If so, the state Published will be returned, otherwise
1282
// state Init. When spent, it will query the sweeper store to fetch the fee
1283
// info of the spending transction, and construct an RBFInfo based on it.
1284
// Suppose an error occurs, fn.None is returned.
1285
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1286
        SweepState, fn.Option[RBFInfo]) {
4✔
1287

4✔
1288
        // Check if we can find the spending tx of this input in mempool.
4✔
1289
        txOption := s.mempoolLookup(op)
4✔
1290

4✔
1291
        // Extract the spending tx from the option.
4✔
1292
        var tx *wire.MsgTx
4✔
1293
        txOption.WhenSome(func(t wire.MsgTx) {
7✔
1294
                tx = &t
3✔
1295
        })
3✔
1296

1297
        // Exit early if it's not found.
1298
        //
1299
        // NOTE: this is not accurate for backends that don't support mempool
1300
        // lookup:
1301
        // - for neutrino we don't have a mempool.
1302
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1303
        if tx == nil {
5✔
1304
                return Init, fn.None[RBFInfo]()
1✔
1305
        }
1✔
1306

1307
        // Otherwise the input is already spent in the mempool, so eventually
1308
        // we will return Published.
1309
        //
1310
        // We also need to update the RBF info for this input. If the sweeping
1311
        // transaction is broadcast by us, we can find the fee info in the
1312
        // sweeper store.
1313
        txid := tx.TxHash()
3✔
1314
        tr, err := s.cfg.Store.GetTx(txid)
3✔
1315

3✔
1316
        // If the tx is not found in the store, it means it's not broadcast by
3✔
1317
        // us, hence we can't find the fee info. This is fine as, later on when
3✔
1318
        // this tx is confirmed, we will remove the input from our inputs.
3✔
1319
        if errors.Is(err, ErrTxNotFound) {
4✔
1320
                log.Warnf("Spending tx %v not found in sweeper store", txid)
1✔
1321
                return Published, fn.None[RBFInfo]()
1✔
1322
        }
1✔
1323

1324
        // Exit if we get an db error.
1325
        if err != nil {
3✔
1326
                log.Errorf("Unable to get tx %v from sweeper store: %v",
1✔
1327
                        txid, err)
1✔
1328

1✔
1329
                return Published, fn.None[RBFInfo]()
1✔
1330
        }
1✔
1331

1332
        // Prepare the fee info and return it.
1333
        rbf := fn.Some(RBFInfo{
1✔
1334
                Txid:    txid,
1✔
1335
                Fee:     btcutil.Amount(tr.Fee),
1✔
1336
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
1✔
1337
        })
1✔
1338

1✔
1339
        return Published, rbf
1✔
1340
}
1341

1342
// handleExistingInput processes an input that is already known to the sweeper.
1343
// It will overwrite the params of the old input with the new ones.
1344
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
UNCOV
1345
        oldInput *SweeperInput) {
×
UNCOV
1346

×
UNCOV
1347
        // Before updating the input details, check if an exclusive group was
×
UNCOV
1348
        // set. In case the same input is registered again without an exclusive
×
UNCOV
1349
        // group set, the previous input and its sweep parameters are outdated
×
UNCOV
1350
        // hence need to be replaced. This scenario currently only happens for
×
UNCOV
1351
        // anchor outputs. When a channel is force closed, in the worst case 3
×
UNCOV
1352
        // different sweeps with the same exclusive group are registered with
×
UNCOV
1353
        // the sweeper to bump the closing transaction (cpfp) when its time
×
UNCOV
1354
        // critical. Receiving an input which was already registered with the
×
UNCOV
1355
        // sweeper but now without an exclusive group means non of the previous
×
UNCOV
1356
        // inputs were used as CPFP, so we need to make sure we update the
×
UNCOV
1357
        // sweep parameters but also remove all inputs with the same exclusive
×
UNCOV
1358
        // group because the are outdated too.
×
UNCOV
1359
        var prevExclGroup *uint64
×
UNCOV
1360
        if oldInput.params.ExclusiveGroup != nil &&
×
UNCOV
1361
                input.params.ExclusiveGroup == nil {
×
UNCOV
1362

×
UNCOV
1363
                prevExclGroup = new(uint64)
×
UNCOV
1364
                *prevExclGroup = *oldInput.params.ExclusiveGroup
×
UNCOV
1365
        }
×
1366

1367
        // Update input details and sweep parameters. The re-offered input
1368
        // details may contain a change to the unconfirmed parent tx info.
UNCOV
1369
        oldInput.params = input.params
×
UNCOV
1370
        oldInput.Input = input.input
×
UNCOV
1371

×
UNCOV
1372
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1373
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1374
                oldInput.DeadlineHeight,
×
UNCOV
1375
        )
×
UNCOV
1376

×
UNCOV
1377
        // Add additional result channel to signal spend of this input.
×
UNCOV
1378
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
×
UNCOV
1379

×
UNCOV
1380
        if prevExclGroup != nil {
×
UNCOV
1381
                s.removeExclusiveGroup(*prevExclGroup)
×
UNCOV
1382
        }
×
1383
}
1384

1385
// handleInputSpent takes a spend event of our input and updates the sweeper's
1386
// internal state to remove the input.
UNCOV
1387
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
×
UNCOV
1388
        // Query store to find out if we ever published this tx.
×
UNCOV
1389
        spendHash := *spend.SpenderTxHash
×
UNCOV
1390
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
×
UNCOV
1391
        if err != nil {
×
1392
                log.Errorf("cannot determine if tx %v is ours: %v",
×
1393
                        spendHash, err)
×
1394
                return
×
1395
        }
×
1396

1397
        // If this isn't our transaction, it means someone else swept outputs
1398
        // that we were attempting to sweep. This can happen for anchor outputs
1399
        // as well as justice transactions. In this case, we'll notify the
1400
        // wallet to remove any spends that descent from this output.
UNCOV
1401
        if !isOurTx {
×
UNCOV
1402
                // Construct a map of the inputs this transaction spends.
×
UNCOV
1403
                spendingTx := spend.SpendingTx
×
UNCOV
1404
                inputsSpent := make(
×
UNCOV
1405
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
×
UNCOV
1406
                )
×
UNCOV
1407
                for _, txIn := range spendingTx.TxIn {
×
UNCOV
1408
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
×
UNCOV
1409
                }
×
1410

UNCOV
1411
                log.Debugf("Attempting to remove descendant txns invalidated "+
×
UNCOV
1412
                        "by (txid=%v): %v", spendingTx.TxHash(),
×
UNCOV
1413
                        spew.Sdump(spendingTx))
×
UNCOV
1414

×
UNCOV
1415
                err := s.removeConflictSweepDescendants(inputsSpent)
×
UNCOV
1416
                if err != nil {
×
1417
                        log.Warnf("unable to remove descendant transactions "+
×
1418
                                "due to tx %v: ", spendHash)
×
1419
                }
×
1420

UNCOV
1421
                log.Debugf("Detected third party spend related to in flight "+
×
UNCOV
1422
                        "inputs (is_ours=%v): %v", isOurTx,
×
UNCOV
1423
                        lnutils.SpewLogClosure(spend.SpendingTx))
×
1424
        }
1425

1426
        // We now use the spending tx to update the state of the inputs.
UNCOV
1427
        s.markInputsSwept(spend.SpendingTx, isOurTx)
×
1428
}
1429

1430
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1431
// It will also notify all the subscribers of this input.
1432
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
1✔
1433
        for _, txIn := range tx.TxIn {
5✔
1434
                outpoint := txIn.PreviousOutPoint
4✔
1435

4✔
1436
                // Check if this input is known to us. It could probably be
4✔
1437
                // unknown if we canceled the registration, deleted from inputs
4✔
1438
                // map but the ntfn was in-flight already. Or this could be not
4✔
1439
                // one of our inputs.
4✔
1440
                input, ok := s.inputs[outpoint]
4✔
1441
                if !ok {
5✔
1442
                        // It's very likely that a spending tx contains inputs
1✔
1443
                        // that we don't know.
1✔
1444
                        log.Tracef("Skipped marking input as swept: %v not "+
1✔
1445
                                "found in pending inputs", outpoint)
1✔
1446

1✔
1447
                        continue
1✔
1448
                }
1449

1450
                // This input may already been marked as swept by a previous
1451
                // spend notification, which is likely to happen as one sweep
1452
                // transaction usually sweeps multiple inputs.
1453
                if input.terminated() {
4✔
1454
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1455
                                "state=%v", outpoint, input.state)
1✔
1456

1✔
1457
                        continue
1✔
1458
                }
1459

1460
                input.state = Swept
2✔
1461

2✔
1462
                // Return either a nil or a remote spend result.
2✔
1463
                var err error
2✔
1464
                if !isOurTx {
2✔
UNCOV
1465
                        log.Warnf("Input=%v was spent by remote or third "+
×
UNCOV
1466
                                "party in tx=%v", outpoint, tx.TxHash())
×
UNCOV
1467
                        err = ErrRemoteSpend
×
UNCOV
1468
                }
×
1469

1470
                // Signal result channels.
1471
                s.signalResult(input, Result{
2✔
1472
                        Tx:  tx,
2✔
1473
                        Err: err,
2✔
1474
                })
2✔
1475

2✔
1476
                // Remove all other inputs in this exclusive group.
2✔
1477
                if input.params.ExclusiveGroup != nil {
2✔
UNCOV
1478
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
×
UNCOV
1479
                }
×
1480
        }
1481
}
1482

1483
// markInputFatal marks the given input as fatal and won't be retried. It
1484
// will also notify all the subscribers of this input.
1485
func (s *UtxoSweeper) markInputFatal(pi *SweeperInput, err error) {
5✔
1486
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
5✔
1487

5✔
1488
        pi.state = Fatal
5✔
1489

5✔
1490
        s.signalResult(pi, Result{Err: err})
5✔
1491
}
5✔
1492

1493
// updateSweeperInputs updates the sweeper's internal state and returns a map
1494
// of inputs to be swept. It will remove the inputs that are in final states,
1495
// and returns a map of inputs that have either state Init or PublishFailed.
1496
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
1✔
1497
        // Create a map of inputs to be swept.
1✔
1498
        inputs := make(InputsMap)
1✔
1499

1✔
1500
        // Iterate the pending inputs and update the sweeper's state.
1✔
1501
        //
1✔
1502
        // TODO(yy): sweeper is made to communicate via go channels, so no
1✔
1503
        // locks are needed to access the map. However, it'd be safer if we
1✔
1504
        // turn this inputs map into a SyncMap in case we wanna add concurrent
1✔
1505
        // access to the map in the future.
1✔
1506
        for op, input := range s.inputs {
10✔
1507
                log.Tracef("Checking input: %s, state=%v", input, input.state)
9✔
1508

9✔
1509
                // If the input has reached a final state, that it's either
9✔
1510
                // been swept, or failed, or excluded, we will remove it from
9✔
1511
                // our sweeper.
9✔
1512
                if input.terminated() {
12✔
1513
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1514
                                input.state, op)
3✔
1515

3✔
1516
                        delete(s.inputs, op)
3✔
1517

3✔
1518
                        continue
3✔
1519
                }
1520

1521
                // If this input has been included in a sweep tx that's not
1522
                // published yet, we'd skip this input and wait for the sweep
1523
                // tx to be published.
1524
                if input.state == PendingPublish {
7✔
1525
                        continue
1✔
1526
                }
1527

1528
                // If this input has already been published, we will need to
1529
                // check the RBF condition before attempting another sweeping.
1530
                if input.state == Published {
6✔
1531
                        continue
1✔
1532
                }
1533

1534
                // If the input has a locktime that's not yet reached, we will
1535
                // skip this input and wait for the locktime to be reached.
1536
                mature, _ := input.isMature(uint32(s.currentHeight))
4✔
1537
                if !mature {
6✔
1538
                        continue
2✔
1539
                }
1540

1541
                // If this input is new or has been failed to be published,
1542
                // we'd retry it. The assumption here is that when an error is
1543
                // returned from `PublishTransaction`, it means the tx has
1544
                // failed to meet the policy, hence it's not in the mempool.
1545
                inputs[op] = input
2✔
1546
        }
1547

1548
        return inputs
1✔
1549
}
1550

1551
// sweepPendingInputs is called when the ticker fires. It will create clusters
1552
// and attempt to create and publish the sweeping transactions.
1553
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
1✔
1554
        log.Debugf("Sweeping %v inputs", len(inputs))
1✔
1555

1✔
1556
        // Cluster all of our inputs based on the specific Aggregator.
1✔
1557
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
1✔
1558

1✔
1559
        // sweepWithLock is a helper closure that executes the sweep within a
1✔
1560
        // coin select lock to prevent the coins being selected for other
1✔
1561
        // transactions like funding of a channel.
1✔
1562
        sweepWithLock := func(set InputSet) error {
2✔
1563
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
2✔
1564
                        // Try to add inputs from our wallet.
1✔
1565
                        err := set.AddWalletInputs(s.cfg.Wallet)
1✔
1566
                        if err != nil {
1✔
UNCOV
1567
                                return err
×
UNCOV
1568
                        }
×
1569

1570
                        // Create sweeping transaction for each set.
1571
                        err = s.sweep(set)
1✔
1572
                        if err != nil {
1✔
1573
                                return err
×
1574
                        }
×
1575

1576
                        return nil
1✔
1577
                })
1578
        }
1579

1580
        for _, set := range sets {
3✔
1581
                var err error
2✔
1582
                if set.NeedWalletInput() {
3✔
1583
                        // Sweep the set of inputs that need the wallet inputs.
1✔
1584
                        err = sweepWithLock(set)
1✔
1585
                } else {
2✔
1586
                        // Sweep the set of inputs that don't need the wallet
1✔
1587
                        // inputs.
1✔
1588
                        err = s.sweep(set)
1✔
1589
                }
1✔
1590

1591
                if err != nil {
2✔
UNCOV
1592
                        log.Errorf("Failed to sweep %v: %v", set, err)
×
UNCOV
1593
                }
×
1594
        }
1595
}
1596

1597
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1598
// the inputs being used.
1599
type bumpResp struct {
1600
        // result is the result of the bump attempt returned from the fee
1601
        // bumper.
1602
        result *BumpResult
1603

1604
        // set is the input set that was used in the bump attempt.
1605
        set InputSet
1606
}
1607

1608
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1609
// future updates about the sweeping tx.
1610
//
1611
// NOTE: must run as a goroutine.
1612
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1613
        resultChan <-chan *BumpResult) {
6✔
1614

6✔
1615
        defer s.wg.Done()
6✔
1616

6✔
1617
        for {
13✔
1618
                select {
7✔
1619
                case r := <-resultChan:
3✔
1620
                        // Validate the result is valid.
3✔
1621
                        if err := r.Validate(); err != nil {
3✔
1622
                                log.Errorf("Received invalid result: %v", err)
×
1623
                                continue
×
1624
                        }
1625

1626
                        resp := &bumpResp{
3✔
1627
                                result: r,
3✔
1628
                                set:    set,
3✔
1629
                        }
3✔
1630

3✔
1631
                        // Send the result back to the main event loop.
3✔
1632
                        select {
3✔
1633
                        case s.bumpRespChan <- resp:
3✔
1634
                        case <-s.quit:
×
1635
                                log.Debug("Sweeper shutting down, skip " +
×
1636
                                        "sending bump result")
×
1637

×
1638
                                return
×
1639
                        }
1640

1641
                        // The sweeping tx has been confirmed, we can exit the
1642
                        // monitor now.
1643
                        //
1644
                        // TODO(yy): can instead remove the spend subscription
1645
                        // in sweeper and rely solely on this event to mark
1646
                        // inputs as Swept?
1647
                        if r.Event == TxConfirmed || r.Event == TxFailed {
5✔
1648
                                // Exit if the tx is failed to be created.
2✔
1649
                                if r.Tx == nil {
2✔
UNCOV
1650
                                        log.Debugf("Received %v for nil tx, "+
×
UNCOV
1651
                                                "exit monitor", r.Event)
×
UNCOV
1652

×
UNCOV
1653
                                        return
×
UNCOV
1654
                                }
×
1655

1656
                                log.Debugf("Received %v for sweep tx %v, exit "+
2✔
1657
                                        "fee bump monitor", r.Event,
2✔
1658
                                        r.Tx.TxHash())
2✔
1659

2✔
1660
                                // Cancel the rebroadcasting of the failed tx.
2✔
1661
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
2✔
1662

2✔
1663
                                return
2✔
1664
                        }
1665

1666
                case <-s.quit:
2✔
1667
                        log.Debugf("Sweeper shutting down, exit fee " +
2✔
1668
                                "bump handler")
2✔
1669

2✔
1670
                        return
2✔
1671
                }
1672
        }
1673
}
1674

1675
// handleBumpEventTxFailed handles the case where the tx has been failed to
1676
// publish.
1677
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
1✔
1678
        r := resp.result
1✔
1679
        tx, err := r.Tx, r.Err
1✔
1680

1✔
1681
        if tx != nil {
2✔
1682
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
1✔
1683
                        err)
1✔
1684
        }
1✔
1685

1686
        // NOTE: When marking the inputs as failed, we are using the input set
1687
        // instead of the inputs found in the tx. This is fine for current
1688
        // version of the sweeper because we always create a tx using ALL of
1689
        // the inputs specified by the set.
1690
        //
1691
        // TODO(yy): should we also remove the failed tx from db?
1692
        s.markInputsPublishFailed(resp.set)
1✔
1693
}
1694

1695
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1696
// replaced by a new one.
1697
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1698
        r := resp.result
3✔
1699
        oldTx := r.ReplacedTx
3✔
1700
        newTx := r.Tx
3✔
1701

3✔
1702
        // Prepare a new record to replace the old one.
3✔
1703
        tr := &TxRecord{
3✔
1704
                Txid:    newTx.TxHash(),
3✔
1705
                FeeRate: uint64(r.FeeRate),
3✔
1706
                Fee:     uint64(r.Fee),
3✔
1707
        }
3✔
1708

3✔
1709
        // Get the old record for logging purpose.
3✔
1710
        oldTxid := oldTx.TxHash()
3✔
1711
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1712
        if err != nil {
4✔
1713
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1714
                return err
1✔
1715
        }
1✔
1716

1717
        // Cancel the rebroadcasting of the replaced tx.
1718
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
2✔
1719

2✔
1720
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
2✔
1721
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
2✔
1722
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
2✔
1723

2✔
1724
        // The old sweeping tx has been replaced by a new one, we will update
2✔
1725
        // the tx record in the sweeper db.
2✔
1726
        //
2✔
1727
        // TODO(yy): we may also need to update the inputs in this tx to a new
2✔
1728
        // state. Suppose a replacing tx only spends a subset of the inputs
2✔
1729
        // here, we'd end up with the rest being marked as `Published` and
2✔
1730
        // won't be aggregated in the next sweep. Atm it's fine as we always
2✔
1731
        // RBF the same input set.
2✔
1732
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1733
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
1✔
1734
                return err
1✔
1735
        }
1✔
1736

1737
        // Mark the inputs as published using the replacing tx.
1738
        return s.markInputsPublished(tr, resp.set)
1✔
1739
}
1740

1741
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1742
// successfully published.
1743
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
1✔
1744
        r := resp.result
1✔
1745
        tx := r.Tx
1✔
1746
        tr := &TxRecord{
1✔
1747
                Txid:    tx.TxHash(),
1✔
1748
                FeeRate: uint64(r.FeeRate),
1✔
1749
                Fee:     uint64(r.Fee),
1✔
1750
        }
1✔
1751

1✔
1752
        // Inputs have been successfully published so we update their
1✔
1753
        // states.
1✔
1754
        err := s.markInputsPublished(tr, resp.set)
1✔
1755
        if err != nil {
1✔
1756
                return err
×
1757
        }
×
1758

1759
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
1✔
1760
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
1✔
1761

1✔
1762
        // If there's no error, remove the output script. Otherwise keep it so
1✔
1763
        // that it can be reused for the next transaction and causes no address
1✔
1764
        // inflation.
1✔
1765
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
1✔
1766

1✔
1767
        return nil
1✔
1768
}
1769

1770
// handleBumpEventTxFatal handles the case where there's an unexpected error
1771
// when creating or publishing the sweeping tx. In this case, the tx will be
1772
// removed from the sweeper store and the inputs will be marked as `Failed`,
1773
// which means they will not be retried.
1774
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1775
        r := resp.result
2✔
1776

2✔
1777
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1778
        // a broadcast error, it's likely there isn't a tx here.
2✔
1779
        if r.Tx != nil {
4✔
1780
                txid := r.Tx.TxHash()
2✔
1781
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
2✔
1782

2✔
1783
                // Remove the tx from the sweeper db if it exists.
2✔
1784
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
3✔
1785
                        return fmt.Errorf("delete tx record for %v: %w", txid,
1✔
1786
                                err)
1✔
1787
                }
1✔
1788
        }
1789

1790
        // Mark the inputs as fatal.
1791
        s.markInputsFatal(resp.set, r.Err)
1✔
1792

1✔
1793
        return nil
1✔
1794
}
1795

1796
// markInputsFatal  marks all inputs in the input set as failed. It will also
1797
// notify all the subscribers of these inputs.
1798
func (s *UtxoSweeper) markInputsFatal(set InputSet, err error) {
2✔
1799
        for _, inp := range set.Inputs() {
9✔
1800
                outpoint := inp.OutPoint()
7✔
1801

7✔
1802
                input, ok := s.inputs[outpoint]
7✔
1803
                if !ok {
7✔
UNCOV
1804
                        // It's very likely that a spending tx contains inputs
×
UNCOV
1805
                        // that we don't know.
×
UNCOV
1806
                        log.Tracef("Skipped marking input as failed: %v not "+
×
UNCOV
1807
                                "found in pending inputs", outpoint)
×
UNCOV
1808

×
UNCOV
1809
                        continue
×
1810
                }
1811

1812
                // If the input is already in a terminal state, we don't want
1813
                // to rewrite it, which also indicates an error as we only get
1814
                // an error event during the initial broadcast.
1815
                if input.terminated() {
10✔
1816
                        log.Errorf("Skipped marking input=%v as failed due to "+
3✔
1817
                                "unexpected state=%v", outpoint, input.state)
3✔
1818

3✔
1819
                        continue
3✔
1820
                }
1821

1822
                s.markInputFatal(input, err)
4✔
1823
        }
1824
}
1825

1826
// handleBumpEvent handles the result sent from the bumper based on its event
1827
// type.
1828
//
1829
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1830
// input's spending event, we don't need to do anything here.
1831
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
1✔
1832
        log.Debugf("Received bump result %v", r.result)
1✔
1833

1✔
1834
        switch r.result.Event {
1✔
1835
        // The tx has been published, we update the inputs' state and create a
1836
        // record to be stored in the sweeper db.
UNCOV
1837
        case TxPublished:
×
UNCOV
1838
                return s.handleBumpEventTxPublished(r)
×
1839

1840
        // The tx has failed, we update the inputs' state.
1841
        case TxFailed:
1✔
1842
                s.handleBumpEventTxFailed(r)
1✔
1843
                return nil
1✔
1844

1845
        // The tx has been replaced, we will remove the old tx and replace it
1846
        // with the new one.
UNCOV
1847
        case TxReplaced:
×
UNCOV
1848
                return s.handleBumpEventTxReplaced(r)
×
1849

1850
        // There's a fatal error in creating the tx, we will remove the tx from
1851
        // the sweeper db and mark the inputs as failed.
UNCOV
1852
        case TxFatal:
×
UNCOV
1853
                return s.handleBumpEventTxFatal(r)
×
1854
        }
1855

UNCOV
1856
        return nil
×
1857
}
1858

1859
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1860
//
1861
// NOTE: It is enough to check the txid because the sweeper will create
1862
// outpoints which solely belong to the internal LND wallet.
UNCOV
1863
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
×
UNCOV
1864
        found, err := s.cfg.Store.IsOurTx(op.Hash)
×
UNCOV
1865
        // In case there is an error fetching the transaction details from the
×
UNCOV
1866
        // sweeper store we assume the outpoint is still used by the sweeper
×
UNCOV
1867
        // (worst case scenario).
×
UNCOV
1868
        //
×
UNCOV
1869
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
×
UNCOV
1870
        // bucket.
×
UNCOV
1871
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
×
1872
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1873
                        "with: %v, we assume it is still in use by the sweeper",
×
1874
                        op.Hash, op.Index, err)
×
1875

×
1876
                return true
×
1877
        }
×
1878

UNCOV
1879
        return found
×
1880
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc