• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16181619122

09 Jul 2025 10:33PM UTC coverage: 55.326% (-2.3%) from 57.611%
16181619122

Pull #10060

github

web-flow
Merge d15e8671f into 0e830da9d
Pull Request #10060: sweep: fix expected spending events being missed

9 of 26 new or added lines in 2 files covered. (34.62%)

23695 existing lines in 280 files now uncovered.

108518 of 196143 relevant lines covered (55.33%)

22354.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.91
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/rpcclient"
13
        "github.com/btcsuite/btcd/txscript"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btcwallet/chain"
16
        "github.com/lightningnetwork/lnd/chainio"
17
        "github.com/lightningnetwork/lnd/chainntnfs"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/input"
20
        "github.com/lightningnetwork/lnd/labels"
21
        "github.com/lightningnetwork/lnd/lntypes"
22
        "github.com/lightningnetwork/lnd/lnutils"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
25
        "github.com/lightningnetwork/lnd/tlv"
26
)
27

28
// spentNotificationTimeout defines the time to wait for a spending event from
29
// `RegisterSpendNtfn` when an immediate response is expected.
30
const spentNotificationTimeout = 1 * time.Second
31

32
var (
33
        // ErrInvalidBumpResult is returned when the bump result is invalid.
34
        ErrInvalidBumpResult = errors.New("invalid bump result")
35

36
        // ErrNotEnoughBudget is returned when the fee bumper decides the
37
        // current budget cannot cover the fee.
38
        ErrNotEnoughBudget = errors.New("not enough budget")
39

40
        // ErrLocktimeImmature is returned when sweeping an input whose
41
        // locktime is not reached.
42
        ErrLocktimeImmature = errors.New("immature input")
43

44
        // ErrTxNoOutput is returned when an output cannot be created during tx
45
        // preparation, usually due to the output being dust.
46
        ErrTxNoOutput = errors.New("tx has no output")
47

48
        // ErrUnknownSpent is returned when an unknown tx has spent an input in
49
        // the sweeping tx.
50
        ErrUnknownSpent = errors.New("unknown spend of input")
51

52
        // ErrInputMissing is returned when a given input no longer exists,
53
        // e.g., spending from an orphan tx.
54
        ErrInputMissing = errors.New("input no longer exists")
55
)
56

57
var (
58
        // dummyChangePkScript is a dummy tapscript change script that's used
59
        // when we don't need a real address, just something that can be used
60
        // for fee estimation.
61
        dummyChangePkScript = []byte{
62
                0x51, 0x20,
63
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
64
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
65
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
66
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
67
        }
68
)
69

70
// Bumper defines an interface that can be used by other subsystems for fee
71
// bumping.
72
type Bumper interface {
73
        // Broadcast is used to publish the tx created from the given inputs
74
        // specified in the request. It handles the tx creation, broadcasts it,
75
        // and monitors its confirmation status for potential fee bumping. It
76
        // returns a chan that the caller can use to receive updates about the
77
        // broadcast result and potential RBF attempts.
78
        Broadcast(req *BumpRequest) <-chan *BumpResult
79
}
80

81
// BumpEvent represents the event of a fee bumping attempt.
82
type BumpEvent uint8
83

84
const (
85
        // TxPublished is sent when the broadcast attempt is finished.
86
        TxPublished BumpEvent = iota
87

88
        // TxFailed is sent when the tx has encountered a fee-related error
89
        // during its creation or broadcast, or an internal error from the fee
90
        // bumper. In either case the inputs in this tx should be retried with
91
        // either a different grouping strategy or an increased budget.
92
        //
93
        // TODO(yy): Remove the above usage once we remove sweeping non-CPFP
94
        // anchors.
95
        TxFailed
96

97
        // TxReplaced is sent when the original tx is replaced by a new one.
98
        TxReplaced
99

100
        // TxConfirmed is sent when the tx is confirmed.
101
        TxConfirmed
102

103
        // TxUnknownSpend is sent when at least one of the inputs is spent but
104
        // not by the current sweeping tx, this can happen when,
105
        // - a remote party has replaced our sweeping tx by spending the
106
        //   input(s), e.g., via the direct preimage spend on our outgoing HTLC.
107
        // - a third party has replaced our sweeping tx, e.g., the anchor output
108
        //   after 16 blocks.
109
        // - A previous sweeping tx has confirmed but the fee bumper is not
110
        //   aware of it, e.g., a restart happens right after the sweeping tx is
111
        //   broadcast and confirmed.
112
        TxUnknownSpend
113

114
        // TxFatal is sent when the inputs in this tx cannot be retried. Txns
115
        // will end up in this state if they have encountered a non-fee related
116
        // error, which means they cannot be retried with increased budget.
117
        TxFatal
118

119
        // sentinelEvent is used to check if an event is unknown.
120
        sentinelEvent
121
)
122

123
// String returns a human-readable string for the event.
UNCOV
124
func (e BumpEvent) String() string {
×
UNCOV
125
        switch e {
×
UNCOV
126
        case TxPublished:
×
UNCOV
127
                return "Published"
×
UNCOV
128
        case TxFailed:
×
UNCOV
129
                return "Failed"
×
UNCOV
130
        case TxReplaced:
×
UNCOV
131
                return "Replaced"
×
UNCOV
132
        case TxConfirmed:
×
UNCOV
133
                return "Confirmed"
×
UNCOV
134
        case TxUnknownSpend:
×
UNCOV
135
                return "UnknownSpend"
×
UNCOV
136
        case TxFatal:
×
UNCOV
137
                return "Fatal"
×
138
        default:
×
139
                return "Unknown"
×
140
        }
141
}
142

143
// Unknown returns true if the event is unknown.
144
func (e BumpEvent) Unknown() bool {
11✔
145
        return e >= sentinelEvent
11✔
146
}
11✔
147

148
// BumpRequest is used by the caller to give the Bumper the necessary info to
149
// create and manage potential fee bumps for a set of inputs.
150
type BumpRequest struct {
151
        // Budget gives the total amount that can be used as fees by these
152
        // inputs.
153
        Budget btcutil.Amount
154

155
        // Inputs is the set of inputs to sweep.
156
        Inputs []input.Input
157

158
        // DeadlineHeight is the block height at which the tx should be
159
        // confirmed.
160
        DeadlineHeight int32
161

162
        // DeliveryAddress is the script to send the change output to.
163
        DeliveryAddress lnwallet.AddrWithKey
164

165
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
166
        MaxFeeRate chainfee.SatPerKWeight
167

168
        // StartingFeeRate is an optional parameter that can be used to specify
169
        // the initial fee rate to use for the fee function.
170
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
171

172
        // ExtraTxOut tracks if this bump request has an optional set of extra
173
        // outputs to add to the transaction.
174
        ExtraTxOut fn.Option[SweepOutput]
175

176
        // Immediate is used to specify that the tx should be broadcast
177
        // immediately.
178
        Immediate bool
179
}
180

181
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
182
// request. It calculates the feerate using the supplied budget and the weight,
183
// compares it with the specified MaxFeeRate, and returns the smaller of the
184
// two.
185
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
11✔
186
        // We'll want to know if we have any blobs, as we need to factor this
11✔
187
        // into the max fee rate for this bump request.
11✔
188
        hasBlobs := fn.Any(r.Inputs, func(i input.Input) bool {
21✔
189
                return fn.MapOptionZ(
10✔
190
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
10✔
UNCOV
191
                                return len(b) > 0
×
UNCOV
192
                        },
×
193
                )
194
        })
195

196
        sweepAddrs := [][]byte{
11✔
197
                r.DeliveryAddress.DeliveryAddress,
11✔
198
        }
11✔
199

11✔
200
        // If we have blobs, then we'll add an extra sweep addr for the size
11✔
201
        // estimate below. We know that these blobs will also always be based on
11✔
202
        // p2tr addrs.
11✔
203
        if hasBlobs {
11✔
204
                // We need to pass in a real address, so we'll use a dummy
×
205
                // tapscript change script that's used elsewhere for tests.
×
206
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
207
        }
×
208

209
        // Get the size of the sweep tx, which will be used to calculate the
210
        // budget fee rate.
211
        size, err := calcSweepTxWeight(
11✔
212
                r.Inputs, sweepAddrs,
11✔
213
        )
11✔
214
        if err != nil {
12✔
215
                return 0, err
1✔
216
        }
1✔
217

218
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
219
        // This is needed as, when the input has a large value and the user
220
        // sets the budget to be proportional to the input value, the fee rate
221
        // can be very high and we need to make sure it doesn't exceed the max
222
        // fee rate.
223
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
10✔
224
        if maxFeeRateAllowed > r.MaxFeeRate {
13✔
225
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
3✔
226
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
3✔
227
                        r.MaxFeeRate, size)
3✔
228

3✔
229
                return r.MaxFeeRate, nil
3✔
230
        }
3✔
231

232
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
7✔
233
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
7✔
234

7✔
235
        return maxFeeRateAllowed, nil
7✔
236
}
237

238
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
239
// sweeping tx always has a single output(change).
240
func calcSweepTxWeight(inputs []input.Input,
241
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
14✔
242

14✔
243
        // Use a const fee rate as we only use the weight estimator to
14✔
244
        // calculate the size.
14✔
245
        const feeRate = 1
14✔
246

14✔
247
        // Initialize the tx weight estimator with,
14✔
248
        // - nil outputs as we only have one single change output.
14✔
249
        // - const fee rate as we don't care about the fees here.
14✔
250
        // - 0 maxfeerate as we don't care about fees here.
14✔
251
        //
14✔
252
        // TODO(yy): we should refactor the weight estimator to not require a
14✔
253
        // fee rate and max fee rate and make it a pure tx weight calculator.
14✔
254
        _, estimator, err := getWeightEstimate(
14✔
255
                inputs, nil, feeRate, 0, outputPkScript,
14✔
256
        )
14✔
257
        if err != nil {
16✔
258
                return 0, err
2✔
259
        }
2✔
260

261
        return estimator.weight(), nil
12✔
262
}
263

264
// BumpResult is used by the Bumper to send updates about the tx being
265
// broadcast.
266
type BumpResult struct {
267
        // Event is the type of event that the result is for.
268
        Event BumpEvent
269

270
        // Tx is the tx being broadcast.
271
        Tx *wire.MsgTx
272

273
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
274
        ReplacedTx *wire.MsgTx
275

276
        // FeeRate is the fee rate used for the new tx.
277
        FeeRate chainfee.SatPerKWeight
278

279
        // Fee is the fee paid by the new tx.
280
        Fee btcutil.Amount
281

282
        // Err is the error that occurred during the broadcast.
283
        Err error
284

285
        // SpentInputs are the inputs spent by another tx which caused the
286
        // current tx to be failed.
287
        SpentInputs map[wire.OutPoint]*wire.MsgTx
288

289
        // requestID is the ID of the request that created this record.
290
        requestID uint64
291
}
292

293
// String returns a human-readable string for the result.
UNCOV
294
func (b *BumpResult) String() string {
×
UNCOV
295
        desc := fmt.Sprintf("Event=%v", b.Event)
×
UNCOV
296
        if b.Tx != nil {
×
UNCOV
297
                desc += fmt.Sprintf(", Tx=%v", b.Tx.TxHash())
×
UNCOV
298
        }
×
299

UNCOV
300
        return fmt.Sprintf("[%s]", desc)
×
301
}
302

303
// Validate validates the BumpResult so it's safe to use.
304
func (b *BumpResult) Validate() error {
12✔
305
        isFailureEvent := b.Event == TxFailed || b.Event == TxFatal ||
12✔
306
                b.Event == TxUnknownSpend
12✔
307

12✔
308
        // Every result must have a tx except the fatal or failed case.
12✔
309
        if b.Tx == nil && !isFailureEvent {
13✔
310
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
311
        }
1✔
312

313
        // Every result must have a known event.
314
        if b.Event.Unknown() {
12✔
315
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
316
        }
1✔
317

318
        // If it's a replacing event, it must have a replaced tx.
319
        if b.Event == TxReplaced && b.ReplacedTx == nil {
11✔
320
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
321
        }
1✔
322

323
        // If it's a failed or fatal event, it must have an error.
324
        if isFailureEvent && b.Err == nil {
11✔
325
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
2✔
326
        }
2✔
327

328
        // If it's a confirmed event, it must have a fee rate and fee.
329
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
8✔
330
                return fmt.Errorf("%w: missing fee rate or fee",
1✔
331
                        ErrInvalidBumpResult)
1✔
332
        }
1✔
333

334
        return nil
6✔
335
}
336

337
// TxPublisherConfig is the config used to create a new TxPublisher.
338
type TxPublisherConfig struct {
339
        // Signer is used to create the tx signature.
340
        Signer input.Signer
341

342
        // Wallet is used primarily to publish the tx.
343
        Wallet Wallet
344

345
        // Estimator is used to estimate the fee rate for the new tx based on
346
        // its deadline conf target.
347
        Estimator chainfee.Estimator
348

349
        // Notifier is used to monitor the confirmation status of the tx.
350
        Notifier chainntnfs.ChainNotifier
351

352
        // ChainIO represents an abstraction over a source that can query the
353
        // blockchain.
354
        ChainIO lnwallet.BlockChainIO
355

356
        // AuxSweeper is an optional interface that can be used to modify the
357
        // way sweep transaction are generated.
358
        AuxSweeper fn.Option[AuxSweeper]
359
}
360

361
// TxPublisher is an implementation of the Bumper interface. It utilizes the
362
// `testmempoolaccept` RPC to bump the fee of txns it created based on
363
// different fee function selected or configed by the caller. Its purpose is to
364
// take a list of inputs specified, and create a tx that spends them to a
365
// specified output. It will then monitor the confirmation status of the tx,
366
// and if it's not confirmed within a certain time frame, it will attempt to
367
// bump the fee of the tx by creating a new tx that spends the same inputs to
368
// the same output, but with a higher fee rate. It will continue to do this
369
// until the tx is confirmed or the fee rate reaches the maximum fee rate
370
// specified by the caller.
371
type TxPublisher struct {
372
        started atomic.Bool
373
        stopped atomic.Bool
374

375
        // Embed the blockbeat consumer struct to get access to the method
376
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
377
        chainio.BeatConsumer
378

379
        wg sync.WaitGroup
380

381
        // cfg specifies the configuration of the TxPublisher.
382
        cfg *TxPublisherConfig
383

384
        // currentHeight is the current block height.
385
        currentHeight atomic.Int32
386

387
        // records is a map keyed by the requestCounter and the value is the tx
388
        // being monitored.
389
        records lnutils.SyncMap[uint64, *monitorRecord]
390

391
        // requestCounter is a monotonically increasing counter used to keep
392
        // track of how many requests have been made.
393
        requestCounter atomic.Uint64
394

395
        // subscriberChans is a map keyed by the requestCounter, each item is
396
        // the chan that the publisher sends the fee bump result to.
397
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
398

399
        // quit is used to signal the publisher to stop.
400
        quit chan struct{}
401
}
402

403
// Compile-time constraint to ensure TxPublisher implements Bumper.
404
var _ Bumper = (*TxPublisher)(nil)
405

406
// Compile-time check for the chainio.Consumer interface.
407
var _ chainio.Consumer = (*TxPublisher)(nil)
408

409
// NewTxPublisher creates a new TxPublisher.
410
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
21✔
411
        tp := &TxPublisher{
21✔
412
                cfg:             &cfg,
21✔
413
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
21✔
414
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
21✔
415
                quit:            make(chan struct{}),
21✔
416
        }
21✔
417

21✔
418
        // Mount the block consumer.
21✔
419
        tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
21✔
420

21✔
421
        return tp
21✔
422
}
21✔
423

424
// isNeutrinoBackend checks if the wallet backend is neutrino.
425
func (t *TxPublisher) isNeutrinoBackend() bool {
×
426
        return t.cfg.Wallet.BackEnd() == "neutrino"
×
427
}
×
428

429
// Broadcast is used to publish the tx created from the given inputs. It will
430
// register the broadcast request and return a chan to the caller to subscribe
431
// the broadcast result. The initial broadcast is guaranteed to be
432
// RBF-compliant unless the budget specified cannot cover the fee.
433
//
434
// NOTE: part of the Bumper interface.
435
func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult {
5✔
436
        log.Tracef("Received broadcast request: %s",
5✔
437
                lnutils.SpewLogClosure(req))
5✔
438

5✔
439
        // Store the request.
5✔
440
        record := t.storeInitialRecord(req)
5✔
441

5✔
442
        // Create a chan to send the result to the caller.
5✔
443
        subscriber := make(chan *BumpResult, 1)
5✔
444
        t.subscriberChans.Store(record.requestID, subscriber)
5✔
445

5✔
446
        // Publish the tx immediately if specified.
5✔
447
        if req.Immediate {
6✔
448
                t.handleInitialBroadcast(record)
1✔
449
        }
1✔
450

451
        return subscriber
5✔
452
}
453

454
// storeInitialRecord initializes a monitor record and saves it in the map.
455
func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord {
5✔
456
        // Increase the request counter.
5✔
457
        //
5✔
458
        // NOTE: this is the only place where we increase the counter.
5✔
459
        requestID := t.requestCounter.Add(1)
5✔
460

5✔
461
        // Register the record.
5✔
462
        record := &monitorRecord{
5✔
463
                requestID: requestID,
5✔
464
                req:       req,
5✔
465
        }
5✔
466
        t.records.Store(requestID, record)
5✔
467

5✔
468
        return record
5✔
469
}
5✔
470

471
// updateRecord updates the given record's tx and fee, and saves it in the
472
// records map.
473
func (t *TxPublisher) updateRecord(r *monitorRecord,
474
        sweepCtx *sweepTxCtx) *monitorRecord {
19✔
475

19✔
476
        r.tx = sweepCtx.tx
19✔
477
        r.fee = sweepCtx.fee
19✔
478
        r.outpointToTxIndex = sweepCtx.outpointToTxIndex
19✔
479

19✔
480
        // Register the record.
19✔
481
        t.records.Store(r.requestID, r)
19✔
482

19✔
483
        return r
19✔
484
}
19✔
485

486
// NOTE: part of the `chainio.Consumer` interface.
487
func (t *TxPublisher) Name() string {
21✔
488
        return "TxPublisher"
21✔
489
}
21✔
490

491
// initializeTx initializes a fee function and creates an RBF-compliant tx. If
492
// succeeded, the initial tx is stored in the records map.
493
func (t *TxPublisher) initializeTx(r *monitorRecord) (*monitorRecord, error) {
5✔
494
        // Create a fee bumping algorithm to be used for future RBF.
5✔
495
        feeAlgo, err := t.initializeFeeFunction(r.req)
5✔
496
        if err != nil {
6✔
497
                return nil, fmt.Errorf("init fee function: %w", err)
1✔
498
        }
1✔
499

500
        // Attach the newly created fee function.
501
        //
502
        // TODO(yy): current we'd initialize a monitorRecord before creating the
503
        // fee function, while we could instead create the fee function first
504
        // then save it to the record. To make this happen we need to change the
505
        // conf target calculation below since we would be initializing the fee
506
        // function one block before.
507
        r.feeFunction = feeAlgo
4✔
508

4✔
509
        // Create the initial tx to be broadcasted. This tx is guaranteed to
4✔
510
        // comply with the RBF restrictions.
4✔
511
        record, err := t.createRBFCompliantTx(r)
4✔
512
        if err != nil {
5✔
513
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
1✔
514
        }
1✔
515

516
        return record, nil
3✔
517
}
518

519
// initializeFeeFunction initializes a fee function to be used for this request
520
// for future fee bumping.
521
func (t *TxPublisher) initializeFeeFunction(
522
        req *BumpRequest) (FeeFunction, error) {
8✔
523

8✔
524
        // Get the max allowed feerate.
8✔
525
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
8✔
526
        if err != nil {
8✔
527
                return nil, err
×
528
        }
×
529

530
        // Get the initial conf target.
531
        confTarget := calcCurrentConfTarget(
8✔
532
                t.currentHeight.Load(), req.DeadlineHeight,
8✔
533
        )
8✔
534

8✔
535
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
8✔
536
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
8✔
537
                maxFeeRateAllowed)
8✔
538

8✔
539
        // Initialize the fee function and return it.
8✔
540
        //
8✔
541
        // TODO(yy): return based on differet req.Strategy?
8✔
542
        return NewLinearFeeFunction(
8✔
543
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
8✔
544
                req.StartingFeeRate,
8✔
545
        )
8✔
546
}
547

548
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
549
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
550
// and redo the process until the tx is valid, or return an error when non-RBF
551
// related errors occur or the budget has been used up.
552
func (t *TxPublisher) createRBFCompliantTx(
553
        r *monitorRecord) (*monitorRecord, error) {
10✔
554

10✔
555
        f := r.feeFunction
10✔
556

10✔
557
        for {
23✔
558
                // Create a new tx with the given fee rate and check its
13✔
559
                // mempool acceptance.
13✔
560
                sweepCtx, err := t.createAndCheckTx(r)
13✔
561

13✔
562
                switch {
13✔
563
                case err == nil:
7✔
564
                        // The tx is valid, store it.
7✔
565
                        record := t.updateRecord(r, sweepCtx)
7✔
566

7✔
567
                        log.Infof("Created initial sweep tx=%v for %v inputs: "+
7✔
568
                                "feerate=%v, fee=%v, inputs:\n%v",
7✔
569
                                sweepCtx.tx.TxHash(), len(r.req.Inputs),
7✔
570
                                f.FeeRate(), sweepCtx.fee,
7✔
571
                                inputTypeSummary(r.req.Inputs))
7✔
572

7✔
573
                        return record, nil
7✔
574

575
                // If the error indicates the fees paid is not enough, we will
576
                // ask the fee function to increase the fee rate and retry.
577
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
578
                        // We should at least start with a feerate above the
2✔
579
                        // mempool min feerate, so if we get this error, it
2✔
580
                        // means something is wrong earlier in the pipeline.
2✔
581
                        log.Errorf("Current fee=%v, feerate=%v, %v",
2✔
582
                                sweepCtx.fee, f.FeeRate(), err)
2✔
583

2✔
584
                        fallthrough
2✔
585

586
                // We are not paying enough fees so we increase it.
587
                case errors.Is(err, chain.ErrInsufficientFee):
4✔
588
                        increased := false
4✔
589

4✔
590
                        // Keep calling the fee function until the fee rate is
4✔
591
                        // increased or maxed out.
4✔
592
                        for !increased {
9✔
593
                                log.Debugf("Increasing fee for next round, "+
5✔
594
                                        "current fee=%v, feerate=%v",
5✔
595
                                        sweepCtx.fee, f.FeeRate())
5✔
596

5✔
597
                                // If the fee function tells us that we have
5✔
598
                                // used up the budget, we will return an error
5✔
599
                                // indicating this tx cannot be made. The
5✔
600
                                // sweeper should handle this error and try to
5✔
601
                                // cluster these inputs differently.
5✔
602
                                increased, err = f.Increment()
5✔
603
                                if err != nil {
6✔
604
                                        return nil, err
1✔
605
                                }
1✔
606
                        }
607

608
                // TODO(yy): suppose there's only one bad input, we can do a
609
                // binary search to find out which input is causing this error
610
                // by recreating a tx using half of the inputs and check its
611
                // mempool acceptance.
612
                default:
2✔
613
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
2✔
614
                        return nil, err
2✔
615
                }
616
        }
617
}
618

619
// createAndCheckTx creates a tx based on the given inputs, change output
620
// script, and the fee rate. In addition, it validates the tx's mempool
621
// acceptance before returning a tx that can be published directly, along with
622
// its fee.
623
func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) {
23✔
624
        req := r.req
23✔
625
        f := r.feeFunction
23✔
626

23✔
627
        // Create the sweep tx with max fee rate of 0 as the fee function
23✔
628
        // guarantees the fee rate used here won't exceed the max fee rate.
23✔
629
        sweepCtx, err := t.createSweepTx(
23✔
630
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
23✔
631
        )
23✔
632
        if err != nil {
23✔
UNCOV
633
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
×
UNCOV
634
        }
×
635

636
        // Sanity check the budget still covers the fee.
637
        if sweepCtx.fee > req.Budget {
25✔
638
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
639
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
2✔
640
        }
2✔
641

642
        // If we had an extra txOut, then we'll update the result to include
643
        // it.
644
        req.ExtraTxOut = sweepCtx.extraTxOut
21✔
645

21✔
646
        // Validate the tx's mempool acceptance.
21✔
647
        err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
21✔
648

21✔
649
        // Exit early if the tx is valid.
21✔
650
        if err == nil {
33✔
651
                return sweepCtx, nil
12✔
652
        }
12✔
653

654
        // Print an error log if the chain backend doesn't support the mempool
655
        // acceptance test RPC.
656
        if errors.Is(err, rpcclient.ErrBackendVersion) {
9✔
657
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
658
                        "consider upgrading it to a newer version")
×
659
                return sweepCtx, nil
×
660
        }
×
661

662
        // We are running on a backend that doesn't implement the RPC
663
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
664
        if errors.Is(err, chain.ErrUnimplemented) {
9✔
UNCOV
665
                log.Debug("Skipped testmempoolaccept due to not implemented")
×
UNCOV
666
                return sweepCtx, nil
×
UNCOV
667
        }
×
668

669
        // If the inputs are spent by another tx, we will exit with the latest
670
        // sweepCtx and an error.
671
        if errors.Is(err, chain.ErrMissingInputs) {
9✔
UNCOV
672
                log.Debugf("Tx %v missing inputs, it's likely the input has "+
×
UNCOV
673
                        "been spent by others", sweepCtx.tx.TxHash())
×
UNCOV
674

×
UNCOV
675
                // Make sure to update the record with the latest attempt.
×
UNCOV
676
                t.updateRecord(r, sweepCtx)
×
UNCOV
677

×
UNCOV
678
                return sweepCtx, ErrInputMissing
×
UNCOV
679
        }
×
680

681
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
9✔
682
                sweepCtx.tx.TxHash(), err)
9✔
683
}
684

685
// handleMissingInputs handles the case when the chain backend reports back a
686
// missing inputs error, which could happen when one of the input has been spent
687
// in another tx, or the input is referencing an orphan. When the input is
688
// spent, it will be handled via the TxUnknownSpend flow by creating a
689
// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned.
UNCOV
690
func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult {
×
UNCOV
691
        // Get the spending txns.
×
UNCOV
692
        spends := t.getSpentInputs(r)
×
UNCOV
693

×
UNCOV
694
        // Attach the spending txns.
×
UNCOV
695
        r.spentInputs = spends
×
UNCOV
696

×
UNCOV
697
        // If there are no spending txns found and the input is missing, the
×
UNCOV
698
        // input is referencing an orphan tx that's no longer valid, e.g., the
×
UNCOV
699
        // spending the anchor output from the remote commitment after the local
×
UNCOV
700
        // commitment has confirmed. In this case we will mark it as fatal and
×
UNCOV
701
        // exit.
×
UNCOV
702
        if len(spends) == 0 {
×
UNCOV
703
                log.Warnf("Failing record=%v: found orphan inputs: %v\n",
×
UNCOV
704
                        r.requestID, inputTypeSummary(r.req.Inputs))
×
UNCOV
705

×
UNCOV
706
                // Create a result that will be sent to the resultChan which is
×
UNCOV
707
                // listened by the caller.
×
UNCOV
708
                result := &BumpResult{
×
UNCOV
709
                        Event:     TxFatal,
×
UNCOV
710
                        Tx:        r.tx,
×
UNCOV
711
                        requestID: r.requestID,
×
UNCOV
712
                        Err:       ErrInputMissing,
×
UNCOV
713
                }
×
UNCOV
714

×
UNCOV
715
                return result
×
UNCOV
716
        }
×
717

718
        // Check that the spending tx matches the sweeping tx - given that the
719
        // current sweeping tx has been failed due to missing inputs, the
720
        // spending tx must be a different tx, thus it should NOT be matched. We
721
        // perform a sanity check here to catch the unexpected state.
UNCOV
722
        if !t.isUnknownSpent(r, spends) {
×
723
                log.Errorf("Sweeping tx %v has missing inputs, yet the "+
×
724
                        "spending tx is the sweeping tx itself: %v",
×
725
                        r.tx.TxHash(), r.spentInputs)
×
726
        }
×
727

UNCOV
728
        return t.createUnknownSpentBumpResult(r)
×
729
}
730

731
// broadcast takes a monitored tx and publishes it to the network. Prior to the
732
// broadcast, it will subscribe the tx's confirmation notification and attach
733
// the event channel to the record. Any broadcast-related errors will not be
734
// returned here, instead, they will be put inside the `BumpResult` and
735
// returned to the caller.
736
func (t *TxPublisher) broadcast(record *monitorRecord) (*BumpResult, error) {
9✔
737
        txid := record.tx.TxHash()
9✔
738

9✔
739
        tx := record.tx
9✔
740
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
9✔
741
                txid, len(tx.TxIn), t.currentHeight.Load())
9✔
742

9✔
743
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
9✔
744
        // present of this new broadcast attempt.
9✔
745
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
18✔
746
                return aux.NotifyBroadcast(
9✔
747
                        record.req, tx, record.fee, record.outpointToTxIndex,
9✔
748
                )
9✔
749
        })
9✔
750
        if err != nil {
9✔
751
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
752
        }
×
753

754
        // Set the event, and change it to TxFailed if the wallet fails to
755
        // publish it.
756
        event := TxPublished
9✔
757

9✔
758
        // Publish the sweeping tx with customized label. If the publish fails,
9✔
759
        // this error will be saved in the `BumpResult` and it will be removed
9✔
760
        // from being monitored.
9✔
761
        err = t.cfg.Wallet.PublishTransaction(
9✔
762
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
9✔
763
        )
9✔
764
        if err != nil {
12✔
765
                // NOTE: we decide to attach this error to the result instead
3✔
766
                // of returning it here because by the time the tx reaches
3✔
767
                // here, it should have passed the mempool acceptance check. If
3✔
768
                // it still fails to be broadcast, it's likely a non-RBF
3✔
769
                // related error happened. So we send this error back to the
3✔
770
                // caller so that it can handle it properly.
3✔
771
                //
3✔
772
                // TODO(yy): find out which input is causing the failure.
3✔
773
                log.Errorf("Failed to publish tx %v: %v", txid, err)
3✔
774
                event = TxFailed
3✔
775
        }
3✔
776

777
        result := &BumpResult{
9✔
778
                Event:     event,
9✔
779
                Tx:        record.tx,
9✔
780
                Fee:       record.fee,
9✔
781
                FeeRate:   record.feeFunction.FeeRate(),
9✔
782
                Err:       err,
9✔
783
                requestID: record.requestID,
9✔
784
        }
9✔
785

9✔
786
        return result, nil
9✔
787
}
788

789
// notifyResult sends the result to the resultChan specified by the requestID.
790
// This channel is expected to be read by the caller.
791
func (t *TxPublisher) notifyResult(result *BumpResult) {
14✔
792
        id := result.requestID
14✔
793
        subscriber, ok := t.subscriberChans.Load(id)
14✔
794
        if !ok {
14✔
UNCOV
795
                log.Errorf("Result chan for id=%v not found", id)
×
UNCOV
796
                return
×
UNCOV
797
        }
×
798

799
        log.Debugf("Sending result %v for requestID=%v", result, id)
14✔
800

14✔
801
        select {
14✔
802
        // Send the result to the subscriber.
803
        //
804
        // TODO(yy): Add timeout in case it's blocking?
805
        case subscriber <- result:
12✔
806
        case <-t.quit:
2✔
807
                log.Debug("Fee bumper stopped")
2✔
808
        }
809
}
810

811
// removeResult removes the tracking of the result if the result contains a
812
// non-nil error, or the tx is confirmed, the record will be removed from the
813
// maps.
814
func (t *TxPublisher) removeResult(result *BumpResult) {
14✔
815
        id := result.requestID
14✔
816

14✔
817
        var txid chainhash.Hash
14✔
818
        if result.Tx != nil {
25✔
819
                txid = result.Tx.TxHash()
11✔
820
        }
11✔
821

822
        // Remove the record from the maps if there's an error or the tx is
823
        // confirmed. When there's an error, it means this tx has failed its
824
        // broadcast and cannot be retried. There are two cases it may fail,
825
        // - when the budget cannot cover the increased fee calculated by the
826
        //   fee function, hence the budget is used up.
827
        // - when a non-fee related error returned from PublishTransaction.
828
        switch result.Event {
14✔
829
        case TxFailed:
2✔
830
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
2✔
831
                        id, txid, result.Err)
2✔
832

833
        case TxConfirmed:
3✔
834
                // Remove the record if the tx is confirmed.
3✔
835
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
3✔
836
                        txid)
3✔
837

838
        case TxFatal:
2✔
839
                // Remove the record if there's an error.
2✔
840
                log.Debugf("Removing monitor record=%v due to fatal err: %v",
2✔
841
                        id, result.Err)
2✔
842

843
        case TxUnknownSpend:
2✔
844
                // Remove the record if there's an unknown spend.
2✔
845
                log.Debugf("Removing monitor record=%v due unknown spent: "+
2✔
846
                        "%v", id, result.Err)
2✔
847

848
        // Do nothing if it's neither failed or confirmed.
849
        default:
5✔
850
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
5✔
851
                        result.Event)
5✔
852

5✔
853
                return
5✔
854
        }
855

856
        t.records.Delete(id)
9✔
857
        t.subscriberChans.Delete(id)
9✔
858
}
859

860
// handleResult handles the result of a tx broadcast. It will notify the
861
// subscriber and remove the record if the tx is confirmed or failed to be
862
// broadcast.
863
func (t *TxPublisher) handleResult(result *BumpResult) {
11✔
864
        // Notify the subscriber.
11✔
865
        t.notifyResult(result)
11✔
866

11✔
867
        // Remove the record if it's failed or confirmed.
11✔
868
        t.removeResult(result)
11✔
869
}
11✔
870

871
// monitorRecord is used to keep track of the tx being monitored by the
872
// publisher internally.
873
type monitorRecord struct {
874
        // requestID is the ID of the request that created this record.
875
        requestID uint64
876

877
        // tx is the tx being monitored.
878
        tx *wire.MsgTx
879

880
        // req is the original request.
881
        req *BumpRequest
882

883
        // feeFunction is the fee bumping algorithm used by the publisher.
884
        feeFunction FeeFunction
885

886
        // fee is the fee paid by the tx.
887
        fee btcutil.Amount
888

889
        // outpointToTxIndex is a map of outpoint to tx index.
890
        outpointToTxIndex map[wire.OutPoint]int
891

892
        // spentInputs are the inputs spent by another tx which caused the
893
        // current tx failed.
894
        spentInputs map[wire.OutPoint]*wire.MsgTx
895
}
896

897
// Start starts the publisher by subscribing to block epoch updates and kicking
898
// off the monitor loop.
UNCOV
899
func (t *TxPublisher) Start(beat chainio.Blockbeat) error {
×
UNCOV
900
        log.Info("TxPublisher starting...")
×
UNCOV
901

×
UNCOV
902
        if t.started.Swap(true) {
×
903
                return fmt.Errorf("TxPublisher started more than once")
×
904
        }
×
905

906
        // Set the current height.
UNCOV
907
        t.currentHeight.Store(beat.Height())
×
UNCOV
908

×
UNCOV
909
        t.wg.Add(1)
×
UNCOV
910
        go t.monitor()
×
UNCOV
911

×
UNCOV
912
        log.Debugf("TxPublisher started")
×
UNCOV
913

×
UNCOV
914
        return nil
×
915
}
916

917
// Stop stops the publisher and waits for the monitor loop to exit.
UNCOV
918
func (t *TxPublisher) Stop() error {
×
UNCOV
919
        log.Info("TxPublisher stopping...")
×
UNCOV
920

×
UNCOV
921
        if t.stopped.Swap(true) {
×
922
                return fmt.Errorf("TxPublisher stopped more than once")
×
923
        }
×
924

UNCOV
925
        close(t.quit)
×
UNCOV
926
        t.wg.Wait()
×
UNCOV
927

×
UNCOV
928
        log.Debug("TxPublisher stopped")
×
UNCOV
929

×
UNCOV
930
        return nil
×
931
}
932

933
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
934
// it will examine all the txns being monitored, and check if any of them needs
935
// to be bumped. If so, it will attempt to bump the fee of the tx.
936
//
937
// NOTE: Must be run as a goroutine.
UNCOV
938
func (t *TxPublisher) monitor() {
×
UNCOV
939
        defer t.wg.Done()
×
UNCOV
940

×
UNCOV
941
        for {
×
UNCOV
942
                select {
×
UNCOV
943
                case beat := <-t.BlockbeatChan:
×
UNCOV
944
                        height := beat.Height()
×
UNCOV
945
                        log.Debugf("TxPublisher received new block: %v", height)
×
UNCOV
946

×
UNCOV
947
                        // Update the best known height for the publisher.
×
UNCOV
948
                        t.currentHeight.Store(height)
×
UNCOV
949

×
UNCOV
950
                        // Check all monitored txns to see if any of them needs
×
UNCOV
951
                        // to be bumped.
×
UNCOV
952
                        t.processRecords()
×
UNCOV
953

×
UNCOV
954
                        // Notify we've processed the block.
×
UNCOV
955
                        t.NotifyBlockProcessed(beat, nil)
×
956

UNCOV
957
                case <-t.quit:
×
UNCOV
958
                        log.Debug("Fee bumper stopped, exit monitor")
×
UNCOV
959
                        return
×
960
                }
961
        }
962
}
963

964
// processRecords checks all the txns being monitored, and checks if any of
965
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
966
func (t *TxPublisher) processRecords() {
5✔
967
        // confirmedRecords stores a map of the records which have been
5✔
968
        // confirmed.
5✔
969
        confirmedRecords := make(map[uint64]*monitorRecord)
5✔
970

5✔
971
        // feeBumpRecords stores a map of records which need to be bumped.
5✔
972
        feeBumpRecords := make(map[uint64]*monitorRecord)
5✔
973

5✔
974
        // failedRecords stores a map of records which has inputs being spent
5✔
975
        // by a third party.
5✔
976
        failedRecords := make(map[uint64]*monitorRecord)
5✔
977

5✔
978
        // initialRecords stores a map of records which are being created and
5✔
979
        // published for the first time.
5✔
980
        initialRecords := make(map[uint64]*monitorRecord)
5✔
981

5✔
982
        // visitor is a helper closure that visits each record and divides them
5✔
983
        // into two groups.
5✔
984
        visitor := func(requestID uint64, r *monitorRecord) error {
10✔
985
                log.Tracef("Checking monitor recordID=%v", requestID)
5✔
986

5✔
987
                // Check whether the inputs have already been spent.
5✔
988
                spends := t.getSpentInputs(r)
5✔
989

5✔
990
                // If the any of the inputs has been spent, the record will be
5✔
991
                // marked as failed or confirmed.
5✔
992
                if len(spends) != 0 {
8✔
993
                        // Attach the spending txns.
3✔
994
                        r.spentInputs = spends
3✔
995

3✔
996
                        // When tx is nil, it means we haven't tried the initial
3✔
997
                        // broadcast yet the input is already spent. This could
3✔
998
                        // happen when the node shuts down, a previous sweeping
3✔
999
                        // tx confirmed, then the node comes back online and
3✔
1000
                        // reoffers the inputs. Another case is the remote node
3✔
1001
                        // spends the input quickly before we even attempt the
3✔
1002
                        // sweep. In either case we will fail the record and let
3✔
1003
                        // the sweeper handles it.
3✔
1004
                        if r.tx == nil {
4✔
1005
                                failedRecords[requestID] = r
1✔
1006
                                return nil
1✔
1007
                        }
1✔
1008

1009
                        // Check whether the inputs has been spent by a unknown
1010
                        // tx.
1011
                        if t.isUnknownSpent(r, spends) {
3✔
1012
                                failedRecords[requestID] = r
1✔
1013

1✔
1014
                                // Move to the next record.
1✔
1015
                                return nil
1✔
1016
                        }
1✔
1017

1018
                        // The tx is ours, we can move it to the confirmed queue
1019
                        // and stop monitoring it.
1020
                        confirmedRecords[requestID] = r
1✔
1021

1✔
1022
                        // Move to the next record.
1✔
1023
                        return nil
1✔
1024
                }
1025

1026
                // This is the first time we see this record, so we put it in
1027
                // the initial queue.
1028
                if r.tx == nil {
3✔
1029
                        initialRecords[requestID] = r
1✔
1030

1✔
1031
                        return nil
1✔
1032
                }
1✔
1033

1034
                // We can only get here when the inputs are not spent and a
1035
                // previous sweeping tx has been attempted. In this case we will
1036
                // perform an RBF on it in the current block.
1037
                feeBumpRecords[requestID] = r
1✔
1038

1✔
1039
                // Return nil to move to the next record.
1✔
1040
                return nil
1✔
1041
        }
1042

1043
        // Iterate through all the records and divide them into four groups.
1044
        t.records.ForEach(visitor)
5✔
1045

5✔
1046
        // Handle the initial broadcast.
5✔
1047
        for _, r := range initialRecords {
6✔
1048
                t.handleInitialBroadcast(r)
1✔
1049
        }
1✔
1050

1051
        // For records that are confirmed, we'll notify the caller about this
1052
        // result.
1053
        for _, r := range confirmedRecords {
6✔
1054
                t.wg.Add(1)
1✔
1055
                go t.handleTxConfirmed(r)
1✔
1056
        }
1✔
1057

1058
        // Get the current height to be used in the following goroutines.
1059
        currentHeight := t.currentHeight.Load()
5✔
1060

5✔
1061
        // For records that are not confirmed, we perform a fee bump if needed.
5✔
1062
        for _, r := range feeBumpRecords {
6✔
1063
                t.wg.Add(1)
1✔
1064
                go t.handleFeeBumpTx(r, currentHeight)
1✔
1065
        }
1✔
1066

1067
        // For records that are failed, we'll notify the caller about this
1068
        // result.
1069
        for _, r := range failedRecords {
7✔
1070
                t.wg.Add(1)
2✔
1071
                go t.handleUnknownSpent(r)
2✔
1072
        }
2✔
1073
}
1074

1075
// handleTxConfirmed is called when a monitored tx is confirmed. It will
1076
// notify the subscriber then remove the record from the maps .
1077
//
1078
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1079
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
2✔
1080
        defer t.wg.Done()
2✔
1081

2✔
1082
        log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash())
2✔
1083

2✔
1084
        // Create a result that will be sent to the resultChan which is
2✔
1085
        // listened by the caller.
2✔
1086
        result := &BumpResult{
2✔
1087
                Event:     TxConfirmed,
2✔
1088
                Tx:        r.tx,
2✔
1089
                requestID: r.requestID,
2✔
1090
                Fee:       r.fee,
2✔
1091
                FeeRate:   r.feeFunction.FeeRate(),
2✔
1092
        }
2✔
1093

2✔
1094
        // Notify that this tx is confirmed and remove the record from the map.
2✔
1095
        t.handleResult(result)
2✔
1096
}
2✔
1097

1098
// handleInitialTxError takes the error from `initializeTx` and decides the
1099
// bump event. It will construct a BumpResult and handles it.
1100
func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
2✔
1101
        // Create a bump result to be sent to the sweeper.
2✔
1102
        result := &BumpResult{
2✔
1103
                Err:       err,
2✔
1104
                requestID: r.requestID,
2✔
1105
        }
2✔
1106

2✔
1107
        // We now decide what type of event to send.
2✔
1108
        switch {
2✔
1109
        // When the error is due to a dust output, we'll send a TxFailed so
1110
        // these inputs can be retried with a different group in the next
1111
        // block.
UNCOV
1112
        case errors.Is(err, ErrTxNoOutput):
×
UNCOV
1113
                result.Event = TxFailed
×
1114

1115
        // When the error is due to zero fee rate delta, we'll send a TxFailed
1116
        // so these inputs can be retried in the next block.
UNCOV
1117
        case errors.Is(err, ErrZeroFeeRateDelta):
×
UNCOV
1118
                result.Event = TxFailed
×
1119

1120
        // When the error is due to budget being used up, we'll send a TxFailed
1121
        // so these inputs can be retried with a different group in the next
1122
        // block.
UNCOV
1123
        case errors.Is(err, ErrMaxPosition):
×
UNCOV
1124
                fallthrough
×
1125

1126
        // If the tx doesn't not have enough budget, or if the inputs amounts
1127
        // are not sufficient to cover the budget, we will return a TxFailed
1128
        // event so the sweeper can handle it by re-clustering the utxos.
1129
        case errors.Is(err, ErrNotEnoughInputs),
UNCOV
1130
                errors.Is(err, ErrNotEnoughBudget):
×
UNCOV
1131

×
UNCOV
1132
                result.Event = TxFailed
×
UNCOV
1133

×
UNCOV
1134
                // Calculate the starting fee rate to be used when retry
×
UNCOV
1135
                // sweeping these inputs.
×
UNCOV
1136
                feeRate, err := t.calculateRetryFeeRate(r)
×
UNCOV
1137
                if err != nil {
×
1138
                        result.Event = TxFatal
×
1139
                        result.Err = err
×
1140
                }
×
1141

1142
                // Attach the new fee rate.
UNCOV
1143
                result.FeeRate = feeRate
×
1144

1145
        // When there are missing inputs, we'll create a TxUnknownSpend bump
1146
        // result here so the rest of the inputs can be retried.
UNCOV
1147
        case errors.Is(err, ErrInputMissing):
×
UNCOV
1148
                result = t.handleMissingInputs(r)
×
1149

1150
        // Otherwise this is not a fee-related error and the tx cannot be
1151
        // retried. In that case we will fail ALL the inputs in this tx, which
1152
        // means they will be removed from the sweeper and never be tried
1153
        // again.
1154
        //
1155
        // TODO(yy): Find out which input is causing the failure and fail that
1156
        // one only.
1157
        default:
2✔
1158
                result.Event = TxFatal
2✔
1159
        }
1160

1161
        t.handleResult(result)
2✔
1162
}
1163

1164
// handleInitialBroadcast is called when a new request is received. It will
1165
// handle the initial tx creation and broadcast. In details,
1166
// 1. init a fee function based on the given strategy.
1167
// 2. create an RBF-compliant tx and monitor it for confirmation.
1168
// 3. notify the initial broadcast result back to the caller.
1169
func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
5✔
1170
        log.Debugf("Initial broadcast for requestID=%v", r.requestID)
5✔
1171

5✔
1172
        var (
5✔
1173
                result *BumpResult
5✔
1174
                err    error
5✔
1175
        )
5✔
1176

5✔
1177
        // Attempt an initial broadcast which is guaranteed to comply with the
5✔
1178
        // RBF rules.
5✔
1179
        //
5✔
1180
        // Create the initial tx to be broadcasted.
5✔
1181
        record, err := t.initializeTx(r)
5✔
1182
        if err != nil {
7✔
1183
                log.Errorf("Initial broadcast failed: %v", err)
2✔
1184

2✔
1185
                // We now handle the initialization error and exit.
2✔
1186
                t.handleInitialTxError(r, err)
2✔
1187

2✔
1188
                return
2✔
1189
        }
2✔
1190

1191
        // Successfully created the first tx, now broadcast it.
1192
        result, err = t.broadcast(record)
3✔
1193
        if err != nil {
3✔
1194
                // The broadcast failed, which can only happen if the tx record
×
1195
                // cannot be found or the aux sweeper returns an error. In
×
1196
                // either case, we will send back a TxFail event so these
×
1197
                // inputs can be retried.
×
1198
                result = &BumpResult{
×
1199
                        Event:     TxFailed,
×
1200
                        Err:       err,
×
1201
                        requestID: r.requestID,
×
1202
                }
×
1203
        }
×
1204

1205
        t.handleResult(result)
3✔
1206
}
1207

1208
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
1209
// attempt to bump the fee of the tx.
1210
//
1211
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1212
func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
4✔
1213
        defer t.wg.Done()
4✔
1214

4✔
1215
        log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(),
4✔
1216
                r.requestID)
4✔
1217

4✔
1218
        oldTxid := r.tx.TxHash()
4✔
1219

4✔
1220
        // Get the current conf target for this record.
4✔
1221
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
4✔
1222

4✔
1223
        // Ask the fee function whether a bump is needed. We expect the fee
4✔
1224
        // function to increase its returned fee rate after calling this
4✔
1225
        // method.
4✔
1226
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
4✔
1227
        if err != nil {
5✔
1228
                // TODO(yy): send this error back to the sweeper so it can
1✔
1229
                // re-group the inputs?
1✔
1230
                log.Errorf("Failed to increase fee rate for tx %v at "+
1✔
1231
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
1✔
1232

1✔
1233
                return
1✔
1234
        }
1✔
1235

1236
        // If the fee rate was not increased, there's no need to bump the fee.
1237
        if !increased {
4✔
1238
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
1✔
1239
                        t.currentHeight.Load())
1✔
1240

1✔
1241
                return
1✔
1242
        }
1✔
1243

1244
        // The fee function now has a new fee rate, we will use it to bump the
1245
        // fee of the tx.
1246
        resultOpt := t.createAndPublishTx(r)
2✔
1247

2✔
1248
        // If there's a result, we will notify the caller about the result.
2✔
1249
        resultOpt.WhenSome(func(result BumpResult) {
4✔
1250
                // Notify the new result.
2✔
1251
                t.handleResult(&result)
2✔
1252
        })
2✔
1253
}
1254

1255
// handleUnknownSpent is called when the inputs are spent by a unknown tx. It
1256
// will notify the subscriber then remove the record from the maps and send a
1257
// TxUnknownSpend event to the subscriber.
1258
//
1259
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1260
func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
2✔
1261
        defer t.wg.Done()
2✔
1262

2✔
1263
        log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+
2✔
1264
                "bumper, failing it now:\n%v", r.requestID,
2✔
1265
                inputTypeSummary(r.req.Inputs))
2✔
1266

2✔
1267
        // Create a result that will be sent to the resultChan which is listened
2✔
1268
        // by the caller.
2✔
1269
        result := t.createUnknownSpentBumpResult(r)
2✔
1270

2✔
1271
        // Notify the sweeper about this result in the end.
2✔
1272
        t.handleResult(result)
2✔
1273
}
2✔
1274

1275
// createUnknownSpentBumpResult creates and returns a BumpResult given the
1276
// monitored record has unknown spends.
1277
func (t *TxPublisher) createUnknownSpentBumpResult(
1278
        r *monitorRecord) *BumpResult {
2✔
1279

2✔
1280
        // Create a result that will be sent to the resultChan which is listened
2✔
1281
        // by the caller.
2✔
1282
        result := &BumpResult{
2✔
1283
                Event:       TxUnknownSpend,
2✔
1284
                Tx:          r.tx,
2✔
1285
                requestID:   r.requestID,
2✔
1286
                Err:         ErrUnknownSpent,
2✔
1287
                SpentInputs: r.spentInputs,
2✔
1288
        }
2✔
1289

2✔
1290
        // Calculate the next fee rate for the retry.
2✔
1291
        feeRate, err := t.calculateRetryFeeRate(r)
2✔
1292
        if err != nil {
2✔
UNCOV
1293
                // Overwrite the event and error so the sweeper will
×
UNCOV
1294
                // remove this input.
×
UNCOV
1295
                result.Event = TxFatal
×
UNCOV
1296
                result.Err = err
×
UNCOV
1297
        }
×
1298

1299
        // Attach the new fee rate to be used for the next sweeping attempt.
1300
        result.FeeRate = feeRate
2✔
1301

2✔
1302
        return result
2✔
1303
}
1304

1305
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
1306
// to the network. It will update the record with the new tx and fee rate if
1307
// successfully created, and return the result when published successfully.
1308
func (t *TxPublisher) createAndPublishTx(
1309
        r *monitorRecord) fn.Option[BumpResult] {
7✔
1310

7✔
1311
        // Fetch the old tx.
7✔
1312
        oldTx := r.tx
7✔
1313

7✔
1314
        // Create a new tx with the new fee rate.
7✔
1315
        //
7✔
1316
        // NOTE: The fee function is expected to have increased its returned
7✔
1317
        // fee rate after calling the SkipFeeBump method. So we can use it
7✔
1318
        // directly here.
7✔
1319
        sweepCtx, err := t.createAndCheckTx(r)
7✔
1320

7✔
1321
        // If there's an error creating the replacement tx, we need to abort the
7✔
1322
        // flow and handle it.
7✔
1323
        if err != nil {
10✔
1324
                return t.handleReplacementTxError(r, oldTx, err)
3✔
1325
        }
3✔
1326

1327
        // The tx has been created without any errors, we now register a new
1328
        // record by overwriting the same requestID.
1329
        record := t.updateRecord(r, sweepCtx)
4✔
1330

4✔
1331
        // Attempt to broadcast this new tx.
4✔
1332
        result, err := t.broadcast(record)
4✔
1333
        if err != nil {
4✔
1334
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1335
                        sweepCtx.tx.TxHash(), err)
×
1336

×
1337
                return fn.None[BumpResult]()
×
1338
        }
×
1339

1340
        // If the result error is fee related, we will return no error and let
1341
        // the fee bumper retry it at next block.
1342
        //
1343
        // NOTE: we may get this error if we've bypassed the mempool check,
1344
        // which means we are using neutrino backend.
1345
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
4✔
1346
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
4✔
UNCOV
1347

×
UNCOV
1348
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(),
×
UNCOV
1349
                        result.Err)
×
UNCOV
1350

×
UNCOV
1351
                return fn.None[BumpResult]()
×
UNCOV
1352
        }
×
1353

1354
        // A successful replacement tx is created, attach the old tx.
1355
        result.ReplacedTx = oldTx
4✔
1356

4✔
1357
        // If the new tx failed to be published, we will return the result so
4✔
1358
        // the caller can handle it.
4✔
1359
        if result.Event == TxFailed {
5✔
1360
                return fn.Some(*result)
1✔
1361
        }
1✔
1362

1363
        log.Debugf("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
3✔
1364
                sweepCtx.tx.TxHash())
3✔
1365

3✔
1366
        // Otherwise, it's a successful RBF, set the event and return.
3✔
1367
        result.Event = TxReplaced
3✔
1368

3✔
1369
        return fn.Some(*result)
3✔
1370
}
1371

1372
// isUnknownSpent checks whether the inputs of the tx has already been spent by
1373
// a tx not known to us. When a tx is not confirmed, yet its inputs has been
1374
// spent, then it must be spent by a different tx other than the sweeping tx
1375
// here.
1376
func (t *TxPublisher) isUnknownSpent(r *monitorRecord,
1377
        spends map[wire.OutPoint]*wire.MsgTx) bool {
2✔
1378

2✔
1379
        txid := r.tx.TxHash()
2✔
1380

2✔
1381
        // Iterate all the spending txns and check if they match the sweeping
2✔
1382
        // tx.
2✔
1383
        for op, spendingTx := range spends {
4✔
1384
                spendingTxID := spendingTx.TxHash()
2✔
1385

2✔
1386
                // If the spending tx is the same as the sweeping tx then we are
2✔
1387
                // good.
2✔
1388
                if spendingTxID == txid {
3✔
1389
                        continue
1✔
1390
                }
1391

1392
                log.Warnf("Detected unknown spend of input=%v in tx=%v", op,
1✔
1393
                        spendingTx.TxHash())
1✔
1394

1✔
1395
                return true
1✔
1396
        }
1397

1398
        return false
1✔
1399
}
1400

1401
// getSpentInputs performs a non-blocking read on the spending subscriptions to
1402
// see whether any of the monitored inputs has been spent. A map of inputs with
1403
// their spending txns are returned if found.
1404
func (t *TxPublisher) getSpentInputs(
1405
        r *monitorRecord) map[wire.OutPoint]*wire.MsgTx {
6✔
1406

6✔
1407
        // Create a slice to record the inputs spent.
6✔
1408
        spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs))
6✔
1409

6✔
1410
        // Iterate all the inputs and check if they have been spent already.
6✔
1411
        for _, inp := range r.req.Inputs {
14✔
1412
                op := inp.OutPoint()
8✔
1413

8✔
1414
                // For wallet utxos, the height hint is not set - we don't need
8✔
1415
                // to monitor them for third party spend.
8✔
1416
                //
8✔
1417
                // TODO(yy): We need to properly lock wallet utxos before
8✔
1418
                // skipping this check as the same wallet utxo can be used by
8✔
1419
                // different sweeping txns.
8✔
1420
                heightHint := inp.HeightHint()
8✔
1421
                if heightHint == 0 {
9✔
1422
                        heightHint = uint32(t.currentHeight.Load())
1✔
1423
                        log.Debugf("Checking wallet input %v using heightHint "+
1✔
1424
                                "%v", op, heightHint)
1✔
1425
                }
1✔
1426

1427
                // Check whether the input has been spent or not.
1428
                utxo, err := t.cfg.ChainIO.GetUtxo(
8✔
1429
                        &op, inp.SignDesc().Output.PkScript, heightHint, t.quit,
8✔
1430
                )
8✔
1431
                if err != nil {
8✔
NEW
1432
                        log.Errorf("Failed to get utxo for input=%v: %v", op,
×
NEW
1433
                                err)
×
NEW
1434

×
NEW
1435
                        return spentInputs
×
NEW
1436
                }
×
1437

1438
                // If a non-nil utxo is returned it means this input is still
1439
                // unspent. Thus we can continue to the next input as there's no
1440
                // need to register spend notification for it.
1441
                if utxo != nil {
12✔
1442
                        continue
4✔
1443
                }
1444

1445
                // If the input has already been spent after the height hint, a
1446
                // spend event is sent back immediately.
1447
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
4✔
1448
                        &op, inp.SignDesc().Output.PkScript, heightHint,
4✔
1449
                )
4✔
1450
                if err != nil {
4✔
1451
                        log.Criticalf("Failed to register spend ntfn for "+
×
1452
                                "input=%v: %v", op, err)
×
1453

×
NEW
1454
                        return spentInputs
×
1455
                }
×
1456

1457
                // Remove the subscription when exit.
1458
                defer spendEvent.Cancel()
4✔
1459

4✔
1460
                // Do a blocking read to receive the spent event.
4✔
1461
                select {
4✔
1462
                case spend, ok := <-spendEvent.Spend:
4✔
1463
                        if !ok {
4✔
1464
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1465

×
1466
                                continue
×
1467
                        }
1468

1469
                        spendingTx := spend.SpendingTx
4✔
1470

4✔
1471
                        log.Debugf("Detected spent of input=%v in tx=%v", op,
4✔
1472
                                spendingTx.TxHash())
4✔
1473

4✔
1474
                        spentInputs[op] = spendingTx
4✔
1475

1476
                // The above spent event should be returned immediately, yet we
1477
                // still perform a timeout check here in case it blocks forever.
1478
                //
1479
                // TODO(yy): The proper way to fix this is to redesign the area
1480
                // so we use the async flow for checking whether a given input
1481
                // is spent or not. A better approach is to implement a new
1482
                // synchronous method to check for spending, which should be
1483
                // attempted when implementing SQL into btcwallet.
NEW
1484
                case <-time.After(spentNotificationTimeout):
×
NEW
1485
                        log.Errorf("Input is reported as spent by GetUtxo, "+
×
NEW
1486
                                "but spending notification is not returned "+
×
NEW
1487
                                "immediately: input=%v, heightHint=%v", op,
×
NEW
1488
                                heightHint)
×
NEW
1489

×
NEW
1490
                        // Return with the inputs we've found so far. This
×
NEW
1491
                        // prevents us from treating this known-spent input as
×
NEW
1492
                        // unspent in this cycle.
×
NEW
1493
                        return spentInputs
×
1494
                }
1495
        }
1496

1497
        return spentInputs
6✔
1498
}
1499

1500
// calcCurrentConfTarget calculates the current confirmation target based on
1501
// the deadline height. The conf target is capped at 0 if the deadline has
1502
// already been past.
1503
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
14✔
1504
        var confTarget uint32
14✔
1505

14✔
1506
        // Calculate how many blocks left until the deadline.
14✔
1507
        deadlineDelta := deadline - currentHeight
14✔
1508

14✔
1509
        // If we are already past the deadline, we will set the conf target to
14✔
1510
        // be 1.
14✔
1511
        if deadlineDelta < 0 {
18✔
1512
                log.Warnf("Deadline is %d blocks behind current height %v",
4✔
1513
                        -deadlineDelta, currentHeight)
4✔
1514

4✔
1515
                confTarget = 0
4✔
1516
        } else {
14✔
1517
                confTarget = uint32(deadlineDelta)
10✔
1518
        }
10✔
1519

1520
        return confTarget
14✔
1521
}
1522

1523
// sweepTxCtx houses a sweep transaction with additional context.
1524
type sweepTxCtx struct {
1525
        tx *wire.MsgTx
1526

1527
        fee btcutil.Amount
1528

1529
        extraTxOut fn.Option[SweepOutput]
1530

1531
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1532
        // the sweep transaction.
1533
        outpointToTxIndex map[wire.OutPoint]int
1534
}
1535

1536
// createSweepTx creates a sweeping tx based on the given inputs, change
1537
// address and fee rate.
1538
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1539
        changePkScript lnwallet.AddrWithKey,
1540
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
23✔
1541

23✔
1542
        // Validate and calculate the fee and change amount.
23✔
1543
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
23✔
1544
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
23✔
1545
                t.cfg.AuxSweeper,
23✔
1546
        )
23✔
1547
        if err != nil {
23✔
UNCOV
1548
                return nil, err
×
UNCOV
1549
        }
×
1550

1551
        var (
23✔
1552
                // Create the sweep transaction that we will be building. We
23✔
1553
                // use version 2 as it is required for CSV.
23✔
1554
                sweepTx = wire.NewMsgTx(2)
23✔
1555

23✔
1556
                // We'll add the inputs as we go so we know the final ordering
23✔
1557
                // of inputs to sign.
23✔
1558
                idxs []input.Input
23✔
1559
        )
23✔
1560

23✔
1561
        // We start by adding all inputs that commit to an output. We do this
23✔
1562
        // since the input and output index must stay the same for the
23✔
1563
        // signatures to be valid.
23✔
1564
        outpointToTxIndex := make(map[wire.OutPoint]int)
23✔
1565
        for _, o := range inputs {
46✔
1566
                if o.RequiredTxOut() == nil {
46✔
1567
                        continue
23✔
1568
                }
1569

UNCOV
1570
                idxs = append(idxs, o)
×
UNCOV
1571
                sweepTx.AddTxIn(&wire.TxIn{
×
UNCOV
1572
                        PreviousOutPoint: o.OutPoint(),
×
UNCOV
1573
                        Sequence:         o.BlocksToMaturity(),
×
UNCOV
1574
                })
×
UNCOV
1575
                sweepTx.AddTxOut(o.RequiredTxOut())
×
UNCOV
1576

×
UNCOV
1577
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
×
1578
        }
1579

1580
        // Sum up the value contained in the remaining inputs, and add them to
1581
        // the sweep transaction.
1582
        for _, o := range inputs {
46✔
1583
                if o.RequiredTxOut() != nil {
23✔
UNCOV
1584
                        continue
×
1585
                }
1586

1587
                idxs = append(idxs, o)
23✔
1588
                sweepTx.AddTxIn(&wire.TxIn{
23✔
1589
                        PreviousOutPoint: o.OutPoint(),
23✔
1590
                        Sequence:         o.BlocksToMaturity(),
23✔
1591
                })
23✔
1592
        }
1593

1594
        // If we have change outputs to add, then add it the sweep transaction
1595
        // here.
1596
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
46✔
1597
                for i := range changeOuts {
69✔
1598
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
46✔
1599
                }
46✔
1600
        })
1601

1602
        // We'll default to using the current block height as locktime, if none
1603
        // of the inputs commits to a different locktime.
1604
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
23✔
1605

23✔
1606
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
23✔
1607
        if err != nil {
23✔
1608
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
1609
                        "for hash cache: %v", err)
×
1610
        }
×
1611
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
23✔
1612

23✔
1613
        // With all the inputs in place, use each output's unique input script
23✔
1614
        // function to generate the final witness required for spending.
23✔
1615
        addInputScript := func(idx int, tso input.Input) error {
46✔
1616
                inputScript, err := tso.CraftInputScript(
23✔
1617
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
23✔
1618
                )
23✔
1619
                if err != nil {
23✔
1620
                        return err
×
1621
                }
×
1622

1623
                sweepTx.TxIn[idx].Witness = inputScript.Witness
23✔
1624

23✔
1625
                if len(inputScript.SigScript) == 0 {
46✔
1626
                        return nil
23✔
1627
                }
23✔
1628

1629
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1630

×
1631
                return nil
×
1632
        }
1633

1634
        for idx, inp := range idxs {
46✔
1635
                if err := addInputScript(idx, inp); err != nil {
23✔
1636
                        return nil, err
×
1637
                }
×
1638
        }
1639

1640
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
23✔
1641
                inputTypeSummary(inputs))
23✔
1642

23✔
1643
        // Try to locate the extra change output, though there might be None.
23✔
1644
        extraTxOut := fn.MapOption(
23✔
1645
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
46✔
1646
                        for _, sweepOut := range sweepOuts {
69✔
1647
                                if !sweepOut.IsExtra {
92✔
1648
                                        continue
46✔
1649
                                }
1650

1651
                                // If we sweep outputs of a custom channel, the
1652
                                // custom leaves in those outputs will be merged
1653
                                // into a single output, even if we sweep
1654
                                // multiple outputs (e.g. to_remote and breached
1655
                                // to_local of a breached channel) at the same
1656
                                // time. So there will only ever be one extra
1657
                                // output.
1658
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
1659
                                        lnutils.SpewLogClosure(sweepOut))
×
1660

×
1661
                                return fn.Some(sweepOut)
×
1662
                        }
1663

1664
                        return fn.None[SweepOutput]()
23✔
1665
                },
1666
        )(changeOutputsOpt)
1667

1668
        return &sweepTxCtx{
23✔
1669
                tx:                sweepTx,
23✔
1670
                fee:               txFee,
23✔
1671
                extraTxOut:        fn.FlattenOption(extraTxOut),
23✔
1672
                outpointToTxIndex: outpointToTxIndex,
23✔
1673
        }, nil
23✔
1674
}
1675

1676
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1677
// optional locktime after a series of validations:
1678
// 1. check the locktime has been reached.
1679
// 2. check the locktimes are the same.
1680
// 3. check the inputs cover the outputs.
1681
//
1682
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1683
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1684
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1685
        auxSweeper fn.Option[AuxSweeper]) (
1686
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
23✔
1687

23✔
1688
        noChange := fn.None[[]SweepOutput]()
23✔
1689
        noLocktime := fn.None[int32]()
23✔
1690

23✔
1691
        // Given the set of inputs we have, if we have an aux sweeper, then
23✔
1692
        // we'll attempt to see if we have any other change outputs we'll need
23✔
1693
        // to add to the sweep transaction.
23✔
1694
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
23✔
1695

23✔
1696
        var extraChangeOut fn.Option[SweepOutput]
23✔
1697
        err := fn.MapOptionZ(
23✔
1698
                auxSweeper, func(aux AuxSweeper) error {
46✔
1699
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
23✔
1700
                        if err := extraOut.Err(); err != nil {
23✔
1701
                                return err
×
1702
                        }
×
1703

1704
                        extraChangeOut = extraOut.LeftToSome()
23✔
1705

23✔
1706
                        return nil
23✔
1707
                },
1708
        )
1709
        if err != nil {
23✔
1710
                return 0, noChange, noLocktime, err
×
1711
        }
×
1712

1713
        // Creating a weight estimator with nil outputs and zero max fee rate.
1714
        // We don't allow adding customized outputs in the sweeping tx, and the
1715
        // fee rate is already being managed before we get here.
1716
        inputs, estimator, err := getWeightEstimate(
23✔
1717
                inputs, nil, feeRate, 0, changePkScripts,
23✔
1718
        )
23✔
1719
        if err != nil {
23✔
1720
                return 0, noChange, noLocktime, err
×
1721
        }
×
1722

1723
        txFee := estimator.fee()
23✔
1724

23✔
1725
        var (
23✔
1726
                // Track whether any of the inputs require a certain locktime.
23✔
1727
                locktime = int32(-1)
23✔
1728

23✔
1729
                // We keep track of total input amount, and required output
23✔
1730
                // amount to use for calculating the change amount below.
23✔
1731
                totalInput     btcutil.Amount
23✔
1732
                requiredOutput btcutil.Amount
23✔
1733
        )
23✔
1734

23✔
1735
        // If we have an extra change output, then we'll add it as a required
23✔
1736
        // output amt.
23✔
1737
        extraChangeOut.WhenSome(func(o SweepOutput) {
46✔
1738
                requiredOutput += btcutil.Amount(o.Value)
23✔
1739
        })
23✔
1740

1741
        // Go through each input and check if the required lock times have
1742
        // reached and are the same.
1743
        for _, o := range inputs {
46✔
1744
                // If the input has a required output, we'll add it to the
23✔
1745
                // required output amount.
23✔
1746
                if o.RequiredTxOut() != nil {
23✔
UNCOV
1747
                        requiredOutput += btcutil.Amount(
×
UNCOV
1748
                                o.RequiredTxOut().Value,
×
UNCOV
1749
                        )
×
UNCOV
1750
                }
×
1751

1752
                // Update the total input amount.
1753
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
23✔
1754

23✔
1755
                lt, ok := o.RequiredLockTime()
23✔
1756

23✔
1757
                // Skip if the input doesn't require a lock time.
23✔
1758
                if !ok {
46✔
1759
                        continue
23✔
1760
                }
1761

1762
                // Check if the lock time has reached
UNCOV
1763
                if lt > uint32(currentHeight) {
×
1764
                        return 0, noChange, noLocktime,
×
1765
                                fmt.Errorf("%w: current height is %v, "+
×
1766
                                        "locktime is %v", ErrLocktimeImmature,
×
1767
                                        currentHeight, lt)
×
1768
                }
×
1769

1770
                // If another input commits to a different locktime, they
1771
                // cannot be combined in the same transaction.
UNCOV
1772
                if locktime != -1 && locktime != int32(lt) {
×
1773
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1774
                }
×
1775

1776
                // Update the locktime for next iteration.
UNCOV
1777
                locktime = int32(lt)
×
1778
        }
1779

1780
        // Make sure total output amount is less than total input amount.
1781
        if requiredOutput+txFee > totalInput {
23✔
UNCOV
1782
                log.Errorf("Insufficient input to create sweep tx: "+
×
UNCOV
1783
                        "input_sum=%v, output_sum=%v", totalInput,
×
UNCOV
1784
                        requiredOutput+txFee)
×
UNCOV
1785

×
UNCOV
1786
                return 0, noChange, noLocktime, ErrNotEnoughInputs
×
UNCOV
1787
        }
×
1788

1789
        // The value remaining after the required output and fees is the
1790
        // change output.
1791
        changeAmt := totalInput - requiredOutput - txFee
23✔
1792
        changeOuts := make([]SweepOutput, 0, 2)
23✔
1793

23✔
1794
        extraChangeOut.WhenSome(func(o SweepOutput) {
46✔
1795
                changeOuts = append(changeOuts, o)
23✔
1796
        })
23✔
1797

1798
        // We'll calculate the dust limit for the given changePkScript since it
1799
        // is variable.
1800
        changeFloor := lnwallet.DustLimitForSize(
23✔
1801
                len(changePkScript.DeliveryAddress),
23✔
1802
        )
23✔
1803

23✔
1804
        switch {
23✔
1805
        // If the change amount is dust, we'll move it into the fees, and
1806
        // ignore it.
UNCOV
1807
        case changeAmt < changeFloor:
×
UNCOV
1808
                log.Infof("Change amt %v below dustlimit %v, not adding "+
×
UNCOV
1809
                        "change output", changeAmt, changeFloor)
×
UNCOV
1810

×
UNCOV
1811
                // If there's no required output, and the change output is a
×
UNCOV
1812
                // dust, it means we are creating a tx without any outputs. In
×
UNCOV
1813
                // this case we'll return an error. This could happen when
×
UNCOV
1814
                // creating a tx that has an anchor as the only input.
×
UNCOV
1815
                if requiredOutput == 0 {
×
UNCOV
1816
                        return 0, noChange, noLocktime, ErrTxNoOutput
×
UNCOV
1817
                }
×
1818

1819
                // The dust amount is added to the fee.
1820
                txFee += changeAmt
×
1821

1822
        // Otherwise, we'll actually recognize it as a change output.
1823
        default:
23✔
1824
                changeOuts = append(changeOuts, SweepOutput{
23✔
1825
                        TxOut: wire.TxOut{
23✔
1826
                                Value:    int64(changeAmt),
23✔
1827
                                PkScript: changePkScript.DeliveryAddress,
23✔
1828
                        },
23✔
1829
                        IsExtra:     false,
23✔
1830
                        InternalKey: changePkScript.InternalKey,
23✔
1831
                })
23✔
1832
        }
1833

1834
        // Optionally set the locktime.
1835
        locktimeOpt := fn.Some(locktime)
23✔
1836
        if locktime == -1 {
46✔
1837
                locktimeOpt = noLocktime
23✔
1838
        }
23✔
1839

1840
        var changeOutsOpt fn.Option[[]SweepOutput]
23✔
1841
        if len(changeOuts) > 0 {
46✔
1842
                changeOutsOpt = fn.Some(changeOuts)
23✔
1843
        }
23✔
1844

1845
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
23✔
1846
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
23✔
1847
                "parents_fee=%v, parents_weight=%v, current_height=%v",
23✔
1848
                len(inputs), inputTypeSummary(inputs), feeRate,
23✔
1849
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
23✔
1850
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
23✔
1851

23✔
1852
        return txFee, changeOutsOpt, locktimeOpt, nil
23✔
1853
}
1854

1855
// handleReplacementTxError handles the error returned from creating the
1856
// replacement tx. It returns a BumpResult that should be notified to the
1857
// sweeper.
1858
func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
1859
        oldTx *wire.MsgTx, err error) fn.Option[BumpResult] {
3✔
1860

3✔
1861
        // If the error is fee related, we will return no error and let the fee
3✔
1862
        // bumper retry it at next block.
3✔
1863
        //
3✔
1864
        // NOTE: we can check the RBF error here and ask the fee function to
3✔
1865
        // recalculate the fee rate. However, this would defeat the purpose of
3✔
1866
        // using a deadline based fee function:
3✔
1867
        // - if the deadline is far away, there's no rush to RBF the tx.
3✔
1868
        // - if the deadline is close, we expect the fee function to give us a
3✔
1869
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
3✔
1870
        //   means the budget is not enough.
3✔
1871
        if errors.Is(err, chain.ErrInsufficientFee) ||
3✔
1872
                errors.Is(err, lnwallet.ErrMempoolFee) {
5✔
1873

2✔
1874
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
2✔
1875
                return fn.None[BumpResult]()
2✔
1876
        }
2✔
1877

1878
        // At least one of the inputs is missing, which means it has already
1879
        // been spent by another tx and confirmed. In this case we will handle
1880
        // it by returning a TxUnknownSpend bump result.
1881
        if errors.Is(err, ErrInputMissing) {
1✔
1882
                log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
×
1883
                bumpResult := t.handleMissingInputs(r)
×
1884

×
1885
                return fn.Some(*bumpResult)
×
1886
        }
×
1887

1888
        // Return a failed event to retry the sweep.
1889
        event := TxFailed
1✔
1890

1✔
1891
        // Calculate the next fee rate for the retry.
1✔
1892
        feeRate, ferr := t.calculateRetryFeeRate(r)
1✔
1893
        if ferr != nil {
1✔
1894
                // If there's an error with the fee calculation, we need to
×
1895
                // abort the sweep.
×
1896
                event = TxFatal
×
1897
        }
×
1898

1899
        // If the error is not fee related, we will return a `TxFailed` event so
1900
        // this input can be retried.
1901
        result := fn.Some(BumpResult{
1✔
1902
                Event:     event,
1✔
1903
                Tx:        oldTx,
1✔
1904
                Err:       err,
1✔
1905
                requestID: r.requestID,
1✔
1906
                FeeRate:   feeRate,
1✔
1907
        })
1✔
1908

1✔
1909
        // If the tx doesn't not have enough budget, or if the inputs amounts
1✔
1910
        // are not sufficient to cover the budget, we will return a result so
1✔
1911
        // the sweeper can handle it by re-clustering the utxos.
1✔
1912
        if errors.Is(err, ErrNotEnoughBudget) ||
1✔
1913
                errors.Is(err, ErrNotEnoughInputs) {
2✔
1914

1✔
1915
                log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
1✔
1916
                return result
1✔
1917
        }
1✔
1918

1919
        // Otherwise, an unexpected error occurred, we will log an error and let
1920
        // the sweeper retry the whole process.
1921
        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
×
1922

×
1923
        return result
×
1924
}
1925

1926
// calculateRetryFeeRate calculates a new fee rate to be used as the starting
1927
// fee rate for the next sweep attempt if the inputs are to be retried. When the
1928
// fee function is nil it will be created here, and an error is returned if the
1929
// fee func cannot be initialized.
1930
func (t *TxPublisher) calculateRetryFeeRate(
1931
        r *monitorRecord) (chainfee.SatPerKWeight, error) {
3✔
1932

3✔
1933
        // Get the fee function, which will be used to decided the next fee rate
3✔
1934
        // to use if the sweeper decides to retry sweeping this input.
3✔
1935
        feeFunc := r.feeFunction
3✔
1936

3✔
1937
        // When the record is failed before the initial broadcast is attempted,
3✔
1938
        // it will have a nil fee func. In this case, we'll create the fee func
3✔
1939
        // here.
3✔
1940
        //
3✔
1941
        // NOTE: Since the current record is failed and will be deleted, we
3✔
1942
        // don't need to update the record on this fee function. We only need
3✔
1943
        // the fee rate data so the sweeper can pick up where we left off.
3✔
1944
        if feeFunc == nil {
4✔
1945
                f, err := t.initializeFeeFunction(r.req)
1✔
1946

1✔
1947
                // TODO(yy): The only error we would receive here is when the
1✔
1948
                // pkScript is not recognized by the weightEstimator. What we
1✔
1949
                // should do instead is to check the pkScript immediately after
1✔
1950
                // receiving a sweep request so we don't need to check it again,
1✔
1951
                // which will also save us from error checking from several
1✔
1952
                // callsites.
1✔
1953
                if err != nil {
1✔
UNCOV
1954
                        log.Errorf("Failed to create fee func for record %v: "+
×
UNCOV
1955
                                "%v", r.requestID, err)
×
UNCOV
1956

×
UNCOV
1957
                        return 0, err
×
UNCOV
1958
                }
×
1959

1960
                feeFunc = f
1✔
1961
        }
1962

1963
        // Since we failed to sweep the inputs, either the sweeping tx has been
1964
        // replaced by another party's tx, or the current output values cannot
1965
        // cover the budget, we missed this block window to increase its fee
1966
        // rate. To make sure the fee rate stays in the initial line, we now ask
1967
        // the fee function to give us the next fee rate as if the sweeping tx
1968
        // were RBFed. This new fee rate will be used as the starting fee rate
1969
        // if the upper system decides to continue sweeping the rest of the
1970
        // inputs.
1971
        _, err := feeFunc.Increment()
3✔
1972
        if err != nil {
4✔
1973
                // The fee function has reached its max position - nothing we
1✔
1974
                // can do here other than letting the user increase the budget.
1✔
1975
                log.Errorf("Failed to calculate the next fee rate for "+
1✔
1976
                        "Record(%v): %v", r.requestID, err)
1✔
1977
        }
1✔
1978

1979
        return feeFunc.FeeRate(), nil
3✔
1980
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc