• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13035292482

29 Jan 2025 03:59PM UTC coverage: 49.3% (-9.5%) from 58.777%
13035292482

Pull #9456

github

mohamedawnallah
docs: update release-notes-0.19.0.md

In this commit, we warn users about the removal
of RPCs `SendToRoute`, `SendToRouteSync`, `SendPayment`,
and `SendPaymentSync` in the next release 0.20.
Pull Request #9456: lnrpc+docs: deprecate warning `SendToRoute`, `SendToRouteSync`, `SendPayment`, and `SendPaymentSync` in Release 0.19

100634 of 204126 relevant lines covered (49.3%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.76
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainio"
14
        "github.com/lightningnetwork/lnd/chainntnfs"
15
        "github.com/lightningnetwork/lnd/fn/v2"
16
        "github.com/lightningnetwork/lnd/input"
17
        "github.com/lightningnetwork/lnd/lnutils"
18
        "github.com/lightningnetwork/lnd/lnwallet"
19
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
20
)
21

22
var (
23
        // ErrRemoteSpend is returned in case an output that we try to sweep is
24
        // confirmed in a tx of the remote party.
25
        ErrRemoteSpend = errors.New("remote party swept utxo")
26

27
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
28
        // fee rate that's below the relay fee rate.
29
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
30

31
        // ErrExclusiveGroupSpend is returned in case a different input of the
32
        // same exclusive group was spent.
33
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
34
                "was spent")
35

36
        // ErrSweeperShuttingDown is an error returned when a client attempts to
37
        // make a request to the UtxoSweeper, but it is unable to handle it as
38
        // it is/has already been stopped.
39
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
40

41
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
42
        // used when sweeping inputs with no deadline pressure.
43
        DefaultDeadlineDelta = int32(1008)
44
)
45

46
// Params contains the parameters that control the sweeping process.
47
type Params struct {
48
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
49
        // with the same identifier from being batched together.
50
        ExclusiveGroup *uint64
51

52
        // DeadlineHeight specifies an absolute block height that this input
53
        // should be confirmed by. This value is used by the fee bumper to
54
        // decide its urgency and adjust its feerate used.
55
        DeadlineHeight fn.Option[int32]
56

57
        // Budget specifies the maximum amount of satoshis that can be spent on
58
        // fees for this sweep.
59
        Budget btcutil.Amount
60

61
        // Immediate indicates that the input should be swept immediately
62
        // without waiting for blocks to come to trigger the sweeping of
63
        // inputs.
64
        Immediate bool
65

66
        // StartingFeeRate is an optional parameter that can be used to specify
67
        // the initial fee rate to use for the fee function.
68
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
69
}
70

71
// String returns a human readable interpretation of the sweep parameters.
72
func (p Params) String() string {
3✔
73
        deadline := "none"
3✔
74
        p.DeadlineHeight.WhenSome(func(d int32) {
6✔
75
                deadline = fmt.Sprintf("%d", d)
3✔
76
        })
3✔
77

78
        exclusiveGroup := "none"
3✔
79
        if p.ExclusiveGroup != nil {
6✔
80
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
3✔
81
        }
3✔
82

83
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
3✔
84
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
3✔
85
                p.Immediate, exclusiveGroup, p.Budget, deadline)
3✔
86
}
87

88
// SweepState represents the current state of a pending input.
89
//
90
//nolint:revive
91
type SweepState uint8
92

93
const (
94
        // Init is the initial state of a pending input. This is set when a new
95
        // sweeping request for a given input is made.
96
        Init SweepState = iota
97

98
        // PendingPublish specifies an input's state where it's already been
99
        // included in a sweeping tx but the tx is not published yet.  Inputs
100
        // in this state should not be used for grouping again.
101
        PendingPublish
102

103
        // Published is the state where the input's sweeping tx has
104
        // successfully been published. Inputs in this state can only be
105
        // updated via RBF.
106
        Published
107

108
        // PublishFailed is the state when an error is returned from publishing
109
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
110
        // sweeping tx.
111
        PublishFailed
112

113
        // Swept is the final state of a pending input. This is set when the
114
        // input has been successfully swept.
115
        Swept
116

117
        // Excluded is the state of a pending input that has been excluded and
118
        // can no longer be swept. For instance, when one of the three anchor
119
        // sweeping transactions confirmed, the remaining two will be excluded.
120
        Excluded
121

122
        // Failed is the state when a pending input has too many failed publish
123
        // atttempts or unknown broadcast error is returned.
124
        Failed
125
)
126

127
// String gives a human readable text for the sweep states.
128
func (s SweepState) String() string {
3✔
129
        switch s {
3✔
130
        case Init:
3✔
131
                return "Init"
3✔
132

133
        case PendingPublish:
3✔
134
                return "PendingPublish"
3✔
135

136
        case Published:
3✔
137
                return "Published"
3✔
138

139
        case PublishFailed:
3✔
140
                return "PublishFailed"
3✔
141

142
        case Swept:
3✔
143
                return "Swept"
3✔
144

145
        case Excluded:
3✔
146
                return "Excluded"
3✔
147

148
        case Failed:
2✔
149
                return "Failed"
2✔
150

151
        default:
×
152
                return "Unknown"
×
153
        }
154
}
155

156
// RBFInfo stores the information required to perform a RBF bump on a pending
157
// sweeping tx.
158
type RBFInfo struct {
159
        // Txid is the txid of the sweeping tx.
160
        Txid chainhash.Hash
161

162
        // FeeRate is the fee rate of the sweeping tx.
163
        FeeRate chainfee.SatPerKWeight
164

165
        // Fee is the total fee of the sweeping tx.
166
        Fee btcutil.Amount
167
}
168

169
// SweeperInput is created when an input reaches the main loop for the first
170
// time. It wraps the input and tracks all relevant state that is needed for
171
// sweeping.
172
type SweeperInput struct {
173
        input.Input
174

175
        // state tracks the current state of the input.
176
        state SweepState
177

178
        // listeners is a list of channels over which the final outcome of the
179
        // sweep needs to be broadcasted.
180
        listeners []chan Result
181

182
        // ntfnRegCancel is populated with a function that cancels the chain
183
        // notifier spend registration.
184
        ntfnRegCancel func()
185

186
        // publishAttempts records the number of attempts that have already been
187
        // made to sweep this tx.
188
        publishAttempts int
189

190
        // params contains the parameters that control the sweeping process.
191
        params Params
192

193
        // lastFeeRate is the most recent fee rate used for this input within a
194
        // transaction broadcast to the network.
195
        lastFeeRate chainfee.SatPerKWeight
196

197
        // rbf records the RBF constraints.
198
        rbf fn.Option[RBFInfo]
199

200
        // DeadlineHeight is the deadline height for this input. This is
201
        // different from the DeadlineHeight in its params as it's an actual
202
        // value than an option.
203
        DeadlineHeight int32
204
}
205

206
// String returns a human readable interpretation of the pending input.
207
func (p *SweeperInput) String() string {
3✔
208
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
3✔
209
}
3✔
210

211
// terminated returns a boolean indicating whether the input has reached a
212
// final state.
213
func (p *SweeperInput) terminated() bool {
3✔
214
        switch p.state {
3✔
215
        // If the input has reached a final state, that it's either
216
        // been swept, or failed, or excluded, we will remove it from
217
        // our sweeper.
218
        case Failed, Swept, Excluded:
3✔
219
                return true
3✔
220

221
        default:
3✔
222
                return false
3✔
223
        }
224
}
225

226
// isMature returns a boolean indicating whether the input has a timelock that
227
// has been reached or not. The locktime found is also returned.
228
func (p *SweeperInput) isMature(currentHeight uint32) (bool, uint32) {
3✔
229
        locktime, _ := p.RequiredLockTime()
3✔
230
        if currentHeight < locktime {
6✔
231
                log.Debugf("Input %v has locktime=%v, current height is %v",
3✔
232
                        p, locktime, currentHeight)
3✔
233

3✔
234
                return false, locktime
3✔
235
        }
3✔
236

237
        // If the input has a CSV that's not yet reached, we will skip
238
        // this input and wait for the expiry.
239
        //
240
        // NOTE: We need to consider whether this input can be included in the
241
        // next block or not, which means the CSV will be checked against the
242
        // currentHeight plus one.
243
        locktime = p.BlocksToMaturity() + p.HeightHint()
3✔
244
        if currentHeight+1 < locktime {
6✔
245
                log.Debugf("Input %v has CSV expiry=%v, current height is %v, "+
3✔
246
                        "skipped sweeping", p, locktime, currentHeight)
3✔
247

3✔
248
                return false, locktime
3✔
249
        }
3✔
250

251
        return true, locktime
3✔
252
}
253

254
// InputsMap is a type alias for a set of pending inputs.
255
type InputsMap = map[wire.OutPoint]*SweeperInput
256

257
// inputsMapToString returns a human readable interpretation of the pending
258
// inputs.
259
func inputsMapToString(inputs InputsMap) string {
3✔
260
        if len(inputs) == 0 {
6✔
261
                return ""
3✔
262
        }
3✔
263

264
        inps := make([]input.Input, 0, len(inputs))
3✔
265
        for _, in := range inputs {
6✔
266
                inps = append(inps, in)
3✔
267
        }
3✔
268

269
        return "\n" + inputTypeSummary(inps)
3✔
270
}
271

272
// pendingSweepsReq is an internal message we'll use to represent an external
273
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
274
// attempting to sweep.
275
type pendingSweepsReq struct {
276
        respChan chan map[wire.OutPoint]*PendingInputResponse
277
        errChan  chan error
278
}
279

280
// PendingInputResponse contains information about an input that is currently
281
// being swept by the UtxoSweeper.
282
type PendingInputResponse struct {
283
        // OutPoint is the identify outpoint of the input being swept.
284
        OutPoint wire.OutPoint
285

286
        // WitnessType is the witness type of the input being swept.
287
        WitnessType input.WitnessType
288

289
        // Amount is the amount of the input being swept.
290
        Amount btcutil.Amount
291

292
        // LastFeeRate is the most recent fee rate used for the input being
293
        // swept within a transaction broadcast to the network.
294
        LastFeeRate chainfee.SatPerKWeight
295

296
        // BroadcastAttempts is the number of attempts we've made to sweept the
297
        // input.
298
        BroadcastAttempts int
299

300
        // Params contains the sweep parameters for this pending request.
301
        Params Params
302

303
        // DeadlineHeight records the deadline height of this input.
304
        DeadlineHeight uint32
305
}
306

307
// updateReq is an internal message we'll use to represent an external caller's
308
// intent to update the sweep parameters of a given input.
309
type updateReq struct {
310
        input        wire.OutPoint
311
        params       Params
312
        responseChan chan *updateResp
313
}
314

315
// updateResp is an internal message we'll use to hand off the response of a
316
// updateReq from the UtxoSweeper's main event loop back to the caller.
317
type updateResp struct {
318
        resultChan chan Result
319
        err        error
320
}
321

322
// UtxoSweeper is responsible for sweeping outputs back into the wallet
323
type UtxoSweeper struct {
324
        started uint32 // To be used atomically.
325
        stopped uint32 // To be used atomically.
326

327
        // Embed the blockbeat consumer struct to get access to the method
328
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
329
        chainio.BeatConsumer
330

331
        cfg *UtxoSweeperConfig
332

333
        newInputs chan *sweepInputMessage
334
        spendChan chan *chainntnfs.SpendDetail
335

336
        // pendingSweepsReq is a channel that will be sent requests by external
337
        // callers in order to retrieve the set of pending inputs the
338
        // UtxoSweeper is attempting to sweep.
339
        pendingSweepsReqs chan *pendingSweepsReq
340

341
        // updateReqs is a channel that will be sent requests by external
342
        // callers who wish to bump the fee rate of a given input.
343
        updateReqs chan *updateReq
344

345
        // inputs is the total set of inputs the UtxoSweeper has been requested
346
        // to sweep.
347
        inputs InputsMap
348

349
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
350

351
        relayFeeRate chainfee.SatPerKWeight
352

353
        quit chan struct{}
354
        wg   sync.WaitGroup
355

356
        // currentHeight is the best known height of the main chain. This is
357
        // updated whenever a new block epoch is received.
358
        currentHeight int32
359

360
        // bumpRespChan is a channel that receives broadcast results from the
361
        // TxPublisher.
362
        bumpRespChan chan *bumpResp
363
}
364

365
// Compile-time check for the chainio.Consumer interface.
366
var _ chainio.Consumer = (*UtxoSweeper)(nil)
367

368
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
369
type UtxoSweeperConfig struct {
370
        // GenSweepScript generates a P2WKH script belonging to the wallet where
371
        // funds can be swept.
372
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
373

374
        // FeeEstimator is used when crafting sweep transactions to estimate
375
        // the necessary fee relative to the expected size of the sweep
376
        // transaction.
377
        FeeEstimator chainfee.Estimator
378

379
        // Wallet contains the wallet functions that sweeper requires.
380
        Wallet Wallet
381

382
        // Notifier is an instance of a chain notifier we'll use to watch for
383
        // certain on-chain events.
384
        Notifier chainntnfs.ChainNotifier
385

386
        // Mempool is the mempool watcher that will be used to query whether a
387
        // given input is already being spent by a transaction in the mempool.
388
        Mempool chainntnfs.MempoolWatcher
389

390
        // Store stores the published sweeper txes.
391
        Store SweeperStore
392

393
        // Signer is used by the sweeper to generate valid witnesses at the
394
        // time the incubated outputs need to be spent.
395
        Signer input.Signer
396

397
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
398
        // in a single sweep tx. If more need to be swept, multiple txes are
399
        // created and published.
400
        MaxInputsPerTx uint32
401

402
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
403
        MaxFeeRate chainfee.SatPerVByte
404

405
        // Aggregator is used to group inputs into clusters based on its
406
        // implemention-specific strategy.
407
        Aggregator UtxoAggregator
408

409
        // Publisher is used to publish the sweep tx crafted here and monitors
410
        // it for potential fee bumps.
411
        Publisher Bumper
412

413
        // NoDeadlineConfTarget is the conf target to use when sweeping
414
        // non-time-sensitive outputs.
415
        NoDeadlineConfTarget uint32
416
}
417

418
// Result is the struct that is pushed through the result channel. Callers can
419
// use this to be informed of the final sweep result. In case of a remote
420
// spend, Err will be ErrRemoteSpend.
421
type Result struct {
422
        // Err is the final result of the sweep. It is nil when the input is
423
        // swept successfully by us. ErrRemoteSpend is returned when another
424
        // party took the input.
425
        Err error
426

427
        // Tx is the transaction that spent the input.
428
        Tx *wire.MsgTx
429
}
430

431
// sweepInputMessage structs are used in the internal channel between the
432
// SweepInput call and the sweeper main loop.
433
type sweepInputMessage struct {
434
        input      input.Input
435
        params     Params
436
        resultChan chan Result
437
}
438

439
// New returns a new Sweeper instance.
440
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
3✔
441
        s := &UtxoSweeper{
3✔
442
                cfg:               cfg,
3✔
443
                newInputs:         make(chan *sweepInputMessage),
3✔
444
                spendChan:         make(chan *chainntnfs.SpendDetail),
3✔
445
                updateReqs:        make(chan *updateReq),
3✔
446
                pendingSweepsReqs: make(chan *pendingSweepsReq),
3✔
447
                quit:              make(chan struct{}),
3✔
448
                inputs:            make(InputsMap),
3✔
449
                bumpRespChan:      make(chan *bumpResp, 100),
3✔
450
        }
3✔
451

3✔
452
        // Mount the block consumer.
3✔
453
        s.BeatConsumer = chainio.NewBeatConsumer(s.quit, s.Name())
3✔
454

3✔
455
        return s
3✔
456
}
3✔
457

458
// Start starts the process of constructing and publish sweep txes.
459
func (s *UtxoSweeper) Start(beat chainio.Blockbeat) error {
3✔
460
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
3✔
461
                return nil
×
462
        }
×
463

464
        log.Info("Sweeper starting")
3✔
465

3✔
466
        // Retrieve relay fee for dust limit calculation. Assume that this will
3✔
467
        // not change from here on.
3✔
468
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
3✔
469

3✔
470
        // Set the current height.
3✔
471
        s.currentHeight = beat.Height()
3✔
472

3✔
473
        // Start sweeper main loop.
3✔
474
        s.wg.Add(1)
3✔
475
        go s.collector()
3✔
476

3✔
477
        return nil
3✔
478
}
479

480
// RelayFeePerKW returns the minimum fee rate required for transactions to be
481
// relayed.
482
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
483
        return s.relayFeeRate
×
484
}
×
485

486
// Stop stops sweeper from listening to block epochs and constructing sweep
487
// txes.
488
func (s *UtxoSweeper) Stop() error {
3✔
489
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
3✔
490
                return nil
×
491
        }
×
492

493
        log.Info("Sweeper shutting down...")
3✔
494
        defer log.Debug("Sweeper shutdown complete")
3✔
495

3✔
496
        close(s.quit)
3✔
497
        s.wg.Wait()
3✔
498

3✔
499
        return nil
3✔
500
}
501

502
// NOTE: part of the `chainio.Consumer` interface.
503
func (s *UtxoSweeper) Name() string {
3✔
504
        return "UtxoSweeper"
3✔
505
}
3✔
506

507
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
508
// swept after the batch time window ends. A custom fee preference can be
509
// provided to determine what fee rate should be used for the input. Note that
510
// the input may not always be swept with this exact value, as its possible for
511
// it to be batched under the same transaction with other similar fee rate
512
// inputs.
513
//
514
// NOTE: Extreme care needs to be taken that input isn't changed externally.
515
// Because it is an interface and we don't know what is exactly behind it, we
516
// cannot make a local copy in sweeper.
517
//
518
// TODO(yy): make sure the caller is using the Result chan.
519
func (s *UtxoSweeper) SweepInput(inp input.Input,
520
        params Params) (chan Result, error) {
3✔
521

3✔
522
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
3✔
523
                inp.SignDesc() == nil {
3✔
524

×
525
                return nil, errors.New("nil input received")
×
526
        }
×
527

528
        absoluteTimeLock, _ := inp.RequiredLockTime()
3✔
529
        log.Debugf("Sweep request received: out_point=%v, witness_type=%v, "+
3✔
530
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
3✔
531
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
3✔
532
                inp.BlocksToMaturity(), absoluteTimeLock,
3✔
533
                btcutil.Amount(inp.SignDesc().Output.Value),
3✔
534
                inp.UnconfParent(), params)
3✔
535

3✔
536
        sweeperInput := &sweepInputMessage{
3✔
537
                input:      inp,
3✔
538
                params:     params,
3✔
539
                resultChan: make(chan Result, 1),
3✔
540
        }
3✔
541

3✔
542
        // Deliver input to the main event loop.
3✔
543
        select {
3✔
544
        case s.newInputs <- sweeperInput:
3✔
545
        case <-s.quit:
×
546
                return nil, ErrSweeperShuttingDown
×
547
        }
548

549
        return sweeperInput.resultChan, nil
3✔
550
}
551

552
// removeConflictSweepDescendants removes any transactions from the wallet that
553
// spend outputs included in the passed outpoint set. This needs to be done in
554
// cases where we're not the only ones that can sweep an output, but there may
555
// exist unconfirmed spends that spend outputs created by a sweep transaction.
556
// The most common case for this is when someone sweeps our anchor outputs
557
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
558
// as a backend when a channel is force closed and anchor cpfp txns are
559
// created to bump the initial commitment transaction. In this case an anchor
560
// cpfp is broadcasted for up to 3 commitment transactions (local,
561
// remote-dangling, remote). Using neutrino all of those transactions will be
562
// accepted (the commitment tx will be different in all of those cases) and have
563
// to be removed as soon as one of them confirmes (they do have the same
564
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
565
// nodes do not signal invalid transactions anymore.
566
func (s *UtxoSweeper) removeConflictSweepDescendants(
567
        outpoints map[wire.OutPoint]struct{}) error {
3✔
568

3✔
569
        // Obtain all the past sweeps that we've done so far. We'll need these
3✔
570
        // to ensure that if the spendingTx spends any of the same inputs, then
3✔
571
        // we remove any transaction that may be spending those inputs from the
3✔
572
        // wallet.
3✔
573
        //
3✔
574
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
3✔
575
        // from the store?
3✔
576
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
3✔
577
        if err != nil {
3✔
578
                return err
×
579
        }
×
580

581
        // We'll now go through each past transaction we published during this
582
        // epoch and cross reference the spent inputs. If there're any inputs
583
        // in common with the inputs the spendingTx spent, then we'll remove
584
        // those.
585
        //
586
        // TODO(roasbeef): need to start to remove all transaction hashes after
587
        // every N blocks (assumed point of no return)
588
        for _, sweepHash := range pastSweepHashes {
6✔
589
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
3✔
590
                if err != nil {
3✔
591
                        return err
×
592
                }
×
593

594
                // Transaction wasn't found in the wallet, may have already
595
                // been replaced/removed.
596
                if sweepTx == nil {
5✔
597
                        // If it was removed, then we'll play it safe and mark
2✔
598
                        // it as no longer need to be rebroadcasted.
2✔
599
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
2✔
600
                        continue
2✔
601
                }
602

603
                // Check to see if this past sweep transaction spent any of the
604
                // same inputs as spendingTx.
605
                var isConflicting bool
3✔
606
                for _, txIn := range sweepTx.TxIn {
6✔
607
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
6✔
608
                                isConflicting = true
3✔
609
                                break
3✔
610
                        }
611
                }
612

613
                if !isConflicting {
6✔
614
                        continue
3✔
615
                }
616

617
                // If it is conflicting, then we'll signal the wallet to remove
618
                // all the transactions that are descendants of outputs created
619
                // by the sweepTx and the sweepTx itself.
620
                log.Debugf("Removing sweep txid=%v from wallet: %v",
3✔
621
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
3✔
622

3✔
623
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
3✔
624
                if err != nil {
3✔
625
                        log.Warnf("Unable to remove descendants: %v", err)
×
626
                }
×
627

628
                // If this transaction was conflicting, then we'll stop
629
                // rebroadcasting it in the background.
630
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
3✔
631
        }
632

633
        return nil
3✔
634
}
635

636
// collector is the sweeper main loop. It processes new inputs, spend
637
// notifications and counts down to publication of the sweep tx.
638
func (s *UtxoSweeper) collector() {
3✔
639
        defer s.wg.Done()
3✔
640

3✔
641
        for {
6✔
642
                // Clean inputs, which will remove inputs that are swept,
3✔
643
                // failed, or excluded from the sweeper and return inputs that
3✔
644
                // are either new or has been published but failed back, which
3✔
645
                // will be retried again here.
3✔
646
                s.updateSweeperInputs()
3✔
647

3✔
648
                select {
3✔
649
                // A new inputs is offered to the sweeper. We check to see if
650
                // we are already trying to sweep this input and if not, set up
651
                // a listener to spend and schedule a sweep.
652
                case input := <-s.newInputs:
3✔
653
                        err := s.handleNewInput(input)
3✔
654
                        if err != nil {
3✔
655
                                log.Criticalf("Unable to handle new input: %v",
×
656
                                        err)
×
657

×
658
                                return
×
659
                        }
×
660

661
                        // If this input is forced, we perform an sweep
662
                        // immediately.
663
                        //
664
                        // TODO(ziggie): Make sure when `immediate` is selected
665
                        // as a parameter that we only trigger the sweeping of
666
                        // this specific input rather than triggering the sweeps
667
                        // of all current pending inputs registered with the
668
                        // sweeper.
669
                        if input.params.Immediate {
5✔
670
                                inputs := s.updateSweeperInputs()
2✔
671
                                s.sweepPendingInputs(inputs)
2✔
672
                        }
2✔
673

674
                // A spend of one of our inputs is detected. Signal sweep
675
                // results to the caller(s).
676
                case spend := <-s.spendChan:
3✔
677
                        s.handleInputSpent(spend)
3✔
678

679
                // A new external request has been received to retrieve all of
680
                // the inputs we're currently attempting to sweep.
681
                case req := <-s.pendingSweepsReqs:
3✔
682
                        s.handlePendingSweepsReq(req)
3✔
683

684
                // A new external request has been received to bump the fee rate
685
                // of a given input.
686
                case req := <-s.updateReqs:
3✔
687
                        resultChan, err := s.handleUpdateReq(req)
3✔
688
                        req.responseChan <- &updateResp{
3✔
689
                                resultChan: resultChan,
3✔
690
                                err:        err,
3✔
691
                        }
3✔
692

3✔
693
                        // Perform an sweep immediately if asked.
3✔
694
                        if req.params.Immediate {
6✔
695
                                inputs := s.updateSweeperInputs()
3✔
696
                                s.sweepPendingInputs(inputs)
3✔
697
                        }
3✔
698

699
                case resp := <-s.bumpRespChan:
3✔
700
                        // Handle the bump event.
3✔
701
                        err := s.handleBumpEvent(resp)
3✔
702
                        if err != nil {
4✔
703
                                log.Errorf("Failed to handle bump event: %v",
1✔
704
                                        err)
1✔
705
                        }
1✔
706

707
                // A new block comes in, update the bestHeight, perform a check
708
                // over all pending inputs and publish sweeping txns if needed.
709
                case beat := <-s.BlockbeatChan:
3✔
710
                        // Update the sweeper to the best height.
3✔
711
                        s.currentHeight = beat.Height()
3✔
712

3✔
713
                        // Update the inputs with the latest height.
3✔
714
                        inputs := s.updateSweeperInputs()
3✔
715

3✔
716
                        log.Debugf("Received new block: height=%v, attempt "+
3✔
717
                                "sweeping %d inputs:%s", s.currentHeight,
3✔
718
                                len(inputs),
3✔
719
                                lnutils.NewLogClosure(func() string {
6✔
720
                                        return inputsMapToString(inputs)
3✔
721
                                }))
3✔
722

723
                        // Attempt to sweep any pending inputs.
724
                        s.sweepPendingInputs(inputs)
3✔
725

3✔
726
                        // Notify we've processed the block.
3✔
727
                        s.NotifyBlockProcessed(beat, nil)
3✔
728

729
                case <-s.quit:
3✔
730
                        return
3✔
731
                }
732
        }
733
}
734

735
// removeExclusiveGroup removes all inputs in the given exclusive group. This
736
// function is called when one of the exclusive group inputs has been spent. The
737
// other inputs won't ever be spendable and can be removed. This also prevents
738
// them from being part of future sweep transactions that would fail. In
739
// addition sweep transactions of those inputs will be removed from the wallet.
740
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
3✔
741
        for outpoint, input := range s.inputs {
6✔
742
                outpoint := outpoint
3✔
743

3✔
744
                // Skip inputs that aren't exclusive.
3✔
745
                if input.params.ExclusiveGroup == nil {
6✔
746
                        continue
3✔
747
                }
748

749
                // Skip inputs from other exclusive groups.
750
                if *input.params.ExclusiveGroup != group {
3✔
751
                        continue
×
752
                }
753

754
                // Skip inputs that are already terminated.
755
                if input.terminated() {
6✔
756
                        log.Tracef("Skipped sending error result for "+
3✔
757
                                "input %v, state=%v", outpoint, input.state)
3✔
758

3✔
759
                        continue
3✔
760
                }
761

762
                // Signal result channels.
763
                s.signalResult(input, Result{
3✔
764
                        Err: ErrExclusiveGroupSpend,
3✔
765
                })
3✔
766

3✔
767
                // Update the input's state as it can no longer be swept.
3✔
768
                input.state = Excluded
3✔
769

3✔
770
                // Remove all unconfirmed transactions from the wallet which
3✔
771
                // spend the passed outpoint of the same exclusive group.
3✔
772
                outpoints := map[wire.OutPoint]struct{}{
3✔
773
                        outpoint: {},
3✔
774
                }
3✔
775
                err := s.removeConflictSweepDescendants(outpoints)
3✔
776
                if err != nil {
3✔
777
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
778
                                "wallet for outpoint %v : %v", outpoint, err)
×
779
                }
×
780
        }
781
}
782

783
// signalResult notifies the listeners of the final result of the input sweep.
784
// It also cancels any pending spend notification.
785
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
3✔
786
        op := pi.OutPoint()
3✔
787
        listeners := pi.listeners
3✔
788

3✔
789
        if result.Err == nil {
6✔
790
                log.Tracef("Dispatching sweep success for %v to %v listeners",
3✔
791
                        op, len(listeners),
3✔
792
                )
3✔
793
        } else {
6✔
794
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
3✔
795
                        op, len(listeners), result.Err,
3✔
796
                )
3✔
797
        }
3✔
798

799
        // Signal all listeners. Channel is buffered. Because we only send once
800
        // on every channel, it should never block.
801
        for _, resultChan := range listeners {
6✔
802
                resultChan <- result
3✔
803
        }
3✔
804

805
        // Cancel spend notification with chain notifier. This is not necessary
806
        // in case of a success, except for that a reorg could still happen.
807
        if pi.ntfnRegCancel != nil {
6✔
808
                log.Debugf("Canceling spend ntfn for %v", op)
3✔
809

3✔
810
                pi.ntfnRegCancel()
3✔
811
        }
3✔
812
}
813

814
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
815
// the tx. The output address is only marked as used if the publish succeeds.
816
func (s *UtxoSweeper) sweep(set InputSet) error {
3✔
817
        // Generate an output script if there isn't an unused script available.
3✔
818
        if s.currentOutputScript.IsNone() {
6✔
819
                addr, err := s.cfg.GenSweepScript().Unpack()
3✔
820
                if err != nil {
3✔
821
                        return fmt.Errorf("gen sweep script: %w", err)
×
822
                }
×
823
                s.currentOutputScript = fn.Some(addr)
3✔
824
        }
825

826
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
3✔
827
                fmt.Errorf("none sweep script"),
3✔
828
        )
3✔
829
        if err != nil {
3✔
830
                return err
×
831
        }
×
832

833
        // Create a fee bump request and ask the publisher to broadcast it. The
834
        // publisher will then take over and start monitoring the tx for
835
        // potential fee bump.
836
        req := &BumpRequest{
3✔
837
                Inputs:          set.Inputs(),
3✔
838
                Budget:          set.Budget(),
3✔
839
                DeadlineHeight:  set.DeadlineHeight(),
3✔
840
                DeliveryAddress: sweepAddr,
3✔
841
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
3✔
842
                StartingFeeRate: set.StartingFeeRate(),
3✔
843
                Immediate:       set.Immediate(),
3✔
844
                // TODO(yy): pass the strategy here.
3✔
845
        }
3✔
846

3✔
847
        // Reschedule the inputs that we just tried to sweep. This is done in
3✔
848
        // case the following publish fails, we'd like to update the inputs'
3✔
849
        // publish attempts and rescue them in the next sweep.
3✔
850
        s.markInputsPendingPublish(set)
3✔
851

3✔
852
        // Broadcast will return a read-only chan that we will listen to for
3✔
853
        // this publish result and future RBF attempt.
3✔
854
        resp := s.cfg.Publisher.Broadcast(req)
3✔
855

3✔
856
        // Successfully sent the broadcast attempt, we now handle the result by
3✔
857
        // subscribing to the result chan and listen for future updates about
3✔
858
        // this tx.
3✔
859
        s.wg.Add(1)
3✔
860
        go s.monitorFeeBumpResult(set, resp)
3✔
861

3✔
862
        return nil
3✔
863
}
864

865
// markInputsPendingPublish updates the pending inputs with the given tx
866
// inputs. It also increments the `publishAttempts`.
867
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
868
        // Reschedule sweep.
3✔
869
        for _, input := range set.Inputs() {
6✔
870
                op := input.OutPoint()
3✔
871
                pi, ok := s.inputs[op]
3✔
872
                if !ok {
6✔
873
                        // It could be that this input is an additional wallet
3✔
874
                        // input that was attached. In that case there also
3✔
875
                        // isn't a pending input to update.
3✔
876
                        log.Tracef("Skipped marking input as pending "+
3✔
877
                                "published: %v not found in pending inputs", op)
3✔
878

3✔
879
                        continue
3✔
880
                }
881

882
                // If this input has already terminated, there's clearly
883
                // something wrong as it would have been removed. In this case
884
                // we log an error and skip marking this input as pending
885
                // publish.
886
                if pi.terminated() {
3✔
887
                        log.Errorf("Expect input %v to not have terminated "+
×
888
                                "state, instead it has %v", op, pi.state)
×
889

×
890
                        continue
×
891
                }
892

893
                // Update the input's state.
894
                pi.state = PendingPublish
3✔
895

3✔
896
                // Record another publish attempt.
3✔
897
                pi.publishAttempts++
3✔
898
        }
899
}
900

901
// markInputsPublished updates the sweeping tx in db and marks the list of
902
// inputs as published.
903
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord, set InputSet) error {
3✔
904
        // Mark this tx in db once successfully published.
3✔
905
        //
3✔
906
        // NOTE: this will behave as an overwrite, which is fine as the record
3✔
907
        // is small.
3✔
908
        tr.Published = true
3✔
909
        err := s.cfg.Store.StoreTx(tr)
3✔
910
        if err != nil {
3✔
911
                return fmt.Errorf("store tx: %w", err)
×
912
        }
×
913

914
        // Reschedule sweep.
915
        for _, input := range set.Inputs() {
6✔
916
                op := input.OutPoint()
3✔
917
                pi, ok := s.inputs[op]
3✔
918
                if !ok {
6✔
919
                        // It could be that this input is an additional wallet
3✔
920
                        // input that was attached. In that case there also
3✔
921
                        // isn't a pending input to update.
3✔
922
                        log.Tracef("Skipped marking input as published: %v "+
3✔
923
                                "not found in pending inputs", op)
3✔
924

3✔
925
                        continue
3✔
926
                }
927

928
                // Valdiate that the input is in an expected state.
929
                if pi.state != PendingPublish {
6✔
930
                        // We may get a Published if this is a replacement tx.
3✔
931
                        log.Debugf("Expect input %v to have %v, instead it "+
3✔
932
                                "has %v", op, PendingPublish, pi.state)
3✔
933

3✔
934
                        continue
3✔
935
                }
936

937
                // Update the input's state.
938
                pi.state = Published
3✔
939

3✔
940
                // Update the input's latest fee rate.
3✔
941
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
942
        }
943

944
        return nil
3✔
945
}
946

947
// markInputsPublishFailed marks the list of inputs as failed to be published.
948
func (s *UtxoSweeper) markInputsPublishFailed(set InputSet) {
3✔
949
        // Reschedule sweep.
3✔
950
        for _, inp := range set.Inputs() {
6✔
951
                op := inp.OutPoint()
3✔
952
                pi, ok := s.inputs[op]
3✔
953
                if !ok {
6✔
954
                        // It could be that this input is an additional wallet
3✔
955
                        // input that was attached. In that case there also
3✔
956
                        // isn't a pending input to update.
3✔
957
                        log.Tracef("Skipped marking input as publish failed: "+
3✔
958
                                "%v not found in pending inputs", op)
3✔
959

3✔
960
                        continue
3✔
961
                }
962

963
                // Valdiate that the input is in an expected state.
964
                if pi.state != PendingPublish && pi.state != Published {
3✔
965
                        log.Debugf("Expect input %v to have %v, instead it "+
×
966
                                "has %v", op, PendingPublish, pi.state)
×
967

×
968
                        continue
×
969
                }
970

971
                log.Warnf("Failed to publish input %v", op)
3✔
972

3✔
973
                // Update the input's state.
3✔
974
                pi.state = PublishFailed
3✔
975
        }
976
}
977

978
// monitorSpend registers a spend notification with the chain notifier. It
979
// returns a cancel function that can be used to cancel the registration.
980
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
981
        script []byte, heightHint uint32) (func(), error) {
3✔
982

3✔
983
        log.Tracef("Wait for spend of %v at heightHint=%v",
3✔
984
                outpoint, heightHint)
3✔
985

3✔
986
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
3✔
987
                &outpoint, script, heightHint,
3✔
988
        )
3✔
989
        if err != nil {
3✔
990
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
991
        }
×
992

993
        s.wg.Add(1)
3✔
994
        go func() {
6✔
995
                defer s.wg.Done()
3✔
996

3✔
997
                select {
3✔
998
                case spend, ok := <-spendEvent.Spend:
3✔
999
                        if !ok {
6✔
1000
                                log.Debugf("Spend ntfn for %v canceled",
3✔
1001
                                        outpoint)
3✔
1002
                                return
3✔
1003
                        }
3✔
1004

1005
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
3✔
1006

3✔
1007
                        select {
3✔
1008
                        case s.spendChan <- spend:
3✔
1009
                                log.Debugf("Delivered spend ntfn for %v",
3✔
1010
                                        outpoint)
3✔
1011

1012
                        case <-s.quit:
×
1013
                        }
1014
                case <-s.quit:
3✔
1015
                }
1016
        }()
1017

1018
        return spendEvent.Cancel, nil
3✔
1019
}
1020

1021
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1022
// attempting to sweep.
1023
func (s *UtxoSweeper) PendingInputs() (
1024
        map[wire.OutPoint]*PendingInputResponse, error) {
3✔
1025

3✔
1026
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
3✔
1027
        errChan := make(chan error, 1)
3✔
1028
        select {
3✔
1029
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1030
                respChan: respChan,
1031
                errChan:  errChan,
1032
        }:
3✔
1033
        case <-s.quit:
×
1034
                return nil, ErrSweeperShuttingDown
×
1035
        }
1036

1037
        select {
3✔
1038
        case pendingSweeps := <-respChan:
3✔
1039
                return pendingSweeps, nil
3✔
1040
        case err := <-errChan:
×
1041
                return nil, err
×
1042
        case <-s.quit:
×
1043
                return nil, ErrSweeperShuttingDown
×
1044
        }
1045
}
1046

1047
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1048
// UtxoSweeper is attempting to sweep.
1049
func (s *UtxoSweeper) handlePendingSweepsReq(
1050
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
3✔
1051

3✔
1052
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
3✔
1053
        for _, inp := range s.inputs {
6✔
1054
                // Skip immature inputs for compatibility.
3✔
1055
                mature, _ := inp.isMature(uint32(s.currentHeight))
3✔
1056
                if !mature {
6✔
1057
                        continue
3✔
1058
                }
1059

1060
                // Only the exported fields are set, as we expect the response
1061
                // to only be consumed externally.
1062
                op := inp.OutPoint()
3✔
1063
                resps[op] = &PendingInputResponse{
3✔
1064
                        OutPoint:    op,
3✔
1065
                        WitnessType: inp.WitnessType(),
3✔
1066
                        Amount: btcutil.Amount(
3✔
1067
                                inp.SignDesc().Output.Value,
3✔
1068
                        ),
3✔
1069
                        LastFeeRate:       inp.lastFeeRate,
3✔
1070
                        BroadcastAttempts: inp.publishAttempts,
3✔
1071
                        Params:            inp.params,
3✔
1072
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
3✔
1073
                }
3✔
1074
        }
1075

1076
        select {
3✔
1077
        case req.respChan <- resps:
3✔
1078
        case <-s.quit:
×
1079
                log.Debug("Skipped sending pending sweep response due to " +
×
1080
                        "UtxoSweeper shutting down")
×
1081
        }
1082

1083
        return resps
3✔
1084
}
1085

1086
// UpdateParams allows updating the sweep parameters of a pending input in the
1087
// UtxoSweeper. This function can be used to provide an updated fee preference
1088
// and force flag that will be used for a new sweep transaction of the input
1089
// that will act as a replacement transaction (RBF) of the original sweeping
1090
// transaction, if any. The exclusive group is left unchanged.
1091
//
1092
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1093
// is actually successful. The responsibility of doing so should be handled by
1094
// the caller.
1095
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
1096
        params Params) (chan Result, error) {
3✔
1097

3✔
1098
        responseChan := make(chan *updateResp, 1)
3✔
1099
        select {
3✔
1100
        case s.updateReqs <- &updateReq{
1101
                input:        input,
1102
                params:       params,
1103
                responseChan: responseChan,
1104
        }:
3✔
1105
        case <-s.quit:
×
1106
                return nil, ErrSweeperShuttingDown
×
1107
        }
1108

1109
        select {
3✔
1110
        case response := <-responseChan:
3✔
1111
                return response.resultChan, response.err
3✔
1112
        case <-s.quit:
×
1113
                return nil, ErrSweeperShuttingDown
×
1114
        }
1115
}
1116

1117
// handleUpdateReq handles an update request by simply updating the sweep
1118
// parameters of the pending input. Currently, no validation is done on the new
1119
// fee preference to ensure it will properly create a replacement transaction.
1120
//
1121
// TODO(wilmer):
1122
//   - Validate fee preference to ensure we'll create a valid replacement
1123
//     transaction to allow the new fee rate to propagate throughout the
1124
//     network.
1125
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1126
//     did not exist in the original sweep transaction, resulting in an invalid
1127
//     replacement transaction.
1128
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
1129
        chan Result, error) {
3✔
1130

3✔
1131
        // If the UtxoSweeper is already trying to sweep this input, then we can
3✔
1132
        // simply just increase its fee rate. This will allow the input to be
3✔
1133
        // batched with others which also have a similar fee rate, creating a
3✔
1134
        // higher fee rate transaction that replaces the original input's
3✔
1135
        // sweeping transaction.
3✔
1136
        sweeperInput, ok := s.inputs[req.input]
3✔
1137
        if !ok {
3✔
1138
                return nil, lnwallet.ErrNotMine
×
1139
        }
×
1140

1141
        // Create the updated parameters struct. Leave the exclusive group
1142
        // unchanged.
1143
        newParams := Params{
3✔
1144
                StartingFeeRate: req.params.StartingFeeRate,
3✔
1145
                Immediate:       req.params.Immediate,
3✔
1146
                Budget:          req.params.Budget,
3✔
1147
                DeadlineHeight:  req.params.DeadlineHeight,
3✔
1148
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
3✔
1149
        }
3✔
1150

3✔
1151
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
3✔
1152
                req.input, sweeperInput.state, sweeperInput.params, newParams)
3✔
1153

3✔
1154
        sweeperInput.params = newParams
3✔
1155

3✔
1156
        // We need to reset the state so this input will be attempted again by
3✔
1157
        // our sweeper.
3✔
1158
        //
3✔
1159
        // TODO(yy): a dedicated state?
3✔
1160
        sweeperInput.state = Init
3✔
1161

3✔
1162
        // If the new input specifies a deadline, update the deadline height.
3✔
1163
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
3✔
1164
                sweeperInput.DeadlineHeight,
3✔
1165
        )
3✔
1166

3✔
1167
        resultChan := make(chan Result, 1)
3✔
1168
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
3✔
1169

3✔
1170
        return resultChan, nil
3✔
1171
}
1172

1173
// ListSweeps returns a list of the sweeps recorded by the sweep store.
1174
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
3✔
1175
        return s.cfg.Store.ListSweeps()
3✔
1176
}
3✔
1177

1178
// mempoolLookup takes an input's outpoint and queries the mempool to see
1179
// whether it's already been spent in a transaction found in the mempool.
1180
// Returns the transaction if found.
1181
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
3✔
1182
        // For neutrino backend, there's no mempool available, so we exit
3✔
1183
        // early.
3✔
1184
        if s.cfg.Mempool == nil {
4✔
1185
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1186

1✔
1187
                return fn.None[wire.MsgTx]()
1✔
1188
        }
1✔
1189

1190
        // Query this input in the mempool. If this outpoint is already spent
1191
        // in mempool, we should get a spending event back immediately.
1192
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
2✔
1193
}
1194

1195
// calculateDefaultDeadline calculates the default deadline height for a sweep
1196
// request that has no deadline height specified.
1197
func (s *UtxoSweeper) calculateDefaultDeadline(pi *SweeperInput) int32 {
3✔
1198
        // Create a default deadline height, which will be used when there's no
3✔
1199
        // DeadlineHeight specified for a given input.
3✔
1200
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
3✔
1201

3✔
1202
        // If the input is immature and has a locktime, we'll use the locktime
3✔
1203
        // height as the starting height.
3✔
1204
        matured, locktime := pi.isMature(uint32(s.currentHeight))
3✔
1205
        if !matured {
6✔
1206
                defaultDeadline = int32(locktime + s.cfg.NoDeadlineConfTarget)
3✔
1207
                log.Debugf("Input %v is immature, using locktime=%v instead "+
3✔
1208
                        "of current height=%d as starting height",
3✔
1209
                        pi.OutPoint(), locktime, s.currentHeight)
3✔
1210
        }
3✔
1211

1212
        return defaultDeadline
3✔
1213
}
1214

1215
// handleNewInput processes a new input by registering spend notification and
1216
// scheduling sweeping for it.
1217
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
3✔
1218
        outpoint := input.input.OutPoint()
3✔
1219
        pi, pending := s.inputs[outpoint]
3✔
1220
        if pending {
6✔
1221
                log.Infof("Already has pending input %v received, old params: "+
3✔
1222
                        "%v, new params %v", outpoint, pi.params, input.params)
3✔
1223

3✔
1224
                s.handleExistingInput(input, pi)
3✔
1225

3✔
1226
                return nil
3✔
1227
        }
3✔
1228

1229
        // This is a new input, and we want to query the mempool to see if this
1230
        // input has already been spent. If so, we'll start the input with
1231
        // state Published and attach the RBFInfo.
1232
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
3✔
1233

3✔
1234
        // Create a new pendingInput and initialize the listeners slice with
3✔
1235
        // the passed in result channel. If this input is offered for sweep
3✔
1236
        // again, the result channel will be appended to this slice.
3✔
1237
        pi = &SweeperInput{
3✔
1238
                state:     state,
3✔
1239
                listeners: []chan Result{input.resultChan},
3✔
1240
                Input:     input.input,
3✔
1241
                params:    input.params,
3✔
1242
                rbf:       rbfInfo,
3✔
1243
        }
3✔
1244

3✔
1245
        // Set the acutal deadline height.
3✔
1246
        pi.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1247
                s.calculateDefaultDeadline(pi),
3✔
1248
        )
3✔
1249

3✔
1250
        s.inputs[outpoint] = pi
3✔
1251
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
3✔
1252

3✔
1253
        log.Infof("Registered sweep request at block %d: out_point=%v, "+
3✔
1254
                "witness_type=%v, amount=%v, deadline=%d, state=%v, "+
3✔
1255
                "params=(%v)", s.currentHeight, pi.OutPoint(), pi.WitnessType(),
3✔
1256
                btcutil.Amount(pi.SignDesc().Output.Value), pi.DeadlineHeight,
3✔
1257
                pi.state, pi.params)
3✔
1258

3✔
1259
        // Start watching for spend of this input, either by us or the remote
3✔
1260
        // party.
3✔
1261
        cancel, err := s.monitorSpend(
3✔
1262
                outpoint, input.input.SignDesc().Output.PkScript,
3✔
1263
                input.input.HeightHint(),
3✔
1264
        )
3✔
1265
        if err != nil {
3✔
1266
                err := fmt.Errorf("wait for spend: %w", err)
×
1267
                s.markInputFailed(pi, err)
×
1268

×
1269
                return err
×
1270
        }
×
1271

1272
        pi.ntfnRegCancel = cancel
3✔
1273

3✔
1274
        return nil
3✔
1275
}
1276

1277
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1278
// already been spent. If so, the state Published will be returned, otherwise
1279
// state Init. When spent, it will query the sweeper store to fetch the fee
1280
// info of the spending transction, and construct an RBFInfo based on it.
1281
// Suppose an error occurs, fn.None is returned.
1282
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1283
        SweepState, fn.Option[RBFInfo]) {
3✔
1284

3✔
1285
        // Check if we can find the spending tx of this input in mempool.
3✔
1286
        txOption := s.mempoolLookup(op)
3✔
1287

3✔
1288
        // Extract the spending tx from the option.
3✔
1289
        var tx *wire.MsgTx
3✔
1290
        txOption.WhenSome(func(t wire.MsgTx) {
5✔
1291
                tx = &t
2✔
1292
        })
2✔
1293

1294
        // Exit early if it's not found.
1295
        //
1296
        // NOTE: this is not accurate for backends that don't support mempool
1297
        // lookup:
1298
        // - for neutrino we don't have a mempool.
1299
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1300
        if tx == nil {
6✔
1301
                return Init, fn.None[RBFInfo]()
3✔
1302
        }
3✔
1303

1304
        // Otherwise the input is already spent in the mempool, so eventually
1305
        // we will return Published.
1306
        //
1307
        // We also need to update the RBF info for this input. If the sweeping
1308
        // transaction is broadcast by us, we can find the fee info in the
1309
        // sweeper store.
1310
        txid := tx.TxHash()
2✔
1311
        tr, err := s.cfg.Store.GetTx(txid)
2✔
1312

2✔
1313
        // If the tx is not found in the store, it means it's not broadcast by
2✔
1314
        // us, hence we can't find the fee info. This is fine as, later on when
2✔
1315
        // this tx is confirmed, we will remove the input from our inputs.
2✔
1316
        if errors.Is(err, ErrTxNotFound) {
4✔
1317
                log.Warnf("Spending tx %v not found in sweeper store", txid)
2✔
1318
                return Published, fn.None[RBFInfo]()
2✔
1319
        }
2✔
1320

1321
        // Exit if we get an db error.
1322
        if err != nil {
2✔
1323
                log.Errorf("Unable to get tx %v from sweeper store: %v",
×
1324
                        txid, err)
×
1325

×
1326
                return Published, fn.None[RBFInfo]()
×
1327
        }
×
1328

1329
        // Prepare the fee info and return it.
1330
        rbf := fn.Some(RBFInfo{
2✔
1331
                Txid:    txid,
2✔
1332
                Fee:     btcutil.Amount(tr.Fee),
2✔
1333
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
2✔
1334
        })
2✔
1335

2✔
1336
        return Published, rbf
2✔
1337
}
1338

1339
// handleExistingInput processes an input that is already known to the sweeper.
1340
// It will overwrite the params of the old input with the new ones.
1341
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
1342
        oldInput *SweeperInput) {
3✔
1343

3✔
1344
        // Before updating the input details, check if an exclusive group was
3✔
1345
        // set. In case the same input is registered again without an exclusive
3✔
1346
        // group set, the previous input and its sweep parameters are outdated
3✔
1347
        // hence need to be replaced. This scenario currently only happens for
3✔
1348
        // anchor outputs. When a channel is force closed, in the worst case 3
3✔
1349
        // different sweeps with the same exclusive group are registered with
3✔
1350
        // the sweeper to bump the closing transaction (cpfp) when its time
3✔
1351
        // critical. Receiving an input which was already registered with the
3✔
1352
        // sweeper but now without an exclusive group means non of the previous
3✔
1353
        // inputs were used as CPFP, so we need to make sure we update the
3✔
1354
        // sweep parameters but also remove all inputs with the same exclusive
3✔
1355
        // group because the are outdated too.
3✔
1356
        var prevExclGroup *uint64
3✔
1357
        if oldInput.params.ExclusiveGroup != nil &&
3✔
1358
                input.params.ExclusiveGroup == nil {
6✔
1359

3✔
1360
                prevExclGroup = new(uint64)
3✔
1361
                *prevExclGroup = *oldInput.params.ExclusiveGroup
3✔
1362
        }
3✔
1363

1364
        // Update input details and sweep parameters. The re-offered input
1365
        // details may contain a change to the unconfirmed parent tx info.
1366
        oldInput.params = input.params
3✔
1367
        oldInput.Input = input.input
3✔
1368

3✔
1369
        // If the new input specifies a deadline, update the deadline height.
3✔
1370
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1371
                oldInput.DeadlineHeight,
3✔
1372
        )
3✔
1373

3✔
1374
        // Add additional result channel to signal spend of this input.
3✔
1375
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
3✔
1376

3✔
1377
        if prevExclGroup != nil {
6✔
1378
                s.removeExclusiveGroup(*prevExclGroup)
3✔
1379
        }
3✔
1380
}
1381

1382
// handleInputSpent takes a spend event of our input and updates the sweeper's
1383
// internal state to remove the input.
1384
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
3✔
1385
        // Query store to find out if we ever published this tx.
3✔
1386
        spendHash := *spend.SpenderTxHash
3✔
1387
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
3✔
1388
        if err != nil {
3✔
1389
                log.Errorf("cannot determine if tx %v is ours: %v",
×
1390
                        spendHash, err)
×
1391
                return
×
1392
        }
×
1393

1394
        // If this isn't our transaction, it means someone else swept outputs
1395
        // that we were attempting to sweep. This can happen for anchor outputs
1396
        // as well as justice transactions. In this case, we'll notify the
1397
        // wallet to remove any spends that descent from this output.
1398
        if !isOurTx {
6✔
1399
                // Construct a map of the inputs this transaction spends.
3✔
1400
                spendingTx := spend.SpendingTx
3✔
1401
                inputsSpent := make(
3✔
1402
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
3✔
1403
                )
3✔
1404
                for _, txIn := range spendingTx.TxIn {
6✔
1405
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
3✔
1406
                }
3✔
1407

1408
                log.Debugf("Attempting to remove descendant txns invalidated "+
3✔
1409
                        "by (txid=%v): %v", spendingTx.TxHash(),
3✔
1410
                        spew.Sdump(spendingTx))
3✔
1411

3✔
1412
                err := s.removeConflictSweepDescendants(inputsSpent)
3✔
1413
                if err != nil {
3✔
1414
                        log.Warnf("unable to remove descendant transactions "+
×
1415
                                "due to tx %v: ", spendHash)
×
1416
                }
×
1417

1418
                log.Debugf("Detected third party spend related to in flight "+
3✔
1419
                        "inputs (is_ours=%v): %v", isOurTx,
3✔
1420
                        lnutils.SpewLogClosure(spend.SpendingTx))
3✔
1421
        }
1422

1423
        // We now use the spending tx to update the state of the inputs.
1424
        s.markInputsSwept(spend.SpendingTx, isOurTx)
3✔
1425
}
1426

1427
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1428
// It will also notify all the subscribers of this input.
1429
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
3✔
1430
        for _, txIn := range tx.TxIn {
6✔
1431
                outpoint := txIn.PreviousOutPoint
3✔
1432

3✔
1433
                // Check if this input is known to us. It could probably be
3✔
1434
                // unknown if we canceled the registration, deleted from inputs
3✔
1435
                // map but the ntfn was in-flight already. Or this could be not
3✔
1436
                // one of our inputs.
3✔
1437
                input, ok := s.inputs[outpoint]
3✔
1438
                if !ok {
6✔
1439
                        // It's very likely that a spending tx contains inputs
3✔
1440
                        // that we don't know.
3✔
1441
                        log.Tracef("Skipped marking input as swept: %v not "+
3✔
1442
                                "found in pending inputs", outpoint)
3✔
1443

3✔
1444
                        continue
3✔
1445
                }
1446

1447
                // This input may already been marked as swept by a previous
1448
                // spend notification, which is likely to happen as one sweep
1449
                // transaction usually sweeps multiple inputs.
1450
                if input.terminated() {
3✔
1451
                        log.Debugf("Skipped marking input as swept: %v "+
×
1452
                                "state=%v", outpoint, input.state)
×
1453

×
1454
                        continue
×
1455
                }
1456

1457
                input.state = Swept
3✔
1458

3✔
1459
                // Return either a nil or a remote spend result.
3✔
1460
                var err error
3✔
1461
                if !isOurTx {
6✔
1462
                        log.Warnf("Input=%v was spent by remote or third "+
3✔
1463
                                "party in tx=%v", outpoint, tx.TxHash())
3✔
1464
                        err = ErrRemoteSpend
3✔
1465
                }
3✔
1466

1467
                // Signal result channels.
1468
                s.signalResult(input, Result{
3✔
1469
                        Tx:  tx,
3✔
1470
                        Err: err,
3✔
1471
                })
3✔
1472

3✔
1473
                // Remove all other inputs in this exclusive group.
3✔
1474
                if input.params.ExclusiveGroup != nil {
6✔
1475
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
3✔
1476
                }
3✔
1477
        }
1478
}
1479

1480
// markInputFailed marks the given input as failed and won't be retried. It
1481
// will also notify all the subscribers of this input.
1482
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
2✔
1483
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
2✔
1484

2✔
1485
        pi.state = Failed
2✔
1486

2✔
1487
        s.signalResult(pi, Result{Err: err})
2✔
1488
}
2✔
1489

1490
// updateSweeperInputs updates the sweeper's internal state and returns a map
1491
// of inputs to be swept. It will remove the inputs that are in final states,
1492
// and returns a map of inputs that have either state Init or PublishFailed.
1493
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
3✔
1494
        // Create a map of inputs to be swept.
3✔
1495
        inputs := make(InputsMap)
3✔
1496

3✔
1497
        // Iterate the pending inputs and update the sweeper's state.
3✔
1498
        //
3✔
1499
        // TODO(yy): sweeper is made to communicate via go channels, so no
3✔
1500
        // locks are needed to access the map. However, it'd be safer if we
3✔
1501
        // turn this inputs map into a SyncMap in case we wanna add concurrent
3✔
1502
        // access to the map in the future.
3✔
1503
        for op, input := range s.inputs {
6✔
1504
                log.Tracef("Checking input: %s, state=%v", input, input.state)
3✔
1505

3✔
1506
                // If the input has reached a final state, that it's either
3✔
1507
                // been swept, or failed, or excluded, we will remove it from
3✔
1508
                // our sweeper.
3✔
1509
                if input.terminated() {
6✔
1510
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1511
                                input.state, op)
3✔
1512

3✔
1513
                        delete(s.inputs, op)
3✔
1514

3✔
1515
                        continue
3✔
1516
                }
1517

1518
                // If this input has been included in a sweep tx that's not
1519
                // published yet, we'd skip this input and wait for the sweep
1520
                // tx to be published.
1521
                if input.state == PendingPublish {
6✔
1522
                        continue
3✔
1523
                }
1524

1525
                // If this input has already been published, we will need to
1526
                // check the RBF condition before attempting another sweeping.
1527
                if input.state == Published {
6✔
1528
                        continue
3✔
1529
                }
1530

1531
                // If the input has a locktime that's not yet reached, we will
1532
                // skip this input and wait for the locktime to be reached.
1533
                mature, _ := input.isMature(uint32(s.currentHeight))
3✔
1534
                if !mature {
6✔
1535
                        continue
3✔
1536
                }
1537

1538
                // If this input is new or has been failed to be published,
1539
                // we'd retry it. The assumption here is that when an error is
1540
                // returned from `PublishTransaction`, it means the tx has
1541
                // failed to meet the policy, hence it's not in the mempool.
1542
                inputs[op] = input
3✔
1543
        }
1544

1545
        return inputs
3✔
1546
}
1547

1548
// sweepPendingInputs is called when the ticker fires. It will create clusters
1549
// and attempt to create and publish the sweeping transactions.
1550
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
3✔
1551
        log.Debugf("Sweeping %v inputs", len(inputs))
3✔
1552

3✔
1553
        // Cluster all of our inputs based on the specific Aggregator.
3✔
1554
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
3✔
1555

3✔
1556
        // sweepWithLock is a helper closure that executes the sweep within a
3✔
1557
        // coin select lock to prevent the coins being selected for other
3✔
1558
        // transactions like funding of a channel.
3✔
1559
        sweepWithLock := func(set InputSet) error {
6✔
1560
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
6✔
1561
                        // Try to add inputs from our wallet.
3✔
1562
                        err := set.AddWalletInputs(s.cfg.Wallet)
3✔
1563
                        if err != nil {
6✔
1564
                                return err
3✔
1565
                        }
3✔
1566

1567
                        // Create sweeping transaction for each set.
1568
                        err = s.sweep(set)
3✔
1569
                        if err != nil {
3✔
1570
                                return err
×
1571
                        }
×
1572

1573
                        return nil
3✔
1574
                })
1575
        }
1576

1577
        for _, set := range sets {
6✔
1578
                var err error
3✔
1579
                if set.NeedWalletInput() {
6✔
1580
                        // Sweep the set of inputs that need the wallet inputs.
3✔
1581
                        err = sweepWithLock(set)
3✔
1582
                } else {
6✔
1583
                        // Sweep the set of inputs that don't need the wallet
3✔
1584
                        // inputs.
3✔
1585
                        err = s.sweep(set)
3✔
1586
                }
3✔
1587

1588
                if err != nil {
6✔
1589
                        log.Errorf("Failed to sweep %v: %v", set, err)
3✔
1590
                }
3✔
1591
        }
1592
}
1593

1594
// bumpResp wraps the result of a bump attempt returned from the fee bumper and
1595
// the inputs being used.
1596
type bumpResp struct {
1597
        // result is the result of the bump attempt returned from the fee
1598
        // bumper.
1599
        result *BumpResult
1600

1601
        // set is the input set that was used in the bump attempt.
1602
        set InputSet
1603
}
1604

1605
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1606
// future updates about the sweeping tx.
1607
//
1608
// NOTE: must run as a goroutine.
1609
func (s *UtxoSweeper) monitorFeeBumpResult(set InputSet,
1610
        resultChan <-chan *BumpResult) {
3✔
1611

3✔
1612
        defer s.wg.Done()
3✔
1613

3✔
1614
        for {
6✔
1615
                select {
3✔
1616
                case r := <-resultChan:
3✔
1617
                        // Validate the result is valid.
3✔
1618
                        if err := r.Validate(); err != nil {
3✔
1619
                                log.Errorf("Received invalid result: %v", err)
×
1620
                                continue
×
1621
                        }
1622

1623
                        resp := &bumpResp{
3✔
1624
                                result: r,
3✔
1625
                                set:    set,
3✔
1626
                        }
3✔
1627

3✔
1628
                        // Send the result back to the main event loop.
3✔
1629
                        select {
3✔
1630
                        case s.bumpRespChan <- resp:
3✔
1631
                        case <-s.quit:
×
1632
                                log.Debug("Sweeper shutting down, skip " +
×
1633
                                        "sending bump result")
×
1634

×
1635
                                return
×
1636
                        }
1637

1638
                        // The sweeping tx has been confirmed, we can exit the
1639
                        // monitor now.
1640
                        //
1641
                        // TODO(yy): can instead remove the spend subscription
1642
                        // in sweeper and rely solely on this event to mark
1643
                        // inputs as Swept?
1644
                        if r.Event == TxConfirmed || r.Event == TxFailed {
6✔
1645
                                // Exit if the tx is failed to be created.
3✔
1646
                                if r.Tx == nil {
6✔
1647
                                        log.Debugf("Received %v for nil tx, "+
3✔
1648
                                                "exit monitor", r.Event)
3✔
1649

3✔
1650
                                        return
3✔
1651
                                }
3✔
1652

1653
                                log.Debugf("Received %v for sweep tx %v, exit "+
3✔
1654
                                        "fee bump monitor", r.Event,
3✔
1655
                                        r.Tx.TxHash())
3✔
1656

3✔
1657
                                // Cancel the rebroadcasting of the failed tx.
3✔
1658
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
3✔
1659

3✔
1660
                                return
3✔
1661
                        }
1662

1663
                case <-s.quit:
3✔
1664
                        log.Debugf("Sweeper shutting down, exit fee " +
3✔
1665
                                "bump handler")
3✔
1666

3✔
1667
                        return
3✔
1668
                }
1669
        }
1670
}
1671

1672
// handleBumpEventTxFailed handles the case where the tx has been failed to
1673
// publish.
1674
func (s *UtxoSweeper) handleBumpEventTxFailed(resp *bumpResp) {
3✔
1675
        r := resp.result
3✔
1676
        tx, err := r.Tx, r.Err
3✔
1677

3✔
1678
        if tx != nil {
6✔
1679
                log.Warnf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(),
3✔
1680
                        err)
3✔
1681
        }
3✔
1682

1683
        // NOTE: When marking the inputs as failed, we are using the input set
1684
        // instead of the inputs found in the tx. This is fine for current
1685
        // version of the sweeper because we always create a tx using ALL of
1686
        // the inputs specified by the set.
1687
        //
1688
        // TODO(yy): should we also remove the failed tx from db?
1689
        s.markInputsPublishFailed(resp.set)
3✔
1690
}
1691

1692
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1693
// replaced by a new one.
1694
func (s *UtxoSweeper) handleBumpEventTxReplaced(resp *bumpResp) error {
3✔
1695
        r := resp.result
3✔
1696
        oldTx := r.ReplacedTx
3✔
1697
        newTx := r.Tx
3✔
1698

3✔
1699
        // Prepare a new record to replace the old one.
3✔
1700
        tr := &TxRecord{
3✔
1701
                Txid:    newTx.TxHash(),
3✔
1702
                FeeRate: uint64(r.FeeRate),
3✔
1703
                Fee:     uint64(r.Fee),
3✔
1704
        }
3✔
1705

3✔
1706
        // Get the old record for logging purpose.
3✔
1707
        oldTxid := oldTx.TxHash()
3✔
1708
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1709
        if err != nil {
4✔
1710
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1711
                return err
1✔
1712
        }
1✔
1713

1714
        // Cancel the rebroadcasting of the replaced tx.
1715
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
3✔
1716

3✔
1717
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
3✔
1718
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
3✔
1719
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
3✔
1720

3✔
1721
        // The old sweeping tx has been replaced by a new one, we will update
3✔
1722
        // the tx record in the sweeper db.
3✔
1723
        //
3✔
1724
        // TODO(yy): we may also need to update the inputs in this tx to a new
3✔
1725
        // state. Suppose a replacing tx only spends a subset of the inputs
3✔
1726
        // here, we'd end up with the rest being marked as `Published` and
3✔
1727
        // won't be aggregated in the next sweep. Atm it's fine as we always
3✔
1728
        // RBF the same input set.
3✔
1729
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1730
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
×
1731
                return err
×
1732
        }
×
1733

1734
        // Mark the inputs as published using the replacing tx.
1735
        return s.markInputsPublished(tr, resp.set)
3✔
1736
}
1737

1738
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1739
// successfully published.
1740
func (s *UtxoSweeper) handleBumpEventTxPublished(resp *bumpResp) error {
3✔
1741
        r := resp.result
3✔
1742
        tx := r.Tx
3✔
1743
        tr := &TxRecord{
3✔
1744
                Txid:    tx.TxHash(),
3✔
1745
                FeeRate: uint64(r.FeeRate),
3✔
1746
                Fee:     uint64(r.Fee),
3✔
1747
        }
3✔
1748

3✔
1749
        // Inputs have been successfully published so we update their
3✔
1750
        // states.
3✔
1751
        err := s.markInputsPublished(tr, resp.set)
3✔
1752
        if err != nil {
3✔
1753
                return err
×
1754
        }
×
1755

1756
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
3✔
1757
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
3✔
1758

3✔
1759
        // If there's no error, remove the output script. Otherwise keep it so
3✔
1760
        // that it can be reused for the next transaction and causes no address
3✔
1761
        // inflation.
3✔
1762
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
3✔
1763

3✔
1764
        return nil
3✔
1765
}
1766

1767
// handleBumpEventTxFatal handles the case where there's an unexpected error
1768
// when creating or publishing the sweeping tx. In this case, the tx will be
1769
// removed from the sweeper store and the inputs will be marked as `Failed`,
1770
// which means they will not be retried.
1771
func (s *UtxoSweeper) handleBumpEventTxFatal(resp *bumpResp) error {
2✔
1772
        r := resp.result
2✔
1773

2✔
1774
        // Remove the tx from the sweeper store if there is one. Since this is
2✔
1775
        // a broadcast error, it's likely there isn't a tx here.
2✔
1776
        if r.Tx != nil {
2✔
1777
                txid := r.Tx.TxHash()
×
1778
                log.Infof("Tx=%v failed with unexpected error: %v", txid, r.Err)
×
1779

×
1780
                // Remove the tx from the sweeper db if it exists.
×
1781
                if err := s.cfg.Store.DeleteTx(txid); err != nil {
×
1782
                        return fmt.Errorf("delete tx record for %v: %w", txid,
×
1783
                                err)
×
1784
                }
×
1785
        }
1786

1787
        // Mark the inputs as failed.
1788
        s.markInputsFailed(resp.set, r.Err)
2✔
1789

2✔
1790
        return nil
2✔
1791
}
1792

1793
// markInputsFailed marks all inputs found in the tx as failed. It will also
1794
// notify all the subscribers of these inputs.
1795
func (s *UtxoSweeper) markInputsFailed(set InputSet, err error) {
2✔
1796
        for _, inp := range set.Inputs() {
4✔
1797
                outpoint := inp.OutPoint()
2✔
1798

2✔
1799
                input, ok := s.inputs[outpoint]
2✔
1800
                if !ok {
4✔
1801
                        // It's very likely that a spending tx contains inputs
2✔
1802
                        // that we don't know.
2✔
1803
                        log.Tracef("Skipped marking input as failed: %v not "+
2✔
1804
                                "found in pending inputs", outpoint)
2✔
1805

2✔
1806
                        continue
2✔
1807
                }
1808

1809
                // If the input is already in a terminal state, we don't want
1810
                // to rewrite it, which also indicates an error as we only get
1811
                // an error event during the initial broadcast.
1812
                if input.terminated() {
2✔
1813
                        log.Errorf("Skipped marking input=%v as failed due to "+
×
1814
                                "unexpected state=%v", outpoint, input.state)
×
1815

×
1816
                        continue
×
1817
                }
1818

1819
                s.markInputFailed(input, err)
2✔
1820
        }
1821
}
1822

1823
// handleBumpEvent handles the result sent from the bumper based on its event
1824
// type.
1825
//
1826
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1827
// input's spending event, we don't need to do anything here.
1828
func (s *UtxoSweeper) handleBumpEvent(r *bumpResp) error {
3✔
1829
        log.Debugf("Received bump result %v", r.result)
3✔
1830

3✔
1831
        switch r.result.Event {
3✔
1832
        // The tx has been published, we update the inputs' state and create a
1833
        // record to be stored in the sweeper db.
1834
        case TxPublished:
3✔
1835
                return s.handleBumpEventTxPublished(r)
3✔
1836

1837
        // The tx has failed, we update the inputs' state.
1838
        case TxFailed:
3✔
1839
                s.handleBumpEventTxFailed(r)
3✔
1840
                return nil
3✔
1841

1842
        // The tx has been replaced, we will remove the old tx and replace it
1843
        // with the new one.
1844
        case TxReplaced:
3✔
1845
                return s.handleBumpEventTxReplaced(r)
3✔
1846

1847
        // There's a fatal error in creating the tx, we will remove the tx from
1848
        // the sweeper db and mark the inputs as failed.
1849
        case TxFatal:
2✔
1850
                return s.handleBumpEventTxFatal(r)
2✔
1851
        }
1852

1853
        return nil
3✔
1854
}
1855

1856
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1857
//
1858
// NOTE: It is enough to check the txid because the sweeper will create
1859
// outpoints which solely belong to the internal LND wallet.
1860
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
3✔
1861
        found, err := s.cfg.Store.IsOurTx(op.Hash)
3✔
1862
        // In case there is an error fetching the transaction details from the
3✔
1863
        // sweeper store we assume the outpoint is still used by the sweeper
3✔
1864
        // (worst case scenario).
3✔
1865
        //
3✔
1866
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
3✔
1867
        // bucket.
3✔
1868
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
3✔
1869
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1870
                        "with: %v, we assume it is still in use by the sweeper",
×
1871
                        op.Hash, op.Index, err)
×
1872

×
1873
                return true
×
1874
        }
×
1875

1876
        return found
3✔
1877
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc