• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12280337084

11 Dec 2024 04:09PM UTC coverage: 49.495% (-0.05%) from 49.54%
12280337084

Pull #9343

github

ellemouton
fn: rework the ContextGuard and add tests

In this commit, the ContextGuard struct is re-worked such that the
context that its new main WithCtx method provides is cancelled in sync
with a parent context being cancelled or with it's quit channel being
cancelled. Tests are added to assert the behaviour. In order for the
close of the quit channel to be consistent with the cancelling of the
derived context, the quit channel _must_ be contained internal to the
ContextGuard so that callers are only able to close the channel via the
exposed Quit method which will then take care to first cancel any
derived context that depend on the quit channel before returning.
Pull Request #9343: fn: expand the ContextGuard and add tests

100285 of 202617 relevant lines covered (49.49%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.23
/sweep/sweeper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/wire"
12
        "github.com/davecgh/go-spew/spew"
13
        "github.com/lightningnetwork/lnd/chainntnfs"
14
        "github.com/lightningnetwork/lnd/fn/v2"
15
        "github.com/lightningnetwork/lnd/input"
16
        "github.com/lightningnetwork/lnd/lnutils"
17
        "github.com/lightningnetwork/lnd/lnwallet"
18
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
19
)
20

21
var (
22
        // ErrRemoteSpend is returned in case an output that we try to sweep is
23
        // confirmed in a tx of the remote party.
24
        ErrRemoteSpend = errors.New("remote party swept utxo")
25

26
        // ErrFeePreferenceTooLow is returned when the fee preference gives a
27
        // fee rate that's below the relay fee rate.
28
        ErrFeePreferenceTooLow = errors.New("fee preference too low")
29

30
        // ErrExclusiveGroupSpend is returned in case a different input of the
31
        // same exclusive group was spent.
32
        ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
33
                "was spent")
34

35
        // ErrSweeperShuttingDown is an error returned when a client attempts to
36
        // make a request to the UtxoSweeper, but it is unable to handle it as
37
        // it is/has already been stopped.
38
        ErrSweeperShuttingDown = errors.New("utxo sweeper shutting down")
39

40
        // DefaultDeadlineDelta defines a default deadline delta (1 week) to be
41
        // used when sweeping inputs with no deadline pressure.
42
        DefaultDeadlineDelta = int32(1008)
43
)
44

45
// Params contains the parameters that control the sweeping process.
46
type Params struct {
47
        // ExclusiveGroup is an identifier that, if set, prevents other inputs
48
        // with the same identifier from being batched together.
49
        ExclusiveGroup *uint64
50

51
        // DeadlineHeight specifies an absolute block height that this input
52
        // should be confirmed by. This value is used by the fee bumper to
53
        // decide its urgency and adjust its feerate used.
54
        DeadlineHeight fn.Option[int32]
55

56
        // Budget specifies the maximum amount of satoshis that can be spent on
57
        // fees for this sweep.
58
        Budget btcutil.Amount
59

60
        // Immediate indicates that the input should be swept immediately
61
        // without waiting for blocks to come to trigger the sweeping of
62
        // inputs.
63
        Immediate bool
64

65
        // StartingFeeRate is an optional parameter that can be used to specify
66
        // the initial fee rate to use for the fee function.
67
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
68
}
69

70
// String returns a human readable interpretation of the sweep parameters.
71
func (p Params) String() string {
3✔
72
        deadline := "none"
3✔
73
        p.DeadlineHeight.WhenSome(func(d int32) {
6✔
74
                deadline = fmt.Sprintf("%d", d)
3✔
75
        })
3✔
76

77
        exclusiveGroup := "none"
3✔
78
        if p.ExclusiveGroup != nil {
6✔
79
                exclusiveGroup = fmt.Sprintf("%d", *p.ExclusiveGroup)
3✔
80
        }
3✔
81

82
        return fmt.Sprintf("startingFeeRate=%v, immediate=%v, "+
3✔
83
                "exclusive_group=%v, budget=%v, deadline=%v", p.StartingFeeRate,
3✔
84
                p.Immediate, exclusiveGroup, p.Budget, deadline)
3✔
85
}
86

87
// SweepState represents the current state of a pending input.
88
//
89
//nolint:revive
90
type SweepState uint8
91

92
const (
93
        // Init is the initial state of a pending input. This is set when a new
94
        // sweeping request for a given input is made.
95
        Init SweepState = iota
96

97
        // PendingPublish specifies an input's state where it's already been
98
        // included in a sweeping tx but the tx is not published yet.  Inputs
99
        // in this state should not be used for grouping again.
100
        PendingPublish
101

102
        // Published is the state where the input's sweeping tx has
103
        // successfully been published. Inputs in this state can only be
104
        // updated via RBF.
105
        Published
106

107
        // PublishFailed is the state when an error is returned from publishing
108
        // the sweeping tx. Inputs in this state can be re-grouped in to a new
109
        // sweeping tx.
110
        PublishFailed
111

112
        // Swept is the final state of a pending input. This is set when the
113
        // input has been successfully swept.
114
        Swept
115

116
        // Excluded is the state of a pending input that has been excluded and
117
        // can no longer be swept. For instance, when one of the three anchor
118
        // sweeping transactions confirmed, the remaining two will be excluded.
119
        Excluded
120

121
        // Failed is the state when a pending input has too many failed publish
122
        // atttempts or unknown broadcast error is returned.
123
        Failed
124
)
125

126
// String gives a human readable text for the sweep states.
127
func (s SweepState) String() string {
3✔
128
        switch s {
3✔
129
        case Init:
2✔
130
                return "Init"
2✔
131

132
        case PendingPublish:
3✔
133
                return "PendingPublish"
3✔
134

135
        case Published:
3✔
136
                return "Published"
3✔
137

138
        case PublishFailed:
3✔
139
                return "PublishFailed"
3✔
140

141
        case Swept:
3✔
142
                return "Swept"
3✔
143

144
        case Excluded:
3✔
145
                return "Excluded"
3✔
146

147
        case Failed:
×
148
                return "Failed"
×
149

150
        default:
×
151
                return "Unknown"
×
152
        }
153
}
154

155
// RBFInfo stores the information required to perform a RBF bump on a pending
156
// sweeping tx.
157
type RBFInfo struct {
158
        // Txid is the txid of the sweeping tx.
159
        Txid chainhash.Hash
160

161
        // FeeRate is the fee rate of the sweeping tx.
162
        FeeRate chainfee.SatPerKWeight
163

164
        // Fee is the total fee of the sweeping tx.
165
        Fee btcutil.Amount
166
}
167

168
// SweeperInput is created when an input reaches the main loop for the first
169
// time. It wraps the input and tracks all relevant state that is needed for
170
// sweeping.
171
type SweeperInput struct {
172
        input.Input
173

174
        // state tracks the current state of the input.
175
        state SweepState
176

177
        // listeners is a list of channels over which the final outcome of the
178
        // sweep needs to be broadcasted.
179
        listeners []chan Result
180

181
        // ntfnRegCancel is populated with a function that cancels the chain
182
        // notifier spend registration.
183
        ntfnRegCancel func()
184

185
        // publishAttempts records the number of attempts that have already been
186
        // made to sweep this tx.
187
        publishAttempts int
188

189
        // params contains the parameters that control the sweeping process.
190
        params Params
191

192
        // lastFeeRate is the most recent fee rate used for this input within a
193
        // transaction broadcast to the network.
194
        lastFeeRate chainfee.SatPerKWeight
195

196
        // rbf records the RBF constraints.
197
        rbf fn.Option[RBFInfo]
198

199
        // DeadlineHeight is the deadline height for this input. This is
200
        // different from the DeadlineHeight in its params as it's an actual
201
        // value than an option.
202
        DeadlineHeight int32
203
}
204

205
// String returns a human readable interpretation of the pending input.
206
func (p *SweeperInput) String() string {
3✔
207
        return fmt.Sprintf("%v (%v)", p.Input.OutPoint(), p.Input.WitnessType())
3✔
208
}
3✔
209

210
// terminated returns a boolean indicating whether the input has reached a
211
// final state.
212
func (p *SweeperInput) terminated() bool {
3✔
213
        switch p.state {
3✔
214
        // If the input has reached a final state, that it's either
215
        // been swept, or failed, or excluded, we will remove it from
216
        // our sweeper.
217
        case Failed, Swept, Excluded:
3✔
218
                return true
3✔
219

220
        default:
3✔
221
                return false
3✔
222
        }
223
}
224

225
// InputsMap is a type alias for a set of pending inputs.
226
type InputsMap = map[wire.OutPoint]*SweeperInput
227

228
// pendingSweepsReq is an internal message we'll use to represent an external
229
// caller's intent to retrieve all of the pending inputs the UtxoSweeper is
230
// attempting to sweep.
231
type pendingSweepsReq struct {
232
        respChan chan map[wire.OutPoint]*PendingInputResponse
233
        errChan  chan error
234
}
235

236
// PendingInputResponse contains information about an input that is currently
237
// being swept by the UtxoSweeper.
238
type PendingInputResponse struct {
239
        // OutPoint is the identify outpoint of the input being swept.
240
        OutPoint wire.OutPoint
241

242
        // WitnessType is the witness type of the input being swept.
243
        WitnessType input.WitnessType
244

245
        // Amount is the amount of the input being swept.
246
        Amount btcutil.Amount
247

248
        // LastFeeRate is the most recent fee rate used for the input being
249
        // swept within a transaction broadcast to the network.
250
        LastFeeRate chainfee.SatPerKWeight
251

252
        // BroadcastAttempts is the number of attempts we've made to sweept the
253
        // input.
254
        BroadcastAttempts int
255

256
        // Params contains the sweep parameters for this pending request.
257
        Params Params
258

259
        // DeadlineHeight records the deadline height of this input.
260
        DeadlineHeight uint32
261
}
262

263
// updateReq is an internal message we'll use to represent an external caller's
264
// intent to update the sweep parameters of a given input.
265
type updateReq struct {
266
        input        wire.OutPoint
267
        params       Params
268
        responseChan chan *updateResp
269
}
270

271
// updateResp is an internal message we'll use to hand off the response of a
272
// updateReq from the UtxoSweeper's main event loop back to the caller.
273
type updateResp struct {
274
        resultChan chan Result
275
        err        error
276
}
277

278
// UtxoSweeper is responsible for sweeping outputs back into the wallet
279
type UtxoSweeper struct {
280
        started uint32 // To be used atomically.
281
        stopped uint32 // To be used atomically.
282

283
        cfg *UtxoSweeperConfig
284

285
        newInputs chan *sweepInputMessage
286
        spendChan chan *chainntnfs.SpendDetail
287

288
        // pendingSweepsReq is a channel that will be sent requests by external
289
        // callers in order to retrieve the set of pending inputs the
290
        // UtxoSweeper is attempting to sweep.
291
        pendingSweepsReqs chan *pendingSweepsReq
292

293
        // updateReqs is a channel that will be sent requests by external
294
        // callers who wish to bump the fee rate of a given input.
295
        updateReqs chan *updateReq
296

297
        // inputs is the total set of inputs the UtxoSweeper has been requested
298
        // to sweep.
299
        inputs InputsMap
300

301
        currentOutputScript fn.Option[lnwallet.AddrWithKey]
302

303
        relayFeeRate chainfee.SatPerKWeight
304

305
        quit chan struct{}
306
        wg   sync.WaitGroup
307

308
        // currentHeight is the best known height of the main chain. This is
309
        // updated whenever a new block epoch is received.
310
        currentHeight int32
311

312
        // bumpResultChan is a channel that receives broadcast results from the
313
        // TxPublisher.
314
        bumpResultChan chan *BumpResult
315
}
316

317
// UtxoSweeperConfig contains dependencies of UtxoSweeper.
318
type UtxoSweeperConfig struct {
319
        // GenSweepScript generates a P2WKH script belonging to the wallet where
320
        // funds can be swept.
321
        GenSweepScript func() fn.Result[lnwallet.AddrWithKey]
322

323
        // FeeEstimator is used when crafting sweep transactions to estimate
324
        // the necessary fee relative to the expected size of the sweep
325
        // transaction.
326
        FeeEstimator chainfee.Estimator
327

328
        // Wallet contains the wallet functions that sweeper requires.
329
        Wallet Wallet
330

331
        // Notifier is an instance of a chain notifier we'll use to watch for
332
        // certain on-chain events.
333
        Notifier chainntnfs.ChainNotifier
334

335
        // Mempool is the mempool watcher that will be used to query whether a
336
        // given input is already being spent by a transaction in the mempool.
337
        Mempool chainntnfs.MempoolWatcher
338

339
        // Store stores the published sweeper txes.
340
        Store SweeperStore
341

342
        // Signer is used by the sweeper to generate valid witnesses at the
343
        // time the incubated outputs need to be spent.
344
        Signer input.Signer
345

346
        // MaxInputsPerTx specifies the default maximum number of inputs allowed
347
        // in a single sweep tx. If more need to be swept, multiple txes are
348
        // created and published.
349
        MaxInputsPerTx uint32
350

351
        // MaxFeeRate is the maximum fee rate allowed within the UtxoSweeper.
352
        MaxFeeRate chainfee.SatPerVByte
353

354
        // Aggregator is used to group inputs into clusters based on its
355
        // implemention-specific strategy.
356
        Aggregator UtxoAggregator
357

358
        // Publisher is used to publish the sweep tx crafted here and monitors
359
        // it for potential fee bumps.
360
        Publisher Bumper
361

362
        // NoDeadlineConfTarget is the conf target to use when sweeping
363
        // non-time-sensitive outputs.
364
        NoDeadlineConfTarget uint32
365
}
366

367
// Result is the struct that is pushed through the result channel. Callers can
368
// use this to be informed of the final sweep result. In case of a remote
369
// spend, Err will be ErrRemoteSpend.
370
type Result struct {
371
        // Err is the final result of the sweep. It is nil when the input is
372
        // swept successfully by us. ErrRemoteSpend is returned when another
373
        // party took the input.
374
        Err error
375

376
        // Tx is the transaction that spent the input.
377
        Tx *wire.MsgTx
378
}
379

380
// sweepInputMessage structs are used in the internal channel between the
381
// SweepInput call and the sweeper main loop.
382
type sweepInputMessage struct {
383
        input      input.Input
384
        params     Params
385
        resultChan chan Result
386
}
387

388
// New returns a new Sweeper instance.
389
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
3✔
390
        return &UtxoSweeper{
3✔
391
                cfg:               cfg,
3✔
392
                newInputs:         make(chan *sweepInputMessage),
3✔
393
                spendChan:         make(chan *chainntnfs.SpendDetail),
3✔
394
                updateReqs:        make(chan *updateReq),
3✔
395
                pendingSweepsReqs: make(chan *pendingSweepsReq),
3✔
396
                quit:              make(chan struct{}),
3✔
397
                inputs:            make(InputsMap),
3✔
398
                bumpResultChan:    make(chan *BumpResult, 100),
3✔
399
        }
3✔
400
}
3✔
401

402
// Start starts the process of constructing and publish sweep txes.
403
func (s *UtxoSweeper) Start() error {
3✔
404
        if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
3✔
405
                return nil
×
406
        }
×
407

408
        log.Info("Sweeper starting")
3✔
409

3✔
410
        // Retrieve relay fee for dust limit calculation. Assume that this will
3✔
411
        // not change from here on.
3✔
412
        s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
3✔
413

3✔
414
        // We need to register for block epochs and retry sweeping every block.
3✔
415
        // We should get a notification with the current best block immediately
3✔
416
        // if we don't provide any epoch. We'll wait for that in the collector.
3✔
417
        blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
3✔
418
        if err != nil {
3✔
419
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
420
        }
×
421

422
        // Start sweeper main loop.
423
        s.wg.Add(1)
3✔
424
        go func() {
6✔
425
                defer blockEpochs.Cancel()
3✔
426
                defer s.wg.Done()
3✔
427

3✔
428
                s.collector(blockEpochs.Epochs)
3✔
429

3✔
430
                // The collector exited and won't longer handle incoming
3✔
431
                // requests. This can happen on shutdown, when the block
3✔
432
                // notifier shuts down before the sweeper and its clients. In
3✔
433
                // order to not deadlock the clients waiting for their requests
3✔
434
                // being handled, we handle them here and immediately return an
3✔
435
                // error. When the sweeper finally is shut down we can exit as
3✔
436
                // the clients will be notified.
3✔
437
                for {
6✔
438
                        select {
3✔
439
                        case inp := <-s.newInputs:
×
440
                                inp.resultChan <- Result{
×
441
                                        Err: ErrSweeperShuttingDown,
×
442
                                }
×
443

444
                        case req := <-s.pendingSweepsReqs:
×
445
                                req.errChan <- ErrSweeperShuttingDown
×
446

447
                        case req := <-s.updateReqs:
×
448
                                req.responseChan <- &updateResp{
×
449
                                        err: ErrSweeperShuttingDown,
×
450
                                }
×
451

452
                        case <-s.quit:
3✔
453
                                return
3✔
454
                        }
455
                }
456
        }()
457

458
        return nil
3✔
459
}
460

461
// RelayFeePerKW returns the minimum fee rate required for transactions to be
462
// relayed.
463
func (s *UtxoSweeper) RelayFeePerKW() chainfee.SatPerKWeight {
×
464
        return s.relayFeeRate
×
465
}
×
466

467
// Stop stops sweeper from listening to block epochs and constructing sweep
468
// txes.
469
func (s *UtxoSweeper) Stop() error {
3✔
470
        if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
3✔
471
                return nil
×
472
        }
×
473

474
        log.Info("Sweeper shutting down...")
3✔
475
        defer log.Debug("Sweeper shutdown complete")
3✔
476

3✔
477
        close(s.quit)
3✔
478
        s.wg.Wait()
3✔
479

3✔
480
        return nil
3✔
481
}
482

483
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
484
// swept after the batch time window ends. A custom fee preference can be
485
// provided to determine what fee rate should be used for the input. Note that
486
// the input may not always be swept with this exact value, as its possible for
487
// it to be batched under the same transaction with other similar fee rate
488
// inputs.
489
//
490
// NOTE: Extreme care needs to be taken that input isn't changed externally.
491
// Because it is an interface and we don't know what is exactly behind it, we
492
// cannot make a local copy in sweeper.
493
//
494
// TODO(yy): make sure the caller is using the Result chan.
495
func (s *UtxoSweeper) SweepInput(inp input.Input,
496
        params Params) (chan Result, error) {
3✔
497

3✔
498
        if inp == nil || inp.OutPoint() == input.EmptyOutPoint ||
3✔
499
                inp.SignDesc() == nil {
3✔
500

×
501
                return nil, errors.New("nil input received")
×
502
        }
×
503

504
        absoluteTimeLock, _ := inp.RequiredLockTime()
3✔
505
        log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
3✔
506
                "relative_time_lock=%v, absolute_time_lock=%v, amount=%v, "+
3✔
507
                "parent=(%v), params=(%v)", inp.OutPoint(), inp.WitnessType(),
3✔
508
                inp.BlocksToMaturity(), absoluteTimeLock,
3✔
509
                btcutil.Amount(inp.SignDesc().Output.Value),
3✔
510
                inp.UnconfParent(), params)
3✔
511

3✔
512
        sweeperInput := &sweepInputMessage{
3✔
513
                input:      inp,
3✔
514
                params:     params,
3✔
515
                resultChan: make(chan Result, 1),
3✔
516
        }
3✔
517

3✔
518
        // Deliver input to the main event loop.
3✔
519
        select {
3✔
520
        case s.newInputs <- sweeperInput:
3✔
521
        case <-s.quit:
×
522
                return nil, ErrSweeperShuttingDown
×
523
        }
524

525
        return sweeperInput.resultChan, nil
3✔
526
}
527

528
// removeConflictSweepDescendants removes any transactions from the wallet that
529
// spend outputs included in the passed outpoint set. This needs to be done in
530
// cases where we're not the only ones that can sweep an output, but there may
531
// exist unconfirmed spends that spend outputs created by a sweep transaction.
532
// The most common case for this is when someone sweeps our anchor outputs
533
// after 16 blocks. Moreover this is also needed for wallets which use neutrino
534
// as a backend when a channel is force closed and anchor cpfp txns are
535
// created to bump the initial commitment transaction. In this case an anchor
536
// cpfp is broadcasted for up to 3 commitment transactions (local,
537
// remote-dangling, remote). Using neutrino all of those transactions will be
538
// accepted (the commitment tx will be different in all of those cases) and have
539
// to be removed as soon as one of them confirmes (they do have the same
540
// ExclusiveGroup). For neutrino backends the corresponding BIP 157 serving full
541
// nodes do not signal invalid transactions anymore.
542
func (s *UtxoSweeper) removeConflictSweepDescendants(
543
        outpoints map[wire.OutPoint]struct{}) error {
3✔
544

3✔
545
        // Obtain all the past sweeps that we've done so far. We'll need these
3✔
546
        // to ensure that if the spendingTx spends any of the same inputs, then
3✔
547
        // we remove any transaction that may be spending those inputs from the
3✔
548
        // wallet.
3✔
549
        //
3✔
550
        // TODO(roasbeef): can be last sweep here if we remove anything confirmed
3✔
551
        // from the store?
3✔
552
        pastSweepHashes, err := s.cfg.Store.ListSweeps()
3✔
553
        if err != nil {
3✔
554
                return err
×
555
        }
×
556

557
        // We'll now go through each past transaction we published during this
558
        // epoch and cross reference the spent inputs. If there're any inputs
559
        // in common with the inputs the spendingTx spent, then we'll remove
560
        // those.
561
        //
562
        // TODO(roasbeef): need to start to remove all transaction hashes after
563
        // every N blocks (assumed point of no return)
564
        for _, sweepHash := range pastSweepHashes {
6✔
565
                sweepTx, err := s.cfg.Wallet.FetchTx(sweepHash)
3✔
566
                if err != nil {
3✔
567
                        return err
×
568
                }
×
569

570
                // Transaction wasn't found in the wallet, may have already
571
                // been replaced/removed.
572
                if sweepTx == nil {
6✔
573
                        // If it was removed, then we'll play it safe and mark
3✔
574
                        // it as no longer need to be rebroadcasted.
3✔
575
                        s.cfg.Wallet.CancelRebroadcast(sweepHash)
3✔
576
                        continue
3✔
577
                }
578

579
                // Check to see if this past sweep transaction spent any of the
580
                // same inputs as spendingTx.
581
                var isConflicting bool
3✔
582
                for _, txIn := range sweepTx.TxIn {
6✔
583
                        if _, ok := outpoints[txIn.PreviousOutPoint]; ok {
6✔
584
                                isConflicting = true
3✔
585
                                break
3✔
586
                        }
587
                }
588

589
                if !isConflicting {
6✔
590
                        continue
3✔
591
                }
592

593
                // If it is conflicting, then we'll signal the wallet to remove
594
                // all the transactions that are descendants of outputs created
595
                // by the sweepTx and the sweepTx itself.
596
                log.Debugf("Removing sweep txid=%v from wallet: %v",
3✔
597
                        sweepTx.TxHash(), spew.Sdump(sweepTx))
3✔
598

3✔
599
                err = s.cfg.Wallet.RemoveDescendants(sweepTx)
3✔
600
                if err != nil {
3✔
601
                        log.Warnf("Unable to remove descendants: %v", err)
×
602
                }
×
603

604
                // If this transaction was conflicting, then we'll stop
605
                // rebroadcasting it in the background.
606
                s.cfg.Wallet.CancelRebroadcast(sweepHash)
3✔
607
        }
608

609
        return nil
3✔
610
}
611

612
// collector is the sweeper main loop. It processes new inputs, spend
613
// notifications and counts down to publication of the sweep tx.
614
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
3✔
615
        // We registered for the block epochs with a nil request. The notifier
3✔
616
        // should send us the current best block immediately. So we need to wait
3✔
617
        // for it here because we need to know the current best height.
3✔
618
        select {
3✔
619
        case bestBlock := <-blockEpochs:
3✔
620
                s.currentHeight = bestBlock.Height
3✔
621

622
        case <-s.quit:
×
623
                return
×
624
        }
625

626
        for {
6✔
627
                // Clean inputs, which will remove inputs that are swept,
3✔
628
                // failed, or excluded from the sweeper and return inputs that
3✔
629
                // are either new or has been published but failed back, which
3✔
630
                // will be retried again here.
3✔
631
                s.updateSweeperInputs()
3✔
632

3✔
633
                select {
3✔
634
                // A new inputs is offered to the sweeper. We check to see if
635
                // we are already trying to sweep this input and if not, set up
636
                // a listener to spend and schedule a sweep.
637
                case input := <-s.newInputs:
3✔
638
                        err := s.handleNewInput(input)
3✔
639
                        if err != nil {
3✔
640
                                log.Criticalf("Unable to handle new input: %v",
×
641
                                        err)
×
642

×
643
                                return
×
644
                        }
×
645

646
                        // If this input is forced, we perform an sweep
647
                        // immediately.
648
                        //
649
                        // TODO(ziggie): Make sure when `immediate` is selected
650
                        // as a parameter that we only trigger the sweeping of
651
                        // this specific input rather than triggering the sweeps
652
                        // of all current pending inputs registered with the
653
                        // sweeper.
654
                        if input.params.Immediate {
6✔
655
                                inputs := s.updateSweeperInputs()
3✔
656
                                s.sweepPendingInputs(inputs)
3✔
657
                        }
3✔
658

659
                // A spend of one of our inputs is detected. Signal sweep
660
                // results to the caller(s).
661
                case spend := <-s.spendChan:
3✔
662
                        s.handleInputSpent(spend)
3✔
663

664
                // A new external request has been received to retrieve all of
665
                // the inputs we're currently attempting to sweep.
666
                case req := <-s.pendingSweepsReqs:
3✔
667
                        s.handlePendingSweepsReq(req)
3✔
668

669
                // A new external request has been received to bump the fee rate
670
                // of a given input.
671
                case req := <-s.updateReqs:
3✔
672
                        resultChan, err := s.handleUpdateReq(req)
3✔
673
                        req.responseChan <- &updateResp{
3✔
674
                                resultChan: resultChan,
3✔
675
                                err:        err,
3✔
676
                        }
3✔
677

3✔
678
                        // Perform an sweep immediately if asked.
3✔
679
                        if req.params.Immediate {
6✔
680
                                inputs := s.updateSweeperInputs()
3✔
681
                                s.sweepPendingInputs(inputs)
3✔
682
                        }
3✔
683

684
                case result := <-s.bumpResultChan:
3✔
685
                        // Handle the bump event.
3✔
686
                        err := s.handleBumpEvent(result)
3✔
687
                        if err != nil {
6✔
688
                                log.Errorf("Failed to handle bump event: %v",
3✔
689
                                        err)
3✔
690
                        }
3✔
691

692
                // A new block comes in, update the bestHeight, perform a check
693
                // over all pending inputs and publish sweeping txns if needed.
694
                case epoch, ok := <-blockEpochs:
3✔
695
                        if !ok {
3✔
696
                                // We should stop the sweeper before stopping
×
697
                                // the chain service. Otherwise it indicates an
×
698
                                // error.
×
699
                                log.Error("Block epoch channel closed")
×
700

×
701
                                return
×
702
                        }
×
703

704
                        // Update the sweeper to the best height.
705
                        s.currentHeight = epoch.Height
3✔
706

3✔
707
                        // Update the inputs with the latest height.
3✔
708
                        inputs := s.updateSweeperInputs()
3✔
709

3✔
710
                        log.Debugf("Received new block: height=%v, attempt "+
3✔
711
                                "sweeping %d inputs", epoch.Height, len(inputs))
3✔
712

3✔
713
                        // Attempt to sweep any pending inputs.
3✔
714
                        s.sweepPendingInputs(inputs)
3✔
715

716
                case <-s.quit:
3✔
717
                        return
3✔
718
                }
719
        }
720
}
721

722
// removeExclusiveGroup removes all inputs in the given exclusive group. This
723
// function is called when one of the exclusive group inputs has been spent. The
724
// other inputs won't ever be spendable and can be removed. This also prevents
725
// them from being part of future sweep transactions that would fail. In
726
// addition sweep transactions of those inputs will be removed from the wallet.
727
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
3✔
728
        for outpoint, input := range s.inputs {
6✔
729
                outpoint := outpoint
3✔
730

3✔
731
                // Skip inputs that aren't exclusive.
3✔
732
                if input.params.ExclusiveGroup == nil {
6✔
733
                        continue
3✔
734
                }
735

736
                // Skip inputs from other exclusive groups.
737
                if *input.params.ExclusiveGroup != group {
3✔
738
                        continue
×
739
                }
740

741
                // Skip inputs that are already terminated.
742
                if input.terminated() {
6✔
743
                        log.Tracef("Skipped sending error result for "+
3✔
744
                                "input %v, state=%v", outpoint, input.state)
3✔
745

3✔
746
                        continue
3✔
747
                }
748

749
                // Signal result channels.
750
                s.signalResult(input, Result{
3✔
751
                        Err: ErrExclusiveGroupSpend,
3✔
752
                })
3✔
753

3✔
754
                // Update the input's state as it can no longer be swept.
3✔
755
                input.state = Excluded
3✔
756

3✔
757
                // Remove all unconfirmed transactions from the wallet which
3✔
758
                // spend the passed outpoint of the same exclusive group.
3✔
759
                outpoints := map[wire.OutPoint]struct{}{
3✔
760
                        outpoint: {},
3✔
761
                }
3✔
762
                err := s.removeConflictSweepDescendants(outpoints)
3✔
763
                if err != nil {
3✔
764
                        log.Warnf("Unable to remove conflicting sweep tx from "+
×
765
                                "wallet for outpoint %v : %v", outpoint, err)
×
766
                }
×
767
        }
768
}
769

770
// signalResult notifies the listeners of the final result of the input sweep.
771
// It also cancels any pending spend notification.
772
func (s *UtxoSweeper) signalResult(pi *SweeperInput, result Result) {
3✔
773
        op := pi.OutPoint()
3✔
774
        listeners := pi.listeners
3✔
775

3✔
776
        if result.Err == nil {
6✔
777
                log.Tracef("Dispatching sweep success for %v to %v listeners",
3✔
778
                        op, len(listeners),
3✔
779
                )
3✔
780
        } else {
6✔
781
                log.Tracef("Dispatching sweep error for %v to %v listeners: %v",
3✔
782
                        op, len(listeners), result.Err,
3✔
783
                )
3✔
784
        }
3✔
785

786
        // Signal all listeners. Channel is buffered. Because we only send once
787
        // on every channel, it should never block.
788
        for _, resultChan := range listeners {
6✔
789
                resultChan <- result
3✔
790
        }
3✔
791

792
        // Cancel spend notification with chain notifier. This is not necessary
793
        // in case of a success, except for that a reorg could still happen.
794
        if pi.ntfnRegCancel != nil {
6✔
795
                log.Debugf("Canceling spend ntfn for %v", op)
3✔
796

3✔
797
                pi.ntfnRegCancel()
3✔
798
        }
3✔
799
}
800

801
// sweep takes a set of preselected inputs, creates a sweep tx and publishes
802
// the tx. The output address is only marked as used if the publish succeeds.
803
func (s *UtxoSweeper) sweep(set InputSet) error {
3✔
804
        // Generate an output script if there isn't an unused script available.
3✔
805
        if s.currentOutputScript.IsNone() {
6✔
806
                addr, err := s.cfg.GenSweepScript().Unpack()
3✔
807
                if err != nil {
3✔
808
                        return fmt.Errorf("gen sweep script: %w", err)
×
809
                }
×
810
                s.currentOutputScript = fn.Some(addr)
3✔
811
        }
812

813
        sweepAddr, err := s.currentOutputScript.UnwrapOrErr(
3✔
814
                fmt.Errorf("none sweep script"),
3✔
815
        )
3✔
816
        if err != nil {
3✔
817
                return err
×
818
        }
×
819

820
        // Create a fee bump request and ask the publisher to broadcast it. The
821
        // publisher will then take over and start monitoring the tx for
822
        // potential fee bump.
823
        req := &BumpRequest{
3✔
824
                Inputs:          set.Inputs(),
3✔
825
                Budget:          set.Budget(),
3✔
826
                DeadlineHeight:  set.DeadlineHeight(),
3✔
827
                DeliveryAddress: sweepAddr,
3✔
828
                MaxFeeRate:      s.cfg.MaxFeeRate.FeePerKWeight(),
3✔
829
                StartingFeeRate: set.StartingFeeRate(),
3✔
830
                // TODO(yy): pass the strategy here.
3✔
831
        }
3✔
832

3✔
833
        // Reschedule the inputs that we just tried to sweep. This is done in
3✔
834
        // case the following publish fails, we'd like to update the inputs'
3✔
835
        // publish attempts and rescue them in the next sweep.
3✔
836
        s.markInputsPendingPublish(set)
3✔
837

3✔
838
        // Broadcast will return a read-only chan that we will listen to for
3✔
839
        // this publish result and future RBF attempt.
3✔
840
        resp, err := s.cfg.Publisher.Broadcast(req)
3✔
841
        if err != nil {
6✔
842
                outpoints := make([]wire.OutPoint, len(set.Inputs()))
3✔
843
                for i, inp := range set.Inputs() {
6✔
844
                        outpoints[i] = inp.OutPoint()
3✔
845
                }
3✔
846

847
                log.Errorf("Initial broadcast failed: %v, inputs=\n%v", err,
3✔
848
                        inputTypeSummary(set.Inputs()))
3✔
849

3✔
850
                // TODO(yy): find out which input is causing the failure.
3✔
851
                s.markInputsPublishFailed(outpoints)
3✔
852

3✔
853
                return err
3✔
854
        }
855

856
        // Successfully sent the broadcast attempt, we now handle the result by
857
        // subscribing to the result chan and listen for future updates about
858
        // this tx.
859
        s.wg.Add(1)
3✔
860
        go s.monitorFeeBumpResult(resp)
3✔
861

3✔
862
        return nil
3✔
863
}
864

865
// markInputsPendingPublish updates the pending inputs with the given tx
866
// inputs. It also increments the `publishAttempts`.
867
func (s *UtxoSweeper) markInputsPendingPublish(set InputSet) {
3✔
868
        // Reschedule sweep.
3✔
869
        for _, input := range set.Inputs() {
6✔
870
                pi, ok := s.inputs[input.OutPoint()]
3✔
871
                if !ok {
6✔
872
                        // It could be that this input is an additional wallet
3✔
873
                        // input that was attached. In that case there also
3✔
874
                        // isn't a pending input to update.
3✔
875
                        log.Tracef("Skipped marking input as pending "+
3✔
876
                                "published: %v not found in pending inputs",
3✔
877
                                input.OutPoint())
3✔
878

3✔
879
                        continue
3✔
880
                }
881

882
                // If this input has already terminated, there's clearly
883
                // something wrong as it would have been removed. In this case
884
                // we log an error and skip marking this input as pending
885
                // publish.
886
                if pi.terminated() {
3✔
887
                        log.Errorf("Expect input %v to not have terminated "+
×
888
                                "state, instead it has %v",
×
889
                                input.OutPoint, pi.state)
×
890

×
891
                        continue
×
892
                }
893

894
                // Update the input's state.
895
                pi.state = PendingPublish
3✔
896

3✔
897
                // Record another publish attempt.
3✔
898
                pi.publishAttempts++
3✔
899
        }
900
}
901

902
// markInputsPublished updates the sweeping tx in db and marks the list of
903
// inputs as published.
904
func (s *UtxoSweeper) markInputsPublished(tr *TxRecord,
905
        inputs []*wire.TxIn) error {
3✔
906

3✔
907
        // Mark this tx in db once successfully published.
3✔
908
        //
3✔
909
        // NOTE: this will behave as an overwrite, which is fine as the record
3✔
910
        // is small.
3✔
911
        tr.Published = true
3✔
912
        err := s.cfg.Store.StoreTx(tr)
3✔
913
        if err != nil {
3✔
914
                return fmt.Errorf("store tx: %w", err)
×
915
        }
×
916

917
        // Reschedule sweep.
918
        for _, input := range inputs {
6✔
919
                pi, ok := s.inputs[input.PreviousOutPoint]
3✔
920
                if !ok {
6✔
921
                        // It could be that this input is an additional wallet
3✔
922
                        // input that was attached. In that case there also
3✔
923
                        // isn't a pending input to update.
3✔
924
                        log.Tracef("Skipped marking input as published: %v "+
3✔
925
                                "not found in pending inputs",
3✔
926
                                input.PreviousOutPoint)
3✔
927

3✔
928
                        continue
3✔
929
                }
930

931
                // Valdiate that the input is in an expected state.
932
                if pi.state != PendingPublish {
6✔
933
                        // We may get a Published if this is a replacement tx.
3✔
934
                        log.Debugf("Expect input %v to have %v, instead it "+
3✔
935
                                "has %v", input.PreviousOutPoint,
3✔
936
                                PendingPublish, pi.state)
3✔
937

3✔
938
                        continue
3✔
939
                }
940

941
                // Update the input's state.
942
                pi.state = Published
3✔
943

3✔
944
                // Update the input's latest fee rate.
3✔
945
                pi.lastFeeRate = chainfee.SatPerKWeight(tr.FeeRate)
3✔
946
        }
947

948
        return nil
3✔
949
}
950

951
// markInputsPublishFailed marks the list of inputs as failed to be published.
952
func (s *UtxoSweeper) markInputsPublishFailed(outpoints []wire.OutPoint) {
3✔
953
        // Reschedule sweep.
3✔
954
        for _, op := range outpoints {
6✔
955
                pi, ok := s.inputs[op]
3✔
956
                if !ok {
6✔
957
                        // It could be that this input is an additional wallet
3✔
958
                        // input that was attached. In that case there also
3✔
959
                        // isn't a pending input to update.
3✔
960
                        log.Tracef("Skipped marking input as publish failed: "+
3✔
961
                                "%v not found in pending inputs", op)
3✔
962

3✔
963
                        continue
3✔
964
                }
965

966
                // Valdiate that the input is in an expected state.
967
                if pi.state != PendingPublish && pi.state != Published {
3✔
968
                        log.Debugf("Expect input %v to have %v, instead it "+
×
969
                                "has %v", op, PendingPublish, pi.state)
×
970

×
971
                        continue
×
972
                }
973

974
                log.Warnf("Failed to publish input %v", op)
3✔
975

3✔
976
                // Update the input's state.
3✔
977
                pi.state = PublishFailed
3✔
978
        }
979
}
980

981
// monitorSpend registers a spend notification with the chain notifier. It
982
// returns a cancel function that can be used to cancel the registration.
983
func (s *UtxoSweeper) monitorSpend(outpoint wire.OutPoint,
984
        script []byte, heightHint uint32) (func(), error) {
3✔
985

3✔
986
        log.Tracef("Wait for spend of %v at heightHint=%v",
3✔
987
                outpoint, heightHint)
3✔
988

3✔
989
        spendEvent, err := s.cfg.Notifier.RegisterSpendNtfn(
3✔
990
                &outpoint, script, heightHint,
3✔
991
        )
3✔
992
        if err != nil {
3✔
993
                return nil, fmt.Errorf("register spend ntfn: %w", err)
×
994
        }
×
995

996
        s.wg.Add(1)
3✔
997
        go func() {
6✔
998
                defer s.wg.Done()
3✔
999

3✔
1000
                select {
3✔
1001
                case spend, ok := <-spendEvent.Spend:
3✔
1002
                        if !ok {
6✔
1003
                                log.Debugf("Spend ntfn for %v canceled",
3✔
1004
                                        outpoint)
3✔
1005
                                return
3✔
1006
                        }
3✔
1007

1008
                        log.Debugf("Delivering spend ntfn for %v", outpoint)
3✔
1009

3✔
1010
                        select {
3✔
1011
                        case s.spendChan <- spend:
3✔
1012
                                log.Debugf("Delivered spend ntfn for %v",
3✔
1013
                                        outpoint)
3✔
1014

1015
                        case <-s.quit:
1✔
1016
                        }
1017
                case <-s.quit:
3✔
1018
                }
1019
        }()
1020

1021
        return spendEvent.Cancel, nil
3✔
1022
}
1023

1024
// PendingInputs returns the set of inputs that the UtxoSweeper is currently
1025
// attempting to sweep.
1026
func (s *UtxoSweeper) PendingInputs() (
1027
        map[wire.OutPoint]*PendingInputResponse, error) {
3✔
1028

3✔
1029
        respChan := make(chan map[wire.OutPoint]*PendingInputResponse, 1)
3✔
1030
        errChan := make(chan error, 1)
3✔
1031
        select {
3✔
1032
        case s.pendingSweepsReqs <- &pendingSweepsReq{
1033
                respChan: respChan,
1034
                errChan:  errChan,
1035
        }:
3✔
1036
        case <-s.quit:
×
1037
                return nil, ErrSweeperShuttingDown
×
1038
        }
1039

1040
        select {
3✔
1041
        case pendingSweeps := <-respChan:
3✔
1042
                return pendingSweeps, nil
3✔
1043
        case err := <-errChan:
×
1044
                return nil, err
×
1045
        case <-s.quit:
×
1046
                return nil, ErrSweeperShuttingDown
×
1047
        }
1048
}
1049

1050
// handlePendingSweepsReq handles a request to retrieve all pending inputs the
1051
// UtxoSweeper is attempting to sweep.
1052
func (s *UtxoSweeper) handlePendingSweepsReq(
1053
        req *pendingSweepsReq) map[wire.OutPoint]*PendingInputResponse {
3✔
1054

3✔
1055
        resps := make(map[wire.OutPoint]*PendingInputResponse, len(s.inputs))
3✔
1056
        for _, inp := range s.inputs {
6✔
1057
                // Only the exported fields are set, as we expect the response
3✔
1058
                // to only be consumed externally.
3✔
1059
                op := inp.OutPoint()
3✔
1060
                resps[op] = &PendingInputResponse{
3✔
1061
                        OutPoint:    op,
3✔
1062
                        WitnessType: inp.WitnessType(),
3✔
1063
                        Amount: btcutil.Amount(
3✔
1064
                                inp.SignDesc().Output.Value,
3✔
1065
                        ),
3✔
1066
                        LastFeeRate:       inp.lastFeeRate,
3✔
1067
                        BroadcastAttempts: inp.publishAttempts,
3✔
1068
                        Params:            inp.params,
3✔
1069
                        DeadlineHeight:    uint32(inp.DeadlineHeight),
3✔
1070
                }
3✔
1071
        }
3✔
1072

1073
        select {
3✔
1074
        case req.respChan <- resps:
3✔
1075
        case <-s.quit:
×
1076
                log.Debug("Skipped sending pending sweep response due to " +
×
1077
                        "UtxoSweeper shutting down")
×
1078
        }
1079

1080
        return resps
3✔
1081
}
1082

1083
// UpdateParams allows updating the sweep parameters of a pending input in the
1084
// UtxoSweeper. This function can be used to provide an updated fee preference
1085
// and force flag that will be used for a new sweep transaction of the input
1086
// that will act as a replacement transaction (RBF) of the original sweeping
1087
// transaction, if any. The exclusive group is left unchanged.
1088
//
1089
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
1090
// is actually successful. The responsibility of doing so should be handled by
1091
// the caller.
1092
func (s *UtxoSweeper) UpdateParams(input wire.OutPoint,
1093
        params Params) (chan Result, error) {
3✔
1094

3✔
1095
        responseChan := make(chan *updateResp, 1)
3✔
1096
        select {
3✔
1097
        case s.updateReqs <- &updateReq{
1098
                input:        input,
1099
                params:       params,
1100
                responseChan: responseChan,
1101
        }:
3✔
1102
        case <-s.quit:
×
1103
                return nil, ErrSweeperShuttingDown
×
1104
        }
1105

1106
        select {
3✔
1107
        case response := <-responseChan:
3✔
1108
                return response.resultChan, response.err
3✔
1109
        case <-s.quit:
×
1110
                return nil, ErrSweeperShuttingDown
×
1111
        }
1112
}
1113

1114
// handleUpdateReq handles an update request by simply updating the sweep
1115
// parameters of the pending input. Currently, no validation is done on the new
1116
// fee preference to ensure it will properly create a replacement transaction.
1117
//
1118
// TODO(wilmer):
1119
//   - Validate fee preference to ensure we'll create a valid replacement
1120
//     transaction to allow the new fee rate to propagate throughout the
1121
//     network.
1122
//   - Ensure we don't combine this input with any other unconfirmed inputs that
1123
//     did not exist in the original sweep transaction, resulting in an invalid
1124
//     replacement transaction.
1125
func (s *UtxoSweeper) handleUpdateReq(req *updateReq) (
1126
        chan Result, error) {
3✔
1127

3✔
1128
        // If the UtxoSweeper is already trying to sweep this input, then we can
3✔
1129
        // simply just increase its fee rate. This will allow the input to be
3✔
1130
        // batched with others which also have a similar fee rate, creating a
3✔
1131
        // higher fee rate transaction that replaces the original input's
3✔
1132
        // sweeping transaction.
3✔
1133
        sweeperInput, ok := s.inputs[req.input]
3✔
1134
        if !ok {
3✔
1135
                return nil, lnwallet.ErrNotMine
×
1136
        }
×
1137

1138
        // Create the updated parameters struct. Leave the exclusive group
1139
        // unchanged.
1140
        newParams := Params{
3✔
1141
                StartingFeeRate: req.params.StartingFeeRate,
3✔
1142
                Immediate:       req.params.Immediate,
3✔
1143
                Budget:          req.params.Budget,
3✔
1144
                DeadlineHeight:  req.params.DeadlineHeight,
3✔
1145
                ExclusiveGroup:  sweeperInput.params.ExclusiveGroup,
3✔
1146
        }
3✔
1147

3✔
1148
        log.Debugf("Updating parameters for %v(state=%v) from (%v) to (%v)",
3✔
1149
                req.input, sweeperInput.state, sweeperInput.params, newParams)
3✔
1150

3✔
1151
        sweeperInput.params = newParams
3✔
1152

3✔
1153
        // We need to reset the state so this input will be attempted again by
3✔
1154
        // our sweeper.
3✔
1155
        //
3✔
1156
        // TODO(yy): a dedicated state?
3✔
1157
        sweeperInput.state = Init
3✔
1158

3✔
1159
        // If the new input specifies a deadline, update the deadline height.
3✔
1160
        sweeperInput.DeadlineHeight = req.params.DeadlineHeight.UnwrapOr(
3✔
1161
                sweeperInput.DeadlineHeight,
3✔
1162
        )
3✔
1163

3✔
1164
        resultChan := make(chan Result, 1)
3✔
1165
        sweeperInput.listeners = append(sweeperInput.listeners, resultChan)
3✔
1166

3✔
1167
        return resultChan, nil
3✔
1168
}
1169

1170
// ListSweeps returns a list of the sweeps recorded by the sweep store.
1171
func (s *UtxoSweeper) ListSweeps() ([]chainhash.Hash, error) {
3✔
1172
        return s.cfg.Store.ListSweeps()
3✔
1173
}
3✔
1174

1175
// mempoolLookup takes an input's outpoint and queries the mempool to see
1176
// whether it's already been spent in a transaction found in the mempool.
1177
// Returns the transaction if found.
1178
func (s *UtxoSweeper) mempoolLookup(op wire.OutPoint) fn.Option[wire.MsgTx] {
3✔
1179
        // For neutrino backend, there's no mempool available, so we exit
3✔
1180
        // early.
3✔
1181
        if s.cfg.Mempool == nil {
4✔
1182
                log.Debugf("Skipping mempool lookup for %v, no mempool ", op)
1✔
1183

1✔
1184
                return fn.None[wire.MsgTx]()
1✔
1185
        }
1✔
1186

1187
        // Query this input in the mempool. If this outpoint is already spent
1188
        // in mempool, we should get a spending event back immediately.
1189
        return s.cfg.Mempool.LookupInputMempoolSpend(op)
2✔
1190
}
1191

1192
// handleNewInput processes a new input by registering spend notification and
1193
// scheduling sweeping for it.
1194
func (s *UtxoSweeper) handleNewInput(input *sweepInputMessage) error {
3✔
1195
        // Create a default deadline height, which will be used when there's no
3✔
1196
        // DeadlineHeight specified for a given input.
3✔
1197
        defaultDeadline := s.currentHeight + int32(s.cfg.NoDeadlineConfTarget)
3✔
1198

3✔
1199
        outpoint := input.input.OutPoint()
3✔
1200
        pi, pending := s.inputs[outpoint]
3✔
1201
        if pending {
6✔
1202
                log.Debugf("Already has pending input %v received", outpoint)
3✔
1203

3✔
1204
                s.handleExistingInput(input, pi)
3✔
1205

3✔
1206
                return nil
3✔
1207
        }
3✔
1208

1209
        // This is a new input, and we want to query the mempool to see if this
1210
        // input has already been spent. If so, we'll start the input with
1211
        // state Published and attach the RBFInfo.
1212
        state, rbfInfo := s.decideStateAndRBFInfo(input.input.OutPoint())
3✔
1213

3✔
1214
        // Create a new pendingInput and initialize the listeners slice with
3✔
1215
        // the passed in result channel. If this input is offered for sweep
3✔
1216
        // again, the result channel will be appended to this slice.
3✔
1217
        pi = &SweeperInput{
3✔
1218
                state:     state,
3✔
1219
                listeners: []chan Result{input.resultChan},
3✔
1220
                Input:     input.input,
3✔
1221
                params:    input.params,
3✔
1222
                rbf:       rbfInfo,
3✔
1223
                // Set the acutal deadline height.
3✔
1224
                DeadlineHeight: input.params.DeadlineHeight.UnwrapOr(
3✔
1225
                        defaultDeadline,
3✔
1226
                ),
3✔
1227
        }
3✔
1228

3✔
1229
        s.inputs[outpoint] = pi
3✔
1230
        log.Tracef("input %v, state=%v, added to inputs", outpoint, pi.state)
3✔
1231

3✔
1232
        // Start watching for spend of this input, either by us or the remote
3✔
1233
        // party.
3✔
1234
        cancel, err := s.monitorSpend(
3✔
1235
                outpoint, input.input.SignDesc().Output.PkScript,
3✔
1236
                input.input.HeightHint(),
3✔
1237
        )
3✔
1238
        if err != nil {
3✔
1239
                err := fmt.Errorf("wait for spend: %w", err)
×
1240
                s.markInputFailed(pi, err)
×
1241

×
1242
                return err
×
1243
        }
×
1244

1245
        pi.ntfnRegCancel = cancel
3✔
1246

3✔
1247
        return nil
3✔
1248
}
1249

1250
// decideStateAndRBFInfo queries the mempool to see whether the given input has
1251
// already been spent. If so, the state Published will be returned, otherwise
1252
// state Init. When spent, it will query the sweeper store to fetch the fee
1253
// info of the spending transction, and construct an RBFInfo based on it.
1254
// Suppose an error occurs, fn.None is returned.
1255
func (s *UtxoSweeper) decideStateAndRBFInfo(op wire.OutPoint) (
1256
        SweepState, fn.Option[RBFInfo]) {
3✔
1257

3✔
1258
        // Check if we can find the spending tx of this input in mempool.
3✔
1259
        txOption := s.mempoolLookup(op)
3✔
1260

3✔
1261
        // Extract the spending tx from the option.
3✔
1262
        var tx *wire.MsgTx
3✔
1263
        txOption.WhenSome(func(t wire.MsgTx) {
5✔
1264
                tx = &t
2✔
1265
        })
2✔
1266

1267
        // Exit early if it's not found.
1268
        //
1269
        // NOTE: this is not accurate for backends that don't support mempool
1270
        // lookup:
1271
        // - for neutrino we don't have a mempool.
1272
        // - for btcd below v0.24.1 we don't have `gettxspendingprevout`.
1273
        if tx == nil {
6✔
1274
                return Init, fn.None[RBFInfo]()
3✔
1275
        }
3✔
1276

1277
        // Otherwise the input is already spent in the mempool, so eventually
1278
        // we will return Published.
1279
        //
1280
        // We also need to update the RBF info for this input. If the sweeping
1281
        // transaction is broadcast by us, we can find the fee info in the
1282
        // sweeper store.
1283
        txid := tx.TxHash()
2✔
1284
        tr, err := s.cfg.Store.GetTx(txid)
2✔
1285

2✔
1286
        // If the tx is not found in the store, it means it's not broadcast by
2✔
1287
        // us, hence we can't find the fee info. This is fine as, later on when
2✔
1288
        // this tx is confirmed, we will remove the input from our inputs.
2✔
1289
        if errors.Is(err, ErrTxNotFound) {
4✔
1290
                log.Warnf("Spending tx %v not found in sweeper store", txid)
2✔
1291
                return Published, fn.None[RBFInfo]()
2✔
1292
        }
2✔
1293

1294
        // Exit if we get an db error.
1295
        if err != nil {
2✔
1296
                log.Errorf("Unable to get tx %v from sweeper store: %v",
×
1297
                        txid, err)
×
1298

×
1299
                return Published, fn.None[RBFInfo]()
×
1300
        }
×
1301

1302
        // Prepare the fee info and return it.
1303
        rbf := fn.Some(RBFInfo{
2✔
1304
                Txid:    txid,
2✔
1305
                Fee:     btcutil.Amount(tr.Fee),
2✔
1306
                FeeRate: chainfee.SatPerKWeight(tr.FeeRate),
2✔
1307
        })
2✔
1308

2✔
1309
        return Published, rbf
2✔
1310
}
1311

1312
// handleExistingInput processes an input that is already known to the sweeper.
1313
// It will overwrite the params of the old input with the new ones.
1314
func (s *UtxoSweeper) handleExistingInput(input *sweepInputMessage,
1315
        oldInput *SweeperInput) {
3✔
1316

3✔
1317
        // Before updating the input details, check if an exclusive group was
3✔
1318
        // set. In case the same input is registered again without an exclusive
3✔
1319
        // group set, the previous input and its sweep parameters are outdated
3✔
1320
        // hence need to be replaced. This scenario currently only happens for
3✔
1321
        // anchor outputs. When a channel is force closed, in the worst case 3
3✔
1322
        // different sweeps with the same exclusive group are registered with
3✔
1323
        // the sweeper to bump the closing transaction (cpfp) when its time
3✔
1324
        // critical. Receiving an input which was already registered with the
3✔
1325
        // sweeper but now without an exclusive group means non of the previous
3✔
1326
        // inputs were used as CPFP, so we need to make sure we update the
3✔
1327
        // sweep parameters but also remove all inputs with the same exclusive
3✔
1328
        // group because the are outdated too.
3✔
1329
        var prevExclGroup *uint64
3✔
1330
        if oldInput.params.ExclusiveGroup != nil &&
3✔
1331
                input.params.ExclusiveGroup == nil {
6✔
1332

3✔
1333
                prevExclGroup = new(uint64)
3✔
1334
                *prevExclGroup = *oldInput.params.ExclusiveGroup
3✔
1335
        }
3✔
1336

1337
        // Update input details and sweep parameters. The re-offered input
1338
        // details may contain a change to the unconfirmed parent tx info.
1339
        oldInput.params = input.params
3✔
1340
        oldInput.Input = input.input
3✔
1341

3✔
1342
        // If the new input specifies a deadline, update the deadline height.
3✔
1343
        oldInput.DeadlineHeight = input.params.DeadlineHeight.UnwrapOr(
3✔
1344
                oldInput.DeadlineHeight,
3✔
1345
        )
3✔
1346

3✔
1347
        // Add additional result channel to signal spend of this input.
3✔
1348
        oldInput.listeners = append(oldInput.listeners, input.resultChan)
3✔
1349

3✔
1350
        if prevExclGroup != nil {
6✔
1351
                s.removeExclusiveGroup(*prevExclGroup)
3✔
1352
        }
3✔
1353
}
1354

1355
// handleInputSpent takes a spend event of our input and updates the sweeper's
1356
// internal state to remove the input.
1357
func (s *UtxoSweeper) handleInputSpent(spend *chainntnfs.SpendDetail) {
3✔
1358
        // Query store to find out if we ever published this tx.
3✔
1359
        spendHash := *spend.SpenderTxHash
3✔
1360
        isOurTx, err := s.cfg.Store.IsOurTx(spendHash)
3✔
1361
        if err != nil {
3✔
1362
                log.Errorf("cannot determine if tx %v is ours: %v",
×
1363
                        spendHash, err)
×
1364
                return
×
1365
        }
×
1366

1367
        // If this isn't our transaction, it means someone else swept outputs
1368
        // that we were attempting to sweep. This can happen for anchor outputs
1369
        // as well as justice transactions. In this case, we'll notify the
1370
        // wallet to remove any spends that descent from this output.
1371
        if !isOurTx {
6✔
1372
                // Construct a map of the inputs this transaction spends.
3✔
1373
                spendingTx := spend.SpendingTx
3✔
1374
                inputsSpent := make(
3✔
1375
                        map[wire.OutPoint]struct{}, len(spendingTx.TxIn),
3✔
1376
                )
3✔
1377
                for _, txIn := range spendingTx.TxIn {
6✔
1378
                        inputsSpent[txIn.PreviousOutPoint] = struct{}{}
3✔
1379
                }
3✔
1380

1381
                log.Debugf("Attempting to remove descendant txns invalidated "+
3✔
1382
                        "by (txid=%v): %v", spendingTx.TxHash(),
3✔
1383
                        spew.Sdump(spendingTx))
3✔
1384

3✔
1385
                err := s.removeConflictSweepDescendants(inputsSpent)
3✔
1386
                if err != nil {
3✔
1387
                        log.Warnf("unable to remove descendant transactions "+
×
1388
                                "due to tx %v: ", spendHash)
×
1389
                }
×
1390

1391
                log.Debugf("Detected third party spend related to in flight "+
3✔
1392
                        "inputs (is_ours=%v): %v", isOurTx,
3✔
1393
                        lnutils.SpewLogClosure(spend.SpendingTx))
3✔
1394
        }
1395

1396
        // We now use the spending tx to update the state of the inputs.
1397
        s.markInputsSwept(spend.SpendingTx, isOurTx)
3✔
1398
}
1399

1400
// markInputsSwept marks all inputs swept by the spending transaction as swept.
1401
// It will also notify all the subscribers of this input.
1402
func (s *UtxoSweeper) markInputsSwept(tx *wire.MsgTx, isOurTx bool) {
3✔
1403
        for _, txIn := range tx.TxIn {
6✔
1404
                outpoint := txIn.PreviousOutPoint
3✔
1405

3✔
1406
                // Check if this input is known to us. It could probably be
3✔
1407
                // unknown if we canceled the registration, deleted from inputs
3✔
1408
                // map but the ntfn was in-flight already. Or this could be not
3✔
1409
                // one of our inputs.
3✔
1410
                input, ok := s.inputs[outpoint]
3✔
1411
                if !ok {
6✔
1412
                        // It's very likely that a spending tx contains inputs
3✔
1413
                        // that we don't know.
3✔
1414
                        log.Tracef("Skipped marking input as swept: %v not "+
3✔
1415
                                "found in pending inputs", outpoint)
3✔
1416

3✔
1417
                        continue
3✔
1418
                }
1419

1420
                // This input may already been marked as swept by a previous
1421
                // spend notification, which is likely to happen as one sweep
1422
                // transaction usually sweeps multiple inputs.
1423
                if input.terminated() {
3✔
1424
                        log.Debugf("Skipped marking input as swept: %v "+
×
1425
                                "state=%v", outpoint, input.state)
×
1426

×
1427
                        continue
×
1428
                }
1429

1430
                input.state = Swept
3✔
1431

3✔
1432
                // Return either a nil or a remote spend result.
3✔
1433
                var err error
3✔
1434
                if !isOurTx {
6✔
1435
                        log.Warnf("Input=%v was spent by remote or third "+
3✔
1436
                                "party in tx=%v", outpoint, tx.TxHash())
3✔
1437
                        err = ErrRemoteSpend
3✔
1438
                }
3✔
1439

1440
                // Signal result channels.
1441
                s.signalResult(input, Result{
3✔
1442
                        Tx:  tx,
3✔
1443
                        Err: err,
3✔
1444
                })
3✔
1445

3✔
1446
                // Remove all other inputs in this exclusive group.
3✔
1447
                if input.params.ExclusiveGroup != nil {
6✔
1448
                        s.removeExclusiveGroup(*input.params.ExclusiveGroup)
3✔
1449
                }
3✔
1450
        }
1451
}
1452

1453
// markInputFailed marks the given input as failed and won't be retried. It
1454
// will also notify all the subscribers of this input.
1455
func (s *UtxoSweeper) markInputFailed(pi *SweeperInput, err error) {
×
1456
        log.Errorf("Failed to sweep input: %v, error: %v", pi, err)
×
1457

×
1458
        pi.state = Failed
×
1459

×
1460
        // Remove all other inputs in this exclusive group.
×
1461
        if pi.params.ExclusiveGroup != nil {
×
1462
                s.removeExclusiveGroup(*pi.params.ExclusiveGroup)
×
1463
        }
×
1464

1465
        s.signalResult(pi, Result{Err: err})
×
1466
}
1467

1468
// updateSweeperInputs updates the sweeper's internal state and returns a map
1469
// of inputs to be swept. It will remove the inputs that are in final states,
1470
// and returns a map of inputs that have either state Init or PublishFailed.
1471
func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
3✔
1472
        // Create a map of inputs to be swept.
3✔
1473
        inputs := make(InputsMap)
3✔
1474

3✔
1475
        // Iterate the pending inputs and update the sweeper's state.
3✔
1476
        //
3✔
1477
        // TODO(yy): sweeper is made to communicate via go channels, so no
3✔
1478
        // locks are needed to access the map. However, it'd be safer if we
3✔
1479
        // turn this inputs map into a SyncMap in case we wanna add concurrent
3✔
1480
        // access to the map in the future.
3✔
1481
        for op, input := range s.inputs {
6✔
1482
                // If the input has reached a final state, that it's either
3✔
1483
                // been swept, or failed, or excluded, we will remove it from
3✔
1484
                // our sweeper.
3✔
1485
                if input.terminated() {
6✔
1486
                        log.Debugf("Removing input(State=%v) %v from sweeper",
3✔
1487
                                input.state, op)
3✔
1488

3✔
1489
                        delete(s.inputs, op)
3✔
1490

3✔
1491
                        continue
3✔
1492
                }
1493

1494
                // If this input has been included in a sweep tx that's not
1495
                // published yet, we'd skip this input and wait for the sweep
1496
                // tx to be published.
1497
                if input.state == PendingPublish {
6✔
1498
                        continue
3✔
1499
                }
1500

1501
                // If this input has already been published, we will need to
1502
                // check the RBF condition before attempting another sweeping.
1503
                if input.state == Published {
6✔
1504
                        continue
3✔
1505
                }
1506

1507
                // If the input has a locktime that's not yet reached, we will
1508
                // skip this input and wait for the locktime to be reached.
1509
                locktime, _ := input.RequiredLockTime()
3✔
1510
                if uint32(s.currentHeight) < locktime {
6✔
1511
                        log.Warnf("Skipping input %v due to locktime=%v not "+
3✔
1512
                                "reached, current height is %v", op, locktime,
3✔
1513
                                s.currentHeight)
3✔
1514

3✔
1515
                        continue
3✔
1516
                }
1517

1518
                // If the input has a CSV that's not yet reached, we will skip
1519
                // this input and wait for the expiry.
1520
                locktime = input.BlocksToMaturity() + input.HeightHint()
3✔
1521
                if s.currentHeight < int32(locktime)-1 {
6✔
1522
                        log.Infof("Skipping input %v due to CSV expiry=%v not "+
3✔
1523
                                "reached, current height is %v", op, locktime,
3✔
1524
                                s.currentHeight)
3✔
1525

3✔
1526
                        continue
3✔
1527
                }
1528

1529
                // If this input is new or has been failed to be published,
1530
                // we'd retry it. The assumption here is that when an error is
1531
                // returned from `PublishTransaction`, it means the tx has
1532
                // failed to meet the policy, hence it's not in the mempool.
1533
                inputs[op] = input
3✔
1534
        }
1535

1536
        return inputs
3✔
1537
}
1538

1539
// sweepPendingInputs is called when the ticker fires. It will create clusters
1540
// and attempt to create and publish the sweeping transactions.
1541
func (s *UtxoSweeper) sweepPendingInputs(inputs InputsMap) {
3✔
1542
        // Cluster all of our inputs based on the specific Aggregator.
3✔
1543
        sets := s.cfg.Aggregator.ClusterInputs(inputs)
3✔
1544

3✔
1545
        // sweepWithLock is a helper closure that executes the sweep within a
3✔
1546
        // coin select lock to prevent the coins being selected for other
3✔
1547
        // transactions like funding of a channel.
3✔
1548
        sweepWithLock := func(set InputSet) error {
6✔
1549
                return s.cfg.Wallet.WithCoinSelectLock(func() error {
6✔
1550
                        // Try to add inputs from our wallet.
3✔
1551
                        err := set.AddWalletInputs(s.cfg.Wallet)
3✔
1552
                        if err != nil {
6✔
1553
                                return err
3✔
1554
                        }
3✔
1555

1556
                        // Create sweeping transaction for each set.
1557
                        err = s.sweep(set)
3✔
1558
                        if err != nil {
5✔
1559
                                return err
2✔
1560
                        }
2✔
1561

1562
                        return nil
3✔
1563
                })
1564
        }
1565

1566
        for _, set := range sets {
6✔
1567
                var err error
3✔
1568
                if set.NeedWalletInput() {
6✔
1569
                        // Sweep the set of inputs that need the wallet inputs.
3✔
1570
                        err = sweepWithLock(set)
3✔
1571
                } else {
6✔
1572
                        // Sweep the set of inputs that don't need the wallet
3✔
1573
                        // inputs.
3✔
1574
                        err = s.sweep(set)
3✔
1575
                }
3✔
1576

1577
                if err != nil {
6✔
1578
                        log.Errorf("Failed to sweep %v: %v", set, err)
3✔
1579
                }
3✔
1580
        }
1581
}
1582

1583
// monitorFeeBumpResult subscribes to the passed result chan to listen for
1584
// future updates about the sweeping tx.
1585
//
1586
// NOTE: must run as a goroutine.
1587
func (s *UtxoSweeper) monitorFeeBumpResult(resultChan <-chan *BumpResult) {
3✔
1588
        defer s.wg.Done()
3✔
1589

3✔
1590
        for {
6✔
1591
                select {
3✔
1592
                case r := <-resultChan:
3✔
1593
                        // Validate the result is valid.
3✔
1594
                        if err := r.Validate(); err != nil {
3✔
1595
                                log.Errorf("Received invalid result: %v", err)
×
1596
                                continue
×
1597
                        }
1598

1599
                        // Send the result back to the main event loop.
1600
                        select {
3✔
1601
                        case s.bumpResultChan <- r:
3✔
1602
                        case <-s.quit:
×
1603
                                log.Debug("Sweeper shutting down, skip " +
×
1604
                                        "sending bump result")
×
1605

×
1606
                                return
×
1607
                        }
1608

1609
                        // The sweeping tx has been confirmed, we can exit the
1610
                        // monitor now.
1611
                        //
1612
                        // TODO(yy): can instead remove the spend subscription
1613
                        // in sweeper and rely solely on this event to mark
1614
                        // inputs as Swept?
1615
                        if r.Event == TxConfirmed || r.Event == TxFailed {
6✔
1616
                                log.Debugf("Received %v for sweep tx %v, exit "+
3✔
1617
                                        "fee bump monitor", r.Event,
3✔
1618
                                        r.Tx.TxHash())
3✔
1619

3✔
1620
                                // Cancel the rebroadcasting of the failed tx.
3✔
1621
                                s.cfg.Wallet.CancelRebroadcast(r.Tx.TxHash())
3✔
1622

3✔
1623
                                return
3✔
1624
                        }
3✔
1625

1626
                case <-s.quit:
3✔
1627
                        log.Debugf("Sweeper shutting down, exit fee " +
3✔
1628
                                "bump handler")
3✔
1629

3✔
1630
                        return
3✔
1631
                }
1632
        }
1633
}
1634

1635
// handleBumpEventTxFailed handles the case where the tx has been failed to
1636
// publish.
1637
func (s *UtxoSweeper) handleBumpEventTxFailed(r *BumpResult) error {
3✔
1638
        tx, err := r.Tx, r.Err
3✔
1639

3✔
1640
        log.Errorf("Fee bump attempt failed for tx=%v: %v", tx.TxHash(), err)
3✔
1641

3✔
1642
        outpoints := make([]wire.OutPoint, 0, len(tx.TxIn))
3✔
1643
        for _, inp := range tx.TxIn {
6✔
1644
                outpoints = append(outpoints, inp.PreviousOutPoint)
3✔
1645
        }
3✔
1646

1647
        // TODO(yy): should we also remove the failed tx from db?
1648
        s.markInputsPublishFailed(outpoints)
3✔
1649

3✔
1650
        return err
3✔
1651
}
1652

1653
// handleBumpEventTxReplaced handles the case where the sweeping tx has been
1654
// replaced by a new one.
1655
func (s *UtxoSweeper) handleBumpEventTxReplaced(r *BumpResult) error {
3✔
1656
        oldTx := r.ReplacedTx
3✔
1657
        newTx := r.Tx
3✔
1658

3✔
1659
        // Prepare a new record to replace the old one.
3✔
1660
        tr := &TxRecord{
3✔
1661
                Txid:    newTx.TxHash(),
3✔
1662
                FeeRate: uint64(r.FeeRate),
3✔
1663
                Fee:     uint64(r.Fee),
3✔
1664
        }
3✔
1665

3✔
1666
        // Get the old record for logging purpose.
3✔
1667
        oldTxid := oldTx.TxHash()
3✔
1668
        record, err := s.cfg.Store.GetTx(oldTxid)
3✔
1669
        if err != nil {
4✔
1670
                log.Errorf("Fetch tx record for %v: %v", oldTxid, err)
1✔
1671
                return err
1✔
1672
        }
1✔
1673

1674
        // Cancel the rebroadcasting of the replaced tx.
1675
        s.cfg.Wallet.CancelRebroadcast(oldTxid)
3✔
1676

3✔
1677
        log.Infof("RBFed tx=%v(fee=%v sats, feerate=%v sats/kw) with new "+
3✔
1678
                "tx=%v(fee=%v, "+"feerate=%v)", record.Txid, record.Fee,
3✔
1679
                record.FeeRate, tr.Txid, tr.Fee, tr.FeeRate)
3✔
1680

3✔
1681
        // The old sweeping tx has been replaced by a new one, we will update
3✔
1682
        // the tx record in the sweeper db.
3✔
1683
        //
3✔
1684
        // TODO(yy): we may also need to update the inputs in this tx to a new
3✔
1685
        // state. Suppose a replacing tx only spends a subset of the inputs
3✔
1686
        // here, we'd end up with the rest being marked as `Published` and
3✔
1687
        // won't be aggregated in the next sweep. Atm it's fine as we always
3✔
1688
        // RBF the same input set.
3✔
1689
        if err := s.cfg.Store.DeleteTx(oldTxid); err != nil {
3✔
1690
                log.Errorf("Delete tx record for %v: %v", oldTxid, err)
×
1691
                return err
×
1692
        }
×
1693

1694
        // Mark the inputs as published using the replacing tx.
1695
        return s.markInputsPublished(tr, r.Tx.TxIn)
3✔
1696
}
1697

1698
// handleBumpEventTxPublished handles the case where the sweeping tx has been
1699
// successfully published.
1700
func (s *UtxoSweeper) handleBumpEventTxPublished(r *BumpResult) error {
3✔
1701
        tx := r.Tx
3✔
1702
        tr := &TxRecord{
3✔
1703
                Txid:    tx.TxHash(),
3✔
1704
                FeeRate: uint64(r.FeeRate),
3✔
1705
                Fee:     uint64(r.Fee),
3✔
1706
        }
3✔
1707

3✔
1708
        // Inputs have been successfully published so we update their
3✔
1709
        // states.
3✔
1710
        err := s.markInputsPublished(tr, tx.TxIn)
3✔
1711
        if err != nil {
3✔
1712
                return err
×
1713
        }
×
1714

1715
        log.Debugf("Published sweep tx %v, num_inputs=%v, height=%v",
3✔
1716
                tx.TxHash(), len(tx.TxIn), s.currentHeight)
3✔
1717

3✔
1718
        // If there's no error, remove the output script. Otherwise keep it so
3✔
1719
        // that it can be reused for the next transaction and causes no address
3✔
1720
        // inflation.
3✔
1721
        s.currentOutputScript = fn.None[lnwallet.AddrWithKey]()
3✔
1722

3✔
1723
        return nil
3✔
1724
}
1725

1726
// handleBumpEvent handles the result sent from the bumper based on its event
1727
// type.
1728
//
1729
// NOTE: TxConfirmed event is not handled, since we already subscribe to the
1730
// input's spending event, we don't need to do anything here.
1731
func (s *UtxoSweeper) handleBumpEvent(r *BumpResult) error {
3✔
1732
        log.Debugf("Received bump event [%v] for tx %v", r.Event, r.Tx.TxHash())
3✔
1733

3✔
1734
        switch r.Event {
3✔
1735
        // The tx has been published, we update the inputs' state and create a
1736
        // record to be stored in the sweeper db.
1737
        case TxPublished:
3✔
1738
                return s.handleBumpEventTxPublished(r)
3✔
1739

1740
        // The tx has failed, we update the inputs' state.
1741
        case TxFailed:
3✔
1742
                return s.handleBumpEventTxFailed(r)
3✔
1743

1744
        // The tx has been replaced, we will remove the old tx and replace it
1745
        // with the new one.
1746
        case TxReplaced:
3✔
1747
                return s.handleBumpEventTxReplaced(r)
3✔
1748
        }
1749

1750
        return nil
3✔
1751
}
1752

1753
// IsSweeperOutpoint determines whether the outpoint was created by the sweeper.
1754
//
1755
// NOTE: It is enough to check the txid because the sweeper will create
1756
// outpoints which solely belong to the internal LND wallet.
1757
func (s *UtxoSweeper) IsSweeperOutpoint(op wire.OutPoint) bool {
3✔
1758
        found, err := s.cfg.Store.IsOurTx(op.Hash)
3✔
1759
        // In case there is an error fetching the transaction details from the
3✔
1760
        // sweeper store we assume the outpoint is still used by the sweeper
3✔
1761
        // (worst case scenario).
3✔
1762
        //
3✔
1763
        // TODO(ziggie): Ensure that confirmed outpoints are deleted from the
3✔
1764
        // bucket.
3✔
1765
        if err != nil && !errors.Is(err, errNoTxHashesBucket) {
3✔
1766
                log.Errorf("failed to fetch info for outpoint(%v:%d) "+
×
1767
                        "with: %v, we assume it is still in use by the sweeper",
×
1768
                        op.Hash, op.Index, err)
×
1769

×
1770
                return true
×
1771
        }
×
1772

1773
        return found
3✔
1774
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc