• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12343072627

15 Dec 2024 11:09PM UTC coverage: 57.504% (-1.1%) from 58.636%
12343072627

Pull #9315

github

yyforyongyu
contractcourt: offer outgoing htlc one block earlier before its expiry

We need to offer the outgoing htlc one block earlier to make sure when
the expiry height hits, the sweeper will not miss sweeping it in the
same block. This also means the outgoing contest resolver now only does
one thing - watch for preimage spend till height expiry-1, which can
easily be moved into the timeout resolver instead in the future.
Pull Request #9315: Implement `blockbeat`

1445 of 2007 new or added lines in 26 files covered. (72.0%)

19246 existing lines in 249 files now uncovered.

102342 of 177975 relevant lines covered (57.5%)

24772.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.35
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
49
        // with the same identifier from being batched together.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
UNCOV
72
func (p Params) String() string {
×
UNCOV
73
        deadline := "none"
×
UNCOV
74
        p.DeadlineHeight.WhenSome(func(d int32) {
×
UNCOV
75
                deadline = fmt.Sprintf("%d", d)
×
UNCOV
76
        })
×
77

UNCOV
78
        exclusiveGroup := "none"
×
UNCOV
79
        if p.ExclusiveGroup != nil {
×
UNCOV
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
×
UNCOV
81
        }
×
82

UNCOV
83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
×
UNCOV
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
×
UNCOV
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
×
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Failed is the state when a pending input has too many failed publish
123
        // atttempts or unknown broadcast error is returned.
124
        Failed
125
)
126

127
// String gives a human readable text for the sweep states.
UNCOV
128
func (s SweepState) String() string {
×
UNCOV
129
        switch s {
×
UNCOV
130
        case Init:
×
UNCOV
131
                return "Init"
×
132

UNCOV
133
        case PendingPublish:
×
UNCOV
134
                return "PendingPublish"
×
135

UNCOV
136
        case Published:
×
UNCOV
137
                return "Published"
×
138

UNCOV
139
        case PublishFailed:
×
UNCOV
140
                return "PublishFailed"
×
141

UNCOV
142
        case Swept:
×
UNCOV
143
                return "Swept"
×
144

UNCOV
145
        case Excluded:
×
UNCOV
146
                return "Excluded"
×
147

148
        case Failed:
×
149
                return "Failed"
×
150

151
        default:
×
152
                return "Unknown"
×
153
        }
154
}
155

156
// RBFInfo stores the information required to perform a RBF bump on a pending
157
// sweeping tx.
158
type RBFInfo struct {
159
        // Txid is the txid of the sweeping tx.
160
        Txid chainhash.Hash
161

162
        // FeeRate is the fee rate of the sweeping tx.
163
        FeeRate chainfee.SatPerKWeight
164

165
        // Fee is the total fee of the sweeping tx.
166
        Fee btcutil.Amount
167
}
168

169
// SweeperInput is created when an input reaches the main loop for the first
170
// time. It wraps the input and tracks all relevant state that is needed for
171
// sweeping.
172
type SweeperInput struct {
173
        input.Input
174

175
        // state tracks the current state of the input.
176
        state SweepState
177

178
        // listeners is a list of channels over which the final outcome of the
179
        // sweep needs to be broadcasted.
180
        listeners []chan Result
181

182
        // ntfnRegCancel is populated with a function that cancels the chain
183
        // notifier spend registration.
184
        ntfnRegCancel func()
185

186
        // publishAttempts records the number of attempts that have already been
187
        // made to sweep this tx.
188
        publishAttempts int
189

190
        // params contains the parameters that control the sweeping process.
191
        params Params
192

193
        // lastFeeRate is the most recent fee rate used for this input within a
194
        // transaction broadcast to the network.
195
        lastFeeRate chainfee.SatPerKWeight
196

197
        // rbf records the RBF constraints.
198
        rbf fn.Option[RBFInfo]
199

200
        // DeadlineHeight is the deadline height for this input. This is
201
        // different from the DeadlineHeight in its params as it's an actual
202
        // value than an option.
203
        DeadlineHeight int32
204
}
205

206
// String returns a human readable interpretation of the pending input.
207
func (p *SweeperInput) String() string {
26✔
208
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
26✔
209
}
26✔
210

211
// terminated returns a boolean indicating whether the input has reached a
212
// final state.
213
func (p *SweeperInput) terminated() bool {
22✔
214
        switch p.state {
22✔
215
        // If the input has reached a final state, that it's either
216
        // been swept, or failed, or excluded, we will remove it from
217
        // our sweeper.
218
        case Failed, Swept, Excluded:
8✔
219
                return true
8✔
220

221
        default:
14✔
222
                return false
14✔
223
        }
224
}
225

226
// isMature returns a boolean indicating whether the input has a timelock that
227
// has been reached or not. The locktime found is also returned.
228
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
4✔
229
        locktime, _ := p.RequiredLockTime()
4✔
230
        if currentHeight < locktime {
5✔
231
                log.Debugf("Input %v has locktime=%v, current height is %v",
1✔
232
                        p.OutPoint(), locktime, currentHeight)
1✔
233

1✔
234
                return false, locktime
1✔
235
        }
1✔
236

237
        // If the input has a CSV that's not yet reached, we will skip
238
        // this input and wait for the expiry.
239
        //
240
        // NOTE: We need to consider whether this input can be included in the
241
        // next block or not, which means the CSV will be checked against the
242
        // currentHeight plus one.
243
        locktime = p.BlocksToMaturity() + p.HeightHint()
3✔
244
        if currentHeight+1 < locktime {
4✔
245
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
1✔
246
                        "skipped sweeping", p.OutPoint(), locktime,
1✔
247
                        currentHeight)
1✔
248

1✔
249
                return false, locktime
1✔
250
        }
1✔
251

252
        return true, locktime
2✔
253
}
254

255
// InputsMap is a type alias for a set of pending inputs.
256
type InputsMap = map[wire.OutPoint]*SweeperInput
257

258
// pendingSweepsReq is an internal message we'll use to represent an external
259
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
260
// attempting to sweep.
261
type pendingSweepsReq struct {
262
        respChan chan map[wire.OutPoint]*PendingInputResponse
263
        errChan  chan error
264
}
265

266
// PendingInputResponse contains information about an input that is currently
267
// being swept by the UtxoSweeper.
268
type PendingInputResponse struct {
269
        // OutPoint is the identify outpoint of the input being swept.
270
        OutPoint wire.OutPoint
271

272
        // WitnessType is the witness type of the input being swept.
273
        WitnessType input.WitnessType
274

275
        // Amount is the amount of the input being swept.
276
        Amount btcutil.Amount
277

278
        // LastFeeRate is the most recent fee rate used for the input being
279
        // swept within a transaction broadcast to the network.
280
        LastFeeRate chainfee.SatPerKWeight
281

282
        // BroadcastAttempts is the number of attempts we've made to sweept the
283
        // input.
284
        BroadcastAttempts int
285

286
        // Params contains the sweep parameters for this pending request.
287
        Params Params
288

289
        // DeadlineHeight records the deadline height of this input.
290
        DeadlineHeight uint32
291
}
292

293
// updateReq is an internal message we'll use to represent an external caller's
294
// intent to update the sweep parameters of a given input.
295
type updateReq struct {
296
        input        wire.OutPoint
297
        params       Params
298
        responseChan chan *updateResp
299
}
300

301
// updateResp is an internal message we'll use to hand off the response of a
302
// updateReq from the UtxoSweeper's main event loop back to the caller.
303
type updateResp struct {
304
        resultChan chan Result
305
        err        error
306
}
307

308
// UtxoSweeper is responsible for sweeping outputs back into the wallet
309
type UtxoSweeper struct {
310
        started uint32 // To be used atomically.
311
        stopped uint32 // To be used atomically.
312

313
        // Embed the blockbeat consumer struct to get access to the method
314
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
315
        chainio.BeatConsumer
316

317
        cfg *UtxoSweeperConfig
318

319
        newInputs chan *sweepInputMessage
320
        spendChan chan *chainntnfs.SpendDetail
321

322
        // pendingSweepsReq is a channel that will be sent requests by external
323
        // callers in order to retrieve the set of pending inputs the
324
        // UtxoSweeper is attempting to sweep.
325
        pendingSweepsReqs chan *pendingSweepsReq
326

327
        // updateReqs is a channel that will be sent requests by external
328
        // callers who wish to bump the fee rate of a given input.
329
        updateReqs chan *updateReq
330

331
        // inputs is the total set of inputs the UtxoSweeper has been requested
332
        // to sweep.
333
        inputs InputsMap
334

335
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
336

337
        relayFeeRate chainfee.SatPerKWeight
338

339
        quit chan struct{}
340
        wg   sync.WaitGroup
341

342
        // currentHeight is the best known height of the main chain. This is
343
        // updated whenever a new block epoch is received.
344
        currentHeight int32
345

346
        // bumpRespChan is a channel that receives broadcast results from the
347
        // TxPublisher.
348
        bumpRespChan chan *bumpResp
349
}
350

351
// Compile-time check for the chainio.Consumer interface.
352
var _ chainio.Consumer = (*UtxoSweeper)(nil)
353

354
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
355
type UtxoSweeperConfig struct {
356
        // GenSweepScript generates a P2WKH script belonging to the wallet where
357
        // funds can be swept.
358
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
359

360
        // FeeEstimator is used when crafting sweep transactions to estimate
361
        // the necessary fee relative to the expected size of the sweep
362
        // transaction.
363
        FeeEstimator chainfee.Estimator
364

365
        // Wallet contains the wallet functions that sweeper requires.
366
        Wallet Wallet
367

368
        // Notifier is an instance of a chain notifier we'll use to watch for
369
        // certain on-chain events.
370
        Notifier chainntnfs.ChainNotifier
371

372
        // Mempool is the mempool watcher that will be used to query whether a
373
        // given input is already being spent by a transaction in the mempool.
374
        Mempool chainntnfs.MempoolWatcher
375

376
        // Store stores the published sweeper txes.
377
        Store SweeperStore
378

379
        // Signer is used by the sweeper to generate valid witnesses at the
380
        // time the incubated outputs need to be spent.
381
        Signer input.Signer
382

383
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
384
        // in a single sweep tx. If more need to be swept, multiple txes are
385
        // created and published.
386
        MaxInputsPerTx uint32
387

388
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
389
        MaxFeeRate chainfee.SatPerVByte
390

391
        // Aggregator is used to group inputs into clusters based on its
392
        // implemention-specific strategy.
393
        Aggregator UtxoAggregator
394

395
        // Publisher is used to publish the sweep tx crafted here and monitors
396
        // it for potential fee bumps.
397
        Publisher Bumper
398

399
        // NoDeadlineConfTarget is the conf target to use when sweeping
400
        // non-time-sensitive outputs.
401
        NoDeadlineConfTarget uint32
402
}
403

404
// Result is the struct that is pushed through the result channel. Callers can
405
// use this to be informed of the final sweep result. In case of a remote
406
// spend, Err will be ErrRemoteSpend.
407
type Result struct {
408
        // Err is the final result of the sweep. It is nil when the input is
409
        // swept successfully by us. ErrRemoteSpend is returned when another
410
        // party took the input.
411
        Err error
412

413
        // Tx is the transaction that spent the input.
414
        Tx *wire.MsgTx
415
}
416

417
// sweepInputMessage structs are used in the internal channel between the
418
// SweepInput call and the sweeper main loop.
419
type sweepInputMessage struct {
420
        input      input.Input
421
        params     Params
422
        resultChan chan Result
423
}
424

425
// New returns a new Sweeper instance.
426
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
16✔
427
        s := &UtxoSweeper{
16✔
428
                cfg:               cfg,
16✔
429
                newInputs:         make(chan *sweepInputMessage),
16✔
430
                spendChan:         make(chan *chainntnfs.SpendDetail),
16✔
431
                updateReqs:        make(chan *updateReq),
16✔
432
                pendingSweepsReqs: make(chan *pendingSweepsReq),
16✔
433
                quit:              make(chan struct{}),
16✔
434
                inputs:            make(InputsMap),
16✔
435
                bumpRespChan:      make(chan *bumpResp, 100),
16✔
436
        }
16✔
437

16✔
438
        // Mount the block consumer.
16✔
439
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
16✔
440

16✔
441
        return s
16✔
442
}
16✔
443

444
// Start starts the process of constructing and publish sweep txes.
NEW
445
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
×
UNCOV
446
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
×
447
                return nil
×
448
        }
×
449

UNCOV
450
        log.Info("Sweeper starting")
×
UNCOV
451

×
UNCOV
452
        // Retrieve relay fee for dust limit calculation. Assume that this will
×
UNCOV
453
        // not change from here on.
×
UNCOV
454
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
455

×
NEW
456
        // Set the current height.
×
NEW
457
        s.currentHeight = beat.Height()
×
UNCOV
458

×
UNCOV
459
        // Start sweeper main loop.
×
UNCOV
460
        s.wg.Add(1)
×
NEW
461
        go s.collector()
×
UNCOV
462

×
UNCOV
463
        return nil
×
464
}
465

466
// RelayFeePerKW returns the minimum fee rate required for transactions to be
467
// relayed.
468
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
469
        return s.relayFeeRate
×
470
}
×
471

472
// Stop stops sweeper from listening to block epochs and constructing sweep
473
// txes.
UNCOV
474
func (s *UtxoSweeper) Stop() error {
×
UNCOV
475
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
476
                return nil
×
477
        }
×
478

UNCOV
479
        log.Info("Sweeper shutting down...")
×
UNCOV
480
        defer log.Debug("Sweeper shutdown complete")
×
UNCOV
481

×
UNCOV
482
        close(s.quit)
×
UNCOV
483
        s.wg.Wait()
×
UNCOV
484

×
UNCOV
485
        return nil
×
486
}
487

488
// NOTE: part of the `chainio.Consumer` interface.
489
func (s *UtxoSweeper) Name() string {
16✔
490
        return "UtxoSweeper"
16✔
491
}
16✔
492

493
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
494
// swept after the batch time window ends. A custom fee preference can be
495
// provided to determine what fee rate should be used for the input. Note that
496
// the input may not always be swept with this exact value, as its possible for
497
// it to be batched under the same transaction with other similar fee rate
498
// inputs.
499
//
500
// NOTE: Extreme care needs to be taken that input isn't changed externally.
501
// Because it is an interface and we don't know what is exactly behind it, we
502
// cannot make a local copy in sweeper.
503
//
504
// TODO(yy): make sure the caller is using the Result chan.
505
func (s *UtxoSweeper) SweepInput(inp input.Input,
UNCOV
506
        params Params) (chan Result, error) {
×
UNCOV
507

×
UNCOV
508
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
×
UNCOV
509
                inp.SignDesc() == nil {
×
510

×
511
                return nil, errors.New("nil input received")
×
512
        }
×
513

UNCOV
514
        absoluteTimeLock, _ := inp.RequiredLockTime()
×
NEW
515
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
×
UNCOV
516
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
×
UNCOV
517
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
×
UNCOV
518
                inp.BlocksToMaturity(), absoluteTimeLock,
×
UNCOV
519
                btcutil.Amount(inp.SignDesc().Output.Value),
×
UNCOV
520
                inp.UnconfParent(), params)
×
UNCOV
521

×
UNCOV
522
        sweeperInput := &sweepInputMessage{
×
UNCOV
523
                input:      inp,
×
UNCOV
524
                params:     params,
×
UNCOV
525
                resultChan: make(chan Result, 1),
×
UNCOV
526
        }
×
UNCOV
527

×
UNCOV
528
        // Deliver input to the main event loop.
×
UNCOV
529
        select {
×
UNCOV
530
        case s.newInputs <- sweeperInput:
×
531
        case <-s.quit:
×
532
                return nil, ErrSweeperShuttingDown
×
533
        }
534

UNCOV
535
        return sweeperInput.resultChan, nil
×
536
}
537

538
// removeConflictSweepDescendants removes any transactions from the wallet that
539
// spend outputs included in the passed outpoint set. This needs to be done in
540
// cases where we're not the only ones that can sweep an output, but there may
541
// exist unconfirmed spends that spend outputs created by a sweep transaction.
542
// The most common case for this is when someone sweeps our anchor outputs
543
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
544
// as a backend when a channel is force closed and anchor cpfp txns are
545
// created to bump the initial commitment transaction. In this case an anchor
546
// cpfp is broadcasted for up to 3 commitment transactions (local,
547
// remote-dangling, remote). Using neutrino all of those transactions will be
548
// accepted (the commitment tx will be different in all of those cases) and have
549
// to be removed as soon as one of them confirmes (they do have the same
550
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
551
// nodes do not signal invalid transactions anymore.
552
func (s *UtxoSweeper) removeConflictSweepDescendants(
UNCOV
553
        outpoints map[wire.OutPoint]struct{}) error {
×
UNCOV
554

×
UNCOV
555
        // Obtain all the past sweeps that we've done so far. We'll need these
×
UNCOV
556
        // to ensure that if the spendingTx spends any of the same inputs, then
×
UNCOV
557
        // we remove any transaction that may be spending those inputs from the
×
UNCOV
558
        // wallet.
×
UNCOV
559
        //
×
UNCOV
560
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
×
UNCOV
561
        // from the store?
×
UNCOV
562
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
×
UNCOV
563
        if err != nil {
×
564
                return err
×
565
        }
×
566

567
        // We'll now go through each past transaction we published during this
568
        // epoch and cross reference the spent inputs. If there're any inputs
569
        // in common with the inputs the spendingTx spent, then we'll remove
570
        // those.
571
        //
572
        // TODO(roasbeef): need to start to remove all transaction hashes after
573
        // every N blocks (assumed point of no return)
UNCOV
574
        for _, sweepHash := range pastSweepHashes {
×
UNCOV
575
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
×
UNCOV
576
                if err != nil {
×
577
                        return err
×
578
                }
×
579

580
                // Transaction wasn't found in the wallet, may have already
581
                // been replaced/removed.
UNCOV
582
                if sweepTx == nil {
×
UNCOV
583
                        // If it was removed, then we'll play it safe and mark
×
UNCOV
584
                        // it as no longer need to be rebroadcasted.
×
UNCOV
585
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
UNCOV
586
                        continue
×
587
                }
588

589
                // Check to see if this past sweep transaction spent any of the
590
                // same inputs as spendingTx.
UNCOV
591
                var isConflicting bool
×
UNCOV
592
                for _, txIn := range sweepTx.TxIn {
×
UNCOV
593
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
×
UNCOV
594
                                isConflicting = true
×
UNCOV
595
                                break
×
596
                        }
597
                }
598

UNCOV
599
                if !isConflicting {
×
UNCOV
600
                        continue
×
601
                }
602

603
                // If it is conflicting, then we'll signal the wallet to remove
604
                // all the transactions that are descendants of outputs created
605
                // by the sweepTx and the sweepTx itself.
UNCOV
606
                log.Debugf("Removing sweep txid=%v from wallet: %v",
×
UNCOV
607
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
×
UNCOV
608

×
UNCOV
609
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
×
UNCOV
610
                if err != nil {
×
611
                        log.Warnf("Unable to remove descendants: %v", err)
×
612
                }
×
613

614
                // If this transaction was conflicting, then we'll stop
615
                // rebroadcasting it in the background.
UNCOV
616
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
617
        }
618

UNCOV
619
        return nil
×
620
}
621

622
// collector is the sweeper main loop. It processes new inputs, spend
623
// notifications and counts down to publication of the sweep tx.
NEW
624
func (s *UtxoSweeper) collector() {
×
NEW
625
        defer s.wg.Done()
×
UNCOV
626

×
UNCOV
627
        for {
×
UNCOV
628
                // Clean inputs, which will remove inputs that are swept,
×
UNCOV
629
                // failed, or excluded from the sweeper and return inputs that
×
UNCOV
630
                // are either new or has been published but failed back, which
×
UNCOV
631
                // will be retried again here.
×
UNCOV
632
                s.updateSweeperInputs()
×
UNCOV
633

×
UNCOV
634
                select {
×
635
                // A new inputs is offered to the sweeper. We check to see if
636
                // we are already trying to sweep this input and if not, set up
637
                // a listener to spend and schedule a sweep.
UNCOV
638
                case input := <-s.newInputs:
×
UNCOV
639
                        err := s.handleNewInput(input)
×
UNCOV
640
                        if err != nil {
×
641
                                log.Criticalf("Unable to handle new input: %v",
×
642
                                        err)
×
643

×
644
                                return
×
645
                        }
×
646

647
                        // If this input is forced, we perform an sweep
648
                        // immediately.
649
                        //
650
                        // TODO(ziggie): Make sure when `immediate` is selected
651
                        // as a parameter that we only trigger the sweeping of
652
                        // this specific input rather than triggering the sweeps
653
                        // of all current pending inputs registered with the
654
                        // sweeper.
UNCOV
655
                        if input.params.Immediate {
×
UNCOV
656
                                inputs := s.updateSweeperInputs()
×
UNCOV
657
                                s.sweepPendingInputs(inputs)
×
UNCOV
658
                        }
×
659

660
                // A spend of one of our inputs is detected. Signal sweep
661
                // results to the caller(s).
UNCOV
662
                case spend := <-s.spendChan:
×
UNCOV
663
                        s.handleInputSpent(spend)
×
664

665
                // A new external request has been received to retrieve all of
666
                // the inputs we're currently attempting to sweep.
UNCOV
667
                case req := <-s.pendingSweepsReqs:
×
UNCOV
668
                        s.handlePendingSweepsReq(req)
×
669

670
                // A new external request has been received to bump the fee rate
671
                // of a given input.
UNCOV
672
                case req := <-s.updateReqs:
×
UNCOV
673
                        resultChan, err := s.handleUpdateReq(req)
×
UNCOV
674
                        req.responseChan <- &updateResp{
×
UNCOV
675
                                resultChan: resultChan,
×
UNCOV
676
                                err:        err,
×
UNCOV
677
                        }
×
UNCOV
678

×
UNCOV
679
                        // Perform an sweep immediately if asked.
×
UNCOV
680
                        if req.params.Immediate {
×
UNCOV
681
                                inputs := s.updateSweeperInputs()
×
UNCOV
682
                                s.sweepPendingInputs(inputs)
×
UNCOV
683
                        }
×
684

NEW
685
                case resp := <-s.bumpRespChan:
×
UNCOV
686
                        // Handle the bump event.
×
NEW
687
                        err := s.handleBumpEvent(resp)
×
UNCOV
688
                        if err != nil {
×
UNCOV
689
                                log.Errorf("Failed to handle bump event: %v",
×
UNCOV
690
                                        err)
×
UNCOV
691
                        }
×
692

693
                // A new block comes in, update the bestHeight, perform a check
694
                // over all pending inputs and publish sweeping txns if needed.
NEW
695
                case beat := <-s.BlockbeatChan:
×
UNCOV
696
                        // Update the sweeper to the best height.
×
NEW
697
                        s.currentHeight = beat.Height()
×
UNCOV
698

×
UNCOV
699
                        // Update the inputs with the latest height.
×
UNCOV
700
                        inputs := s.updateSweeperInputs()
×
UNCOV
701

×
UNCOV
702
                        log.Debugf("Received new block: height=%v, attempt "+
×
NEW
703
                                "sweeping %d inputs:\n%s",
×
NEW
704
                                s.currentHeight, len(inputs),
×
NEW
705
                                lnutils.NewLogClosure(func() string {
×
NEW
706
                                        inps := make(
×
NEW
707
                                                []input.Input, 0, len(inputs),
×
NEW
708
                                        )
×
NEW
709
                                        for _, in := range inputs {
×
NEW
710
                                                inps = append(inps, in)
×
NEW
711
                                        }
×
712

NEW
713
                                        return inputTypeSummary(inps)
×
714
                                }))
715

716
                        // Attempt to sweep any pending inputs.
UNCOV
717
                        s.sweepPendingInputs(inputs)
×
UNCOV
718

×
NEW
719
                        // Notify we've processed the block.
×
NEW
720
                        s.NotifyBlockProcessed(beat, nil)
×
721

UNCOV
722
                case <-s.quit:
×
UNCOV
723
                        return
×
724
                }
725
        }
726
}
727

728
// removeExclusiveGroup removes all inputs in the given exclusive group. This
729
// function is called when one of the exclusive group inputs has been spent. The
730
// other inputs won't ever be spendable and can be removed. This also prevents
731
// them from being part of future sweep transactions that would fail. In
732
// addition sweep transactions of those inputs will be removed from the wallet.
UNCOV
733
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
×
UNCOV
734
        for outpoint, input := range s.inputs {
×
UNCOV
735
                outpoint := outpoint
×
UNCOV
736

×
UNCOV
737
                // Skip inputs that aren't exclusive.
×
UNCOV
738
                if input.params.ExclusiveGroup == nil {
×
UNCOV
739
                        continue
×
740
                }
741

742
                // Skip inputs from other exclusive groups.
UNCOV
743
                if *input.params.ExclusiveGroup != group {
×
744
                        continue
×
745
                }
746

747
                // Skip inputs that are already terminated.
UNCOV
748
                if input.terminated() {
×
UNCOV
749
                        log.Tracef("Skipped sending error result for "+
×
UNCOV
750
                                "input %v, state=%v", outpoint, input.state)
×
UNCOV
751

×
UNCOV
752
                        continue
×
753
                }
754

755
                // Signal result channels.
UNCOV
756
                s.signalResult(input, Result{
×
UNCOV
757
                        Err: ErrExclusiveGroupSpend,
×
UNCOV
758
                })
×
UNCOV
759

×
UNCOV
760
                // Update the input's state as it can no longer be swept.
×
UNCOV
761
                input.state = Excluded
×
UNCOV
762

×
UNCOV
763
                // Remove all unconfirmed transactions from the wallet which
×
UNCOV
764
                // spend the passed outpoint of the same exclusive group.
×
UNCOV
765
                outpoints := map[wire.OutPoint]struct{}{
×
UNCOV
766
                        outpoint: {},
×
UNCOV
767
                }
×
UNCOV
768
                err := s.removeConflictSweepDescendants(outpoints)
×
UNCOV
769
                if err != nil {
×
770
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
771
                                "wallet for outpoint %v : %v", outpoint, err)
×
772
                }
×
773
        }
774
}
775

776
// signalResult notifies the listeners of the final result of the input sweep.
777
// It also cancels any pending spend notification.
778
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
7✔
779
        op := pi.OutPoint()
7✔
780
        listeners := pi.listeners
7✔
781

7✔
782
        if result.Err == nil {
9✔
783
                log.Tracef("Dispatching sweep success for %v to %v listeners",
2✔
784
                        op, len(listeners),
2✔
785
                )
2✔
786
        } else {
7✔
787
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
5✔
788
                        op, len(listeners), result.Err,
5✔
789
                )
5✔
790
        }
5✔
791

792
        // Signal all listeners. Channel is buffered. Because we only send once
793
        // on every channel, it should never block.
794
        for _, resultChan := range listeners {
7✔
UNCOV
795
                resultChan <- result
×
UNCOV
796
        }
×
797

798
        // Cancel spend notification with chain notifier. This is not necessary
799
        // in case of a success, except for that a reorg could still happen.
800
        if pi.ntfnRegCancel != nil {
7✔
UNCOV
801
                log.Debugf("Canceling spend ntfn for %v", op)
×
UNCOV
802

×
UNCOV
803
                pi.ntfnRegCancel()
×
UNCOV
804
        }
×
805
}
806

807
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
808
// the tx. The output address is only marked as used if the publish succeeds.
809
func (s *UtxoSweeper) sweep(set InputSet) error {
2✔
810
        // Generate an output script if there isn't an unused script available.
2✔
811
        if s.currentOutputScript.IsNone() {
3✔
812
                addr, err := s.cfg.GenSweepScript().Unpack()
1✔
813
                if err != nil {
1✔
814
                        return fmt.Errorf("gen sweep script: %w", err)
×
815
                }
×
816
                s.currentOutputScript = fn.Some(addr)
1✔
817
        }
818

819
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
2✔
820
                fmt.Errorf("none sweep script"),
2✔
821
        )
2✔
822
        if err != nil {
2✔
823
                return err
×
824
        }
×
825

826
        // Create a fee bump request and ask the publisher to broadcast it. The
827
        // publisher will then take over and start monitoring the tx for
828
        // potential fee bump.
829
        req := &BumpRequest{
2✔
830
                Inputs:          set.Inputs(),
2✔
831
                Budget:          set.Budget(),
2✔
832
                DeadlineHeight:  set.DeadlineHeight(),
2✔
833
                DeliveryAddress: sweepAddr,
2✔
834
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
2✔
835
                StartingFeeRate: set.StartingFeeRate(),
2✔
836
                Immediate:       set.Immediate(),
2✔
837
                // TODO(yy): pass the strategy here.
2✔
838
        }
2✔
839

2✔
840
        // Reschedule the inputs that we just tried to sweep. This is done in
2✔
841
        // case the following publish fails, we'd like to update the inputs'
2✔
842
        // publish attempts and rescue them in the next sweep.
2✔
843
        s.markInputsPendingPublish(set)
2✔
844

2✔
845
        // Broadcast will return a read-only chan that we will listen to for
2✔
846
        // this publish result and future RBF attempt.
2✔
847
        resp := s.cfg.Publisher.Broadcast(req)
2✔
848

2✔
849
        // Successfully sent the broadcast attempt, we now handle the result by
2✔
850
        // subscribing to the result chan and listen for future updates about
2✔
851
        // this tx.
2✔
852
        s.wg.Add(1)
2✔
853
        go s.monitorFeeBumpResult(set, resp)
2✔
854

2✔
855
        return nil
2✔
856
}
857

858
// markInputsPendingPublish updates the pending inputs with the given tx
859
// inputs. It also increments the `publishAttempts`.
860
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
861
        // Reschedule sweep.
3✔
862
        for _, input := range set.Inputs() {
6✔
863
                op := input.OutPoint()
3✔
864
                pi, ok := s.inputs[op]
3✔
865
                if !ok {
3✔
UNCOV
866
                        // It could be that this input is an additional wallet
×
UNCOV
867
                        // input that was attached. In that case there also
×
UNCOV
868
                        // isn't a pending input to update.
×
UNCOV
869
                        log.Tracef("Skipped marking input as pending "+
×
NEW
870
                                "published: %v not found in pending inputs", op)
×
UNCOV
871

×
UNCOV
872
                        continue
×
873
                }
874

875
                // If this input has already terminated, there's clearly
876
                // something wrong as it would have been removed. In this case
877
                // we log an error and skip marking this input as pending
878
                // publish.
879
                if pi.terminated() {
4✔
880
                        log.Errorf("Expect input %v to not have terminated "+
1✔
881
                                "state, instead it has %v", op, pi.state)
1✔
882

1✔
883
                        continue
1✔
884
                }
885

886
                // Update the input's state.
887
                pi.state = PendingPublish
2✔
888

2✔
889
                // Record another publish attempt.
2✔
890
                pi.publishAttempts++
2✔
891
        }
892
}
893

894
// markInputsPublished updates the sweeping tx in db and marks the list of
895
// inputs as published.
896
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
4✔
897
        // Mark this tx in db once successfully published.
4✔
898
        //
4✔
899
        // NOTE: this will behave as an overwrite, which is fine as the record
4✔
900
        // is small.
4✔
901
        tr.Published = true
4✔
902
        err := s.cfg.Store.StoreTx(tr)
4✔
903
        if err != nil {
5✔
904
                return fmt.Errorf("store tx: %w", err)
1✔
905
        }
1✔
906

907
        // Reschedule sweep.
908
        for _, input := range set.Inputs() {
7✔
909
                op := input.OutPoint()
4✔
910
                pi, ok := s.inputs[op]
4✔
911
                if !ok {
4✔
UNCOV
912
                        // It could be that this input is an additional wallet
×
UNCOV
913
                        // input that was attached. In that case there also
×
UNCOV
914
                        // isn't a pending input to update.
×
UNCOV
915
                        log.Tracef("Skipped marking input as published: %v "+
×
NEW
916
                                "not found in pending inputs", op)
×
UNCOV
917

×
UNCOV
918
                        continue
×
919
                }
920

921
                // Valdiate that the input is in an expected state.
922
                if pi.state != PendingPublish {
5✔
923
                        // We may get a Published if this is a replacement tx.
1✔
924
                        log.Debugf("Expect input %v to have %v, instead it "+
1✔
925
                                "has %v", op, PendingPublish, pi.state)
1✔
926

1✔
927
                        continue
1✔
928
                }
929

930
                // Update the input's state.
931
                pi.state = Published
3✔
932

3✔
933
                // Update the input's latest fee rate.
3✔
934
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
935
        }
936

937
        return nil
3✔
938
}
939

940
// markInputsPublishFailed marks the list of inputs as failed to be published.
941
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
2✔
942
        // Reschedule sweep.
2✔
943
        for _, inp := range set.Inputs() {
12✔
944
                op := inp.OutPoint()
10✔
945
                pi, ok := s.inputs[op]
10✔
946
                if !ok {
10✔
UNCOV
947
                        // It could be that this input is an additional wallet
×
UNCOV
948
                        // input that was attached. In that case there also
×
UNCOV
949
                        // isn't a pending input to update.
×
UNCOV
950
                        log.Tracef("Skipped marking input as publish failed: "+
×
UNCOV
951
                                "%v not found in pending inputs", op)
×
UNCOV
952

×
UNCOV
953
                        continue
×
954
                }
955

956
                // Valdiate that the input is in an expected state.
957
                if pi.state != PendingPublish && pi.state != Published {
15✔
958
                        log.Debugf("Expect input %v to have %v, instead it "+
5✔
959
                                "has %v", op, PendingPublish, pi.state)
5✔
960

5✔
961
                        continue
5✔
962
                }
963

964
                log.Warnf("Failed to publish input %v", op)
5✔
965

5✔
966
                // Update the input's state.
5✔
967
                pi.state = PublishFailed
5✔
968
        }
969
}
970

971
// monitorSpend registers a spend notification with the chain notifier. It
972
// returns a cancel function that can be used to cancel the registration.
973
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
UNCOV
974
        script []byte, heightHint uint32) (func(), error) {
×
UNCOV
975

×
UNCOV
976
        log.Tracef("Wait for spend of %v at heightHint=%v",
×
UNCOV
977
                outpoint, heightHint)
×
UNCOV
978

×
UNCOV
979
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
×
UNCOV
980
                &outpoint, script, heightHint,
×
UNCOV
981
        )
×
UNCOV
982
        if err != nil {
×
983
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
984
        }
×
985

UNCOV
986
        s.wg.Add(1)
×
UNCOV
987
        go func() {
×
UNCOV
988
                defer s.wg.Done()
×
UNCOV
989

×
UNCOV
990
                select {
×
UNCOV
991
                case spend, ok := <-spendEvent.Spend:
×
UNCOV
992
                        if !ok {
×
UNCOV
993
                                log.Debugf("Spend ntfn for %v canceled",
×
UNCOV
994
                                        outpoint)
×
UNCOV
995
                                return
×
UNCOV
996
                        }
×
997

UNCOV
998
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
×
UNCOV
999

×
UNCOV
1000
                        select {
×
UNCOV
1001
                        case s.spendChan <- spend:
×
UNCOV
1002
                                log.Debugf("Delivered spend ntfn for %v",
×
UNCOV
1003
                                        outpoint)
×
1004

UNCOV
1005
                        case <-s.quit:
×
1006
                        }
UNCOV
1007
                case <-s.quit:
×
1008
                }
1009
        }()
1010

UNCOV
1011
        return spendEvent.Cancel, nil
×
1012
}
1013

1014
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1015
// attempting to sweep.
1016
func (s *UtxoSweeper) PendingInputs() (
UNCOV
1017
        map[wire.OutPoint]*PendingInputResponse, error) {
×
UNCOV
1018

×
UNCOV
1019
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
×
UNCOV
1020
        errChan := make(chan error, 1)
×
UNCOV
1021
        select {
×
1022
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1023
                respChan: respChan,
1024
                errChan:  errChan,
UNCOV
1025
        }:
×
1026
        case <-s.quit:
×
1027
                return nil, ErrSweeperShuttingDown
×
1028
        }
1029

UNCOV
1030
        select {
×
UNCOV
1031
        case pendingSweeps := <-respChan:
×
UNCOV
1032
                return pendingSweeps, nil
×
1033
        case err := <-errChan:
×
1034
                return nil, err
×
1035
        case <-s.quit:
×
1036
                return nil, ErrSweeperShuttingDown
×
1037
        }
1038
}
1039

1040
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1041
// UtxoSweeper is attempting to sweep.
1042
func (s *UtxoSweeper) handlePendingSweepsReq(
UNCOV
1043
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
×
UNCOV
1044

×
UNCOV
1045
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
×
UNCOV
1046
        for _, inp := range s.inputs {
×
NEW
1047
                // Skip immature inputs for compatibility.
×
NEW
1048
                mature, _ := inp.isMature(uint32(s.currentHeight))
×
NEW
1049
                if !mature {
×
NEW
1050
                        continue
×
1051
                }
1052

1053
                // Only the exported fields are set, as we expect the response
1054
                // to only be consumed externally.
UNCOV
1055
                op := inp.OutPoint()
×
UNCOV
1056
                resps[op] = &PendingInputResponse{
×
UNCOV
1057
                        OutPoint:    op,
×
UNCOV
1058
                        WitnessType: inp.WitnessType(),
×
UNCOV
1059
                        Amount: btcutil.Amount(
×
UNCOV
1060
                                inp.SignDesc().Output.Value,
×
UNCOV
1061
                        ),
×
UNCOV
1062
                        LastFeeRate:       inp.lastFeeRate,
×
UNCOV
1063
                        BroadcastAttempts: inp.publishAttempts,
×
UNCOV
1064
                        Params:            inp.params,
×
UNCOV
1065
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
×
UNCOV
1066
                }
×
1067
        }
1068

UNCOV
1069
        select {
×
UNCOV
1070
        case req.respChan <- resps:
×
1071
        case <-s.quit:
×
1072
                log.Debug("Skipped sending pending sweep response due to " +
×
1073
                        "UtxoSweeper shutting down")
×
1074
        }
1075

UNCOV
1076
        return resps
×
1077
}
1078

1079
// UpdateParams allows updating the sweep parameters of a pending input in the
1080
// UtxoSweeper. This function can be used to provide an updated fee preference
1081
// and force flag that will be used for a new sweep transaction of the input
1082
// that will act as a replacement transaction (RBF) of the original sweeping
1083
// transaction, if any. The exclusive group is left unchanged.
1084
//
1085
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1086
// is actually successful. The responsibility of doing so should be handled by
1087
// the caller.
1088
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
UNCOV
1089
        params Params) (chan Result, error) {
×
UNCOV
1090

×
UNCOV
1091
        responseChan := make(chan *updateResp, 1)
×
UNCOV
1092
        select {
×
1093
        case s.updateReqs <- &updateReq{
1094
                input:        input,
1095
                params:       params,
1096
                responseChan: responseChan,
UNCOV
1097
        }:
×
1098
        case <-s.quit:
×
1099
                return nil, ErrSweeperShuttingDown
×
1100
        }
1101

UNCOV
1102
        select {
×
UNCOV
1103
        case response := <-responseChan:
×
UNCOV
1104
                return response.resultChan, response.err
×
1105
        case <-s.quit:
×
1106
                return nil, ErrSweeperShuttingDown
×
1107
        }
1108
}
1109

1110
// handleUpdateReq handles an update request by simply updating the sweep
1111
// parameters of the pending input. Currently, no validation is done on the new
1112
// fee preference to ensure it will properly create a replacement transaction.
1113
//
1114
// TODO(wilmer):
1115
//   - Validate fee preference to ensure we'll create a valid replacement
1116
//     transaction to allow the new fee rate to propagate throughout the
1117
//     network.
1118
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1119
//     did not exist in the original sweep transaction, resulting in an invalid
1120
//     replacement transaction.
1121
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
UNCOV
1122
        chan Result, error) {
×
UNCOV
1123

×
UNCOV
1124
        // If the UtxoSweeper is already trying to sweep this input, then we can
×
UNCOV
1125
        // simply just increase its fee rate. This will allow the input to be
×
UNCOV
1126
        // batched with others which also have a similar fee rate, creating a
×
UNCOV
1127
        // higher fee rate transaction that replaces the original input's
×
UNCOV
1128
        // sweeping transaction.
×
UNCOV
1129
        sweeperInput, ok := s.inputs[req.input]
×
UNCOV
1130
        if !ok {
×
1131
                return nil, lnwallet.ErrNotMine
×
1132
        }
×
1133

1134
        // Create the updated parameters struct. Leave the exclusive group
1135
        // unchanged.
UNCOV
1136
        newParams := Params{
×
UNCOV
1137
                StartingFeeRate: req.params.StartingFeeRate,
×
UNCOV
1138
                Immediate:       req.params.Immediate,
×
UNCOV
1139
                Budget:          req.params.Budget,
×
UNCOV
1140
                DeadlineHeight:  req.params.DeadlineHeight,
×
UNCOV
1141
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
×
UNCOV
1142
        }
×
UNCOV
1143

×
UNCOV
1144
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
×
UNCOV
1145
                req.input, sweeperInput.state, sweeperInput.params, newParams)
×
UNCOV
1146

×
UNCOV
1147
        sweeperInput.params = newParams
×
UNCOV
1148

×
UNCOV
1149
        // We need to reset the state so this input will be attempted again by
×
UNCOV
1150
        // our sweeper.
×
UNCOV
1151
        //
×
UNCOV
1152
        // TODO(yy): a dedicated state?
×
UNCOV
1153
        sweeperInput.state = Init
×
UNCOV
1154

×
UNCOV
1155
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1156
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1157
                sweeperInput.DeadlineHeight,
×
UNCOV
1158
        )
×
UNCOV
1159

×
UNCOV
1160
        resultChan := make(chan Result, 1)
×
UNCOV
1161
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
×
UNCOV
1162

×
UNCOV
1163
        return resultChan, nil
×
1164
}
1165

1166
// ListSweeps returns a list of the sweeps recorded by the sweep store.
UNCOV
1167
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
×
UNCOV
1168
        return s.cfg.Store.ListSweeps()
×
UNCOV
1169
}
×
1170

1171
// mempoolLookup takes an input's outpoint and queries the mempool to see
1172
// whether it's already been spent in a transaction found in the mempool.
1173
// Returns the transaction if found.
1174
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
7✔
1175
        // For neutrino backend, there's no mempool available, so we exit
7✔
1176
        // early.
7✔
1177
        if s.cfg.Mempool == nil {
8✔
1178
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1179

1✔
1180
                return fn.None[wire.MsgTx]()
1✔
1181
        }
1✔
1182

1183
        // Query this input in the mempool. If this outpoint is already spent
1184
        // in mempool, we should get a spending event back immediately.
1185
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
6✔
1186
}
1187

1188
// calculateDefaultDeadline calculates the default deadline height for a sweep
1189
// request that has no deadline height specified.
NEW
1190
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
×
UNCOV
1191
        // Create a default deadline height, which will be used when there's no
×
UNCOV
1192
        // DeadlineHeight specified for a given input.
×
UNCOV
1193
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
×
UNCOV
1194

×
NEW
1195
        // If the input is immature and has a locktime, we'll use the locktime
×
NEW
1196
        // height as the starting height.
×
NEW
1197
        matured, locktime := pi.isMature(uint32(s.currentHeight))
×
NEW
1198
        if !matured {
×
NEW
1199
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
×
NEW
1200
                log.Debugf("Input %v is immature, using locktime=%v instead "+
×
NEW
1201
                        "of current height=%d as starting height",
×
NEW
1202
                        pi.OutPoint(), locktime, s.currentHeight)
×
NEW
1203
        }
×
1204

NEW
1205
        return defaultDeadline
×
1206
}
1207

1208
// handleNewInput processes a new input by registering spend notification and
1209
// scheduling sweeping for it.
NEW
1210
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
×
UNCOV
1211
        outpoint := input.input.OutPoint()
×
UNCOV
1212
        pi, pending := s.inputs[outpoint]
×
UNCOV
1213
        if pending {
×
NEW
1214
                log.Infof("Already has pending input %v received, old params: "+
×
NEW
1215
                        "%v, new params %v", outpoint, pi.params, input.params)
×
UNCOV
1216

×
UNCOV
1217
                s.handleExistingInput(input, pi)
×
UNCOV
1218

×
UNCOV
1219
                return nil
×
UNCOV
1220
        }
×
1221

1222
        // This is a new input, and we want to query the mempool to see if this
1223
        // input has already been spent. If so, we'll start the input with
1224
        // state Published and attach the RBFInfo.
UNCOV
1225
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
×
UNCOV
1226

×
UNCOV
1227
        // Create a new pendingInput and initialize the listeners slice with
×
UNCOV
1228
        // the passed in result channel. If this input is offered for sweep
×
UNCOV
1229
        // again, the result channel will be appended to this slice.
×
UNCOV
1230
        pi = &SweeperInput{
×
UNCOV
1231
                state:     state,
×
UNCOV
1232
                listeners: []chan Result{input.resultChan},
×
UNCOV
1233
                Input:     input.input,
×
UNCOV
1234
                params:    input.params,
×
UNCOV
1235
                rbf:       rbfInfo,
×
UNCOV
1236
        }
×
UNCOV
1237

×
NEW
1238
        // Set the acutal deadline height.
×
NEW
1239
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
NEW
1240
                s.calculateDefaultDeadline(pi),
×
NEW
1241
        )
×
NEW
1242

×
UNCOV
1243
        s.inputs[outpoint] = pi
×
UNCOV
1244
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
×
UNCOV
1245

×
NEW
1246
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
×
NEW
1247
                "witness_type=%v, amount=%v, deadline=%d, params=(%v)",
×
NEW
1248
                s.currentHeight, pi.OutPoint(), pi.WitnessType(),
×
NEW
1249
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
×
NEW
1250
                pi.params)
×
NEW
1251

×
UNCOV
1252
        // Start watching for spend of this input, either by us or the remote
×
UNCOV
1253
        // party.
×
UNCOV
1254
        cancel, err := s.monitorSpend(
×
UNCOV
1255
                outpoint, input.input.SignDesc().Output.PkScript,
×
UNCOV
1256
                input.input.HeightHint(),
×
UNCOV
1257
        )
×
UNCOV
1258
        if err != nil {
×
1259
                err := fmt.Errorf("wait for spend: %w", err)
×
1260
                s.markInputFailed(pi, err)
×
1261

×
1262
                return err
×
1263
        }
×
1264

UNCOV
1265
        pi.ntfnRegCancel = cancel
×
UNCOV
1266

×
UNCOV
1267
        return nil
×
1268
}
1269

1270
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1271
// already been spent. If so, the state Published will be returned, otherwise
1272
// state Init. When spent, it will query the sweeper store to fetch the fee
1273
// info of the spending transction, and construct an RBFInfo based on it.
1274
// Suppose an error occurs, fn.None is returned.
1275
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1276
        SweepState, fn.Option[RBFInfo]) {
4✔
1277

4✔
1278
        // Check if we can find the spending tx of this input in mempool.
4✔
1279
        txOption := s.mempoolLookup(op)
4✔
1280

4✔
1281
        // Extract the spending tx from the option.
4✔
1282
        var tx *wire.MsgTx
4✔
1283
        txOption.WhenSome(func(t wire.MsgTx) {
7✔
1284
                tx = &t
3✔
1285
        })
3✔
1286

1287
        // Exit early if it's not found.
1288
        //
1289
        // NOTE: this is not accurate for backends that don't support mempool
1290
        // lookup:
1291
        // - for neutrino we don't have a mempool.
1292
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1293
        if tx == nil {
5✔
1294
                return Init, fn.None[RBFInfo]()
1✔
1295
        }
1✔
1296

1297
        // Otherwise the input is already spent in the mempool, so eventually
1298
        // we will return Published.
1299
        //
1300
        // We also need to update the RBF info for this input. If the sweeping
1301
        // transaction is broadcast by us, we can find the fee info in the
1302
        // sweeper store.
1303
        txid := tx.TxHash()
3✔
1304
        tr, err := s.cfg.Store.GetTx(txid)
3✔
1305

3✔
1306
        // If the tx is not found in the store, it means it's not broadcast by
3✔
1307
        // us, hence we can't find the fee info. This is fine as, later on when
3✔
1308
        // this tx is confirmed, we will remove the input from our inputs.
3✔
1309
        if errors.Is(err, ErrTxNotFound) {
4✔
1310
                log.Warnf("Spending tx %v not found in sweeper store", txid)
1✔
1311
                return Published, fn.None[RBFInfo]()
1✔
1312
        }
1✔
1313

1314
        // Exit if we get an db error.
1315
        if err != nil {
3✔
1316
                log.Errorf("Unable to get tx %v from sweeper store: %v",
1✔
1317
                        txid, err)
1✔
1318

1✔
1319
                return Published, fn.None[RBFInfo]()
1✔
1320
        }
1✔
1321

1322
        // Prepare the fee info and return it.
1323
        rbf := fn.Some(RBFInfo{
1✔
1324
                Txid:    txid,
1✔
1325
                Fee:     btcutil.Amount(tr.Fee),
1✔
1326
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
1✔
1327
        })
1✔
1328

1✔
1329
        return Published, rbf
1✔
1330
}
1331

1332
// handleExistingInput processes an input that is already known to the sweeper.
1333
// It will overwrite the params of the old input with the new ones.
1334
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
UNCOV
1335
        oldInput *SweeperInput) {
×
UNCOV
1336

×
UNCOV
1337
        // Before updating the input details, check if an exclusive group was
×
UNCOV
1338
        // set. In case the same input is registered again without an exclusive
×
UNCOV
1339
        // group set, the previous input and its sweep parameters are outdated
×
UNCOV
1340
        // hence need to be replaced. This scenario currently only happens for
×
UNCOV
1341
        // anchor outputs. When a channel is force closed, in the worst case 3
×
UNCOV
1342
        // different sweeps with the same exclusive group are registered with
×
UNCOV
1343
        // the sweeper to bump the closing transaction (cpfp) when its time
×
UNCOV
1344
        // critical. Receiving an input which was already registered with the
×
UNCOV
1345
        // sweeper but now without an exclusive group means non of the previous
×
UNCOV
1346
        // inputs were used as CPFP, so we need to make sure we update the
×
UNCOV
1347
        // sweep parameters but also remove all inputs with the same exclusive
×
UNCOV
1348
        // group because the are outdated too.
×
UNCOV
1349
        var prevExclGroup *uint64
×
UNCOV
1350
        if oldInput.params.ExclusiveGroup != nil &&
×
UNCOV
1351
                input.params.ExclusiveGroup == nil {
×
UNCOV
1352

×
UNCOV
1353
                prevExclGroup = new(uint64)
×
UNCOV
1354
                *prevExclGroup = *oldInput.params.ExclusiveGroup
×
UNCOV
1355
        }
×
1356

1357
        // Update input details and sweep parameters. The re-offered input
1358
        // details may contain a change to the unconfirmed parent tx info.
UNCOV
1359
        oldInput.params = input.params
×
UNCOV
1360
        oldInput.Input = input.input
×
UNCOV
1361

×
UNCOV
1362
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1363
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1364
                oldInput.DeadlineHeight,
×
UNCOV
1365
        )
×
UNCOV
1366

×
UNCOV
1367
        // Add additional result channel to signal spend of this input.
×
UNCOV
1368
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
×
UNCOV
1369

×
UNCOV
1370
        if prevExclGroup != nil {
×
UNCOV
1371
                s.removeExclusiveGroup(*prevExclGroup)
×
UNCOV
1372
        }
×
1373
}
1374

1375
// handleInputSpent takes a spend event of our input and updates the sweeper's
1376
// internal state to remove the input.
UNCOV
1377
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
×
UNCOV
1378
        // Query store to find out if we ever published this tx.
×
UNCOV
1379
        spendHash := *spend.SpenderTxHash
×
UNCOV
1380
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
×
UNCOV
1381
        if err != nil {
×
1382
                log.Errorf("cannot determine if tx %v is ours: %v",
×
1383
                        spendHash, err)
×
1384
                return
×
1385
        }
×
1386

1387
        // If this isn't our transaction, it means someone else swept outputs
1388
        // that we were attempting to sweep. This can happen for anchor outputs
1389
        // as well as justice transactions. In this case, we'll notify the
1390
        // wallet to remove any spends that descent from this output.
UNCOV
1391
        if !isOurTx {
×
UNCOV
1392
                // Construct a map of the inputs this transaction spends.
×
UNCOV
1393
                spendingTx := spend.SpendingTx
×
UNCOV
1394
                inputsSpent := make(
×
UNCOV
1395
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
×
UNCOV
1396
                )
×
UNCOV
1397
                for _, txIn := range spendingTx.TxIn {
×
UNCOV
1398
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
×
UNCOV
1399
                }
×
1400

UNCOV
1401
                log.Debugf("Attempting to remove descendant txns invalidated "+
×
UNCOV
1402
                        "by (txid=%v): %v", spendingTx.TxHash(),
×
UNCOV
1403
                        spew.Sdump(spendingTx))
×
UNCOV
1404

×
UNCOV
1405
                err := s.removeConflictSweepDescendants(inputsSpent)
×
UNCOV
1406
                if err != nil {
×
1407
                        log.Warnf("unable to remove descendant transactions "+
×
1408
                                "due to tx %v: ", spendHash)
×
1409
                }
×
1410

UNCOV
1411
                log.Debugf("Detected third party spend related to in flight "+
×
UNCOV
1412
                        "inputs (is_ours=%v): %v", isOurTx,
×
UNCOV
1413
                        lnutils.SpewLogClosure(spend.SpendingTx))
×
1414
        }
1415

1416
        // We now use the spending tx to update the state of the inputs.
UNCOV
1417
        s.markInputsSwept(spend.SpendingTx, isOurTx)
×
1418
}
1419

1420
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1421
// It will also notify all the subscribers of this input.
1422
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
1✔
1423
        for _, txIn := range tx.TxIn {
5✔
1424
                outpoint := txIn.PreviousOutPoint
4✔
1425

4✔
1426
                // Check if this input is known to us. It could probably be
4✔
1427
                // unknown if we canceled the registration, deleted from inputs
4✔
1428
                // map but the ntfn was in-flight already. Or this could be not
4✔
1429
                // one of our inputs.
4✔
1430
                input, ok := s.inputs[outpoint]
4✔
1431
                if !ok {
5✔
1432
                        // It's very likely that a spending tx contains inputs
1✔
1433
                        // that we don't know.
1✔
1434
                        log.Tracef("Skipped marking input as swept: %v not "+
1✔
1435
                                "found in pending inputs", outpoint)
1✔
1436

1✔
1437
                        continue
1✔
1438
                }
1439

1440
                // This input may already been marked as swept by a previous
1441
                // spend notification, which is likely to happen as one sweep
1442
                // transaction usually sweeps multiple inputs.
1443
                if input.terminated() {
4✔
1444
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1445
                                "state=%v", outpoint, input.state)
1✔
1446

1✔
1447
                        continue
1✔
1448
                }
1449

1450
                input.state = Swept
2✔
1451

2✔
1452
                // Return either a nil or a remote spend result.
2✔
1453
                var err error
2✔
1454
                if !isOurTx {
2✔
UNCOV
1455
                        log.Warnf("Input=%v was spent by remote or third "+
×
UNCOV
1456
                                "party in tx=%v", outpoint, tx.TxHash())
×
UNCOV
1457
                        err = ErrRemoteSpend
×
UNCOV
1458
                }
×
1459

1460
                // Signal result channels.
1461
                s.signalResult(input, Result{
2✔
1462
                        Tx:  tx,
2✔
1463
                        Err: err,
2✔
1464
                })
2✔
1465

2✔
1466
                // Remove all other inputs in this exclusive group.
2✔
1467
                if input.params.ExclusiveGroup != nil {
2✔
UNCOV
1468
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
×
UNCOV
1469
                }
×
1470
        }
1471
}
1472

1473
// markInputFailed marks the given input as failed and won't be retried. It
1474
// will also notify all the subscribers of this input.
1475
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
5✔
1476
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
5✔
1477

5✔
1478
        pi.state = Failed
5✔
1479

5✔
1480
        s.signalResult(pi, Result{Err: err})
5✔
1481
}
5✔
1482

1483
// updateSweeperInputs updates the sweeper's internal state and returns a map
1484
// of inputs to be swept. It will remove the inputs that are in final states,
1485
// and returns a map of inputs that have either state Init or PublishFailed.
1486
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
1✔
1487
        // Create a map of inputs to be swept.
1✔
1488
        inputs := make(InputsMap)
1✔
1489

1✔
1490
        // Iterate the pending inputs and update the sweeper's state.
1✔
1491
        //
1✔
1492
        // TODO(yy): sweeper is made to communicate via go channels, so no
1✔
1493
        // locks are needed to access the map. However, it'd be safer if we
1✔
1494
        // turn this inputs map into a SyncMap in case we wanna add concurrent
1✔
1495
        // access to the map in the future.
1✔
1496
        for op, input := range s.inputs {
10✔
1497
                log.Tracef("Checking input: %s, state=%v", input, input.state)
9✔
1498

9✔
1499
                // If the input has reached a final state, that it's either
9✔
1500
                // been swept, or failed, or excluded, we will remove it from
9✔
1501
                // our sweeper.
9✔
1502
                if input.terminated() {
12✔
1503
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1504
                                input.state, op)
3✔
1505

3✔
1506
                        delete(s.inputs, op)
3✔
1507

3✔
1508
                        continue
3✔
1509
                }
1510

1511
                // If this input has been included in a sweep tx that's not
1512
                // published yet, we'd skip this input and wait for the sweep
1513
                // tx to be published.
1514
                if input.state == PendingPublish {
7✔
1515
                        continue
1✔
1516
                }
1517

1518
                // If this input has already been published, we will need to
1519
                // check the RBF condition before attempting another sweeping.
1520
                if input.state == Published {
6✔
1521
                        continue
1✔
1522
                }
1523

1524
                // If the input has a locktime that's not yet reached, we will
1525
                // skip this input and wait for the locktime to be reached.
1526
                mature, locktime := input.isMature(uint32(s.currentHeight))
4✔
1527
                if !mature {
6✔
1528
                        log.Debugf("Skipping input %v due to locktime=%v not "+
2✔
1529
                                "reached, current height is %v", op, locktime,
2✔
1530
                                s.currentHeight)
2✔
1531

2✔
1532
                        continue
2✔
1533
                }
1534

1535
                // If this input is new or has been failed to be published,
1536
                // we'd retry it. The assumption here is that when an error is
1537
                // returned from `PublishTransaction`, it means the tx has
1538
                // failed to meet the policy, hence it's not in the mempool.
1539
                inputs[op] = input
2✔
1540
        }
1541

1542
        return inputs
1✔
1543
}
1544

1545
// sweepPendingInputs is called when the ticker fires. It will create clusters
1546
// and attempt to create and publish the sweeping transactions.
1547
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
1✔
1548
        log.Debugf("Sweeping %v inputs", len(inputs))
1✔
1549

1✔
1550
        // Cluster all of our inputs based on the specific Aggregator.
1✔
1551
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
1✔
1552

1✔
1553
        // sweepWithLock is a helper closure that executes the sweep within a
1✔
1554
        // coin select lock to prevent the coins being selected for other
1✔
1555
        // transactions like funding of a channel.
1✔
1556
        sweepWithLock := func(set InputSet) error {
2✔
1557
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
2✔
1558
                        // Try to add inputs from our wallet.
1✔
1559
                        err := set.AddWalletInputs(s.cfg.Wallet)
1✔
1560
                        if err != nil {
1✔
UNCOV
1561
                                return err
×
UNCOV
1562
                        }
×
1563

1564
                        // Create sweeping transaction for each set.
1565
                        err = s.sweep(set)
1✔
1566
                        if err != nil {
1✔
UNCOV
1567
                                return err
×
UNCOV
1568
                        }
×
1569

1570
                        return nil
1✔
1571
                })
1572
        }
1573

1574
        for _, set := range sets {
3✔
1575
                var err error
2✔
1576
                if set.NeedWalletInput() {
3✔
1577
                        // Sweep the set of inputs that need the wallet inputs.
1✔
1578
                        err = sweepWithLock(set)
1✔
1579
                } else {
2✔
1580
                        // Sweep the set of inputs that don't need the wallet
1✔
1581
                        // inputs.
1✔
1582
                        err = s.sweep(set)
1✔
1583
                }
1✔
1584

1585
                if err != nil {
2✔
UNCOV
1586
                        log.Errorf("Failed to sweep %v: %v", set, err)
×
UNCOV
1587
                }
×
1588
        }
1589
}
1590

1591
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1592
// the inputs being used.
1593
type bumpResp struct {
1594
        // result is the result of the bump attempt returned from the fee
1595
        // bumper.
1596
        result *BumpResult
1597

1598
        // set is the input set that was used in the bump attempt.
1599
        set InputSet
1600
}
1601

1602
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1603
// future updates about the sweeping tx.
1604
//
1605
// NOTE: must run as a goroutine.
1606
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1607
        resultChan <-chan *BumpResult) {
6✔
1608

6✔
1609
        defer s.wg.Done()
6✔
1610

6✔
1611
        for {
13✔
1612
                select {
7✔
1613
                case r := <-resultChan:
3✔
1614
                        // Validate the result is valid.
3✔
1615
                        if err := r.Validate(); err != nil {
3✔
1616
                                log.Errorf("Received invalid result: %v", err)
×
1617
                                continue
×
1618
                        }
1619

1620
                        resp := &bumpResp{
3✔
1621
                                result: r,
3✔
1622
                                set:    set,
3✔
1623
                        }
3✔
1624

3✔
1625
                        // Send the result back to the main event loop.
3✔
1626
                        select {
3✔
1627
                        case s.bumpRespChan <- resp:
3✔
UNCOV
1628
                        case <-s.quit:
×
UNCOV
1629
                                log.Debug("Sweeper shutting down, skip " +
×
UNCOV
1630
                                        "sending bump result")
×
UNCOV
1631

×
UNCOV
1632
                                return
×
1633
                        }
1634

1635
                        // The sweeping tx has been confirmed, we can exit the
1636
                        // monitor now.
1637
                        //
1638
                        // TODO(yy): can instead remove the spend subscription
1639
                        // in sweeper and rely solely on this event to mark
1640
                        // inputs as Swept?
1641
                        if r.Event == TxConfirmed || r.Event == TxFailed {
5✔
1642
                                // Exit if the tx is failed to be created.
2✔
1643
                                if r.Tx == nil {
2✔
NEW
1644
                                        log.Debugf("Received %v for nil tx, "+
×
NEW
1645
                                                "exit monitor", r.Event)
×
NEW
1646

×
NEW
1647
                                        return
×
NEW
1648
                                }
×
1649

1650
                                log.Debugf("Received %v for sweep tx %v, exit "+
2✔
1651
                                        "fee bump monitor", r.Event,
2✔
1652
                                        r.Tx.TxHash())
2✔
1653

2✔
1654
                                // Cancel the rebroadcasting of the failed tx.
2✔
1655
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
2✔
1656

2✔
1657
                                return
2✔
1658
                        }
1659

1660
                case <-s.quit:
2✔
1661
                        log.Debugf("Sweeper shutting down, exit fee " +
2✔
1662
                                "bump handler")
2✔
1663

2✔
1664
                        return
2✔
1665
                }
1666
        }
1667
}
1668

1669
// handleBumpEventTxFailed handles the case where the tx has been failed to
1670
// publish.
1671
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
1✔
1672
        r := resp.result
1✔
1673
        tx, err := r.Tx, r.Err
1✔
1674

1✔
1675
        if tx != nil {
2✔
1676
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
1✔
1677
                        err)
1✔
1678
        }
1✔
1679

1680
        // NOTE: When marking the inputs as failed, we are using the input set
1681
        // instead of the inputs found in the tx. This is fine for current
1682
        // version of the sweeper because we always create a tx using ALL of
1683
        // the inputs specified by the set.
1684
        //
1685
        // TODO(yy): should we also remove the failed tx from db?
1686
        s.markInputsPublishFailed(resp.set)
1✔
1687
}
1688

1689
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1690
// replaced by a new one.
1691
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1692
        r := resp.result
3✔
1693
        oldTx := r.ReplacedTx
3✔
1694
        newTx := r.Tx
3✔
1695

3✔
1696
        // Prepare a new record to replace the old one.
3✔
1697
        tr := &TxRecord{
3✔
1698
                Txid:    newTx.TxHash(),
3✔
1699
                FeeRate: uint64(r.FeeRate),
3✔
1700
                Fee:     uint64(r.Fee),
3✔
1701
        }
3✔
1702

3✔
1703
        // Get the old record for logging purpose.
3✔
1704
        oldTxid := oldTx.TxHash()
3✔
1705
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1706
        if err != nil {
4✔
1707
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1708
                return err
1✔
1709
        }
1✔
1710

1711
        // Cancel the rebroadcasting of the replaced tx.
1712
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
2✔
1713

2✔
1714
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
2✔
1715
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
2✔
1716
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
2✔
1717

2✔
1718
        // The old sweeping tx has been replaced by a new one, we will update
2✔
1719
        // the tx record in the sweeper db.
2✔
1720
        //
2✔
1721
        // TODO(yy): we may also need to update the inputs in this tx to a new
2✔
1722
        // state. Suppose a replacing tx only spends a subset of the inputs
2✔
1723
        // here, we'd end up with the rest being marked as `Published` and
2✔
1724
        // won't be aggregated in the next sweep. Atm it's fine as we always
2✔
1725
        // RBF the same input set.
2✔
1726
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1727
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
1✔
1728
                return err
1✔
1729
        }
1✔
1730

1731
        // Mark the inputs as published using the replacing tx.
1732
        return s.markInputsPublished(tr, resp.set)
1✔
1733
}
1734

1735
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1736
// successfully published.
1737
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
1✔
1738
        r := resp.result
1✔
1739
        tx := r.Tx
1✔
1740
        tr := &TxRecord{
1✔
1741
                Txid:    tx.TxHash(),
1✔
1742
                FeeRate: uint64(r.FeeRate),
1✔
1743
                Fee:     uint64(r.Fee),
1✔
1744
        }
1✔
1745

1✔
1746
        // Inputs have been successfully published so we update their
1✔
1747
        // states.
1✔
1748
        err := s.markInputsPublished(tr, resp.set)
1✔
1749
        if err != nil {
1✔
1750
                return err
×
1751
        }
×
1752

1753
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
1✔
1754
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
1✔
1755

1✔
1756
        // If there's no error, remove the output script. Otherwise keep it so
1✔
1757
        // that it can be reused for the next transaction and causes no address
1✔
1758
        // inflation.
1✔
1759
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
1✔
1760

1✔
1761
        return nil
1✔
1762
}
1763

1764
// handleBumpEventTxFatal handles the case where there's an unexpected error
1765
// when creating or publishing the sweeping tx. In this case, the tx will be
1766
// removed from the sweeper store and the inputs will be marked as `Failed`,
1767
// which means they will not be retried.
1768
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1769
        r := resp.result
2✔
1770

2✔
1771
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1772
        // a broadcast error, it's likely there isn't a tx here.
2✔
1773
        if r.Tx != nil {
4✔
1774
                txid := r.Tx.TxHash()
2✔
1775
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
2✔
1776

2✔
1777
                // Remove the tx from the sweeper db if it exists.
2✔
1778
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
3✔
1779
                        return fmt.Errorf("delete tx record for %v: %w", txid,
1✔
1780
                                err)
1✔
1781
                }
1✔
1782
        }
1783

1784
        // Mark the inputs as failed.
1785
        s.markInputsFailed(resp.set, r.Err)
1✔
1786

1✔
1787
        return nil
1✔
1788
}
1789

1790
// markInputsFailed marks all inputs found in the tx as failed. It will also
1791
// notify all the subscribers of these inputs.
1792
func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) {
2✔
1793
        for _, inp := range set.Inputs() {
9✔
1794
                outpoint := inp.OutPoint()
7✔
1795

7✔
1796
                input, ok := s.inputs[outpoint]
7✔
1797
                if !ok {
7✔
NEW
1798
                        // It's very likely that a spending tx contains inputs
×
NEW
1799
                        // that we don't know.
×
NEW
1800
                        log.Tracef("Skipped marking input as failed: %v not "+
×
NEW
1801
                                "found in pending inputs", outpoint)
×
NEW
1802

×
NEW
1803
                        continue
×
1804
                }
1805

1806
                // If the input is already in a terminal state, we don't want
1807
                // to rewrite it, which also indicates an error as we only get
1808
                // an error event during the initial broadcast.
1809
                if input.terminated() {
10✔
1810
                        log.Errorf("Skipped marking input=%v as failed due to "+
3✔
1811
                                "unexpected state=%v", outpoint, input.state)
3✔
1812

3✔
1813
                        continue
3✔
1814
                }
1815

1816
                s.markInputFailed(input, err)
4✔
1817
        }
1818
}
1819

1820
// handleBumpEvent handles the result sent from the bumper based on its event
1821
// type.
1822
//
1823
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1824
// input's spending event, we don't need to do anything here.
1825
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
1✔
1826
        log.Debugf("Received bump result %v", r.result)
1✔
1827

1✔
1828
        switch r.result.Event {
1✔
1829
        // The tx has been published, we update the inputs' state and create a
1830
        // record to be stored in the sweeper db.
UNCOV
1831
        case TxPublished:
×
UNCOV
1832
                return s.handleBumpEventTxPublished(r)
×
1833

1834
        // The tx has failed, we update the inputs' state.
1835
        case TxFailed:
1✔
1836
                s.handleBumpEventTxFailed(r)
1✔
1837
                return nil
1✔
1838

1839
        // The tx has been replaced, we will remove the old tx and replace it
1840
        // with the new one.
UNCOV
1841
        case TxReplaced:
×
UNCOV
1842
                return s.handleBumpEventTxReplaced(r)
×
1843

1844
        // There's a fatal error in creating the tx, we will remove the tx from
1845
        // the sweeper db and mark the inputs as failed.
NEW
1846
        case TxFatal:
×
NEW
1847
                return s.handleBumpEventTxFatal(r)
×
1848
        }
1849

UNCOV
1850
        return nil
×
1851
}
1852

1853
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1854
//
1855
// NOTE: It is enough to check the txid because the sweeper will create
1856
// outpoints which solely belong to the internal LND wallet.
UNCOV
1857
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
×
UNCOV
1858
        found, err := s.cfg.Store.IsOurTx(op.Hash)
×
UNCOV
1859
        // In case there is an error fetching the transaction details from the
×
UNCOV
1860
        // sweeper store we assume the outpoint is still used by the sweeper
×
UNCOV
1861
        // (worst case scenario).
×
UNCOV
1862
        //
×
UNCOV
1863
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
×
UNCOV
1864
        // bucket.
×
UNCOV
1865
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
×
1866
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1867
                        "with: %v, we assume it is still in use by the sweeper",
×
1868
                        op.Hash, op.Index, err)
×
1869

×
1870
                return true
×
1871
        }
×
1872

UNCOV
1873
        return found
×
1874
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc