• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12320026784

13 Dec 2024 05:14PM UTC coverage: 57.562% (-1.1%) from 58.636%
12320026784

Pull #9315

github

web-flow
Merge pull request #9277 from yyforyongyu/yy-blockbeat-end

Beat [4/4]: implement `Consumer` in `chainWatcher`
Pull Request #9315: Implement `blockbeat`

1754 of 2518 new or added lines in 30 files covered. (69.66%)

19108 existing lines in 247 files now uncovered.

102548 of 178151 relevant lines covered (57.56%)

24794.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.35
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
49
        // with the same identifier from being batched together.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
UNCOV
72
func (p Params) String() string {
×
UNCOV
73
        deadline := "none"
×
UNCOV
74
        p.DeadlineHeight.WhenSome(func(d int32) {
×
UNCOV
75
                deadline = fmt.Sprintf("%d", d)
×
UNCOV
76
        })
×
77

UNCOV
78
        exclusiveGroup := "none"
×
UNCOV
79
        if p.ExclusiveGroup != nil {
×
UNCOV
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
×
UNCOV
81
        }
×
82

UNCOV
83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
×
UNCOV
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
×
UNCOV
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
×
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Failed is the state when a pending input has too many failed publish
123
        // atttempts or unknown broadcast error is returned.
124
        Failed
125
)
126

127
// String gives a human readable text for the sweep states.
UNCOV
128
func (s SweepState) String() string {
×
UNCOV
129
        switch s {
×
UNCOV
130
        case Init:
×
UNCOV
131
                return "Init"
×
132

UNCOV
133
        case PendingPublish:
×
UNCOV
134
                return "PendingPublish"
×
135

UNCOV
136
        case Published:
×
UNCOV
137
                return "Published"
×
138

UNCOV
139
        case PublishFailed:
×
UNCOV
140
                return "PublishFailed"
×
141

UNCOV
142
        case Swept:
×
UNCOV
143
                return "Swept"
×
144

UNCOV
145
        case Excluded:
×
UNCOV
146
                return "Excluded"
×
147

148
        case Failed:
×
149
                return "Failed"
×
150

151
        default:
×
152
                return "Unknown"
×
153
        }
154
}
155

156
// RBFInfo stores the information required to perform a RBF bump on a pending
157
// sweeping tx.
158
type RBFInfo struct {
159
        // Txid is the txid of the sweeping tx.
160
        Txid chainhash.Hash
161

162
        // FeeRate is the fee rate of the sweeping tx.
163
        FeeRate chainfee.SatPerKWeight
164

165
        // Fee is the total fee of the sweeping tx.
166
        Fee btcutil.Amount
167
}
168

169
// SweeperInput is created when an input reaches the main loop for the first
170
// time. It wraps the input and tracks all relevant state that is needed for
171
// sweeping.
172
type SweeperInput struct {
173
        input.Input
174

175
        // state tracks the current state of the input.
176
        state SweepState
177

178
        // listeners is a list of channels over which the final outcome of the
179
        // sweep needs to be broadcasted.
180
        listeners []chan Result
181

182
        // ntfnRegCancel is populated with a function that cancels the chain
183
        // notifier spend registration.
184
        ntfnRegCancel func()
185

186
        // publishAttempts records the number of attempts that have already been
187
        // made to sweep this tx.
188
        publishAttempts int
189

190
        // params contains the parameters that control the sweeping process.
191
        params Params
192

193
        // lastFeeRate is the most recent fee rate used for this input within a
194
        // transaction broadcast to the network.
195
        lastFeeRate chainfee.SatPerKWeight
196

197
        // rbf records the RBF constraints.
198
        rbf fn.Option[RBFInfo]
199

200
        // DeadlineHeight is the deadline height for this input. This is
201
        // different from the DeadlineHeight in its params as it's an actual
202
        // value than an option.
203
        DeadlineHeight int32
204
}
205

206
// String returns a human readable interpretation of the pending input.
207
func (p *SweeperInput) String() string {
26✔
208
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
26✔
209
}
26✔
210

211
// terminated returns a boolean indicating whether the input has reached a
212
// final state.
213
func (p *SweeperInput) terminated() bool {
22✔
214
        switch p.state {
22✔
215
        // If the input has reached a final state, that it's either
216
        // been swept, or failed, or excluded, we will remove it from
217
        // our sweeper.
218
        case Failed, Swept, Excluded:
8✔
219
                return true
8✔
220

221
        default:
14✔
222
                return false
14✔
223
        }
224
}
225

226
// isMature returns a boolean indicating whether the input has a timelock that
227
// has been reached or not. The locktime found is also returned.
228
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
4✔
229
        locktime, _ := p.RequiredLockTime()
4✔
230
        if currentHeight < locktime {
5✔
231
                log.Debugf("Input %v has locktime=%v, current height is %v",
1✔
232
                        p.OutPoint(), locktime, currentHeight)
1✔
233

1✔
234
                return false, locktime
1✔
235
        }
1✔
236

237
        // If the input has a CSV that's not yet reached, we will skip
238
        // this input and wait for the expiry.
239
        //
240
        // NOTE: We need to consider whether this input can be included in the
241
        // next block or not, which means the CSV will be checked against the
242
        // currentHeight plus one.
243
        locktime = p.BlocksToMaturity() + p.HeightHint()
3✔
244
        if currentHeight+1 < locktime {
4✔
245
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
1✔
246
                        "skipped sweeping", p.OutPoint(), locktime,
1✔
247
                        currentHeight)
1✔
248

1✔
249
                return false, locktime
1✔
250
        }
1✔
251

252
        return true, locktime
2✔
253
}
254

255
// InputsMap is a type alias for a set of pending inputs.
256
type InputsMap = map[wire.OutPoint]*SweeperInput
257

258
// pendingSweepsReq is an internal message we'll use to represent an external
259
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
260
// attempting to sweep.
261
type pendingSweepsReq struct {
262
        respChan chan map[wire.OutPoint]*PendingInputResponse
263
        errChan  chan error
264
}
265

266
// PendingInputResponse contains information about an input that is currently
267
// being swept by the UtxoSweeper.
268
type PendingInputResponse struct {
269
        // OutPoint is the identify outpoint of the input being swept.
270
        OutPoint wire.OutPoint
271

272
        // WitnessType is the witness type of the input being swept.
273
        WitnessType input.WitnessType
274

275
        // Amount is the amount of the input being swept.
276
        Amount btcutil.Amount
277

278
        // LastFeeRate is the most recent fee rate used for the input being
279
        // swept within a transaction broadcast to the network.
280
        LastFeeRate chainfee.SatPerKWeight
281

282
        // BroadcastAttempts is the number of attempts we've made to sweept the
283
        // input.
284
        BroadcastAttempts int
285

286
        // Params contains the sweep parameters for this pending request.
287
        Params Params
288

289
        // DeadlineHeight records the deadline height of this input.
290
        DeadlineHeight uint32
291
}
292

293
// updateReq is an internal message we'll use to represent an external caller's
294
// intent to update the sweep parameters of a given input.
295
type updateReq struct {
296
        input        wire.OutPoint
297
        params       Params
298
        responseChan chan *updateResp
299
}
300

301
// updateResp is an internal message we'll use to hand off the response of a
302
// updateReq from the UtxoSweeper's main event loop back to the caller.
303
type updateResp struct {
304
        resultChan chan Result
305
        err        error
306
}
307

308
// UtxoSweeper is responsible for sweeping outputs back into the wallet
309
type UtxoSweeper struct {
310
        started uint32 // To be used atomically.
311
        stopped uint32 // To be used atomically.
312

313
        // Embed the blockbeat consumer struct to get access to the method
314
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
315
        chainio.BeatConsumer
316

317
        cfg *UtxoSweeperConfig
318

319
        newInputs chan *sweepInputMessage
320
        spendChan chan *chainntnfs.SpendDetail
321

322
        // pendingSweepsReq is a channel that will be sent requests by external
323
        // callers in order to retrieve the set of pending inputs the
324
        // UtxoSweeper is attempting to sweep.
325
        pendingSweepsReqs chan *pendingSweepsReq
326

327
        // updateReqs is a channel that will be sent requests by external
328
        // callers who wish to bump the fee rate of a given input.
329
        updateReqs chan *updateReq
330

331
        // inputs is the total set of inputs the UtxoSweeper has been requested
332
        // to sweep.
333
        inputs InputsMap
334

335
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
336

337
        relayFeeRate chainfee.SatPerKWeight
338

339
        quit chan struct{}
340
        wg   sync.WaitGroup
341

342
        // currentHeight is the best known height of the main chain. This is
343
        // updated whenever a new block epoch is received.
344
        currentHeight int32
345

346
        // bumpRespChan is a channel that receives broadcast results from the
347
        // TxPublisher.
348
        bumpRespChan chan *bumpResp
349
}
350

351
// Compile-time check for the chainio.Consumer interface.
352
var _ chainio.Consumer = (*UtxoSweeper)(nil)
353

354
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
355
type UtxoSweeperConfig struct {
356
        // GenSweepScript generates a P2WKH script belonging to the wallet where
357
        // funds can be swept.
358
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
359

360
        // FeeEstimator is used when crafting sweep transactions to estimate
361
        // the necessary fee relative to the expected size of the sweep
362
        // transaction.
363
        FeeEstimator chainfee.Estimator
364

365
        // Wallet contains the wallet functions that sweeper requires.
366
        Wallet Wallet
367

368
        // Notifier is an instance of a chain notifier we'll use to watch for
369
        // certain on-chain events.
370
        Notifier chainntnfs.ChainNotifier
371

372
        // Mempool is the mempool watcher that will be used to query whether a
373
        // given input is already being spent by a transaction in the mempool.
374
        Mempool chainntnfs.MempoolWatcher
375

376
        // Store stores the published sweeper txes.
377
        Store SweeperStore
378

379
        // Signer is used by the sweeper to generate valid witnesses at the
380
        // time the incubated outputs need to be spent.
381
        Signer input.Signer
382

383
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
384
        // in a single sweep tx. If more need to be swept, multiple txes are
385
        // created and published.
386
        MaxInputsPerTx uint32
387

388
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
389
        MaxFeeRate chainfee.SatPerVByte
390

391
        // Aggregator is used to group inputs into clusters based on its
392
        // implemention-specific strategy.
393
        Aggregator UtxoAggregator
394

395
        // Publisher is used to publish the sweep tx crafted here and monitors
396
        // it for potential fee bumps.
397
        Publisher Bumper
398

399
        // NoDeadlineConfTarget is the conf target to use when sweeping
400
        // non-time-sensitive outputs.
401
        NoDeadlineConfTarget uint32
402
}
403

404
// Result is the struct that is pushed through the result channel. Callers can
405
// use this to be informed of the final sweep result. In case of a remote
406
// spend, Err will be ErrRemoteSpend.
407
type Result struct {
408
        // Err is the final result of the sweep. It is nil when the input is
409
        // swept successfully by us. ErrRemoteSpend is returned when another
410
        // party took the input.
411
        Err error
412

413
        // Tx is the transaction that spent the input.
414
        Tx *wire.MsgTx
415
}
416

417
// sweepInputMessage structs are used in the internal channel between the
418
// SweepInput call and the sweeper main loop.
419
type sweepInputMessage struct {
420
        input      input.Input
421
        params     Params
422
        resultChan chan Result
423
}
424

425
// New returns a new Sweeper instance.
426
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
16✔
427
        s := &UtxoSweeper{
16✔
428
                cfg:               cfg,
16✔
429
                newInputs:         make(chan *sweepInputMessage),
16✔
430
                spendChan:         make(chan *chainntnfs.SpendDetail),
16✔
431
                updateReqs:        make(chan *updateReq),
16✔
432
                pendingSweepsReqs: make(chan *pendingSweepsReq),
16✔
433
                quit:              make(chan struct{}),
16✔
434
                inputs:            make(InputsMap),
16✔
435
                bumpRespChan:      make(chan *bumpResp, 100),
16✔
436
        }
16✔
437

16✔
438
        // Mount the block consumer.
16✔
439
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
16✔
440

16✔
441
        return s
16✔
442
}
16✔
443

444
// Start starts the process of constructing and publish sweep txes.
NEW
445
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
×
UNCOV
446
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
×
447
                return nil
×
448
        }
×
449

UNCOV
450
        log.Info("Sweeper starting")
×
UNCOV
451

×
UNCOV
452
        // Retrieve relay fee for dust limit calculation. Assume that this will
×
UNCOV
453
        // not change from here on.
×
UNCOV
454
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
455

×
NEW
456
        // Set the current height.
×
NEW
457
        s.currentHeight = beat.Height()
×
UNCOV
458

×
UNCOV
459
        // Start sweeper main loop.
×
UNCOV
460
        s.wg.Add(1)
×
NEW
461
        go s.collector()
×
UNCOV
462

×
UNCOV
463
        return nil
×
464
}
465

466
// RelayFeePerKW returns the minimum fee rate required for transactions to be
467
// relayed.
468
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
469
        return s.relayFeeRate
×
470
}
×
471

472
// Stop stops sweeper from listening to block epochs and constructing sweep
473
// txes.
UNCOV
474
func (s *UtxoSweeper) Stop() error {
×
UNCOV
475
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
476
                return nil
×
477
        }
×
478

UNCOV
479
        log.Info("Sweeper shutting down...")
×
UNCOV
480
        defer log.Debug("Sweeper shutdown complete")
×
UNCOV
481

×
UNCOV
482
        close(s.quit)
×
UNCOV
483
        s.wg.Wait()
×
UNCOV
484

×
UNCOV
485
        return nil
×
486
}
487

488
// NOTE: part of the `chainio.Consumer` interface.
489
func (s *UtxoSweeper) Name() string {
16✔
490
        return "UtxoSweeper"
16✔
491
}
16✔
492

493
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
494
// swept after the batch time window ends. A custom fee preference can be
495
// provided to determine what fee rate should be used for the input. Note that
496
// the input may not always be swept with this exact value, as its possible for
497
// it to be batched under the same transaction with other similar fee rate
498
// inputs.
499
//
500
// NOTE: Extreme care needs to be taken that input isn't changed externally.
501
// Because it is an interface and we don't know what is exactly behind it, we
502
// cannot make a local copy in sweeper.
503
//
504
// TODO(yy): make sure the caller is using the Result chan.
505
func (s *UtxoSweeper) SweepInput(inp input.Input,
UNCOV
506
        params Params) (chan Result, error) {
×
UNCOV
507

×
UNCOV
508
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
×
UNCOV
509
                inp.SignDesc() == nil {
×
510

×
511
                return nil, errors.New("nil input received")
×
512
        }
×
513

UNCOV
514
        absoluteTimeLock, _ := inp.RequiredLockTime()
×
NEW
515
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
×
UNCOV
516
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
×
UNCOV
517
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
×
UNCOV
518
                inp.BlocksToMaturity(), absoluteTimeLock,
×
UNCOV
519
                btcutil.Amount(inp.SignDesc().Output.Value),
×
UNCOV
520
                inp.UnconfParent(), params)
×
UNCOV
521

×
UNCOV
522
        sweeperInput := &sweepInputMessage{
×
UNCOV
523
                input:      inp,
×
UNCOV
524
                params:     params,
×
UNCOV
525
                resultChan: make(chan Result, 1),
×
UNCOV
526
        }
×
UNCOV
527

×
UNCOV
528
        // Deliver input to the main event loop.
×
UNCOV
529
        select {
×
UNCOV
530
        case s.newInputs <- sweeperInput:
×
531
        case <-s.quit:
×
532
                return nil, ErrSweeperShuttingDown
×
533
        }
534

UNCOV
535
        return sweeperInput.resultChan, nil
×
536
}
537

538
// removeConflictSweepDescendants removes any transactions from the wallet that
539
// spend outputs included in the passed outpoint set. This needs to be done in
540
// cases where we're not the only ones that can sweep an output, but there may
541
// exist unconfirmed spends that spend outputs created by a sweep transaction.
542
// The most common case for this is when someone sweeps our anchor outputs
543
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
544
// as a backend when a channel is force closed and anchor cpfp txns are
545
// created to bump the initial commitment transaction. In this case an anchor
546
// cpfp is broadcasted for up to 3 commitment transactions (local,
547
// remote-dangling, remote). Using neutrino all of those transactions will be
548
// accepted (the commitment tx will be different in all of those cases) and have
549
// to be removed as soon as one of them confirmes (they do have the same
550
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
551
// nodes do not signal invalid transactions anymore.
552
func (s *UtxoSweeper) removeConflictSweepDescendants(
UNCOV
553
        outpoints map[wire.OutPoint]struct{}) error {
×
UNCOV
554

×
UNCOV
555
        // Obtain all the past sweeps that we've done so far. We'll need these
×
UNCOV
556
        // to ensure that if the spendingTx spends any of the same inputs, then
×
UNCOV
557
        // we remove any transaction that may be spending those inputs from the
×
UNCOV
558
        // wallet.
×
UNCOV
559
        //
×
UNCOV
560
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
×
UNCOV
561
        // from the store?
×
UNCOV
562
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
×
UNCOV
563
        if err != nil {
×
564
                return err
×
565
        }
×
566

567
        // We'll now go through each past transaction we published during this
568
        // epoch and cross reference the spent inputs. If there're any inputs
569
        // in common with the inputs the spendingTx spent, then we'll remove
570
        // those.
571
        //
572
        // TODO(roasbeef): need to start to remove all transaction hashes after
573
        // every N blocks (assumed point of no return)
UNCOV
574
        for _, sweepHash := range pastSweepHashes {
×
UNCOV
575
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
×
UNCOV
576
                if err != nil {
×
577
                        return err
×
578
                }
×
579

580
                // Transaction wasn't found in the wallet, may have already
581
                // been replaced/removed.
UNCOV
582
                if sweepTx == nil {
×
UNCOV
583
                        // If it was removed, then we'll play it safe and mark
×
UNCOV
584
                        // it as no longer need to be rebroadcasted.
×
UNCOV
585
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
UNCOV
586
                        continue
×
587
                }
588

589
                // Check to see if this past sweep transaction spent any of the
590
                // same inputs as spendingTx.
UNCOV
591
                var isConflicting bool
×
UNCOV
592
                for _, txIn := range sweepTx.TxIn {
×
UNCOV
593
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
×
UNCOV
594
                                isConflicting = true
×
UNCOV
595
                                break
×
596
                        }
597
                }
598

UNCOV
599
                if !isConflicting {
×
UNCOV
600
                        continue
×
601
                }
602

603
                // If it is conflicting, then we'll signal the wallet to remove
604
                // all the transactions that are descendants of outputs created
605
                // by the sweepTx and the sweepTx itself.
UNCOV
606
                log.Debugf("Removing sweep txid=%v from wallet: %v",
×
UNCOV
607
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
×
UNCOV
608

×
UNCOV
609
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
×
UNCOV
610
                if err != nil {
×
611
                        log.Warnf("Unable to remove descendants: %v", err)
×
612
                }
×
613

614
                // If this transaction was conflicting, then we'll stop
615
                // rebroadcasting it in the background.
UNCOV
616
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
617
        }
618

UNCOV
619
        return nil
×
620
}
621

622
// collector is the sweeper main loop. It processes new inputs, spend
623
// notifications and counts down to publication of the sweep tx.
NEW
624
func (s *UtxoSweeper) collector() {
×
NEW
625
        defer s.wg.Done()
×
UNCOV
626

×
UNCOV
627
        for {
×
UNCOV
628
                // Clean inputs, which will remove inputs that are swept,
×
UNCOV
629
                // failed, or excluded from the sweeper and return inputs that
×
UNCOV
630
                // are either new or has been published but failed back, which
×
UNCOV
631
                // will be retried again here.
×
UNCOV
632
                s.updateSweeperInputs()
×
UNCOV
633

×
UNCOV
634
                select {
×
635
                // A new inputs is offered to the sweeper. We check to see if
636
                // we are already trying to sweep this input and if not, set up
637
                // a listener to spend and schedule a sweep.
UNCOV
638
                case input := <-s.newInputs:
×
UNCOV
639
                        err := s.handleNewInput(input)
×
UNCOV
640
                        if err != nil {
×
641
                                log.Criticalf("Unable to handle new input: %v",
×
642
                                        err)
×
643

×
644
                                return
×
645
                        }
×
646

647
                        // If this input is forced, we perform an sweep
648
                        // immediately.
649
                        //
650
                        // TODO(ziggie): Make sure when `immediate` is selected
651
                        // as a parameter that we only trigger the sweeping of
652
                        // this specific input rather than triggering the sweeps
653
                        // of all current pending inputs registered with the
654
                        // sweeper.
UNCOV
655
                        if input.params.Immediate {
×
UNCOV
656
                                inputs := s.updateSweeperInputs()
×
UNCOV
657
                                s.sweepPendingInputs(inputs)
×
UNCOV
658
                        }
×
659

660
                // A spend of one of our inputs is detected. Signal sweep
661
                // results to the caller(s).
UNCOV
662
                case spend := <-s.spendChan:
×
UNCOV
663
                        s.handleInputSpent(spend)
×
664

665
                // A new external request has been received to retrieve all of
666
                // the inputs we're currently attempting to sweep.
UNCOV
667
                case req := <-s.pendingSweepsReqs:
×
UNCOV
668
                        s.handlePendingSweepsReq(req)
×
669

670
                // A new external request has been received to bump the fee rate
671
                // of a given input.
UNCOV
672
                case req := <-s.updateReqs:
×
UNCOV
673
                        resultChan, err := s.handleUpdateReq(req)
×
UNCOV
674
                        req.responseChan <- &updateResp{
×
UNCOV
675
                                resultChan: resultChan,
×
UNCOV
676
                                err:        err,
×
UNCOV
677
                        }
×
UNCOV
678

×
UNCOV
679
                        // Perform an sweep immediately if asked.
×
UNCOV
680
                        if req.params.Immediate {
×
UNCOV
681
                                inputs := s.updateSweeperInputs()
×
UNCOV
682
                                s.sweepPendingInputs(inputs)
×
UNCOV
683
                        }
×
684

NEW
685
                case resp := <-s.bumpRespChan:
×
UNCOV
686
                        // Handle the bump event.
×
NEW
687
                        err := s.handleBumpEvent(resp)
×
UNCOV
688
                        if err != nil {
×
UNCOV
689
                                log.Errorf("Failed to handle bump event: %v",
×
UNCOV
690
                                        err)
×
UNCOV
691
                        }
×
692

693
                // A new block comes in, update the bestHeight, perform a check
694
                // over all pending inputs and publish sweeping txns if needed.
NEW
695
                case beat := <-s.BlockbeatChan:
×
UNCOV
696
                        // Update the sweeper to the best height.
×
NEW
697
                        s.currentHeight = beat.Height()
×
UNCOV
698

×
UNCOV
699
                        // Update the inputs with the latest height.
×
UNCOV
700
                        inputs := s.updateSweeperInputs()
×
UNCOV
701

×
UNCOV
702
                        log.Debugf("Received new block: height=%v, attempt "+
×
NEW
703
                                "sweeping %d inputs:\n%s",
×
NEW
704
                                s.currentHeight, len(inputs),
×
NEW
705
                                lnutils.NewLogClosure(func() string {
×
NEW
706
                                        inps := make(
×
NEW
707
                                                []input.Input, 0, len(inputs),
×
NEW
708
                                        )
×
NEW
709
                                        for _, in := range inputs {
×
NEW
710
                                                inps = append(inps, in)
×
NEW
711
                                        }
×
712

NEW
713
                                        return inputTypeSummary(inps)
×
714
                                }))
715

716
                        // Attempt to sweep any pending inputs.
UNCOV
717
                        s.sweepPendingInputs(inputs)
×
UNCOV
718

×
NEW
719
                        // Notify we've processed the block.
×
NEW
720
                        s.NotifyBlockProcessed(beat, nil)
×
721

UNCOV
722
                case <-s.quit:
×
UNCOV
723
                        return
×
724
                }
725
        }
726
}
727

728
// removeExclusiveGroup removes all inputs in the given exclusive group. This
729
// function is called when one of the exclusive group inputs has been spent. The
730
// other inputs won't ever be spendable and can be removed. This also prevents
731
// them from being part of future sweep transactions that would fail. In
732
// addition sweep transactions of those inputs will be removed from the wallet.
UNCOV
733
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
×
UNCOV
734
        for outpoint, input := range s.inputs {
×
UNCOV
735
                outpoint := outpoint
×
UNCOV
736

×
UNCOV
737
                // Skip inputs that aren't exclusive.
×
UNCOV
738
                if input.params.ExclusiveGroup == nil {
×
UNCOV
739
                        continue
×
740
                }
741

742
                // Skip inputs from other exclusive groups.
UNCOV
743
                if *input.params.ExclusiveGroup != group {
×
744
                        continue
×
745
                }
746

747
                // Skip inputs that are already terminated.
UNCOV
748
                if input.terminated() {
×
UNCOV
749
                        log.Tracef("Skipped sending error result for "+
×
UNCOV
750
                                "input %v, state=%v", outpoint, input.state)
×
UNCOV
751

×
UNCOV
752
                        continue
×
753
                }
754

755
                // Signal result channels.
UNCOV
756
                s.signalResult(input, Result{
×
UNCOV
757
                        Err: ErrExclusiveGroupSpend,
×
UNCOV
758
                })
×
UNCOV
759

×
UNCOV
760
                // Update the input's state as it can no longer be swept.
×
UNCOV
761
                input.state = Excluded
×
UNCOV
762

×
UNCOV
763
                // Remove all unconfirmed transactions from the wallet which
×
UNCOV
764
                // spend the passed outpoint of the same exclusive group.
×
UNCOV
765
                outpoints := map[wire.OutPoint]struct{}{
×
UNCOV
766
                        outpoint: {},
×
UNCOV
767
                }
×
UNCOV
768
                err := s.removeConflictSweepDescendants(outpoints)
×
UNCOV
769
                if err != nil {
×
770
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
771
                                "wallet for outpoint %v : %v", outpoint, err)
×
772
                }
×
773
        }
774
}
775

776
// signalResult notifies the listeners of the final result of the input sweep.
777
// It also cancels any pending spend notification.
778
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
7✔
779
        op := pi.OutPoint()
7✔
780
        listeners := pi.listeners
7✔
781

7✔
782
        if result.Err == nil {
9✔
783
                log.Tracef("Dispatching sweep success for %v to %v listeners",
2✔
784
                        op, len(listeners),
2✔
785
                )
2✔
786
        } else {
7✔
787
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
5✔
788
                        op, len(listeners), result.Err,
5✔
789
                )
5✔
790
        }
5✔
791

792
        // Signal all listeners. Channel is buffered. Because we only send once
793
        // on every channel, it should never block.
794
        for _, resultChan := range listeners {
7✔
UNCOV
795
                resultChan <- result
×
UNCOV
796
        }
×
797

798
        // Cancel spend notification with chain notifier. This is not necessary
799
        // in case of a success, except for that a reorg could still happen.
800
        if pi.ntfnRegCancel != nil {
7✔
UNCOV
801
                log.Debugf("Canceling spend ntfn for %v", op)
×
UNCOV
802

×
UNCOV
803
                pi.ntfnRegCancel()
×
UNCOV
804
        }
×
805
}
806

807
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
808
// the tx. The output address is only marked as used if the publish succeeds.
809
func (s *UtxoSweeper) sweep(set InputSet) error {
2✔
810
        // Generate an output script if there isn't an unused script available.
2✔
811
        if s.currentOutputScript.IsNone() {
3✔
812
                addr, err := s.cfg.GenSweepScript().Unpack()
1✔
813
                if err != nil {
1✔
814
                        return fmt.Errorf("gen sweep script: %w", err)
×
815
                }
×
816
                s.currentOutputScript = fn.Some(addr)
1✔
817
        }
818

819
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
2✔
820
                fmt.Errorf("none sweep script"),
2✔
821
        )
2✔
822
        if err != nil {
2✔
823
                return err
×
824
        }
×
825

826
        // Create a fee bump request and ask the publisher to broadcast it. The
827
        // publisher will then take over and start monitoring the tx for
828
        // potential fee bump.
829
        req := &BumpRequest{
2✔
830
                Inputs:          set.Inputs(),
2✔
831
                Budget:          set.Budget(),
2✔
832
                DeadlineHeight:  set.DeadlineHeight(),
2✔
833
                DeliveryAddress: sweepAddr,
2✔
834
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
2✔
835
                StartingFeeRate: set.StartingFeeRate(),
2✔
836
                Immediate:       set.Immediate(),
2✔
837
                // TODO(yy): pass the strategy here.
2✔
838
        }
2✔
839

2✔
840
        // Reschedule the inputs that we just tried to sweep. This is done in
2✔
841
        // case the following publish fails, we'd like to update the inputs'
2✔
842
        // publish attempts and rescue them in the next sweep.
2✔
843
        s.markInputsPendingPublish(set)
2✔
844

2✔
845
        // Broadcast will return a read-only chan that we will listen to for
2✔
846
        // this publish result and future RBF attempt.
2✔
847
        resp := s.cfg.Publisher.Broadcast(req)
2✔
848

2✔
849
        // Successfully sent the broadcast attempt, we now handle the result by
2✔
850
        // subscribing to the result chan and listen for future updates about
2✔
851
        // this tx.
2✔
852
        s.wg.Add(1)
2✔
853
        go s.monitorFeeBumpResult(set, resp)
2✔
854

2✔
855
        return nil
2✔
856
}
857

858
// markInputsPendingPublish updates the pending inputs with the given tx
859
// inputs. It also increments the `publishAttempts`.
860
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
861
        // Reschedule sweep.
3✔
862
        for _, input := range set.Inputs() {
6✔
863
                op := input.OutPoint()
3✔
864
                pi, ok := s.inputs[op]
3✔
865
                if !ok {
3✔
UNCOV
866
                        // It could be that this input is an additional wallet
×
UNCOV
867
                        // input that was attached. In that case there also
×
UNCOV
868
                        // isn't a pending input to update.
×
UNCOV
869
                        log.Tracef("Skipped marking input as pending "+
×
NEW
870
                                "published: %v not found in pending inputs", op)
×
UNCOV
871

×
UNCOV
872
                        continue
×
873
                }
874

875
                // If this input has already terminated, there's clearly
876
                // something wrong as it would have been removed. In this case
877
                // we log an error and skip marking this input as pending
878
                // publish.
879
                if pi.terminated() {
4✔
880
                        log.Errorf("Expect input %v to not have terminated "+
1✔
881
                                "state, instead it has %v", op, pi.state)
1✔
882

1✔
883
                        continue
1✔
884
                }
885

886
                // Update the input's state.
887
                pi.state = PendingPublish
2✔
888

2✔
889
                // Record another publish attempt.
2✔
890
                pi.publishAttempts++
2✔
891
        }
892
}
893

894
// markInputsPublished updates the sweeping tx in db and marks the list of
895
// inputs as published.
896
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
4✔
897
        // Mark this tx in db once successfully published.
4✔
898
        //
4✔
899
        // NOTE: this will behave as an overwrite, which is fine as the record
4✔
900
        // is small.
4✔
901
        tr.Published = true
4✔
902
        err := s.cfg.Store.StoreTx(tr)
4✔
903
        if err != nil {
5✔
904
                return fmt.Errorf("store tx: %w", err)
1✔
905
        }
1✔
906

907
        // Reschedule sweep.
908
        for _, input := range set.Inputs() {
7✔
909
                op := input.OutPoint()
4✔
910
                pi, ok := s.inputs[op]
4✔
911
                if !ok {
4✔
UNCOV
912
                        // It could be that this input is an additional wallet
×
UNCOV
913
                        // input that was attached. In that case there also
×
UNCOV
914
                        // isn't a pending input to update.
×
UNCOV
915
                        log.Tracef("Skipped marking input as published: %v "+
×
NEW
916
                                "not found in pending inputs", op)
×
UNCOV
917

×
UNCOV
918
                        continue
×
919
                }
920

921
                // Valdiate that the input is in an expected state.
922
                if pi.state != PendingPublish {
5✔
923
                        // We may get a Published if this is a replacement tx.
1✔
924
                        log.Debugf("Expect input %v to have %v, instead it "+
1✔
925
                                "has %v", op, PendingPublish, pi.state)
1✔
926

1✔
927
                        continue
1✔
928
                }
929

930
                // Update the input's state.
931
                pi.state = Published
3✔
932

3✔
933
                // Update the input's latest fee rate.
3✔
934
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
935
        }
936

937
        return nil
3✔
938
}
939

940
// markInputsPublishFailed marks the list of inputs as failed to be published.
941
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
2✔
942
        // Reschedule sweep.
2✔
943
        for _, inp := range set.Inputs() {
12✔
944
                op := inp.OutPoint()
10✔
945
                pi, ok := s.inputs[op]
10✔
946
                if !ok {
10✔
UNCOV
947
                        // It could be that this input is an additional wallet
×
UNCOV
948
                        // input that was attached. In that case there also
×
UNCOV
949
                        // isn't a pending input to update.
×
UNCOV
950
                        log.Tracef("Skipped marking input as publish failed: "+
×
UNCOV
951
                                "%v not found in pending inputs", op)
×
UNCOV
952

×
UNCOV
953
                        continue
×
954
                }
955

956
                // Valdiate that the input is in an expected state.
957
                if pi.state != PendingPublish && pi.state != Published {
15✔
958
                        log.Debugf("Expect input %v to have %v, instead it "+
5✔
959
                                "has %v", op, PendingPublish, pi.state)
5✔
960

5✔
961
                        continue
5✔
962
                }
963

964
                log.Warnf("Failed to publish input %v", op)
5✔
965

5✔
966
                // Update the input's state.
5✔
967
                pi.state = PublishFailed
5✔
968
        }
969
}
970

971
// monitorSpend registers a spend notification with the chain notifier. It
972
// returns a cancel function that can be used to cancel the registration.
973
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
UNCOV
974
        script []byte, heightHint uint32) (func(), error) {
×
UNCOV
975

×
UNCOV
976
        log.Tracef("Wait for spend of %v at heightHint=%v",
×
UNCOV
977
                outpoint, heightHint)
×
UNCOV
978

×
UNCOV
979
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
×
UNCOV
980
                &outpoint, script, heightHint,
×
UNCOV
981
        )
×
UNCOV
982
        if err != nil {
×
983
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
984
        }
×
985

UNCOV
986
        s.wg.Add(1)
×
UNCOV
987
        go func() {
×
UNCOV
988
                defer s.wg.Done()
×
UNCOV
989

×
UNCOV
990
                select {
×
UNCOV
991
                case spend, ok := <-spendEvent.Spend:
×
UNCOV
992
                        if !ok {
×
UNCOV
993
                                log.Debugf("Spend ntfn for %v canceled",
×
UNCOV
994
                                        outpoint)
×
UNCOV
995
                                return
×
UNCOV
996
                        }
×
997

UNCOV
998
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
×
UNCOV
999

×
UNCOV
1000
                        select {
×
UNCOV
1001
                        case s.spendChan <- spend:
×
UNCOV
1002
                                log.Debugf("Delivered spend ntfn for %v",
×
UNCOV
1003
                                        outpoint)
×
1004

UNCOV
1005
                        case <-s.quit:
×
1006
                        }
UNCOV
1007
                case <-s.quit:
×
1008
                }
1009
        }()
1010

UNCOV
1011
        return spendEvent.Cancel, nil
×
1012
}
1013

1014
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1015
// attempting to sweep.
1016
func (s *UtxoSweeper) PendingInputs() (
UNCOV
1017
        map[wire.OutPoint]*PendingInputResponse, error) {
×
UNCOV
1018

×
UNCOV
1019
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
×
UNCOV
1020
        errChan := make(chan error, 1)
×
UNCOV
1021
        select {
×
1022
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1023
                respChan: respChan,
1024
                errChan:  errChan,
UNCOV
1025
        }:
×
1026
        case <-s.quit:
×
1027
                return nil, ErrSweeperShuttingDown
×
1028
        }
1029

UNCOV
1030
        select {
×
UNCOV
1031
        case pendingSweeps := <-respChan:
×
UNCOV
1032
                return pendingSweeps, nil
×
1033
        case err := <-errChan:
×
1034
                return nil, err
×
1035
        case <-s.quit:
×
1036
                return nil, ErrSweeperShuttingDown
×
1037
        }
1038
}
1039

1040
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1041
// UtxoSweeper is attempting to sweep.
1042
func (s *UtxoSweeper) handlePendingSweepsReq(
UNCOV
1043
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
×
UNCOV
1044

×
UNCOV
1045
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
×
UNCOV
1046
        for _, inp := range s.inputs {
×
NEW
1047
                // Skip immature inputs for compatibility.
×
NEW
1048
                mature, _ := inp.isMature(uint32(s.currentHeight))
×
NEW
1049
                if !mature {
×
NEW
1050
                        continue
×
1051
                }
1052

1053
                // Only the exported fields are set, as we expect the response
1054
                // to only be consumed externally.
UNCOV
1055
                op := inp.OutPoint()
×
UNCOV
1056
                resps[op] = &PendingInputResponse{
×
UNCOV
1057
                        OutPoint:    op,
×
UNCOV
1058
                        WitnessType: inp.WitnessType(),
×
UNCOV
1059
                        Amount: btcutil.Amount(
×
UNCOV
1060
                                inp.SignDesc().Output.Value,
×
UNCOV
1061
                        ),
×
UNCOV
1062
                        LastFeeRate:       inp.lastFeeRate,
×
UNCOV
1063
                        BroadcastAttempts: inp.publishAttempts,
×
UNCOV
1064
                        Params:            inp.params,
×
UNCOV
1065
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
×
UNCOV
1066
                }
×
1067
        }
1068

UNCOV
1069
        select {
×
UNCOV
1070
        case req.respChan <- resps:
×
1071
        case <-s.quit:
×
1072
                log.Debug("Skipped sending pending sweep response due to " +
×
1073
                        "UtxoSweeper shutting down")
×
1074
        }
1075

UNCOV
1076
        return resps
×
1077
}
1078

1079
// UpdateParams allows updating the sweep parameters of a pending input in the
1080
// UtxoSweeper. This function can be used to provide an updated fee preference
1081
// and force flag that will be used for a new sweep transaction of the input
1082
// that will act as a replacement transaction (RBF) of the original sweeping
1083
// transaction, if any. The exclusive group is left unchanged.
1084
//
1085
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1086
// is actually successful. The responsibility of doing so should be handled by
1087
// the caller.
1088
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
UNCOV
1089
        params Params) (chan Result, error) {
×
UNCOV
1090

×
UNCOV
1091
        responseChan := make(chan *updateResp, 1)
×
UNCOV
1092
        select {
×
1093
        case s.updateReqs <- &updateReq{
1094
                input:        input,
1095
                params:       params,
1096
                responseChan: responseChan,
UNCOV
1097
        }:
×
1098
        case <-s.quit:
×
1099
                return nil, ErrSweeperShuttingDown
×
1100
        }
1101

UNCOV
1102
        select {
×
UNCOV
1103
        case response := <-responseChan:
×
UNCOV
1104
                return response.resultChan, response.err
×
1105
        case <-s.quit:
×
1106
                return nil, ErrSweeperShuttingDown
×
1107
        }
1108
}
1109

1110
// handleUpdateReq handles an update request by simply updating the sweep
1111
// parameters of the pending input. Currently, no validation is done on the new
1112
// fee preference to ensure it will properly create a replacement transaction.
1113
//
1114
// TODO(wilmer):
1115
//   - Validate fee preference to ensure we'll create a valid replacement
1116
//     transaction to allow the new fee rate to propagate throughout the
1117
//     network.
1118
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1119
//     did not exist in the original sweep transaction, resulting in an invalid
1120
//     replacement transaction.
1121
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
UNCOV
1122
        chan Result, error) {
×
UNCOV
1123

×
UNCOV
1124
        // If the UtxoSweeper is already trying to sweep this input, then we can
×
UNCOV
1125
        // simply just increase its fee rate. This will allow the input to be
×
UNCOV
1126
        // batched with others which also have a similar fee rate, creating a
×
UNCOV
1127
        // higher fee rate transaction that replaces the original input's
×
UNCOV
1128
        // sweeping transaction.
×
UNCOV
1129
        sweeperInput, ok := s.inputs[req.input]
×
UNCOV
1130
        if !ok {
×
1131
                return nil, lnwallet.ErrNotMine
×
1132
        }
×
1133

1134
        // Create the updated parameters struct. Leave the exclusive group
1135
        // unchanged.
UNCOV
1136
        newParams := Params{
×
UNCOV
1137
                StartingFeeRate: req.params.StartingFeeRate,
×
UNCOV
1138
                Immediate:       req.params.Immediate,
×
UNCOV
1139
                Budget:          req.params.Budget,
×
UNCOV
1140
                DeadlineHeight:  req.params.DeadlineHeight,
×
UNCOV
1141
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
×
UNCOV
1142
        }
×
UNCOV
1143

×
UNCOV
1144
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
×
UNCOV
1145
                req.input, sweeperInput.state, sweeperInput.params, newParams)
×
UNCOV
1146

×
UNCOV
1147
        sweeperInput.params = newParams
×
UNCOV
1148

×
UNCOV
1149
        // We need to reset the state so this input will be attempted again by
×
UNCOV
1150
        // our sweeper.
×
UNCOV
1151
        //
×
UNCOV
1152
        // TODO(yy): a dedicated state?
×
UNCOV
1153
        sweeperInput.state = Init
×
UNCOV
1154

×
UNCOV
1155
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1156
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1157
                sweeperInput.DeadlineHeight,
×
UNCOV
1158
        )
×
UNCOV
1159

×
UNCOV
1160
        resultChan := make(chan Result, 1)
×
UNCOV
1161
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
×
UNCOV
1162

×
UNCOV
1163
        return resultChan, nil
×
1164
}
1165

1166
// ListSweeps returns a list of the sweeps recorded by the sweep store.
UNCOV
1167
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
×
UNCOV
1168
        return s.cfg.Store.ListSweeps()
×
UNCOV
1169
}
×
1170

1171
// mempoolLookup takes an input's outpoint and queries the mempool to see
1172
// whether it's already been spent in a transaction found in the mempool.
1173
// Returns the transaction if found.
1174
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
7✔
1175
        // For neutrino backend, there's no mempool available, so we exit
7✔
1176
        // early.
7✔
1177
        if s.cfg.Mempool == nil {
8✔
1178
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1179

1✔
1180
                return fn.None[wire.MsgTx]()
1✔
1181
        }
1✔
1182

1183
        // Query this input in the mempool. If this outpoint is already spent
1184
        // in mempool, we should get a spending event back immediately.
1185
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
6✔
1186
}
1187

1188
// calculateDefaultDeadline calculates the default deadline height for a sweep
1189
// request that has no deadline height specified.
NEW
1190
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
×
UNCOV
1191
        // Create a default deadline height, which will be used when there's no
×
UNCOV
1192
        // DeadlineHeight specified for a given input.
×
UNCOV
1193
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
×
UNCOV
1194

×
NEW
1195
        // If the input is immature and has a locktime, we'll use the locktime
×
NEW
1196
        // height as the starting height.
×
NEW
1197
        matured, locktime := pi.isMature(uint32(s.currentHeight))
×
NEW
1198
        if !matured {
×
NEW
1199
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
×
NEW
1200
                log.Debugf("Input %v is immature, using locktime=%v instead "+
×
NEW
1201
                        "of current height=%d as starting height",
×
NEW
1202
                        pi.OutPoint(), locktime, s.currentHeight)
×
NEW
1203
        }
×
1204

NEW
1205
        return defaultDeadline
×
1206
}
1207

1208
// handleNewInput processes a new input by registering spend notification and
1209
// scheduling sweeping for it.
NEW
1210
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
×
UNCOV
1211
        outpoint := input.input.OutPoint()
×
UNCOV
1212
        pi, pending := s.inputs[outpoint]
×
UNCOV
1213
        if pending {
×
NEW
1214
                log.Infof("Already has pending input %v received, old params: "+
×
NEW
1215
                        "%v, new params %v", outpoint, pi.params, input.params)
×
UNCOV
1216

×
UNCOV
1217
                s.handleExistingInput(input, pi)
×
UNCOV
1218

×
UNCOV
1219
                return nil
×
UNCOV
1220
        }
×
1221

1222
        // This is a new input, and we want to query the mempool to see if this
1223
        // input has already been spent. If so, we'll start the input with
1224
        // state Published and attach the RBFInfo.
UNCOV
1225
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
×
UNCOV
1226

×
UNCOV
1227
        // Create a new pendingInput and initialize the listeners slice with
×
UNCOV
1228
        // the passed in result channel. If this input is offered for sweep
×
UNCOV
1229
        // again, the result channel will be appended to this slice.
×
UNCOV
1230
        pi = &SweeperInput{
×
UNCOV
1231
                state:     state,
×
UNCOV
1232
                listeners: []chan Result{input.resultChan},
×
UNCOV
1233
                Input:     input.input,
×
UNCOV
1234
                params:    input.params,
×
UNCOV
1235
                rbf:       rbfInfo,
×
UNCOV
1236
        }
×
UNCOV
1237

×
NEW
1238
        // Set the acutal deadline height.
×
NEW
1239
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
NEW
1240
                s.calculateDefaultDeadline(pi),
×
NEW
1241
        )
×
NEW
1242

×
UNCOV
1243
        s.inputs[outpoint] = pi
×
UNCOV
1244
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
×
UNCOV
1245

×
NEW
1246
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
×
NEW
1247
                "witness_type=%v, amount=%v, deadline=%d, params=(%v)",
×
NEW
1248
                s.currentHeight, pi.OutPoint(), pi.WitnessType(),
×
NEW
1249
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
×
NEW
1250
                pi.params)
×
NEW
1251

×
UNCOV
1252
        // Start watching for spend of this input, either by us or the remote
×
UNCOV
1253
        // party.
×
UNCOV
1254
        cancel, err := s.monitorSpend(
×
UNCOV
1255
                outpoint, input.input.SignDesc().Output.PkScript,
×
UNCOV
1256
                input.input.HeightHint(),
×
UNCOV
1257
        )
×
UNCOV
1258
        if err != nil {
×
1259
                err := fmt.Errorf("wait for spend: %w", err)
×
1260
                s.markInputFailed(pi, err)
×
1261

×
1262
                return err
×
1263
        }
×
1264

UNCOV
1265
        pi.ntfnRegCancel = cancel
×
UNCOV
1266

×
UNCOV
1267
        return nil
×
1268
}
1269

1270
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1271
// already been spent. If so, the state Published will be returned, otherwise
1272
// state Init. When spent, it will query the sweeper store to fetch the fee
1273
// info of the spending transction, and construct an RBFInfo based on it.
1274
// Suppose an error occurs, fn.None is returned.
1275
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1276
        SweepState, fn.Option[RBFInfo]) {
4✔
1277

4✔
1278
        // Check if we can find the spending tx of this input in mempool.
4✔
1279
        txOption := s.mempoolLookup(op)
4✔
1280

4✔
1281
        // Extract the spending tx from the option.
4✔
1282
        var tx *wire.MsgTx
4✔
1283
        txOption.WhenSome(func(t wire.MsgTx) {
7✔
1284
                tx = &t
3✔
1285
        })
3✔
1286

1287
        // Exit early if it's not found.
1288
        //
1289
        // NOTE: this is not accurate for backends that don't support mempool
1290
        // lookup:
1291
        // - for neutrino we don't have a mempool.
1292
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1293
        if tx == nil {
5✔
1294
                return Init, fn.None[RBFInfo]()
1✔
1295
        }
1✔
1296

1297
        // Otherwise the input is already spent in the mempool, so eventually
1298
        // we will return Published.
1299
        //
1300
        // We also need to update the RBF info for this input. If the sweeping
1301
        // transaction is broadcast by us, we can find the fee info in the
1302
        // sweeper store.
1303
        txid := tx.TxHash()
3✔
1304
        tr, err := s.cfg.Store.GetTx(txid)
3✔
1305

3✔
1306
        // If the tx is not found in the store, it means it's not broadcast by
3✔
1307
        // us, hence we can't find the fee info. This is fine as, later on when
3✔
1308
        // this tx is confirmed, we will remove the input from our inputs.
3✔
1309
        if errors.Is(err, ErrTxNotFound) {
4✔
1310
                log.Warnf("Spending tx %v not found in sweeper store", txid)
1✔
1311
                return Published, fn.None[RBFInfo]()
1✔
1312
        }
1✔
1313

1314
        // Exit if we get an db error.
1315
        if err != nil {
3✔
1316
                log.Errorf("Unable to get tx %v from sweeper store: %v",
1✔
1317
                        txid, err)
1✔
1318

1✔
1319
                return Published, fn.None[RBFInfo]()
1✔
1320
        }
1✔
1321

1322
        // Prepare the fee info and return it.
1323
        rbf := fn.Some(RBFInfo{
1✔
1324
                Txid:    txid,
1✔
1325
                Fee:     btcutil.Amount(tr.Fee),
1✔
1326
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
1✔
1327
        })
1✔
1328

1✔
1329
        return Published, rbf
1✔
1330
}
1331

1332
// handleExistingInput processes an input that is already known to the sweeper.
1333
// It will overwrite the params of the old input with the new ones.
1334
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
UNCOV
1335
        oldInput *SweeperInput) {
×
UNCOV
1336

×
UNCOV
1337
        // Before updating the input details, check if an exclusive group was
×
UNCOV
1338
        // set. In case the same input is registered again without an exclusive
×
UNCOV
1339
        // group set, the previous input and its sweep parameters are outdated
×
UNCOV
1340
        // hence need to be replaced. This scenario currently only happens for
×
UNCOV
1341
        // anchor outputs. When a channel is force closed, in the worst case 3
×
UNCOV
1342
        // different sweeps with the same exclusive group are registered with
×
UNCOV
1343
        // the sweeper to bump the closing transaction (cpfp) when its time
×
UNCOV
1344
        // critical. Receiving an input which was already registered with the
×
UNCOV
1345
        // sweeper but now without an exclusive group means non of the previous
×
UNCOV
1346
        // inputs were used as CPFP, so we need to make sure we update the
×
UNCOV
1347
        // sweep parameters but also remove all inputs with the same exclusive
×
UNCOV
1348
        // group because the are outdated too.
×
UNCOV
1349
        var prevExclGroup *uint64
×
UNCOV
1350
        if oldInput.params.ExclusiveGroup != nil &&
×
UNCOV
1351
                input.params.ExclusiveGroup == nil {
×
UNCOV
1352

×
UNCOV
1353
                prevExclGroup = new(uint64)
×
UNCOV
1354
                *prevExclGroup = *oldInput.params.ExclusiveGroup
×
UNCOV
1355
        }
×
1356

1357
        // Update input details and sweep parameters. The re-offered input
1358
        // details may contain a change to the unconfirmed parent tx info.
UNCOV
1359
        oldInput.params = input.params
×
UNCOV
1360
        oldInput.Input = input.input
×
UNCOV
1361

×
UNCOV
1362
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1363
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1364
                oldInput.DeadlineHeight,
×
UNCOV
1365
        )
×
UNCOV
1366

×
UNCOV
1367
        // Add additional result channel to signal spend of this input.
×
UNCOV
1368
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
×
UNCOV
1369

×
UNCOV
1370
        if prevExclGroup != nil {
×
UNCOV
1371
                s.removeExclusiveGroup(*prevExclGroup)
×
UNCOV
1372
        }
×
1373
}
1374

1375
// handleInputSpent takes a spend event of our input and updates the sweeper's
1376
// internal state to remove the input.
UNCOV
1377
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
×
UNCOV
1378
        // Query store to find out if we ever published this tx.
×
UNCOV
1379
        spendHash := *spend.SpenderTxHash
×
UNCOV
1380
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
×
UNCOV
1381
        if err != nil {
×
1382
                log.Errorf("cannot determine if tx %v is ours: %v",
×
1383
                        spendHash, err)
×
1384
                return
×
1385
        }
×
1386

1387
        // If this isn't our transaction, it means someone else swept outputs
1388
        // that we were attempting to sweep. This can happen for anchor outputs
1389
        // as well as justice transactions. In this case, we'll notify the
1390
        // wallet to remove any spends that descent from this output.
UNCOV
1391
        if !isOurTx {
×
UNCOV
1392
                // Construct a map of the inputs this transaction spends.
×
UNCOV
1393
                spendingTx := spend.SpendingTx
×
UNCOV
1394
                inputsSpent := make(
×
UNCOV
1395
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
×
UNCOV
1396
                )
×
UNCOV
1397
                for _, txIn := range spendingTx.TxIn {
×
UNCOV
1398
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
×
UNCOV
1399
                }
×
1400

UNCOV
1401
                log.Debugf("Attempting to remove descendant txns invalidated "+
×
UNCOV
1402
                        "by (txid=%v): %v", spendingTx.TxHash(),
×
UNCOV
1403
                        spew.Sdump(spendingTx))
×
UNCOV
1404

×
UNCOV
1405
                err := s.removeConflictSweepDescendants(inputsSpent)
×
UNCOV
1406
                if err != nil {
×
1407
                        log.Warnf("unable to remove descendant transactions "+
×
1408
                                "due to tx %v: ", spendHash)
×
1409
                }
×
1410

UNCOV
1411
                log.Debugf("Detected third party spend related to in flight "+
×
UNCOV
1412
                        "inputs (is_ours=%v): %v", isOurTx,
×
UNCOV
1413
                        lnutils.SpewLogClosure(spend.SpendingTx))
×
1414
        }
1415

1416
        // We now use the spending tx to update the state of the inputs.
UNCOV
1417
        s.markInputsSwept(spend.SpendingTx, isOurTx)
×
1418
}
1419

1420
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1421
// It will also notify all the subscribers of this input.
1422
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
1✔
1423
        for _, txIn := range tx.TxIn {
5✔
1424
                outpoint := txIn.PreviousOutPoint
4✔
1425

4✔
1426
                // Check if this input is known to us. It could probably be
4✔
1427
                // unknown if we canceled the registration, deleted from inputs
4✔
1428
                // map but the ntfn was in-flight already. Or this could be not
4✔
1429
                // one of our inputs.
4✔
1430
                input, ok := s.inputs[outpoint]
4✔
1431
                if !ok {
5✔
1432
                        // It's very likely that a spending tx contains inputs
1✔
1433
                        // that we don't know.
1✔
1434
                        log.Tracef("Skipped marking input as swept: %v not "+
1✔
1435
                                "found in pending inputs", outpoint)
1✔
1436

1✔
1437
                        continue
1✔
1438
                }
1439

1440
                // This input may already been marked as swept by a previous
1441
                // spend notification, which is likely to happen as one sweep
1442
                // transaction usually sweeps multiple inputs.
1443
                if input.terminated() {
4✔
1444
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1445
                                "state=%v", outpoint, input.state)
1✔
1446

1✔
1447
                        continue
1✔
1448
                }
1449

1450
                input.state = Swept
2✔
1451

2✔
1452
                // Return either a nil or a remote spend result.
2✔
1453
                var err error
2✔
1454
                if !isOurTx {
2✔
UNCOV
1455
                        log.Warnf("Input=%v was spent by remote or third "+
×
UNCOV
1456
                                "party in tx=%v", outpoint, tx.TxHash())
×
UNCOV
1457
                        err = ErrRemoteSpend
×
UNCOV
1458
                }
×
1459

1460
                // Signal result channels.
1461
                s.signalResult(input, Result{
2✔
1462
                        Tx:  tx,
2✔
1463
                        Err: err,
2✔
1464
                })
2✔
1465

2✔
1466
                // Remove all other inputs in this exclusive group.
2✔
1467
                if input.params.ExclusiveGroup != nil {
2✔
UNCOV
1468
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
×
UNCOV
1469
                }
×
1470
        }
1471
}
1472

1473
// markInputFailed marks the given input as failed and won't be retried. It
1474
// will also notify all the subscribers of this input.
1475
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
5✔
1476
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
5✔
1477

5✔
1478
        pi.state = Failed
5✔
1479

5✔
1480
        s.signalResult(pi, Result{Err: err})
5✔
1481
}
5✔
1482

1483
// updateSweeperInputs updates the sweeper's internal state and returns a map
1484
// of inputs to be swept. It will remove the inputs that are in final states,
1485
// and returns a map of inputs that have either state Init or PublishFailed.
1486
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
1✔
1487
        // Create a map of inputs to be swept.
1✔
1488
        inputs := make(InputsMap)
1✔
1489

1✔
1490
        // Iterate the pending inputs and update the sweeper's state.
1✔
1491
        //
1✔
1492
        // TODO(yy): sweeper is made to communicate via go channels, so no
1✔
1493
        // locks are needed to access the map. However, it'd be safer if we
1✔
1494
        // turn this inputs map into a SyncMap in case we wanna add concurrent
1✔
1495
        // access to the map in the future.
1✔
1496
        for op, input := range s.inputs {
10✔
1497
                log.Tracef("Checking input: %s, state=%v", input, input.state)
9✔
1498

9✔
1499
                // If the input has reached a final state, that it's either
9✔
1500
                // been swept, or failed, or excluded, we will remove it from
9✔
1501
                // our sweeper.
9✔
1502
                if input.terminated() {
12✔
1503
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1504
                                input.state, op)
3✔
1505

3✔
1506
                        delete(s.inputs, op)
3✔
1507

3✔
1508
                        continue
3✔
1509
                }
1510

1511
                // If this input has been included in a sweep tx that's not
1512
                // published yet, we'd skip this input and wait for the sweep
1513
                // tx to be published.
1514
                if input.state == PendingPublish {
7✔
1515
                        continue
1✔
1516
                }
1517

1518
                // If this input has already been published, we will need to
1519
                // check the RBF condition before attempting another sweeping.
1520
                if input.state == Published {
6✔
1521
                        continue
1✔
1522
                }
1523

1524
                // If the input has a locktime that's not yet reached, we will
1525
                // skip this input and wait for the locktime to be reached.
1526
                mature, locktime := input.isMature(uint32(s.currentHeight))
4✔
1527
                if !mature {
6✔
1528
                        log.Debugf("Skipping input %v due to locktime=%v not "+
2✔
1529
                                "reached, current height is %v", op, locktime,
2✔
1530
                                s.currentHeight)
2✔
1531

2✔
1532
                        continue
2✔
1533
                }
1534

1535
                // If this input is new or has been failed to be published,
1536
                // we'd retry it. The assumption here is that when an error is
1537
                // returned from `PublishTransaction`, it means the tx has
1538
                // failed to meet the policy, hence it's not in the mempool.
1539
                inputs[op] = input
2✔
1540
        }
1541

1542
        return inputs
1✔
1543
}
1544

1545
// sweepPendingInputs is called when the ticker fires. It will create clusters
1546
// and attempt to create and publish the sweeping transactions.
1547
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
1✔
1548
        log.Debugf("Sweeping %v inputs", len(inputs))
1✔
1549

1✔
1550
        // Cluster all of our inputs based on the specific Aggregator.
1✔
1551
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
1✔
1552

1✔
1553
        // sweepWithLock is a helper closure that executes the sweep within a
1✔
1554
        // coin select lock to prevent the coins being selected for other
1✔
1555
        // transactions like funding of a channel.
1✔
1556
        sweepWithLock := func(set InputSet) error {
2✔
1557
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
2✔
1558
                        // Try to add inputs from our wallet.
1✔
1559
                        err := set.AddWalletInputs(s.cfg.Wallet)
1✔
1560
                        if err != nil {
1✔
UNCOV
1561
                                return err
×
UNCOV
1562
                        }
×
1563

1564
                        // Create sweeping transaction for each set.
1565
                        err = s.sweep(set)
1✔
1566
                        if err != nil {
1✔
UNCOV
1567
                                return err
×
UNCOV
1568
                        }
×
1569

1570
                        return nil
1✔
1571
                })
1572
        }
1573

1574
        for _, set := range sets {
3✔
1575
                var err error
2✔
1576
                if set.NeedWalletInput() {
3✔
1577
                        // Sweep the set of inputs that need the wallet inputs.
1✔
1578
                        err = sweepWithLock(set)
1✔
1579
                } else {
2✔
1580
                        // Sweep the set of inputs that don't need the wallet
1✔
1581
                        // inputs.
1✔
1582
                        err = s.sweep(set)
1✔
1583
                }
1✔
1584

1585
                if err != nil {
2✔
UNCOV
1586
                        log.Errorf("Failed to sweep %v: %v", set, err)
×
UNCOV
1587
                }
×
1588
        }
1589
}
1590

1591
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1592
// the inputs being used.
1593
type bumpResp struct {
1594
        // result is the result of the bump attempt returned from the fee
1595
        // bumper.
1596
        result *BumpResult
1597

1598
        // set is the input set that was used in the bump attempt.
1599
        set InputSet
1600
}
1601

1602
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1603
// future updates about the sweeping tx.
1604
//
1605
// NOTE: must run as a goroutine.
1606
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1607
        resultChan <-chan *BumpResult) {
6✔
1608

6✔
1609
        defer s.wg.Done()
6✔
1610

6✔
1611
        for {
13✔
1612
                select {
7✔
1613
                case r := <-resultChan:
3✔
1614
                        // Validate the result is valid.
3✔
1615
                        if err := r.Validate(); err != nil {
3✔
1616
                                log.Errorf("Received invalid result: %v", err)
×
1617
                                continue
×
1618
                        }
1619

1620
                        resp := &bumpResp{
3✔
1621
                                result: r,
3✔
1622
                                set:    set,
3✔
1623
                        }
3✔
1624

3✔
1625
                        // Send the result back to the main event loop.
3✔
1626
                        select {
3✔
1627
                        case s.bumpRespChan <- resp:
3✔
UNCOV
1628
                        case <-s.quit:
×
UNCOV
1629
                                log.Debug("Sweeper shutting down, skip " +
×
UNCOV
1630
                                        "sending bump result")
×
UNCOV
1631

×
UNCOV
1632
                                return
×
1633
                        }
1634

1635
                        // The sweeping tx has been confirmed, we can exit the
1636
                        // monitor now.
1637
                        //
1638
                        // TODO(yy): can instead remove the spend subscription
1639
                        // in sweeper and rely solely on this event to mark
1640
                        // inputs as Swept?
1641
                        if r.Event == TxConfirmed || r.Event == TxFailed {
5✔
1642
                                // Exit if the tx is failed to be created.
2✔
1643
                                if r.Tx == nil {
2✔
NEW
1644
                                        log.Debugf("Received %v for nil tx, "+
×
NEW
1645
                                                "exit monitor", r.Event)
×
NEW
1646

×
NEW
1647
                                        return
×
NEW
1648
                                }
×
1649

1650
                                log.Debugf("Received %v for sweep tx %v, exit "+
2✔
1651
                                        "fee bump monitor", r.Event,
2✔
1652
                                        r.Tx.TxHash())
2✔
1653

2✔
1654
                                // Cancel the rebroadcasting of the failed tx.
2✔
1655
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
2✔
1656

2✔
1657
                                return
2✔
1658
                        }
1659

1660
                case <-s.quit:
2✔
1661
                        log.Debugf("Sweeper shutting down, exit fee " +
2✔
1662
                                "bump handler")
2✔
1663

2✔
1664
                        return
2✔
1665
                }
1666
        }
1667
}
1668

1669
// handleBumpEventTxFailed handles the case where the tx has been failed to
1670
// publish.
1671
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
1✔
1672
        r := resp.result
1✔
1673
        tx, err := r.Tx, r.Err
1✔
1674

1✔
1675
        if tx != nil {
2✔
1676
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
1✔
1677
                        err)
1✔
1678
        }
1✔
1679

1680
        // NOTE: When marking the inputs as failed, we are using the input set
1681
        // instead of the inputs found in the tx. This is fine for current
1682
        // version of the sweeper because we always create a tx using ALL of
1683
        // the inputs specified by the set.
1684
        //
1685
        // TODO(yy): should we also remove the failed tx from db?
1686
        s.markInputsPublishFailed(resp.set)
1✔
1687
}
1688

1689
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1690
// replaced by a new one.
1691
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1692
        r := resp.result
3✔
1693
        oldTx := r.ReplacedTx
3✔
1694
        newTx := r.Tx
3✔
1695

3✔
1696
        // Prepare a new record to replace the old one.
3✔
1697
        tr := &TxRecord{
3✔
1698
                Txid:    newTx.TxHash(),
3✔
1699
                FeeRate: uint64(r.FeeRate),
3✔
1700
                Fee:     uint64(r.Fee),
3✔
1701
        }
3✔
1702

3✔
1703
        // Get the old record for logging purpose.
3✔
1704
        oldTxid := oldTx.TxHash()
3✔
1705
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1706
        if err != nil {
4✔
1707
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1708
                return err
1✔
1709
        }
1✔
1710

1711
        // Cancel the rebroadcasting of the replaced tx.
1712
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
2✔
1713

2✔
1714
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
2✔
1715
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
2✔
1716
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
2✔
1717

2✔
1718
        // The old sweeping tx has been replaced by a new one, we will update
2✔
1719
        // the tx record in the sweeper db.
2✔
1720
        //
2✔
1721
        // TODO(yy): we may also need to update the inputs in this tx to a new
2✔
1722
        // state. Suppose a replacing tx only spends a subset of the inputs
2✔
1723
        // here, we'd end up with the rest being marked as `Published` and
2✔
1724
        // won't be aggregated in the next sweep. Atm it's fine as we always
2✔
1725
        // RBF the same input set.
2✔
1726
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1727
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
1✔
1728
                return err
1✔
1729
        }
1✔
1730

1731
        // Mark the inputs as published using the replacing tx.
1732
        return s.markInputsPublished(tr, resp.set)
1✔
1733
}
1734

1735
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1736
// successfully published.
1737
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
1✔
1738
        r := resp.result
1✔
1739
        tx := r.Tx
1✔
1740
        tr := &TxRecord{
1✔
1741
                Txid:    tx.TxHash(),
1✔
1742
                FeeRate: uint64(r.FeeRate),
1✔
1743
                Fee:     uint64(r.Fee),
1✔
1744
        }
1✔
1745

1✔
1746
        // Inputs have been successfully published so we update their
1✔
1747
        // states.
1✔
1748
        err := s.markInputsPublished(tr, resp.set)
1✔
1749
        if err != nil {
1✔
1750
                return err
×
1751
        }
×
1752

1753
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
1✔
1754
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
1✔
1755

1✔
1756
        // If there's no error, remove the output script. Otherwise keep it so
1✔
1757
        // that it can be reused for the next transaction and causes no address
1✔
1758
        // inflation.
1✔
1759
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
1✔
1760

1✔
1761
        return nil
1✔
1762
}
1763

1764
// handleBumpEventTxFatal handles the case where there's an unexpected error
1765
// when creating or publishing the sweeping tx. In this case, the tx will be
1766
// removed from the sweeper store and the inputs will be marked as `Failed`,
1767
// which means they will not be retried.
1768
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1769
        r := resp.result
2✔
1770

2✔
1771
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1772
        // a broadcast error, it's likely there isn't a tx here.
2✔
1773
        if r.Tx != nil {
4✔
1774
                txid := r.Tx.TxHash()
2✔
1775
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
2✔
1776

2✔
1777
                // Remove the tx from the sweeper db if it exists.
2✔
1778
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
3✔
1779
                        return fmt.Errorf("delete tx record for %v: %w", txid,
1✔
1780
                                err)
1✔
1781
                }
1✔
1782
        }
1783

1784
        // Mark the inputs as failed.
1785
        s.markInputsFailed(resp.set, r.Err)
1✔
1786

1✔
1787
        return nil
1✔
1788
}
1789

1790
// markInputsFailed marks all inputs found in the tx as failed. It will also
1791
// notify all the subscribers of these inputs.
1792
func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) {
2✔
1793
        for _, inp := range set.Inputs() {
9✔
1794
                outpoint := inp.OutPoint()
7✔
1795

7✔
1796
                input, ok := s.inputs[outpoint]
7✔
1797
                if !ok {
7✔
NEW
1798
                        // It's very likely that a spending tx contains inputs
×
NEW
1799
                        // that we don't know.
×
NEW
1800
                        log.Tracef("Skipped marking input as failed: %v not "+
×
NEW
1801
                                "found in pending inputs", outpoint)
×
NEW
1802

×
NEW
1803
                        continue
×
1804
                }
1805

1806
                // If the input is already in a terminal state, we don't want
1807
                // to rewrite it, which also indicates an error as we only get
1808
                // an error event during the initial broadcast.
1809
                if input.terminated() {
10✔
1810
                        log.Errorf("Skipped marking input=%v as failed due to "+
3✔
1811
                                "unexpected state=%v", outpoint, input.state)
3✔
1812

3✔
1813
                        continue
3✔
1814
                }
1815

1816
                s.markInputFailed(input, err)
4✔
1817
        }
1818
}
1819

1820
// handleBumpEvent handles the result sent from the bumper based on its event
1821
// type.
1822
//
1823
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1824
// input's spending event, we don't need to do anything here.
1825
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
1✔
1826
        log.Debugf("Received bump result %v", r.result)
1✔
1827

1✔
1828
        switch r.result.Event {
1✔
1829
        // The tx has been published, we update the inputs' state and create a
1830
        // record to be stored in the sweeper db.
UNCOV
1831
        case TxPublished:
×
UNCOV
1832
                return s.handleBumpEventTxPublished(r)
×
1833

1834
        // The tx has failed, we update the inputs' state.
1835
        case TxFailed:
1✔
1836
                s.handleBumpEventTxFailed(r)
1✔
1837
                return nil
1✔
1838

1839
        // The tx has been replaced, we will remove the old tx and replace it
1840
        // with the new one.
UNCOV
1841
        case TxReplaced:
×
UNCOV
1842
                return s.handleBumpEventTxReplaced(r)
×
1843

1844
        // There's a fatal error in creating the tx, we will remove the tx from
1845
        // the sweeper db and mark the inputs as failed.
NEW
1846
        case TxFatal:
×
NEW
1847
                return s.handleBumpEventTxFatal(r)
×
1848
        }
1849

UNCOV
1850
        return nil
×
1851
}
1852

1853
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1854
//
1855
// NOTE: It is enough to check the txid because the sweeper will create
1856
// outpoints which solely belong to the internal LND wallet.
UNCOV
1857
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
×
UNCOV
1858
        found, err := s.cfg.Store.IsOurTx(op.Hash)
×
UNCOV
1859
        // In case there is an error fetching the transaction details from the
×
UNCOV
1860
        // sweeper store we assume the outpoint is still used by the sweeper
×
UNCOV
1861
        // (worst case scenario).
×
UNCOV
1862
        //
×
UNCOV
1863
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
×
UNCOV
1864
        // bucket.
×
UNCOV
1865
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
×
1866
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1867
                        "with: %v, we assume it is still in use by the sweeper",
×
1868
                        op.Hash, op.Index, err)
×
1869

×
1870
                return true
×
1871
        }
×
1872

UNCOV
1873
        return found
×
1874
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc