• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.34
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn"
15
        "github.com/lightningnetwork/lnd/input"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwallet"
18
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
19
)
20

21
var (
22
        // ErrRemoteSpend is returned in case an output that we try to sweep is
23
        // confirmed in a tx of the remote party.
24
        ErrRemoteSpend = errors.New("remote party swept utxo")
25

26
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
27
        // fee rate that's below the relay fee rate.
28
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
29

30
        // ErrExclusiveGroupSpend is returned in case a different input of the
31
        // same exclusive group was spent.
32
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
33
                "was spent")
34

35
        // ErrSweeperShuttingDown is an error returned when a client attempts to
36
        // make a request to the UtxoSweeper, but it is unable to handle it as
37
        // it is/has already been stopped.
38
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
39

40
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
41
        // used when sweeping inputs with no deadline pressure.
42
        DefaultDeadlineDelta = int32(1008)
43
)
44

45
// Params contains the parameters that control the sweeping process.
46
type Params struct {
47
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
48
        // with the same identifier from being batched together.
49
        ExclusiveGroup *uint64
50

51
        // DeadlineHeight specifies an absolute block height that this input
52
        // should be confirmed by. This value is used by the fee bumper to
53
        // decide its urgency and adjust its feerate used.
54
        DeadlineHeight fn.Option[int32]
55

56
        // Budget specifies the maximum amount of satoshis that can be spent on
57
        // fees for this sweep.
58
        Budget btcutil.Amount
59

60
        // Immediate indicates that the input should be swept immediately
61
        // without waiting for blocks to come to trigger the sweeping of
62
        // inputs.
63
        Immediate bool
64

65
        // StartingFeeRate is an optional parameter that can be used to specify
66
        // the initial fee rate to use for the fee function.
67
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
68
}
69

70
// String returns a human readable interpretation of the sweep parameters.
UNCOV
71
func (p Params) String() string {
×
UNCOV
72
        deadline := "none"
×
UNCOV
73
        p.DeadlineHeight.WhenSome(func(d int32) {
×
UNCOV
74
                deadline = fmt.Sprintf("%d", d)
×
UNCOV
75
        })
×
76

UNCOV
77
        exclusiveGroup := "none"
×
UNCOV
78
        if p.ExclusiveGroup != nil {
×
UNCOV
79
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
×
UNCOV
80
        }
×
81

UNCOV
82
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
×
UNCOV
83
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
×
UNCOV
84
                p.Immediate, exclusiveGroup, p.Budget, deadline)
×
85
}
86

87
// SweepState represents the current state of a pending input.
88
//
89
//nolint:revive
90
type SweepState uint8
91

92
const (
93
        // Init is the initial state of a pending input. This is set when a new
94
        // sweeping request for a given input is made.
95
        Init SweepState = iota
96

97
        // PendingPublish specifies an input's state where it's already been
98
        // included in a sweeping tx but the tx is not published yet.  Inputs
99
        // in this state should not be used for grouping again.
100
        PendingPublish
101

102
        // Published is the state where the input's sweeping tx has
103
        // successfully been published. Inputs in this state can only be
104
        // updated via RBF.
105
        Published
106

107
        // PublishFailed is the state when an error is returned from publishing
108
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
109
        // sweeping tx.
110
        PublishFailed
111

112
        // Swept is the final state of a pending input. This is set when the
113
        // input has been successfully swept.
114
        Swept
115

116
        // Excluded is the state of a pending input that has been excluded and
117
        // can no longer be swept. For instance, when one of the three anchor
118
        // sweeping transactions confirmed, the remaining two will be excluded.
119
        Excluded
120

121
        // Failed is the state when a pending input has too many failed publish
122
        // atttempts or unknown broadcast error is returned.
123
        Failed
124
)
125

126
// String gives a human readable text for the sweep states.
UNCOV
127
func (s SweepState) String() string {
×
UNCOV
128
        switch s {
×
UNCOV
129
        case Init:
×
UNCOV
130
                return "Init"
×
131

UNCOV
132
        case PendingPublish:
×
UNCOV
133
                return "PendingPublish"
×
134

UNCOV
135
        case Published:
×
UNCOV
136
                return "Published"
×
137

UNCOV
138
        case PublishFailed:
×
UNCOV
139
                return "PublishFailed"
×
140

UNCOV
141
        case Swept:
×
UNCOV
142
                return "Swept"
×
143

UNCOV
144
        case Excluded:
×
UNCOV
145
                return "Excluded"
×
146

147
        case Failed:
×
148
                return "Failed"
×
149

150
        default:
×
151
                return "Unknown"
×
152
        }
153
}
154

155
// RBFInfo stores the information required to perform a RBF bump on a pending
156
// sweeping tx.
157
type RBFInfo struct {
158
        // Txid is the txid of the sweeping tx.
159
        Txid chainhash.Hash
160

161
        // FeeRate is the fee rate of the sweeping tx.
162
        FeeRate chainfee.SatPerKWeight
163

164
        // Fee is the total fee of the sweeping tx.
165
        Fee btcutil.Amount
166
}
167

168
// SweeperInput is created when an input reaches the main loop for the first
169
// time. It wraps the input and tracks all relevant state that is needed for
170
// sweeping.
171
type SweeperInput struct {
172
        input.Input
173

174
        // state tracks the current state of the input.
175
        state SweepState
176

177
        // listeners is a list of channels over which the final outcome of the
178
        // sweep needs to be broadcasted.
179
        listeners []chan Result
180

181
        // ntfnRegCancel is populated with a function that cancels the chain
182
        // notifier spend registration.
183
        ntfnRegCancel func()
184

185
        // publishAttempts records the number of attempts that have already been
186
        // made to sweep this tx.
187
        publishAttempts int
188

189
        // params contains the parameters that control the sweeping process.
190
        params Params
191

192
        // lastFeeRate is the most recent fee rate used for this input within a
193
        // transaction broadcast to the network.
194
        lastFeeRate chainfee.SatPerKWeight
195

196
        // rbf records the RBF constraints.
197
        rbf fn.Option[RBFInfo]
198

199
        // DeadlineHeight is the deadline height for this input. This is
200
        // different from the DeadlineHeight in its params as it's an actual
201
        // value than an option.
202
        DeadlineHeight int32
203
}
204

205
// String returns a human readable interpretation of the pending input.
206
func (p *SweeperInput) String() string {
26✔
207
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
26✔
208
}
26✔
209

210
// terminated returns a boolean indicating whether the input has reached a
211
// final state.
212
func (p *SweeperInput) terminated() bool {
15✔
213
        switch p.state {
15✔
214
        // If the input has reached a final state, that it's either
215
        // been swept, or failed, or excluded, we will remove it from
216
        // our sweeper.
217
        case Failed, Swept, Excluded:
5✔
218
                return true
5✔
219

220
        default:
10✔
221
                return false
10✔
222
        }
223
}
224

225
// InputsMap is a type alias for a set of pending inputs.
226
type InputsMap = map[wire.OutPoint]*SweeperInput
227

228
// pendingSweepsReq is an internal message we'll use to represent an external
229
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
230
// attempting to sweep.
231
type pendingSweepsReq struct {
232
        respChan chan map[wire.OutPoint]*PendingInputResponse
233
        errChan  chan error
234
}
235

236
// PendingInputResponse contains information about an input that is currently
237
// being swept by the UtxoSweeper.
238
type PendingInputResponse struct {
239
        // OutPoint is the identify outpoint of the input being swept.
240
        OutPoint wire.OutPoint
241

242
        // WitnessType is the witness type of the input being swept.
243
        WitnessType input.WitnessType
244

245
        // Amount is the amount of the input being swept.
246
        Amount btcutil.Amount
247

248
        // LastFeeRate is the most recent fee rate used for the input being
249
        // swept within a transaction broadcast to the network.
250
        LastFeeRate chainfee.SatPerKWeight
251

252
        // BroadcastAttempts is the number of attempts we've made to sweept the
253
        // input.
254
        BroadcastAttempts int
255

256
        // Params contains the sweep parameters for this pending request.
257
        Params Params
258

259
        // DeadlineHeight records the deadline height of this input.
260
        DeadlineHeight uint32
261
}
262

263
// updateReq is an internal message we'll use to represent an external caller's
264
// intent to update the sweep parameters of a given input.
265
type updateReq struct {
266
        input        wire.OutPoint
267
        params       Params
268
        responseChan chan *updateResp
269
}
270

271
// updateResp is an internal message we'll use to hand off the response of a
272
// updateReq from the UtxoSweeper's main event loop back to the caller.
273
type updateResp struct {
274
        resultChan chan Result
275
        err        error
276
}
277

278
// UtxoSweeper is responsible for sweeping outputs back into the wallet
279
type UtxoSweeper struct {
280
        started uint32 // To be used atomically.
281
        stopped uint32 // To be used atomically.
282

283
        cfg *UtxoSweeperConfig
284

285
        newInputs chan *sweepInputMessage
286
        spendChan chan *chainntnfs.SpendDetail
287

288
        // pendingSweepsReq is a channel that will be sent requests by external
289
        // callers in order to retrieve the set of pending inputs the
290
        // UtxoSweeper is attempting to sweep.
291
        pendingSweepsReqs chan *pendingSweepsReq
292

293
        // updateReqs is a channel that will be sent requests by external
294
        // callers who wish to bump the fee rate of a given input.
295
        updateReqs chan *updateReq
296

297
        // inputs is the total set of inputs the UtxoSweeper has been requested
298
        // to sweep.
299
        inputs InputsMap
300

301
        currentOutputScript []byte
302

303
        relayFeeRate chainfee.SatPerKWeight
304

305
        quit chan struct{}
306
        wg   sync.WaitGroup
307

308
        // currentHeight is the best known height of the main chain. This is
309
        // updated whenever a new block epoch is received.
310
        currentHeight int32
311

312
        // bumpResultChan is a channel that receives broadcast results from the
313
        // TxPublisher.
314
        bumpResultChan chan *BumpResult
315
}
316

317
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
318
type UtxoSweeperConfig struct {
319
        // GenSweepScript generates a P2WKH script belonging to the wallet where
320
        // funds can be swept.
321
        GenSweepScript func() ([]byte, error)
322

323
        // FeeEstimator is used when crafting sweep transactions to estimate
324
        // the necessary fee relative to the expected size of the sweep
325
        // transaction.
326
        FeeEstimator chainfee.Estimator
327

328
        // Wallet contains the wallet functions that sweeper requires.
329
        Wallet Wallet
330

331
        // Notifier is an instance of a chain notifier we'll use to watch for
332
        // certain on-chain events.
333
        Notifier chainntnfs.ChainNotifier
334

335
        // Mempool is the mempool watcher that will be used to query whether a
336
        // given input is already being spent by a transaction in the mempool.
337
        Mempool chainntnfs.MempoolWatcher
338

339
        // Store stores the published sweeper txes.
340
        Store SweeperStore
341

342
        // Signer is used by the sweeper to generate valid witnesses at the
343
        // time the incubated outputs need to be spent.
344
        Signer input.Signer
345

346
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
347
        // in a single sweep tx. If more need to be swept, multiple txes are
348
        // created and published.
349
        MaxInputsPerTx uint32
350

351
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
352
        MaxFeeRate chainfee.SatPerVByte
353

354
        // Aggregator is used to group inputs into clusters based on its
355
        // implemention-specific strategy.
356
        Aggregator UtxoAggregator
357

358
        // Publisher is used to publish the sweep tx crafted here and monitors
359
        // it for potential fee bumps.
360
        Publisher Bumper
361

362
        // NoDeadlineConfTarget is the conf target to use when sweeping
363
        // non-time-sensitive outputs.
364
        NoDeadlineConfTarget uint32
365
}
366

367
// Result is the struct that is pushed through the result channel. Callers can
368
// use this to be informed of the final sweep result. In case of a remote
369
// spend, Err will be ErrRemoteSpend.
370
type Result struct {
371
        // Err is the final result of the sweep. It is nil when the input is
372
        // swept successfully by us. ErrRemoteSpend is returned when another
373
        // party took the input.
374
        Err error
375

376
        // Tx is the transaction that spent the input.
377
        Tx *wire.MsgTx
378
}
379

380
// sweepInputMessage structs are used in the internal channel between the
381
// SweepInput call and the sweeper main loop.
382
type sweepInputMessage struct {
383
        input      input.Input
384
        params     Params
385
        resultChan chan Result
386
}
387

388
// New returns a new Sweeper instance.
389
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
14✔
390
        return &UtxoSweeper{
14✔
391
                cfg:               cfg,
14✔
392
                newInputs:         make(chan *sweepInputMessage),
14✔
393
                spendChan:         make(chan *chainntnfs.SpendDetail),
14✔
394
                updateReqs:        make(chan *updateReq),
14✔
395
                pendingSweepsReqs: make(chan *pendingSweepsReq),
14✔
396
                quit:              make(chan struct{}),
14✔
397
                inputs:            make(InputsMap),
14✔
398
                bumpResultChan:    make(chan *BumpResult, 100),
14✔
399
        }
14✔
400
}
14✔
401

402
// Start starts the process of constructing and publish sweep txes.
UNCOV
403
func (s *UtxoSweeper) Start() error {
×
UNCOV
404
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
×
405
                return nil
×
406
        }
×
407

UNCOV
408
        log.Info("Sweeper starting")
×
UNCOV
409

×
UNCOV
410
        // Retrieve relay fee for dust limit calculation. Assume that this will
×
UNCOV
411
        // not change from here on.
×
UNCOV
412
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
×
UNCOV
413

×
UNCOV
414
        // We need to register for block epochs and retry sweeping every block.
×
UNCOV
415
        // We should get a notification with the current best block immediately
×
UNCOV
416
        // if we don't provide any epoch. We'll wait for that in the collector.
×
UNCOV
417
        blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
×
UNCOV
418
        if err != nil {
×
419
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
420
        }
×
421

422
        // Start sweeper main loop.
UNCOV
423
        s.wg.Add(1)
×
UNCOV
424
        go func() {
×
UNCOV
425
                defer blockEpochs.Cancel()
×
UNCOV
426
                defer s.wg.Done()
×
UNCOV
427

×
UNCOV
428
                s.collector(blockEpochs.Epochs)
×
UNCOV
429

×
UNCOV
430
                // The collector exited and won't longer handle incoming
×
UNCOV
431
                // requests. This can happen on shutdown, when the block
×
UNCOV
432
                // notifier shuts down before the sweeper and its clients. In
×
UNCOV
433
                // order to not deadlock the clients waiting for their requests
×
UNCOV
434
                // being handled, we handle them here and immediately return an
×
UNCOV
435
                // error. When the sweeper finally is shut down we can exit as
×
UNCOV
436
                // the clients will be notified.
×
UNCOV
437
                for {
×
UNCOV
438
                        select {
×
439
                        case inp := <-s.newInputs:
×
440
                                inp.resultChan <- Result{
×
441
                                        Err: ErrSweeperShuttingDown,
×
442
                                }
×
443

444
                        case req := <-s.pendingSweepsReqs:
×
445
                                req.errChan <- ErrSweeperShuttingDown
×
446

447
                        case req := <-s.updateReqs:
×
448
                                req.responseChan <- &updateResp{
×
449
                                        err: ErrSweeperShuttingDown,
×
450
                                }
×
451

UNCOV
452
                        case <-s.quit:
×
UNCOV
453
                                return
×
454
                        }
455
                }
456
        }()
457

UNCOV
458
        return nil
×
459
}
460

461
// RelayFeePerKW returns the minimum fee rate required for transactions to be
462
// relayed.
463
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
464
        return s.relayFeeRate
×
465
}
×
466

467
// Stop stops sweeper from listening to block epochs and constructing sweep
468
// txes.
UNCOV
469
func (s *UtxoSweeper) Stop() error {
×
UNCOV
470
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
×
471
                return nil
×
472
        }
×
473

UNCOV
474
        log.Info("Sweeper shutting down...")
×
UNCOV
475
        defer log.Debug("Sweeper shutdown complete")
×
UNCOV
476

×
UNCOV
477
        close(s.quit)
×
UNCOV
478
        s.wg.Wait()
×
UNCOV
479

×
UNCOV
480
        return nil
×
481
}
482

483
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
484
// swept after the batch time window ends. A custom fee preference can be
485
// provided to determine what fee rate should be used for the input. Note that
486
// the input may not always be swept with this exact value, as its possible for
487
// it to be batched under the same transaction with other similar fee rate
488
// inputs.
489
//
490
// NOTE: Extreme care needs to be taken that input isn't changed externally.
491
// Because it is an interface and we don't know what is exactly behind it, we
492
// cannot make a local copy in sweeper.
493
//
494
// TODO(yy): make sure the caller is using the Result chan.
495
func (s *UtxoSweeper) SweepInput(inp input.Input,
UNCOV
496
        params Params) (chan Result, error) {
×
UNCOV
497

×
UNCOV
498
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
×
UNCOV
499
                inp.SignDesc() == nil {
×
500

×
501
                return nil, errors.New("nil input received")
×
502
        }
×
503

UNCOV
504
        absoluteTimeLock, _ := inp.RequiredLockTime()
×
UNCOV
505
        log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
×
UNCOV
506
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
×
UNCOV
507
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
×
UNCOV
508
                inp.BlocksToMaturity(), absoluteTimeLock,
×
UNCOV
509
                btcutil.Amount(inp.SignDesc().Output.Value),
×
UNCOV
510
                inp.UnconfParent(), params)
×
UNCOV
511

×
UNCOV
512
        sweeperInput := &sweepInputMessage{
×
UNCOV
513
                input:      inp,
×
UNCOV
514
                params:     params,
×
UNCOV
515
                resultChan: make(chan Result, 1),
×
UNCOV
516
        }
×
UNCOV
517

×
UNCOV
518
        // Deliver input to the main event loop.
×
UNCOV
519
        select {
×
UNCOV
520
        case s.newInputs <- sweeperInput:
×
521
        case <-s.quit:
×
522
                return nil, ErrSweeperShuttingDown
×
523
        }
524

UNCOV
525
        return sweeperInput.resultChan, nil
×
526
}
527

528
// removeConflictSweepDescendants removes any transactions from the wallet that
529
// spend outputs included in the passed outpoint set. This needs to be done in
530
// cases where we're not the only ones that can sweep an output, but there may
531
// exist unconfirmed spends that spend outputs created by a sweep transaction.
532
// The most common case for this is when someone sweeps our anchor outputs
533
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
534
// as a backend when a channel is force closed and anchor cpfp txns are
535
// created to bump the initial commitment transaction. In this case an anchor
536
// cpfp is broadcasted for up to 3 commitment transactions (local,
537
// remote-dangling, remote). Using neutrino all of those transactions will be
538
// accepted (the commitment tx will be different in all of those cases) and have
539
// to be removed as soon as one of them confirmes (they do have the same
540
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
541
// nodes do not signal invalid transactions anymore.
542
func (s *UtxoSweeper) removeConflictSweepDescendants(
UNCOV
543
        outpoints map[wire.OutPoint]struct{}) error {
×
UNCOV
544

×
UNCOV
545
        // Obtain all the past sweeps that we've done so far. We'll need these
×
UNCOV
546
        // to ensure that if the spendingTx spends any of the same inputs, then
×
UNCOV
547
        // we remove any transaction that may be spending those inputs from the
×
UNCOV
548
        // wallet.
×
UNCOV
549
        //
×
UNCOV
550
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
×
UNCOV
551
        // from the store?
×
UNCOV
552
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
×
UNCOV
553
        if err != nil {
×
554
                return err
×
555
        }
×
556

557
        // We'll now go through each past transaction we published during this
558
        // epoch and cross reference the spent inputs. If there're any inputs
559
        // in common with the inputs the spendingTx spent, then we'll remove
560
        // those.
561
        //
562
        // TODO(roasbeef): need to start to remove all transaction hashes after
563
        // every N blocks (assumed point of no return)
UNCOV
564
        for _, sweepHash := range pastSweepHashes {
×
UNCOV
565
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
×
UNCOV
566
                if err != nil {
×
567
                        return err
×
568
                }
×
569

570
                // Transaction wasn't found in the wallet, may have already
571
                // been replaced/removed.
UNCOV
572
                if sweepTx == nil {
×
UNCOV
573
                        // If it was removed, then we'll play it safe and mark
×
UNCOV
574
                        // it as no longer need to be rebroadcasted.
×
UNCOV
575
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
UNCOV
576
                        continue
×
577
                }
578

579
                // Check to see if this past sweep transaction spent any of the
580
                // same inputs as spendingTx.
UNCOV
581
                var isConflicting bool
×
UNCOV
582
                for _, txIn := range sweepTx.TxIn {
×
UNCOV
583
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
×
UNCOV
584
                                isConflicting = true
×
UNCOV
585
                                break
×
586
                        }
587
                }
588

UNCOV
589
                if !isConflicting {
×
UNCOV
590
                        continue
×
591
                }
592

593
                // If it is conflicting, then we'll signal the wallet to remove
594
                // all the transactions that are descendants of outputs created
595
                // by the sweepTx and the sweepTx itself.
UNCOV
596
                log.Debugf("Removing sweep txid=%v from wallet: %v",
×
UNCOV
597
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
×
UNCOV
598

×
UNCOV
599
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
×
UNCOV
600
                if err != nil {
×
601
                        log.Warnf("Unable to remove descendants: %v", err)
×
602
                }
×
603

604
                // If this transaction was conflicting, then we'll stop
605
                // rebroadcasting it in the background.
UNCOV
606
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
×
607
        }
608

UNCOV
609
        return nil
×
610
}
611

612
// collector is the sweeper main loop. It processes new inputs, spend
613
// notifications and counts down to publication of the sweep tx.
UNCOV
614
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
×
UNCOV
615
        // We registered for the block epochs with a nil request. The notifier
×
UNCOV
616
        // should send us the current best block immediately. So we need to wait
×
UNCOV
617
        // for it here because we need to know the current best height.
×
UNCOV
618
        select {
×
UNCOV
619
        case bestBlock := <-blockEpochs:
×
UNCOV
620
                s.currentHeight = bestBlock.Height
×
621

622
        case <-s.quit:
×
623
                return
×
624
        }
625

UNCOV
626
        for {
×
UNCOV
627
                // Clean inputs, which will remove inputs that are swept,
×
UNCOV
628
                // failed, or excluded from the sweeper and return inputs that
×
UNCOV
629
                // are either new or has been published but failed back, which
×
UNCOV
630
                // will be retried again here.
×
UNCOV
631
                s.updateSweeperInputs()
×
UNCOV
632

×
UNCOV
633
                select {
×
634
                // A new inputs is offered to the sweeper. We check to see if
635
                // we are already trying to sweep this input and if not, set up
636
                // a listener to spend and schedule a sweep.
UNCOV
637
                case input := <-s.newInputs:
×
UNCOV
638
                        err := s.handleNewInput(input)
×
UNCOV
639
                        if err != nil {
×
640
                                log.Criticalf("Unable to handle new input: %v",
×
641
                                        err)
×
642

×
643
                                return
×
644
                        }
×
645

646
                        // If this input is forced, we perform an sweep
647
                        // immediately.
648
                        //
649
                        // TODO(ziggie): Make sure when `immediate` is selected
650
                        // as a parameter that we only trigger the sweeping of
651
                        // this specific input rather than triggering the sweeps
652
                        // of all current pending inputs registered with the
653
                        // sweeper.
UNCOV
654
                        if input.params.Immediate {
×
UNCOV
655
                                inputs := s.updateSweeperInputs()
×
UNCOV
656
                                s.sweepPendingInputs(inputs)
×
UNCOV
657
                        }
×
658

659
                // A spend of one of our inputs is detected. Signal sweep
660
                // results to the caller(s).
UNCOV
661
                case spend := <-s.spendChan:
×
UNCOV
662
                        s.handleInputSpent(spend)
×
663

664
                // A new external request has been received to retrieve all of
665
                // the inputs we're currently attempting to sweep.
UNCOV
666
                case req := <-s.pendingSweepsReqs:
×
UNCOV
667
                        s.handlePendingSweepsReq(req)
×
668

669
                // A new external request has been received to bump the fee rate
670
                // of a given input.
UNCOV
671
                case req := <-s.updateReqs:
×
UNCOV
672
                        resultChan, err := s.handleUpdateReq(req)
×
UNCOV
673
                        req.responseChan <- &updateResp{
×
UNCOV
674
                                resultChan: resultChan,
×
UNCOV
675
                                err:        err,
×
UNCOV
676
                        }
×
UNCOV
677

×
UNCOV
678
                        // Perform an sweep immediately if asked.
×
UNCOV
679
                        if req.params.Immediate {
×
UNCOV
680
                                inputs := s.updateSweeperInputs()
×
UNCOV
681
                                s.sweepPendingInputs(inputs)
×
UNCOV
682
                        }
×
683

UNCOV
684
                case result := <-s.bumpResultChan:
×
UNCOV
685
                        // Handle the bump event.
×
UNCOV
686
                        err := s.handleBumpEvent(result)
×
UNCOV
687
                        if err != nil {
×
UNCOV
688
                                log.Errorf("Failed to handle bump event: %v",
×
UNCOV
689
                                        err)
×
UNCOV
690
                        }
×
691

692
                // A new block comes in, update the bestHeight, perform a check
693
                // over all pending inputs and publish sweeping txns if needed.
UNCOV
694
                case epoch, ok := <-blockEpochs:
×
UNCOV
695
                        if !ok {
×
696
                                // We should stop the sweeper before stopping
×
697
                                // the chain service. Otherwise it indicates an
×
698
                                // error.
×
699
                                log.Error("Block epoch channel closed")
×
700

×
701
                                return
×
702
                        }
×
703

704
                        // Update the sweeper to the best height.
UNCOV
705
                        s.currentHeight = epoch.Height
×
UNCOV
706

×
UNCOV
707
                        // Update the inputs with the latest height.
×
UNCOV
708
                        inputs := s.updateSweeperInputs()
×
UNCOV
709

×
UNCOV
710
                        log.Debugf("Received new block: height=%v, attempt "+
×
UNCOV
711
                                "sweeping %d inputs", epoch.Height, len(inputs))
×
UNCOV
712

×
UNCOV
713
                        // Attempt to sweep any pending inputs.
×
UNCOV
714
                        s.sweepPendingInputs(inputs)
×
715

UNCOV
716
                case <-s.quit:
×
UNCOV
717
                        return
×
718
                }
719
        }
720
}
721

722
// removeExclusiveGroup removes all inputs in the given exclusive group. This
723
// function is called when one of the exclusive group inputs has been spent. The
724
// other inputs won't ever be spendable and can be removed. This also prevents
725
// them from being part of future sweep transactions that would fail. In
726
// addition sweep transactions of those inputs will be removed from the wallet.
UNCOV
727
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
×
UNCOV
728
        for outpoint, input := range s.inputs {
×
UNCOV
729
                outpoint := outpoint
×
UNCOV
730

×
UNCOV
731
                // Skip inputs that aren't exclusive.
×
UNCOV
732
                if input.params.ExclusiveGroup == nil {
×
UNCOV
733
                        continue
×
734
                }
735

736
                // Skip inputs from other exclusive groups.
UNCOV
737
                if *input.params.ExclusiveGroup != group {
×
738
                        continue
×
739
                }
740

741
                // Skip inputs that are already terminated.
UNCOV
742
                if input.terminated() {
×
UNCOV
743
                        log.Tracef("Skipped sending error result for "+
×
UNCOV
744
                                "input %v, state=%v", outpoint, input.state)
×
UNCOV
745

×
UNCOV
746
                        continue
×
747
                }
748

749
                // Signal result channels.
UNCOV
750
                s.signalResult(input, Result{
×
UNCOV
751
                        Err: ErrExclusiveGroupSpend,
×
UNCOV
752
                })
×
UNCOV
753

×
UNCOV
754
                // Update the input's state as it can no longer be swept.
×
UNCOV
755
                input.state = Excluded
×
UNCOV
756

×
UNCOV
757
                // Remove all unconfirmed transactions from the wallet which
×
UNCOV
758
                // spend the passed outpoint of the same exclusive group.
×
UNCOV
759
                outpoints := map[wire.OutPoint]struct{}{
×
UNCOV
760
                        outpoint: {},
×
UNCOV
761
                }
×
UNCOV
762
                err := s.removeConflictSweepDescendants(outpoints)
×
UNCOV
763
                if err != nil {
×
764
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
765
                                "wallet for outpoint %v : %v", outpoint, err)
×
766
                }
×
767
        }
768
}
769

770
// signalResult notifies the listeners of the final result of the input sweep.
771
// It also cancels any pending spend notification.
772
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
3✔
773
        op := pi.OutPoint()
3✔
774
        listeners := pi.listeners
3✔
775

3✔
776
        if result.Err == nil {
5✔
777
                log.Tracef("Dispatching sweep success for %v to %v listeners",
2✔
778
                        op, len(listeners),
2✔
779
                )
2✔
780
        } else {
3✔
781
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
1✔
782
                        op, len(listeners), result.Err,
1✔
783
                )
1✔
784
        }
1✔
785

786
        // Signal all listeners. Channel is buffered. Because we only send once
787
        // on every channel, it should never block.
788
        for _, resultChan := range listeners {
3✔
UNCOV
789
                resultChan <- result
×
UNCOV
790
        }
×
791

792
        // Cancel spend notification with chain notifier. This is not necessary
793
        // in case of a success, except for that a reorg could still happen.
794
        if pi.ntfnRegCancel != nil {
3✔
UNCOV
795
                log.Debugf("Canceling spend ntfn for %v", op)
×
UNCOV
796

×
UNCOV
797
                pi.ntfnRegCancel()
×
UNCOV
798
        }
×
799
}
800

801
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
802
// the tx. The output address is only marked as used if the publish succeeds.
803
func (s *UtxoSweeper) sweep(set InputSet) error {
2✔
804
        // Generate an output script if there isn't an unused script available.
2✔
805
        if s.currentOutputScript == nil {
3✔
806
                pkScript, err := s.cfg.GenSweepScript()
1✔
807
                if err != nil {
1✔
808
                        return fmt.Errorf("gen sweep script: %w", err)
×
809
                }
×
810
                s.currentOutputScript = pkScript
1✔
811
        }
812

813
        // Create a fee bump request and ask the publisher to broadcast it. The
2✔
814
        // publisher will then take over and start monitoring the tx for
2✔
815
        // potential fee bump.
2✔
816
        req := &BumpRequest{
2✔
UNCOV
817
                Inputs:          set.Inputs(),
×
UNCOV
818
                Budget:          set.Budget(),
×
819
                DeadlineHeight:  set.DeadlineHeight(),
820
                DeliveryAddress: s.currentOutputScript,
821
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
822
                StartingFeeRate: set.StartingFeeRate(),
823
                // TODO(yy): pass the strategy here.
2✔
824
        }
2✔
825

2✔
826
        // Reschedule the inputs that we just tried to sweep. This is done in
2✔
827
        // case the following publish fails, we'd like to update the inputs'
2✔
828
        // publish attempts and rescue them in the next sweep.
2✔
829
        s.markInputsPendingPublish(set)
2✔
830

2✔
831
        // Broadcast will return a read-only chan that we will listen to for
2✔
832
        // this publish result and future RBF attempt.
2✔
833
        resp, err := s.cfg.Publisher.Broadcast(req)
2✔
834
        if err != nil {
2✔
835
                outpoints := make([]wire.OutPoint, len(set.Inputs()))
2✔
836
                for i, inp := range set.Inputs() {
2✔
837
                        outpoints[i] = inp.OutPoint()
2✔
838
                }
2✔
839

2✔
840
                log.Errorf("Initial broadcast failed: %v, inputs=\n%v", err,
2✔
841
                        inputTypeSummary(set.Inputs()))
4✔
842

2✔
843
                // TODO(yy): find out which input is causing the failure.
2✔
UNCOV
844
                s.markInputsPublishFailed(outpoints)
×
UNCOV
845

×
846
                return err
847
        }
2✔
848

2✔
849
        // Successfully sent the broadcast attempt, we now handle the result by
2✔
850
        // subscribing to the result chan and listen for future updates about
2✔
851
        // this tx.
2✔
852
        s.wg.Add(1)
2✔
853
        go s.monitorFeeBumpResult(resp)
2✔
854

855
        return nil
856
}
857

858
// markInputsPendingPublish updates the pending inputs with the given tx
UNCOV
859
// inputs. It also increments the `publishAttempts`.
×
UNCOV
860
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
×
UNCOV
861
        // Reschedule sweep.
×
UNCOV
862
        for _, input := range set.Inputs() {
×
863
                pi, ok := s.inputs[input.OutPoint()]
864
                if !ok {
865
                        // It could be that this input is an additional wallet
866
                        // input that was attached. In that case there also
867
                        // isn't a pending input to update.
3✔
868
                        log.Tracef("Skipped marking input as pending "+
3✔
869
                                "published: %v not found in pending inputs",
7✔
870
                                input.OutPoint())
4✔
871

5✔
872
                        continue
1✔
873
                }
1✔
874

1✔
875
                // If this input has already terminated, there's clearly
1✔
876
                // something wrong as it would have been removed. In this case
1✔
877
                // we log an error and skip marking this input as pending
1✔
878
                // publish.
1✔
879
                if pi.terminated() {
1✔
880
                        log.Errorf("Expect input %v to not have terminated "+
881
                                "state, instead it has %v",
882
                                input.OutPoint, pi.state)
883

884
                        continue
885
                }
886

4✔
887
                // Update the input's state.
1✔
888
                pi.state = PendingPublish
1✔
889

1✔
890
                // Record another publish attempt.
1✔
891
                pi.publishAttempts++
1✔
892
        }
893
}
894

895
// markInputsPublished updates the sweeping tx in db and marks the list of
2✔
896
// inputs as published.
2✔
897
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord,
2✔
898
        inputs []*wire.TxIn) error {
2✔
899

900
        // Mark this tx in db once successfully published.
901
        //
902
        // NOTE: this will behave as an overwrite, which is fine as the record
903
        // is small.
904
        tr.Published = true
905
        err := s.cfg.Store.StoreTx(tr)
4✔
906
        if err != nil {
4✔
907
                return fmt.Errorf("store tx: %w", err)
4✔
908
        }
4✔
909

4✔
910
        // Reschedule sweep.
4✔
911
        for _, input := range inputs {
4✔
912
                pi, ok := s.inputs[input.PreviousOutPoint]
4✔
913
                if !ok {
5✔
914
                        // It could be that this input is an additional wallet
1✔
915
                        // input that was attached. In that case there also
1✔
916
                        // isn't a pending input to update.
917
                        log.Tracef("Skipped marking input as published: %v "+
918
                                "not found in pending inputs",
8✔
919
                                input.PreviousOutPoint)
5✔
920

6✔
921
                        continue
1✔
922
                }
1✔
923

1✔
924
                // Valdiate that the input is in an expected state.
1✔
925
                if pi.state != PendingPublish {
1✔
926
                        // We may get a Published if this is a replacement tx.
1✔
927
                        log.Debugf("Expect input %v to have %v, instead it "+
1✔
928
                                "has %v", input.PreviousOutPoint,
1✔
929
                                PendingPublish, pi.state)
930

931
                        continue
932
                }
5✔
933

1✔
934
                // Update the input's state.
1✔
935
                pi.state = Published
1✔
936

1✔
937
                // Update the input's latest fee rate.
1✔
938
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
1✔
939
        }
940

941
        return nil
942
}
3✔
943

3✔
944
// markInputsPublishFailed marks the list of inputs as failed to be published.
3✔
945
func (s *UtxoSweeper) markInputsPublishFailed(outpoints []wire.OutPoint) {
3✔
946
        // Reschedule sweep.
947
        for _, op := range outpoints {
948
                pi, ok := s.inputs[op]
3✔
949
                if !ok {
950
                        // It could be that this input is an additional wallet
951
                        // input that was attached. In that case there also
952
                        // isn't a pending input to update.
4✔
953
                        log.Tracef("Skipped marking input as publish failed: "+
4✔
954
                                "%v not found in pending inputs", op)
15✔
955

11✔
956
                        continue
13✔
957
                }
2✔
958

2✔
959
                // Valdiate that the input is in an expected state.
2✔
960
                if pi.state != PendingPublish && pi.state != Published {
2✔
961
                        log.Debugf("Expect input %v to have %v, instead it "+
2✔
962
                                "has %v", op, PendingPublish, pi.state)
2✔
963

2✔
964
                        continue
965
                }
966

967
                log.Warnf("Failed to publish input %v", op)
14✔
968

5✔
969
                // Update the input's state.
5✔
970
                pi.state = PublishFailed
5✔
971
        }
5✔
972
}
973

974
// monitorSpend registers a spend notification with the chain notifier. It
4✔
975
// returns a cancel function that can be used to cancel the registration.
4✔
976
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
4✔
977
        script []byte, heightHint uint32) (func(), error) {
4✔
978

979
        log.Tracef("Wait for spend of %v at heightHint=%v",
980
                outpoint, heightHint)
981

982
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
983
                &outpoint, script, heightHint,
UNCOV
984
        )
×
UNCOV
985
        if err != nil {
×
986
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
987
        }
×
UNCOV
988

×
UNCOV
989
        s.wg.Add(1)
×
UNCOV
990
        go func() {
×
UNCOV
991
                defer s.wg.Done()
×
UNCOV
992

×
UNCOV
993
                select {
×
UNCOV
994
                case spend, ok := <-spendEvent.Spend:
×
995
                        if !ok {
UNCOV
996
                                log.Debugf("Spend ntfn for %v canceled",
×
UNCOV
997
                                        outpoint)
×
UNCOV
998
                                return
×
UNCOV
999
                        }
×
UNCOV
1000

×
UNCOV
1001
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
×
UNCOV
1002

×
UNCOV
1003
                        select {
×
UNCOV
1004
                        case s.spendChan <- spend:
×
UNCOV
1005
                                log.Debugf("Delivered spend ntfn for %v",
×
UNCOV
1006
                                        outpoint)
×
1007

UNCOV
1008
                        case <-s.quit:
×
UNCOV
1009
                        }
×
UNCOV
1010
                case <-s.quit:
×
UNCOV
1011
                }
×
UNCOV
1012
        }()
×
UNCOV
1013

×
1014
        return spendEvent.Cancel, nil
UNCOV
1015
}
×
1016

UNCOV
1017
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
×
1018
// attempting to sweep.
1019
func (s *UtxoSweeper) PendingInputs() (
1020
        map[wire.OutPoint]*PendingInputResponse, error) {
UNCOV
1021

×
1022
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
1023
        errChan := make(chan error, 1)
1024
        select {
1025
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1026
                respChan: respChan,
UNCOV
1027
                errChan:  errChan,
×
UNCOV
1028
        }:
×
1029
        case <-s.quit:
×
1030
                return nil, ErrSweeperShuttingDown
×
UNCOV
1031
        }
×
1032

1033
        select {
1034
        case pendingSweeps := <-respChan:
UNCOV
1035
                return pendingSweeps, nil
×
1036
        case err := <-errChan:
×
1037
                return nil, err
×
1038
        case <-s.quit:
1039
                return nil, ErrSweeperShuttingDown
UNCOV
1040
        }
×
UNCOV
1041
}
×
UNCOV
1042

×
UNCOV
1043
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
×
UNCOV
1044
// UtxoSweeper is attempting to sweep.
×
UNCOV
1045
func (s *UtxoSweeper) handlePendingSweepsReq(
×
UNCOV
1046
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
×
1047

1048
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
1049
        for _, inp := range s.inputs {
1050
                // Only the exported fields are set, as we expect the response
1051
                // to only be consumed externally.
1052
                op := inp.OutPoint()
UNCOV
1053
                resps[op] = &PendingInputResponse{
×
UNCOV
1054
                        OutPoint:    op,
×
UNCOV
1055
                        WitnessType: inp.WitnessType(),
×
UNCOV
1056
                        Amount: btcutil.Amount(
×
UNCOV
1057
                                inp.SignDesc().Output.Value,
×
UNCOV
1058
                        ),
×
UNCOV
1059
                        LastFeeRate:       inp.lastFeeRate,
×
UNCOV
1060
                        BroadcastAttempts: inp.publishAttempts,
×
UNCOV
1061
                        Params:            inp.params,
×
UNCOV
1062
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
×
UNCOV
1063
                }
×
UNCOV
1064
        }
×
UNCOV
1065

×
UNCOV
1066
        select {
×
UNCOV
1067
        case req.respChan <- resps:
×
1068
        case <-s.quit:
×
1069
                log.Debug("Skipped sending pending sweep response due to " +
×
1070
                        "UtxoSweeper shutting down")
×
UNCOV
1071
        }
×
1072

UNCOV
1073
        return resps
×
UNCOV
1074
}
×
UNCOV
1075

×
UNCOV
1076
// UpdateParams allows updating the sweep parameters of a pending input in the
×
UNCOV
1077
// UtxoSweeper. This function can be used to provide an updated fee preference
×
1078
// and force flag that will be used for a new sweep transaction of the input
1079
// that will act as a replacement transaction (RBF) of the original sweeping
UNCOV
1080
// transaction, if any. The exclusive group is left unchanged.
×
1081
//
1082
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1083
// is actually successful. The responsibility of doing so should be handled by
1084
// the caller.
1085
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
1086
        params Params) (chan Result, error) {
1087

1088
        responseChan := make(chan *updateResp, 1)
1089
        select {
1090
        case s.updateReqs <- &updateReq{
1091
                input:        input,
1092
                params:       params,
UNCOV
1093
                responseChan: responseChan,
×
UNCOV
1094
        }:
×
1095
        case <-s.quit:
×
1096
                return nil, ErrSweeperShuttingDown
×
1097
        }
1098

1099
        select {
1100
        case response := <-responseChan:
UNCOV
1101
                return response.resultChan, response.err
×
1102
        case <-s.quit:
×
1103
                return nil, ErrSweeperShuttingDown
×
1104
        }
1105
}
UNCOV
1106

×
UNCOV
1107
// handleUpdateReq handles an update request by simply updating the sweep
×
UNCOV
1108
// parameters of the pending input. Currently, no validation is done on the new
×
UNCOV
1109
// fee preference to ensure it will properly create a replacement transaction.
×
UNCOV
1110
//
×
1111
// TODO(wilmer):
1112
//   - Validate fee preference to ensure we'll create a valid replacement
1113
//     transaction to allow the new fee rate to propagate throughout the
1114
//     network.
1115
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1116
//     did not exist in the original sweep transaction, resulting in an invalid
1117
//     replacement transaction.
1118
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
1119
        chan Result, error) {
1120

1121
        // If the UtxoSweeper is already trying to sweep this input, then we can
1122
        // simply just increase its fee rate. This will allow the input to be
1123
        // batched with others which also have a similar fee rate, creating a
1124
        // higher fee rate transaction that replaces the original input's
1125
        // sweeping transaction.
UNCOV
1126
        sweeperInput, ok := s.inputs[req.input]
×
UNCOV
1127
        if !ok {
×
1128
                return nil, lnwallet.ErrNotMine
×
1129
        }
×
UNCOV
1130

×
UNCOV
1131
        // Create the updated parameters struct. Leave the exclusive group
×
UNCOV
1132
        // unchanged.
×
UNCOV
1133
        newParams := Params{
×
UNCOV
1134
                StartingFeeRate: req.params.StartingFeeRate,
×
UNCOV
1135
                Immediate:       req.params.Immediate,
×
UNCOV
1136
                Budget:          req.params.Budget,
×
1137
                DeadlineHeight:  req.params.DeadlineHeight,
1138
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
1139
        }
UNCOV
1140

×
UNCOV
1141
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
×
UNCOV
1142
                req.input, sweeperInput.state, sweeperInput.params, newParams)
×
UNCOV
1143

×
UNCOV
1144
        sweeperInput.params = newParams
×
UNCOV
1145

×
UNCOV
1146
        // We need to reset the state so this input will be attempted again by
×
UNCOV
1147
        // our sweeper.
×
UNCOV
1148
        //
×
UNCOV
1149
        // TODO(yy): a dedicated state?
×
UNCOV
1150
        sweeperInput.state = Init
×
UNCOV
1151

×
UNCOV
1152
        // If the new input specifies a deadline, update the deadline height.
×
UNCOV
1153
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1154
                sweeperInput.DeadlineHeight,
×
UNCOV
1155
        )
×
UNCOV
1156

×
UNCOV
1157
        resultChan := make(chan Result, 1)
×
UNCOV
1158
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
×
UNCOV
1159

×
UNCOV
1160
        return resultChan, nil
×
UNCOV
1161
}
×
UNCOV
1162

×
UNCOV
1163
// ListSweeps returns a list of the sweeps recorded by the sweep store.
×
UNCOV
1164
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
×
UNCOV
1165
        return s.cfg.Store.ListSweeps()
×
UNCOV
1166
}
×
UNCOV
1167

×
1168
// mempoolLookup takes an input's outpoint and queries the mempool to see
1169
// whether it's already been spent in a transaction found in the mempool.
1170
// Returns the transaction if found.
UNCOV
1171
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
×
UNCOV
1172
        // For neutrino backend, there's no mempool available, so we exit
×
UNCOV
1173
        // early.
×
1174
        if s.cfg.Mempool == nil {
1175
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1176

1177
                return fn.None[wire.MsgTx]()
1178
        }
7✔
1179

7✔
1180
        // Query this input in the mempool. If this outpoint is already spent
7✔
1181
        // in mempool, we should get a spending event back immediately.
8✔
1182
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
1✔
1183
}
1✔
1184

1✔
1185
// handleNewInput processes a new input by registering spend notification and
1✔
1186
// scheduling sweeping for it.
1187
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
1188
        // Create a default deadline height, which will be used when there's no
1189
        // DeadlineHeight specified for a given input.
6✔
1190
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
1191

1192
        outpoint := input.input.OutPoint()
1193
        pi, pending := s.inputs[outpoint]
UNCOV
1194
        if pending {
×
UNCOV
1195
                log.Debugf("Already has pending input %v received", outpoint)
×
UNCOV
1196

×
UNCOV
1197
                s.handleExistingInput(input, pi)
×
UNCOV
1198

×
UNCOV
1199
                return nil
×
UNCOV
1200
        }
×
UNCOV
1201

×
UNCOV
1202
        // This is a new input, and we want to query the mempool to see if this
×
UNCOV
1203
        // input has already been spent. If so, we'll start the input with
×
UNCOV
1204
        // state Published and attach the RBFInfo.
×
UNCOV
1205
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
×
UNCOV
1206

×
UNCOV
1207
        // Create a new pendingInput and initialize the listeners slice with
×
1208
        // the passed in result channel. If this input is offered for sweep
1209
        // again, the result channel will be appended to this slice.
1210
        pi = &SweeperInput{
1211
                state:     state,
UNCOV
1212
                listeners: []chan Result{input.resultChan},
×
UNCOV
1213
                Input:     input.input,
×
UNCOV
1214
                params:    input.params,
×
UNCOV
1215
                rbf:       rbfInfo,
×
UNCOV
1216
                // Set the acutal deadline height.
×
UNCOV
1217
                DeadlineHeight: input.params.DeadlineHeight.UnwrapOr(
×
UNCOV
1218
                        defaultDeadline,
×
UNCOV
1219
                ),
×
UNCOV
1220
        }
×
UNCOV
1221

×
UNCOV
1222
        s.inputs[outpoint] = pi
×
UNCOV
1223
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
×
UNCOV
1224

×
UNCOV
1225
        // Start watching for spend of this input, either by us or the remote
×
UNCOV
1226
        // party.
×
UNCOV
1227
        cancel, err := s.monitorSpend(
×
UNCOV
1228
                outpoint, input.input.SignDesc().Output.PkScript,
×
UNCOV
1229
                input.input.HeightHint(),
×
UNCOV
1230
        )
×
UNCOV
1231
        if err != nil {
×
1232
                err := fmt.Errorf("wait for spend: %w", err)
×
1233
                s.markInputFailed(pi, err)
×
1234

×
1235
                return err
×
1236
        }
×
UNCOV
1237

×
UNCOV
1238
        pi.ntfnRegCancel = cancel
×
UNCOV
1239

×
UNCOV
1240
        return nil
×
UNCOV
1241
}
×
UNCOV
1242

×
UNCOV
1243
// decideStateAndRBFInfo queries the mempool to see whether the given input has
×
1244
// already been spent. If so, the state Published will be returned, otherwise
UNCOV
1245
// state Init. When spent, it will query the sweeper store to fetch the fee
×
UNCOV
1246
// info of the spending transction, and construct an RBFInfo based on it.
×
UNCOV
1247
// Suppose an error occurs, fn.None is returned.
×
1248
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1249
        SweepState, fn.Option[RBFInfo]) {
1250

1251
        // Check if we can find the spending tx of this input in mempool.
1252
        txOption := s.mempoolLookup(op)
1253

1254
        // Extract the spending tx from the option.
1255
        var tx *wire.MsgTx
1256
        txOption.WhenSome(func(t wire.MsgTx) {
4✔
1257
                tx = &t
4✔
1258
        })
4✔
1259

4✔
1260
        // Exit early if it's not found.
4✔
1261
        //
4✔
1262
        // NOTE: this is not accurate for backends that don't support mempool
4✔
1263
        // lookup:
7✔
1264
        // - for neutrino we don't have a mempool.
3✔
1265
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
3✔
1266
        if tx == nil {
1267
                return Init, fn.None[RBFInfo]()
1268
        }
1269

1270
        // Otherwise the input is already spent in the mempool, so eventually
1271
        // we will return Published.
1272
        //
1273
        // We also need to update the RBF info for this input. If the sweeping
5✔
1274
        // transaction is broadcast by us, we can find the fee info in the
1✔
1275
        // sweeper store.
1✔
1276
        txid := tx.TxHash()
1277
        tr, err := s.cfg.Store.GetTx(txid)
1278

1279
        // If the tx is not found in the store, it means it's not broadcast by
1280
        // us, hence we can't find the fee info. This is fine as, later on when
1281
        // this tx is confirmed, we will remove the input from our inputs.
1282
        if errors.Is(err, ErrTxNotFound) {
1283
                log.Warnf("Spending tx %v not found in sweeper store", txid)
3✔
1284
                return Published, fn.None[RBFInfo]()
3✔
1285
        }
3✔
1286

3✔
1287
        // Exit if we get an db error.
3✔
1288
        if err != nil {
3✔
1289
                log.Errorf("Unable to get tx %v from sweeper store: %v",
4✔
1290
                        txid, err)
1✔
1291

1✔
1292
                return Published, fn.None[RBFInfo]()
1✔
1293
        }
1294

1295
        // Prepare the fee info and return it.
3✔
1296
        rbf := fn.Some(RBFInfo{
1✔
1297
                Txid:    txid,
1✔
1298
                Fee:     btcutil.Amount(tr.Fee),
1✔
1299
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
1✔
1300
        })
1✔
1301

1302
        return Published, rbf
1303
}
1✔
1304

1✔
1305
// handleExistingInput processes an input that is already known to the sweeper.
1✔
1306
// It will overwrite the params of the old input with the new ones.
1✔
1307
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
1✔
1308
        oldInput *SweeperInput) {
1✔
1309

1✔
1310
        // Before updating the input details, check if an exclusive group was
1311
        // set. In case the same input is registered again without an exclusive
1312
        // group set, the previous input and its sweep parameters are outdated
1313
        // hence need to be replaced. This scenario currently only happens for
1314
        // anchor outputs. When a channel is force closed, in the worst case 3
UNCOV
1315
        // different sweeps with the same exclusive group are registered with
×
UNCOV
1316
        // the sweeper to bump the closing transaction (cpfp) when its time
×
UNCOV
1317
        // critical. Receiving an input which was already registered with the
×
UNCOV
1318
        // sweeper but now without an exclusive group means non of the previous
×
UNCOV
1319
        // inputs were used as CPFP, so we need to make sure we update the
×
UNCOV
1320
        // sweep parameters but also remove all inputs with the same exclusive
×
UNCOV
1321
        // group because the are outdated too.
×
UNCOV
1322
        var prevExclGroup *uint64
×
UNCOV
1323
        if oldInput.params.ExclusiveGroup != nil &&
×
UNCOV
1324
                input.params.ExclusiveGroup == nil {
×
UNCOV
1325

×
UNCOV
1326
                prevExclGroup = new(uint64)
×
UNCOV
1327
                *prevExclGroup = *oldInput.params.ExclusiveGroup
×
UNCOV
1328
        }
×
UNCOV
1329

×
UNCOV
1330
        // Update input details and sweep parameters. The re-offered input
×
UNCOV
1331
        // details may contain a change to the unconfirmed parent tx info.
×
UNCOV
1332
        oldInput.params = input.params
×
UNCOV
1333
        oldInput.Input = input.input
×
UNCOV
1334

×
UNCOV
1335
        // If the new input specifies a deadline, update the deadline height.
×
1336
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
1337
                oldInput.DeadlineHeight,
1338
        )
UNCOV
1339

×
UNCOV
1340
        // Add additional result channel to signal spend of this input.
×
UNCOV
1341
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
×
UNCOV
1342

×
UNCOV
1343
        if prevExclGroup != nil {
×
UNCOV
1344
                s.removeExclusiveGroup(*prevExclGroup)
×
UNCOV
1345
        }
×
UNCOV
1346
}
×
UNCOV
1347

×
UNCOV
1348
// handleInputSpent takes a spend event of our input and updates the sweeper's
×
UNCOV
1349
// internal state to remove the input.
×
UNCOV
1350
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
×
UNCOV
1351
        // Query store to find out if we ever published this tx.
×
UNCOV
1352
        spendHash := *spend.SpenderTxHash
×
1353
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
1354
        if err != nil {
1355
                log.Errorf("cannot determine if tx %v is ours: %v",
1356
                        spendHash, err)
1357
                return
×
1358
        }
×
UNCOV
1359

×
UNCOV
1360
        // If this isn't our transaction, it means someone else swept outputs
×
UNCOV
1361
        // that we were attempting to sweep. This can happen for anchor outputs
×
UNCOV
1362
        // as well as justice transactions. In this case, we'll notify the
×
UNCOV
1363
        // wallet to remove any spends that descent from this output.
×
UNCOV
1364
        if !isOurTx {
×
UNCOV
1365
                // Construct a map of the inputs this transaction spends.
×
1366
                spendingTx := spend.SpendingTx
1367
                inputsSpent := make(
1368
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
1369
                )
1370
                for _, txIn := range spendingTx.TxIn {
UNCOV
1371
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
×
UNCOV
1372
                }
×
UNCOV
1373

×
UNCOV
1374
                log.Debugf("Attempting to remove descendant txns invalidated "+
×
UNCOV
1375
                        "by (txid=%v): %v", spendingTx.TxHash(),
×
UNCOV
1376
                        spew.Sdump(spendingTx))
×
UNCOV
1377

×
UNCOV
1378
                err := s.removeConflictSweepDescendants(inputsSpent)
×
UNCOV
1379
                if err != nil {
×
1380
                        log.Warnf("unable to remove descendant transactions "+
1381
                                "due to tx %v: ", spendHash)
×
1382
                }
×
UNCOV
1383

×
UNCOV
1384
                log.Debugf("Detected third party spend related to in flight "+
×
UNCOV
1385
                        "inputs (is_ours=%v): %v", isOurTx,
×
UNCOV
1386
                        lnutils.SpewLogClosure(spend.SpendingTx))
×
UNCOV
1387
        }
×
UNCOV
1388

×
UNCOV
1389
        // We now use the spending tx to update the state of the inputs.
×
1390
        s.markInputsSwept(spend.SpendingTx, isOurTx)
UNCOV
1391
}
×
UNCOV
1392

×
UNCOV
1393
// markInputsSwept marks all inputs swept by the spending transaction as swept.
×
1394
// It will also notify all the subscribers of this input.
1395
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
1396
        for _, txIn := range tx.TxIn {
UNCOV
1397
                outpoint := txIn.PreviousOutPoint
×
1398

1399
                // Check if this input is known to us. It could probably be
1400
                // unknown if we canceled the registration, deleted from inputs
1401
                // map but the ntfn was in-flight already. Or this could be not
1402
                // one of our inputs.
1✔
1403
                input, ok := s.inputs[outpoint]
5✔
1404
                if !ok {
4✔
1405
                        // It's very likely that a spending tx contains inputs
4✔
1406
                        // that we don't know.
4✔
1407
                        log.Tracef("Skipped marking input as swept: %v not "+
4✔
1408
                                "found in pending inputs", outpoint)
4✔
1409

4✔
1410
                        continue
4✔
1411
                }
5✔
1412

1✔
1413
                // This input may already been marked as swept by a previous
1✔
1414
                // spend notification, which is likely to happen as one sweep
1✔
1415
                // transaction usually sweeps multiple inputs.
1✔
1416
                if input.terminated() {
1✔
1417
                        log.Debugf("Skipped marking input as swept: %v "+
1✔
1418
                                "state=%v", outpoint, input.state)
1419

1420
                        continue
1421
                }
1422

1423
                input.state = Swept
4✔
1424

1✔
1425
                // Return either a nil or a remote spend result.
1✔
1426
                var err error
1✔
1427
                if !isOurTx {
1✔
1428
                        log.Warnf("Input=%v was spent by remote or third "+
1429
                                "party in tx=%v", outpoint, tx.TxHash())
1430
                        err = ErrRemoteSpend
2✔
1431
                }
2✔
1432

2✔
1433
                // Signal result channels.
2✔
1434
                s.signalResult(input, Result{
2✔
UNCOV
1435
                        Tx:  tx,
×
UNCOV
1436
                        Err: err,
×
UNCOV
1437
                })
×
UNCOV
1438

×
1439
                // Remove all other inputs in this exclusive group.
1440
                if input.params.ExclusiveGroup != nil {
1441
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
2✔
1442
                }
2✔
1443
        }
2✔
1444
}
2✔
1445

2✔
1446
// markInputFailed marks the given input as failed and won't be retried. It
2✔
1447
// will also notify all the subscribers of this input.
2✔
UNCOV
1448
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
×
UNCOV
1449
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
×
1450

1451
        pi.state = Failed
1452

1453
        // Remove all other inputs in this exclusive group.
1454
        if pi.params.ExclusiveGroup != nil {
1455
                s.removeExclusiveGroup(*pi.params.ExclusiveGroup)
1✔
1456
        }
1✔
1457

1✔
1458
        s.signalResult(pi, Result{Err: err})
1✔
1459
}
1✔
1460

1✔
1461
// updateSweeperInputs updates the sweeper's internal state and returns a map
1✔
UNCOV
1462
// of inputs to be swept. It will remove the inputs that are in final states,
×
UNCOV
1463
// and returns a map of inputs that have either state Init or PublishFailed.
×
1464
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
1465
        // Create a map of inputs to be swept.
1✔
1466
        inputs := make(InputsMap)
1467

1468
        // Iterate the pending inputs and update the sweeper's state.
1469
        //
1470
        // TODO(yy): sweeper is made to communicate via go channels, so no
1471
        // locks are needed to access the map. However, it'd be safer if we
1✔
1472
        // turn this inputs map into a SyncMap in case we wanna add concurrent
1✔
1473
        // access to the map in the future.
1✔
1474
        for op, input := range s.inputs {
1✔
1475
                // If the input has reached a final state, that it's either
1✔
1476
                // been swept, or failed, or excluded, we will remove it from
1✔
1477
                // our sweeper.
1✔
1478
                if input.terminated() {
1✔
1479
                        log.Debugf("Removing input(State=%v) %v from sweeper",
1✔
1480
                                input.state, op)
1✔
1481

10✔
1482
                        delete(s.inputs, op)
9✔
1483

9✔
1484
                        continue
9✔
1485
                }
12✔
1486

3✔
1487
                // If this input has been included in a sweep tx that's not
3✔
1488
                // published yet, we'd skip this input and wait for the sweep
3✔
1489
                // tx to be published.
3✔
1490
                if input.state == PendingPublish {
3✔
1491
                        continue
3✔
1492
                }
1493

1494
                // If this input has already been published, we will need to
1495
                // check the RBF condition before attempting another sweeping.
1496
                if input.state == Published {
1497
                        continue
7✔
1498
                }
1✔
1499

1500
                // If the input has a locktime that's not yet reached, we will
1501
                // skip this input and wait for the locktime to be reached.
1502
                locktime, _ := input.RequiredLockTime()
1503
                if uint32(s.currentHeight) < locktime {
6✔
1504
                        log.Warnf("Skipping input %v due to locktime=%v not "+
1✔
1505
                                "reached, current height is %v", op, locktime,
1506
                                s.currentHeight)
1507

1508
                        continue
1509
                }
4✔
1510

5✔
1511
                // If the input has a CSV that's not yet reached, we will skip
1✔
1512
                // this input and wait for the expiry.
1✔
1513
                locktime = input.BlocksToMaturity() + input.HeightHint()
1✔
1514
                if s.currentHeight < int32(locktime)-1 {
1✔
1515
                        log.Infof("Skipping input %v due to CSV expiry=%v not "+
1✔
1516
                                "reached, current height is %v", op, locktime,
1517
                                s.currentHeight)
1518

1519
                        continue
1520
                }
3✔
1521

4✔
1522
                // If this input is new or has been failed to be published,
1✔
1523
                // we'd retry it. The assumption here is that when an error is
1✔
1524
                // returned from `PublishTransaction`, it means the tx has
1✔
1525
                // failed to meet the policy, hence it's not in the mempool.
1✔
1526
                inputs[op] = input
1✔
1527
        }
1528

1529
        return inputs
1530
}
1531

1532
// sweepPendingInputs is called when the ticker fires. It will create clusters
1533
// and attempt to create and publish the sweeping transactions.
2✔
1534
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
1535
        // Cluster all of our inputs based on the specific Aggregator.
1536
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
1✔
1537

1538
        // sweepWithLock is a helper closure that executes the sweep within a
1539
        // coin select lock to prevent the coins being selected for other
1540
        // transactions like funding of a channel.
1541
        sweepWithLock := func(set InputSet) error {
1✔
1542
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
1✔
1543
                        // Try to add inputs from our wallet.
1✔
1544
                        err := set.AddWalletInputs(s.cfg.Wallet)
1✔
1545
                        if err != nil {
1✔
1546
                                return err
1✔
1547
                        }
1✔
1548

2✔
1549
                        // Create sweeping transaction for each set.
2✔
1550
                        err = s.sweep(set)
1✔
1551
                        if err != nil {
1✔
1552
                                return err
1✔
UNCOV
1553
                        }
×
UNCOV
1554

×
1555
                        return nil
1556
                })
1557
        }
1✔
1558

2✔
1559
        for _, set := range sets {
1✔
1560
                var err error
1✔
1561
                if set.NeedWalletInput() {
UNCOV
1562
                        // Sweep the set of inputs that need the wallet inputs.
×
1563
                        err = sweepWithLock(set)
1564
                } else {
1565
                        // Sweep the set of inputs that don't need the wallet
1566
                        // inputs.
3✔
1567
                        err = s.sweep(set)
2✔
1568
                }
3✔
1569

1✔
1570
                if err != nil {
1✔
1571
                        log.Errorf("Failed to sweep %v: %v", set, err)
2✔
1572
                }
1✔
1573
        }
1✔
1574
}
1✔
1575

1✔
1576
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1577
// future updates about the sweeping tx.
4✔
1578
//
2✔
1579
// NOTE: must run as a goroutine.
2✔
1580
func (s *UtxoSweeper) monitorFeeBumpResult(resultChan <-chan *BumpResult) {
1581
        defer s.wg.Done()
1582

1583
        for {
1584
                select {
1585
                case r := <-resultChan:
1586
                        // Validate the result is valid.
1587
                        if err := r.Validate(); err != nil {
4✔
1588
                                log.Errorf("Received invalid result: %v", err)
4✔
1589
                                continue
4✔
1590
                        }
9✔
1591

5✔
1592
                        // Send the result back to the main event loop.
3✔
1593
                        select {
3✔
1594
                        case s.bumpResultChan <- r:
3✔
UNCOV
1595
                        case <-s.quit:
×
UNCOV
1596
                                log.Debug("Sweeper shutting down, skip " +
×
1597
                                        "sending bump result")
1598

1599
                                return
1600
                        }
3✔
1601

3✔
UNCOV
1602
                        // The sweeping tx has been confirmed, we can exit the
×
UNCOV
1603
                        // monitor now.
×
UNCOV
1604
                        //
×
UNCOV
1605
                        // TODO(yy): can instead remove the spend subscription
×
UNCOV
1606
                        // in sweeper and rely solely on this event to mark
×
1607
                        // inputs as Swept?
1608
                        if r.Event == TxConfirmed || r.Event == TxFailed {
1609
                                log.Debugf("Received %v for sweep tx %v, exit "+
1610
                                        "fee bump monitor", r.Event,
1611
                                        r.Tx.TxHash())
1612

1613
                                // Cancel the rebroadcasting of the failed tx.
1614
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
1615

5✔
1616
                                return
2✔
1617
                        }
2✔
1618

2✔
1619
                case <-s.quit:
2✔
1620
                        log.Debugf("Sweeper shutting down, exit fee " +
2✔
1621
                                "bump handler")
2✔
1622

2✔
1623
                        return
2✔
1624
                }
2✔
1625
        }
1626
}
2✔
1627

2✔
1628
// handleBumpEventTxFailed handles the case where the tx has been failed to
2✔
1629
// publish.
2✔
1630
func (s *UtxoSweeper) handleBumpEventTxFailed(r *BumpResult) error {
2✔
1631
        tx, err := r.Tx, r.Err
1632

1633
        log.Errorf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err)
1634

1635
        outpoints := make([]wire.OutPoint, 0, len(tx.TxIn))
1636
        for _, inp := range tx.TxIn {
1637
                outpoints = append(outpoints, inp.PreviousOutPoint)
1✔
1638
        }
1✔
1639

1✔
1640
        // TODO(yy): should we also remove the failed tx from db?
1✔
1641
        s.markInputsPublishFailed(outpoints)
1✔
1642

1✔
1643
        return err
4✔
1644
}
3✔
1645

3✔
1646
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1647
// replaced by a new one.
1648
func (s *UtxoSweeper) handleBumpEventTxReplaced(r *BumpResult) error {
1✔
1649
        oldTx := r.ReplacedTx
1✔
1650
        newTx := r.Tx
1✔
1651

1652
        // Prepare a new record to replace the old one.
1653
        tr := &TxRecord{
1654
                Txid:    newTx.TxHash(),
1655
                FeeRate: uint64(r.FeeRate),
3✔
1656
                Fee:     uint64(r.Fee),
3✔
1657
        }
3✔
1658

3✔
1659
        // Get the old record for logging purpose.
3✔
1660
        oldTxid := oldTx.TxHash()
3✔
1661
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1662
        if err != nil {
3✔
1663
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
3✔
1664
                return err
3✔
1665
        }
3✔
1666

3✔
1667
        // Cancel the rebroadcasting of the replaced tx.
3✔
1668
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
3✔
1669

4✔
1670
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
1✔
1671
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
1✔
1672
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
1✔
1673

1674
        // The old sweeping tx has been replaced by a new one, we will update
1675
        // the tx record in the sweeper db.
2✔
1676
        //
2✔
1677
        // TODO(yy): we may also need to update the inputs in this tx to a new
2✔
1678
        // state. Suppose a replacing tx only spends a subset of the inputs
2✔
1679
        // here, we'd end up with the rest being marked as `Published` and
2✔
1680
        // won't be aggregated in the next sweep. Atm it's fine as we always
2✔
1681
        // RBF the same input set.
2✔
1682
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
2✔
1683
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
2✔
1684
                return err
2✔
1685
        }
2✔
1686

2✔
1687
        // Mark the inputs as published using the replacing tx.
2✔
1688
        return s.markInputsPublished(tr, r.Tx.TxIn)
2✔
1689
}
3✔
1690

1✔
1691
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1✔
1692
// successfully published.
1✔
1693
func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error {
1694
        tx := r.Tx
1695
        tr := &TxRecord{
1✔
1696
                Txid:    tx.TxHash(),
1697
                FeeRate: uint64(r.FeeRate),
1698
                Fee:     uint64(r.Fee),
1699
        }
1700

1✔
1701
        // Inputs have been successfully published so we update their
1✔
1702
        // states.
1✔
1703
        err := s.markInputsPublished(tr, tx.TxIn)
1✔
1704
        if err != nil {
1✔
1705
                return err
1✔
1706
        }
1✔
1707

1✔
1708
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
1✔
1709
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
1✔
1710

1✔
1711
        // If there's no error, remove the output script. Otherwise
1✔
UNCOV
1712
        // keep it so that it can be reused for the next transaction
×
UNCOV
1713
        // and causes no address inflation.
×
1714
        s.currentOutputScript = nil
1715

1✔
1716
        return nil
1✔
1717
}
1✔
1718

1✔
1719
// handleBumpEvent handles the result sent from the bumper based on its event
1✔
1720
// type.
1✔
1721
//
1✔
1722
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1✔
1723
// input's spending event, we don't need to do anything here.
1✔
1724
func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error {
1725
        log.Debugf("Received bump event [%v] for tx %v", r.Event, r.Tx.TxHash())
1726

1727
        switch r.Event {
1728
        // The tx has been published, we update the inputs' state and create a
1729
        // record to be stored in the sweeper db.
1730
        case TxPublished:
1731
                return s.handleBumpEventTxPublished(r)
1✔
1732

1✔
1733
        // The tx has failed, we update the inputs' state.
1✔
1734
        case TxFailed:
1✔
1735
                return s.handleBumpEventTxFailed(r)
1736

UNCOV
1737
        // The tx has been replaced, we will remove the old tx and replace it
×
UNCOV
1738
        // with the new one.
×
1739
        case TxReplaced:
1740
                return s.handleBumpEventTxReplaced(r)
1741
        }
1✔
1742

1✔
1743
        return nil
1744
}
1745

UNCOV
1746
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
×
UNCOV
1747
//
×
1748
// NOTE: It is enough to check the txid because the sweeper will create
1749
// outpoints which solely belong to the internal LND wallet.
UNCOV
1750
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
×
1751
        found, err := s.cfg.Store.IsOurTx(op.Hash)
1752
        // In case there is an error fetching the transaction details from the
1753
        // sweeper store we assume the outpoint is still used by the sweeper
1754
        // (worst case scenario).
1755
        //
1756
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
UNCOV
1757
        // bucket.
×
UNCOV
1758
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
×
1759
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1760
                        "with: %v, we assume it is still in use by the sweeper",
×
1761
                        op.Hash, op.Index, err)
×
1762

×
1763
                return true
×
1764
        }
×
UNCOV
1765

×
UNCOV
1766
        return found
×
UNCOV
1767
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc