• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16183445888

10 Jul 2025 12:49AM UTC coverage: 57.64% (+0.03%) from 57.611%
16183445888

Pull #10060

github

web-flow
Merge f652051f3 into 0e830da9d
Pull Request #10060: sweep: fix expected spending events being missed

31 of 32 new or added lines in 2 files covered. (96.88%)

36 existing lines in 6 files now uncovered.

98567 of 171005 relevant lines covered (57.64%)

1.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.11
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8
        "time"
9

10
        "github.com/btcsuite/btcd/btcutil"
11
        "github.com/btcsuite/btcd/chaincfg/chainhash"
12
        "github.com/btcsuite/btcd/rpcclient"
13
        "github.com/btcsuite/btcd/txscript"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/btcsuite/btcwallet/chain"
16
        "github.com/lightningnetwork/lnd/chainio"
17
        "github.com/lightningnetwork/lnd/chainntnfs"
18
        "github.com/lightningnetwork/lnd/fn/v2"
19
        "github.com/lightningnetwork/lnd/input"
20
        "github.com/lightningnetwork/lnd/labels"
21
        "github.com/lightningnetwork/lnd/lntypes"
22
        "github.com/lightningnetwork/lnd/lnutils"
23
        "github.com/lightningnetwork/lnd/lnwallet"
24
        "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
25
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
26
        "github.com/lightningnetwork/lnd/tlv"
27
)
28

29
// spentNotificationTimeout defines the time to wait for a spending event from
30
// `RegisterSpendNtfn` when an immediate response is expected.
31
const spentNotificationTimeout = 1 * time.Second
32

33
var (
34
        // ErrInvalidBumpResult is returned when the bump result is invalid.
35
        ErrInvalidBumpResult = errors.New("invalid bump result")
36

37
        // ErrNotEnoughBudget is returned when the fee bumper decides the
38
        // current budget cannot cover the fee.
39
        ErrNotEnoughBudget = errors.New("not enough budget")
40

41
        // ErrLocktimeImmature is returned when sweeping an input whose
42
        // locktime is not reached.
43
        ErrLocktimeImmature = errors.New("immature input")
44

45
        // ErrTxNoOutput is returned when an output cannot be created during tx
46
        // preparation, usually due to the output being dust.
47
        ErrTxNoOutput = errors.New("tx has no output")
48

49
        // ErrUnknownSpent is returned when an unknown tx has spent an input in
50
        // the sweeping tx.
51
        ErrUnknownSpent = errors.New("unknown spend of input")
52

53
        // ErrInputMissing is returned when a given input no longer exists,
54
        // e.g., spending from an orphan tx.
55
        ErrInputMissing = errors.New("input no longer exists")
56
)
57

58
var (
59
        // dummyChangePkScript is a dummy tapscript change script that's used
60
        // when we don't need a real address, just something that can be used
61
        // for fee estimation.
62
        dummyChangePkScript = []byte{
63
                0x51, 0x20,
64
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
65
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
66
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
67
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
68
        }
69
)
70

71
// Bumper defines an interface that can be used by other subsystems for fee
72
// bumping.
73
type Bumper interface {
74
        // Broadcast is used to publish the tx created from the given inputs
75
        // specified in the request. It handles the tx creation, broadcasts it,
76
        // and monitors its confirmation status for potential fee bumping. It
77
        // returns a chan that the caller can use to receive updates about the
78
        // broadcast result and potential RBF attempts.
79
        Broadcast(req *BumpRequest) <-chan *BumpResult
80
}
81

82
// BumpEvent represents the event of a fee bumping attempt.
83
type BumpEvent uint8
84

85
const (
86
        // TxPublished is sent when the broadcast attempt is finished.
87
        TxPublished BumpEvent = iota
88

89
        // TxFailed is sent when the tx has encountered a fee-related error
90
        // during its creation or broadcast, or an internal error from the fee
91
        // bumper. In either case the inputs in this tx should be retried with
92
        // either a different grouping strategy or an increased budget.
93
        //
94
        // TODO(yy): Remove the above usage once we remove sweeping non-CPFP
95
        // anchors.
96
        TxFailed
97

98
        // TxReplaced is sent when the original tx is replaced by a new one.
99
        TxReplaced
100

101
        // TxConfirmed is sent when the tx is confirmed.
102
        TxConfirmed
103

104
        // TxUnknownSpend is sent when at least one of the inputs is spent but
105
        // not by the current sweeping tx, this can happen when,
106
        // - a remote party has replaced our sweeping tx by spending the
107
        //   input(s), e.g., via the direct preimage spend on our outgoing HTLC.
108
        // - a third party has replaced our sweeping tx, e.g., the anchor output
109
        //   after 16 blocks.
110
        // - A previous sweeping tx has confirmed but the fee bumper is not
111
        //   aware of it, e.g., a restart happens right after the sweeping tx is
112
        //   broadcast and confirmed.
113
        TxUnknownSpend
114

115
        // TxFatal is sent when the inputs in this tx cannot be retried. Txns
116
        // will end up in this state if they have encountered a non-fee related
117
        // error, which means they cannot be retried with increased budget.
118
        TxFatal
119

120
        // sentinelEvent is used to check if an event is unknown.
121
        sentinelEvent
122
)
123

124
// String returns a human-readable string for the event.
125
func (e BumpEvent) String() string {
3✔
126
        switch e {
3✔
127
        case TxPublished:
3✔
128
                return "Published"
3✔
129
        case TxFailed:
3✔
130
                return "Failed"
3✔
131
        case TxReplaced:
3✔
132
                return "Replaced"
3✔
133
        case TxConfirmed:
3✔
134
                return "Confirmed"
3✔
135
        case TxUnknownSpend:
3✔
136
                return "UnknownSpend"
3✔
137
        case TxFatal:
2✔
138
                return "Fatal"
2✔
139
        default:
×
140
                return "Unknown"
×
141
        }
142
}
143

144
// Unknown returns true if the event is unknown.
145
func (e BumpEvent) Unknown() bool {
3✔
146
        return e >= sentinelEvent
3✔
147
}
3✔
148

149
// BumpRequest is used by the caller to give the Bumper the necessary info to
150
// create and manage potential fee bumps for a set of inputs.
151
type BumpRequest struct {
152
        // Budget gives the total amount that can be used as fees by these
153
        // inputs.
154
        Budget btcutil.Amount
155

156
        // Inputs is the set of inputs to sweep.
157
        Inputs []input.Input
158

159
        // DeadlineHeight is the block height at which the tx should be
160
        // confirmed.
161
        DeadlineHeight int32
162

163
        // DeliveryAddress is the script to send the change output to.
164
        DeliveryAddress lnwallet.AddrWithKey
165

166
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
167
        MaxFeeRate chainfee.SatPerKWeight
168

169
        // StartingFeeRate is an optional parameter that can be used to specify
170
        // the initial fee rate to use for the fee function.
171
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
172

173
        // ExtraTxOut tracks if this bump request has an optional set of extra
174
        // outputs to add to the transaction.
175
        ExtraTxOut fn.Option[SweepOutput]
176

177
        // Immediate is used to specify that the tx should be broadcast
178
        // immediately.
179
        Immediate bool
180
}
181

182
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
183
// request. It calculates the feerate using the supplied budget and the weight,
184
// compares it with the specified MaxFeeRate, and returns the smaller of the
185
// two.
186
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
3✔
187
        // We'll want to know if we have any blobs, as we need to factor this
3✔
188
        // into the max fee rate for this bump request.
3✔
189
        hasBlobs := fn.Any(r.Inputs, func(i input.Input) bool {
6✔
190
                return fn.MapOptionZ(
3✔
191
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
6✔
192
                                return len(b) > 0
3✔
193
                        },
3✔
194
                )
195
        })
196

197
        sweepAddrs := [][]byte{
3✔
198
                r.DeliveryAddress.DeliveryAddress,
3✔
199
        }
3✔
200

3✔
201
        // If we have blobs, then we'll add an extra sweep addr for the size
3✔
202
        // estimate below. We know that these blobs will also always be based on
3✔
203
        // p2tr addrs.
3✔
204
        if hasBlobs {
3✔
205
                // We need to pass in a real address, so we'll use a dummy
×
206
                // tapscript change script that's used elsewhere for tests.
×
207
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
208
        }
×
209

210
        // Get the size of the sweep tx, which will be used to calculate the
211
        // budget fee rate.
212
        size, err := calcSweepTxWeight(
3✔
213
                r.Inputs, sweepAddrs,
3✔
214
        )
3✔
215
        if err != nil {
3✔
216
                return 0, err
×
217
        }
×
218

219
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
220
        // This is needed as, when the input has a large value and the user
221
        // sets the budget to be proportional to the input value, the fee rate
222
        // can be very high and we need to make sure it doesn't exceed the max
223
        // fee rate.
224
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
3✔
225
        if maxFeeRateAllowed > r.MaxFeeRate {
6✔
226
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
3✔
227
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
3✔
228
                        r.MaxFeeRate, size)
3✔
229

3✔
230
                return r.MaxFeeRate, nil
3✔
231
        }
3✔
232

233
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
3✔
234
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
3✔
235

3✔
236
        return maxFeeRateAllowed, nil
3✔
237
}
238

239
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
240
// sweeping tx always has a single output(change).
241
func calcSweepTxWeight(inputs []input.Input,
242
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
3✔
243

3✔
244
        // Use a const fee rate as we only use the weight estimator to
3✔
245
        // calculate the size.
3✔
246
        const feeRate = 1
3✔
247

3✔
248
        // Initialize the tx weight estimator with,
3✔
249
        // - nil outputs as we only have one single change output.
3✔
250
        // - const fee rate as we don't care about the fees here.
3✔
251
        // - 0 maxfeerate as we don't care about fees here.
3✔
252
        //
3✔
253
        // TODO(yy): we should refactor the weight estimator to not require a
3✔
254
        // fee rate and max fee rate and make it a pure tx weight calculator.
3✔
255
        _, estimator, err := getWeightEstimate(
3✔
256
                inputs, nil, feeRate, 0, outputPkScript,
3✔
257
        )
3✔
258
        if err != nil {
3✔
259
                return 0, err
×
260
        }
×
261

262
        return estimator.weight(), nil
3✔
263
}
264

265
// BumpResult is used by the Bumper to send updates about the tx being
266
// broadcast.
267
type BumpResult struct {
268
        // Event is the type of event that the result is for.
269
        Event BumpEvent
270

271
        // Tx is the tx being broadcast.
272
        Tx *wire.MsgTx
273

274
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
275
        ReplacedTx *wire.MsgTx
276

277
        // FeeRate is the fee rate used for the new tx.
278
        FeeRate chainfee.SatPerKWeight
279

280
        // Fee is the fee paid by the new tx.
281
        Fee btcutil.Amount
282

283
        // Err is the error that occurred during the broadcast.
284
        Err error
285

286
        // SpentInputs are the inputs spent by another tx which caused the
287
        // current tx to be failed.
288
        SpentInputs map[wire.OutPoint]*wire.MsgTx
289

290
        // requestID is the ID of the request that created this record.
291
        requestID uint64
292
}
293

294
// String returns a human-readable string for the result.
295
func (b *BumpResult) String() string {
3✔
296
        desc := fmt.Sprintf("Event=%v", b.Event)
3✔
297
        if b.Tx != nil {
6✔
298
                desc += fmt.Sprintf(", Tx=%v", b.Tx.TxHash())
3✔
299
        }
3✔
300

301
        return fmt.Sprintf("[%s]", desc)
3✔
302
}
303

304
// Validate validates the BumpResult so it's safe to use.
305
func (b *BumpResult) Validate() error {
3✔
306
        isFailureEvent := b.Event == TxFailed || b.Event == TxFatal ||
3✔
307
                b.Event == TxUnknownSpend
3✔
308

3✔
309
        // Every result must have a tx except the fatal or failed case.
3✔
310
        if b.Tx == nil && !isFailureEvent {
3✔
311
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
×
312
        }
×
313

314
        // Every result must have a known event.
315
        if b.Event.Unknown() {
3✔
316
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
×
317
        }
×
318

319
        // If it's a replacing event, it must have a replaced tx.
320
        if b.Event == TxReplaced && b.ReplacedTx == nil {
3✔
321
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
×
322
        }
×
323

324
        // If it's a failed or fatal event, it must have an error.
325
        if isFailureEvent && b.Err == nil {
3✔
326
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
×
327
        }
×
328

329
        // If it's a confirmed event, it must have a fee rate and fee.
330
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
3✔
331
                return fmt.Errorf("%w: missing fee rate or fee",
×
332
                        ErrInvalidBumpResult)
×
333
        }
×
334

335
        return nil
3✔
336
}
337

338
// TxPublisherConfig is the config used to create a new TxPublisher.
339
type TxPublisherConfig struct {
340
        // Signer is used to create the tx signature.
341
        Signer input.Signer
342

343
        // Wallet is used primarily to publish the tx.
344
        Wallet Wallet
345

346
        // Estimator is used to estimate the fee rate for the new tx based on
347
        // its deadline conf target.
348
        Estimator chainfee.Estimator
349

350
        // Notifier is used to monitor the confirmation status of the tx.
351
        Notifier chainntnfs.ChainNotifier
352

353
        // ChainIO represents an abstraction over a source that can query the
354
        // blockchain.
355
        ChainIO lnwallet.BlockChainIO
356

357
        // AuxSweeper is an optional interface that can be used to modify the
358
        // way sweep transaction are generated.
359
        AuxSweeper fn.Option[AuxSweeper]
360
}
361

362
// TxPublisher is an implementation of the Bumper interface. It utilizes the
363
// `testmempoolaccept` RPC to bump the fee of txns it created based on
364
// different fee function selected or configed by the caller. Its purpose is to
365
// take a list of inputs specified, and create a tx that spends them to a
366
// specified output. It will then monitor the confirmation status of the tx,
367
// and if it's not confirmed within a certain time frame, it will attempt to
368
// bump the fee of the tx by creating a new tx that spends the same inputs to
369
// the same output, but with a higher fee rate. It will continue to do this
370
// until the tx is confirmed or the fee rate reaches the maximum fee rate
371
// specified by the caller.
372
type TxPublisher struct {
373
        started atomic.Bool
374
        stopped atomic.Bool
375

376
        // Embed the blockbeat consumer struct to get access to the method
377
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
378
        chainio.BeatConsumer
379

380
        wg sync.WaitGroup
381

382
        // cfg specifies the configuration of the TxPublisher.
383
        cfg *TxPublisherConfig
384

385
        // currentHeight is the current block height.
386
        currentHeight atomic.Int32
387

388
        // records is a map keyed by the requestCounter and the value is the tx
389
        // being monitored.
390
        records lnutils.SyncMap[uint64, *monitorRecord]
391

392
        // requestCounter is a monotonically increasing counter used to keep
393
        // track of how many requests have been made.
394
        requestCounter atomic.Uint64
395

396
        // subscriberChans is a map keyed by the requestCounter, each item is
397
        // the chan that the publisher sends the fee bump result to.
398
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
399

400
        // quit is used to signal the publisher to stop.
401
        quit chan struct{}
402
}
403

404
// Compile-time constraint to ensure TxPublisher implements Bumper.
405
var _ Bumper = (*TxPublisher)(nil)
406

407
// Compile-time check for the chainio.Consumer interface.
408
var _ chainio.Consumer = (*TxPublisher)(nil)
409

410
// NewTxPublisher creates a new TxPublisher.
411
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
3✔
412
        tp := &TxPublisher{
3✔
413
                cfg:             &cfg,
3✔
414
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
3✔
415
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
3✔
416
                quit:            make(chan struct{}),
3✔
417
        }
3✔
418

3✔
419
        // Mount the block consumer.
3✔
420
        tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
3✔
421

3✔
422
        return tp
3✔
423
}
3✔
424

425
// isNeutrinoBackend checks if the wallet backend is neutrino.
426
func (t *TxPublisher) isNeutrinoBackend() bool {
×
427
        return t.cfg.Wallet.BackEnd() == "neutrino"
×
428
}
×
429

430
// Broadcast is used to publish the tx created from the given inputs. It will
431
// register the broadcast request and return a chan to the caller to subscribe
432
// the broadcast result. The initial broadcast is guaranteed to be
433
// RBF-compliant unless the budget specified cannot cover the fee.
434
//
435
// NOTE: part of the Bumper interface.
436
func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult {
3✔
437
        log.Tracef("Received broadcast request: %s",
3✔
438
                lnutils.SpewLogClosure(req))
3✔
439

3✔
440
        // Store the request.
3✔
441
        record := t.storeInitialRecord(req)
3✔
442

3✔
443
        // Create a chan to send the result to the caller.
3✔
444
        subscriber := make(chan *BumpResult, 1)
3✔
445
        t.subscriberChans.Store(record.requestID, subscriber)
3✔
446

3✔
447
        // Publish the tx immediately if specified.
3✔
448
        if req.Immediate {
6✔
449
                t.handleInitialBroadcast(record)
3✔
450
        }
3✔
451

452
        return subscriber
3✔
453
}
454

455
// storeInitialRecord initializes a monitor record and saves it in the map.
456
func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord {
3✔
457
        // Increase the request counter.
3✔
458
        //
3✔
459
        // NOTE: this is the only place where we increase the counter.
3✔
460
        requestID := t.requestCounter.Add(1)
3✔
461

3✔
462
        // Register the record.
3✔
463
        record := &monitorRecord{
3✔
464
                requestID: requestID,
3✔
465
                req:       req,
3✔
466
        }
3✔
467
        t.records.Store(requestID, record)
3✔
468

3✔
469
        return record
3✔
470
}
3✔
471

472
// updateRecord updates the given record's tx and fee, and saves it in the
473
// records map.
474
func (t *TxPublisher) updateRecord(r *monitorRecord,
475
        sweepCtx *sweepTxCtx) *monitorRecord {
3✔
476

3✔
477
        r.tx = sweepCtx.tx
3✔
478
        r.fee = sweepCtx.fee
3✔
479
        r.outpointToTxIndex = sweepCtx.outpointToTxIndex
3✔
480

3✔
481
        // Register the record.
3✔
482
        t.records.Store(r.requestID, r)
3✔
483

3✔
484
        return r
3✔
485
}
3✔
486

487
// NOTE: part of the `chainio.Consumer` interface.
488
func (t *TxPublisher) Name() string {
3✔
489
        return "TxPublisher"
3✔
490
}
3✔
491

492
// initializeTx initializes a fee function and creates an RBF-compliant tx. If
493
// succeeded, the initial tx is stored in the records map.
494
func (t *TxPublisher) initializeTx(r *monitorRecord) (*monitorRecord, error) {
3✔
495
        // Create a fee bumping algorithm to be used for future RBF.
3✔
496
        feeAlgo, err := t.initializeFeeFunction(r.req)
3✔
497
        if err != nil {
6✔
498
                return nil, fmt.Errorf("init fee function: %w", err)
3✔
499
        }
3✔
500

501
        // Attach the newly created fee function.
502
        //
503
        // TODO(yy): current we'd initialize a monitorRecord before creating the
504
        // fee function, while we could instead create the fee function first
505
        // then save it to the record. To make this happen we need to change the
506
        // conf target calculation below since we would be initializing the fee
507
        // function one block before.
508
        r.feeFunction = feeAlgo
3✔
509

3✔
510
        // Create the initial tx to be broadcasted. This tx is guaranteed to
3✔
511
        // comply with the RBF restrictions.
3✔
512
        record, err := t.createRBFCompliantTx(r)
3✔
513
        if err != nil {
6✔
514
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
3✔
515
        }
3✔
516

517
        return record, nil
3✔
518
}
519

520
// initializeFeeFunction initializes a fee function to be used for this request
521
// for future fee bumping.
522
func (t *TxPublisher) initializeFeeFunction(
523
        req *BumpRequest) (FeeFunction, error) {
3✔
524

3✔
525
        // Get the max allowed feerate.
3✔
526
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
3✔
527
        if err != nil {
3✔
528
                return nil, err
×
529
        }
×
530

531
        // Get the initial conf target.
532
        confTarget := calcCurrentConfTarget(
3✔
533
                t.currentHeight.Load(), req.DeadlineHeight,
3✔
534
        )
3✔
535

3✔
536
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
3✔
537
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
3✔
538
                maxFeeRateAllowed)
3✔
539

3✔
540
        // Initialize the fee function and return it.
3✔
541
        //
3✔
542
        // TODO(yy): return based on differet req.Strategy?
3✔
543
        return NewLinearFeeFunction(
3✔
544
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
3✔
545
                req.StartingFeeRate,
3✔
546
        )
3✔
547
}
548

549
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
550
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
551
// and redo the process until the tx is valid, or return an error when non-RBF
552
// related errors occur or the budget has been used up.
553
func (t *TxPublisher) createRBFCompliantTx(
554
        r *monitorRecord) (*monitorRecord, error) {
3✔
555

3✔
556
        f := r.feeFunction
3✔
557

3✔
558
        for {
6✔
559
                // Create a new tx with the given fee rate and check its
3✔
560
                // mempool acceptance.
3✔
561
                sweepCtx, err := t.createAndCheckTx(r)
3✔
562

3✔
563
                switch {
3✔
564
                case err == nil:
3✔
565
                        // The tx is valid, store it.
3✔
566
                        record := t.updateRecord(r, sweepCtx)
3✔
567

3✔
568
                        log.Infof("Created initial sweep tx=%v for %v inputs: "+
3✔
569
                                "feerate=%v, fee=%v, inputs:\n%v",
3✔
570
                                sweepCtx.tx.TxHash(), len(r.req.Inputs),
3✔
571
                                f.FeeRate(), sweepCtx.fee,
3✔
572
                                inputTypeSummary(r.req.Inputs))
3✔
573

3✔
574
                        return record, nil
3✔
575

576
                // If the error indicates the fees paid is not enough, we will
577
                // ask the fee function to increase the fee rate and retry.
578
                case errors.Is(err, lnwallet.ErrMempoolFee):
×
579
                        // We should at least start with a feerate above the
×
580
                        // mempool min feerate, so if we get this error, it
×
581
                        // means something is wrong earlier in the pipeline.
×
582
                        log.Errorf("Current fee=%v, feerate=%v, %v",
×
583
                                sweepCtx.fee, f.FeeRate(), err)
×
584

×
585
                        fallthrough
×
586

587
                // We are not paying enough fees so we increase it.
588
                case errors.Is(err, chain.ErrInsufficientFee):
2✔
589
                        increased := false
2✔
590

2✔
591
                        // Keep calling the fee function until the fee rate is
2✔
592
                        // increased or maxed out.
2✔
593
                        for !increased {
4✔
594
                                log.Debugf("Increasing fee for next round, "+
2✔
595
                                        "current fee=%v, feerate=%v",
2✔
596
                                        sweepCtx.fee, f.FeeRate())
2✔
597

2✔
598
                                // If the fee function tells us that we have
2✔
599
                                // used up the budget, we will return an error
2✔
600
                                // indicating this tx cannot be made. The
2✔
601
                                // sweeper should handle this error and try to
2✔
602
                                // cluster these inputs differently.
2✔
603
                                increased, err = f.Increment()
2✔
604
                                if err != nil {
4✔
605
                                        return nil, err
2✔
606
                                }
2✔
607
                        }
608

609
                // TODO(yy): suppose there's only one bad input, we can do a
610
                // binary search to find out which input is causing this error
611
                // by recreating a tx using half of the inputs and check its
612
                // mempool acceptance.
613
                default:
3✔
614
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
3✔
615
                        return nil, err
3✔
616
                }
617
        }
618
}
619

620
// createAndCheckTx creates a tx based on the given inputs, change output
621
// script, and the fee rate. In addition, it validates the tx's mempool
622
// acceptance before returning a tx that can be published directly, along with
623
// its fee.
624
func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) {
3✔
625
        req := r.req
3✔
626
        f := r.feeFunction
3✔
627

3✔
628
        // Create the sweep tx with max fee rate of 0 as the fee function
3✔
629
        // guarantees the fee rate used here won't exceed the max fee rate.
3✔
630
        sweepCtx, err := t.createSweepTx(
3✔
631
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
3✔
632
        )
3✔
633
        if err != nil {
6✔
634
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
3✔
635
        }
3✔
636

637
        // Sanity check the budget still covers the fee.
638
        if sweepCtx.fee > req.Budget {
3✔
639
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
×
640
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
×
641
        }
×
642

643
        // If we had an extra txOut, then we'll update the result to include
644
        // it.
645
        req.ExtraTxOut = sweepCtx.extraTxOut
3✔
646

3✔
647
        // Validate the tx's mempool acceptance.
3✔
648
        err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
3✔
649

3✔
650
        // Exit early if the tx is valid.
3✔
651
        if err == nil {
5✔
652
                return sweepCtx, nil
2✔
653
        }
2✔
654

655
        // Print an error log if the chain backend doesn't support the mempool
656
        // acceptance test RPC.
657
        if errors.Is(err, rpcclient.ErrBackendVersion) {
3✔
658
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
659
                        "consider upgrading it to a newer version")
×
660
                return sweepCtx, nil
×
661
        }
×
662

663
        // We are running on a backend that doesn't implement the RPC
664
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
665
        if errors.Is(err, chain.ErrUnimplemented) {
4✔
666
                log.Debug("Skipped testmempoolaccept due to not implemented")
1✔
667
                return sweepCtx, nil
1✔
668
        }
1✔
669

670
        // If the inputs are spent by another tx, we will exit with the latest
671
        // sweepCtx and an error.
672
        if errors.Is(err, chain.ErrMissingInputs) {
4✔
673
                log.Debugf("Tx %v missing inputs, it's likely the input has "+
2✔
674
                        "been spent by others", sweepCtx.tx.TxHash())
2✔
675

2✔
676
                // Make sure to update the record with the latest attempt.
2✔
677
                t.updateRecord(r, sweepCtx)
2✔
678

2✔
679
                return sweepCtx, ErrInputMissing
2✔
680
        }
2✔
681

682
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
2✔
683
                sweepCtx.tx.TxHash(), err)
2✔
684
}
685

686
// handleMissingInputs handles the case when the chain backend reports back a
687
// missing inputs error, which could happen when one of the input has been spent
688
// in another tx, or the input is referencing an orphan. When the input is
689
// spent, it will be handled via the TxUnknownSpend flow by creating a
690
// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned.
691
func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult {
2✔
692
        // Get the spending txns.
2✔
693
        spends := t.getSpentInputs(r)
2✔
694

2✔
695
        // Attach the spending txns.
2✔
696
        r.spentInputs = spends
2✔
697

2✔
698
        // If there are no spending txns found and the input is missing, the
2✔
699
        // input is referencing an orphan tx that's no longer valid, e.g., the
2✔
700
        // spending the anchor output from the remote commitment after the local
2✔
701
        // commitment has confirmed. In this case we will mark it as fatal and
2✔
702
        // exit.
2✔
703
        if len(spends) == 0 {
4✔
704
                log.Warnf("Failing record=%v: found orphan inputs: %v\n",
2✔
705
                        r.requestID, inputTypeSummary(r.req.Inputs))
2✔
706

2✔
707
                // Create a result that will be sent to the resultChan which is
2✔
708
                // listened by the caller.
2✔
709
                result := &BumpResult{
2✔
710
                        Event:     TxFatal,
2✔
711
                        Tx:        r.tx,
2✔
712
                        requestID: r.requestID,
2✔
713
                        Err:       ErrInputMissing,
2✔
714
                }
2✔
715

2✔
716
                return result
2✔
717
        }
2✔
718

719
        // Check that the spending tx matches the sweeping tx - given that the
720
        // current sweeping tx has been failed due to missing inputs, the
721
        // spending tx must be a different tx, thus it should NOT be matched. We
722
        // perform a sanity check here to catch the unexpected state.
UNCOV
723
        if !t.isUnknownSpent(r, spends) {
×
724
                log.Errorf("Sweeping tx %v has missing inputs, yet the "+
×
725
                        "spending tx is the sweeping tx itself: %v",
×
726
                        r.tx.TxHash(), r.spentInputs)
×
727
        }
×
728

UNCOV
729
        return t.createUnknownSpentBumpResult(r)
×
730
}
731

732
// broadcast takes a monitored tx and publishes it to the network. Prior to the
733
// broadcast, it will subscribe the tx's confirmation notification and attach
734
// the event channel to the record. Any broadcast-related errors will not be
735
// returned here, instead, they will be put inside the `BumpResult` and
736
// returned to the caller.
737
func (t *TxPublisher) broadcast(record *monitorRecord) (*BumpResult, error) {
3✔
738
        txid := record.tx.TxHash()
3✔
739

3✔
740
        tx := record.tx
3✔
741
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
3✔
742
                txid, len(tx.TxIn), t.currentHeight.Load())
3✔
743

3✔
744
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
3✔
745
        // present of this new broadcast attempt.
3✔
746
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
3✔
747
                return aux.NotifyBroadcast(
×
748
                        record.req, tx, record.fee, record.outpointToTxIndex,
×
749
                )
×
750
        })
×
751
        if err != nil {
3✔
752
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
753
        }
×
754

755
        // Set the event, and change it to TxFailed if the wallet fails to
756
        // publish it.
757
        event := TxPublished
3✔
758

3✔
759
        // Publish the sweeping tx with customized label. If the publish fails,
3✔
760
        // this error will be saved in the `BumpResult` and it will be removed
3✔
761
        // from being monitored.
3✔
762
        err = t.cfg.Wallet.PublishTransaction(
3✔
763
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
3✔
764
        )
3✔
765
        if err != nil {
5✔
766
                // NOTE: we decide to attach this error to the result instead
2✔
767
                // of returning it here because by the time the tx reaches
2✔
768
                // here, it should have passed the mempool acceptance check. If
2✔
769
                // it still fails to be broadcast, it's likely a non-RBF
2✔
770
                // related error happened. So we send this error back to the
2✔
771
                // caller so that it can handle it properly.
2✔
772
                //
2✔
773
                // TODO(yy): find out which input is causing the failure.
2✔
774
                log.Errorf("Failed to publish tx %v: %v", txid, err)
2✔
775
                event = TxFailed
2✔
776
        }
2✔
777

778
        result := &BumpResult{
3✔
779
                Event:     event,
3✔
780
                Tx:        record.tx,
3✔
781
                Fee:       record.fee,
3✔
782
                FeeRate:   record.feeFunction.FeeRate(),
3✔
783
                Err:       err,
3✔
784
                requestID: record.requestID,
3✔
785
        }
3✔
786

3✔
787
        return result, nil
3✔
788
}
789

790
// notifyResult sends the result to the resultChan specified by the requestID.
791
// This channel is expected to be read by the caller.
792
func (t *TxPublisher) notifyResult(result *BumpResult) {
3✔
793
        id := result.requestID
3✔
794
        subscriber, ok := t.subscriberChans.Load(id)
3✔
795
        if !ok {
4✔
796
                log.Errorf("Result chan for id=%v not found", id)
1✔
797
                return
1✔
798
        }
1✔
799

800
        log.Debugf("Sending result %v for requestID=%v", result, id)
3✔
801

3✔
802
        select {
3✔
803
        // Send the result to the subscriber.
804
        //
805
        // TODO(yy): Add timeout in case it's blocking?
806
        case subscriber <- result:
3✔
807
        case <-t.quit:
×
808
                log.Debug("Fee bumper stopped")
×
809
        }
810
}
811

812
// removeResult removes the tracking of the result if the result contains a
813
// non-nil error, or the tx is confirmed, the record will be removed from the
814
// maps.
815
func (t *TxPublisher) removeResult(result *BumpResult) {
3✔
816
        id := result.requestID
3✔
817

3✔
818
        var txid chainhash.Hash
3✔
819
        if result.Tx != nil {
6✔
820
                txid = result.Tx.TxHash()
3✔
821
        }
3✔
822

823
        // Remove the record from the maps if there's an error or the tx is
824
        // confirmed. When there's an error, it means this tx has failed its
825
        // broadcast and cannot be retried. There are two cases it may fail,
826
        // - when the budget cannot cover the increased fee calculated by the
827
        //   fee function, hence the budget is used up.
828
        // - when a non-fee related error returned from PublishTransaction.
829
        switch result.Event {
3✔
830
        case TxFailed:
3✔
831
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
3✔
832
                        id, txid, result.Err)
3✔
833

834
        case TxConfirmed:
3✔
835
                // Remove the record if the tx is confirmed.
3✔
836
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
3✔
837
                        txid)
3✔
838

839
        case TxFatal:
2✔
840
                // Remove the record if there's an error.
2✔
841
                log.Debugf("Removing monitor record=%v due to fatal err: %v",
2✔
842
                        id, result.Err)
2✔
843

844
        case TxUnknownSpend:
3✔
845
                // Remove the record if there's an unknown spend.
3✔
846
                log.Debugf("Removing monitor record=%v due unknown spent: "+
3✔
847
                        "%v", id, result.Err)
3✔
848

849
        // Do nothing if it's neither failed or confirmed.
850
        default:
3✔
851
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
3✔
852
                        result.Event)
3✔
853

3✔
854
                return
3✔
855
        }
856

857
        t.records.Delete(id)
3✔
858
        t.subscriberChans.Delete(id)
3✔
859
}
860

861
// handleResult handles the result of a tx broadcast. It will notify the
862
// subscriber and remove the record if the tx is confirmed or failed to be
863
// broadcast.
864
func (t *TxPublisher) handleResult(result *BumpResult) {
3✔
865
        // Notify the subscriber.
3✔
866
        t.notifyResult(result)
3✔
867

3✔
868
        // Remove the record if it's failed or confirmed.
3✔
869
        t.removeResult(result)
3✔
870
}
3✔
871

872
// monitorRecord is used to keep track of the tx being monitored by the
873
// publisher internally.
874
type monitorRecord struct {
875
        // requestID is the ID of the request that created this record.
876
        requestID uint64
877

878
        // tx is the tx being monitored.
879
        tx *wire.MsgTx
880

881
        // req is the original request.
882
        req *BumpRequest
883

884
        // feeFunction is the fee bumping algorithm used by the publisher.
885
        feeFunction FeeFunction
886

887
        // fee is the fee paid by the tx.
888
        fee btcutil.Amount
889

890
        // outpointToTxIndex is a map of outpoint to tx index.
891
        outpointToTxIndex map[wire.OutPoint]int
892

893
        // spentInputs are the inputs spent by another tx which caused the
894
        // current tx failed.
895
        spentInputs map[wire.OutPoint]*wire.MsgTx
896
}
897

898
// Start starts the publisher by subscribing to block epoch updates and kicking
899
// off the monitor loop.
900
func (t *TxPublisher) Start(beat chainio.Blockbeat) error {
3✔
901
        log.Info("TxPublisher starting...")
3✔
902

3✔
903
        if t.started.Swap(true) {
3✔
904
                return fmt.Errorf("TxPublisher started more than once")
×
905
        }
×
906

907
        // Set the current height.
908
        t.currentHeight.Store(beat.Height())
3✔
909

3✔
910
        t.wg.Add(1)
3✔
911
        go t.monitor()
3✔
912

3✔
913
        log.Debugf("TxPublisher started")
3✔
914

3✔
915
        return nil
3✔
916
}
917

918
// Stop stops the publisher and waits for the monitor loop to exit.
919
func (t *TxPublisher) Stop() error {
3✔
920
        log.Info("TxPublisher stopping...")
3✔
921

3✔
922
        if t.stopped.Swap(true) {
3✔
923
                return fmt.Errorf("TxPublisher stopped more than once")
×
924
        }
×
925

926
        close(t.quit)
3✔
927
        t.wg.Wait()
3✔
928

3✔
929
        log.Debug("TxPublisher stopped")
3✔
930

3✔
931
        return nil
3✔
932
}
933

934
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
935
// it will examine all the txns being monitored, and check if any of them needs
936
// to be bumped. If so, it will attempt to bump the fee of the tx.
937
//
938
// NOTE: Must be run as a goroutine.
939
func (t *TxPublisher) monitor() {
3✔
940
        defer t.wg.Done()
3✔
941

3✔
942
        for {
6✔
943
                select {
3✔
944
                case beat := <-t.BlockbeatChan:
3✔
945
                        height := beat.Height()
3✔
946
                        log.Debugf("TxPublisher received new block: %v", height)
3✔
947

3✔
948
                        // Update the best known height for the publisher.
3✔
949
                        t.currentHeight.Store(height)
3✔
950

3✔
951
                        // Check all monitored txns to see if any of them needs
3✔
952
                        // to be bumped.
3✔
953
                        t.processRecords()
3✔
954

3✔
955
                        // Notify we've processed the block.
3✔
956
                        t.NotifyBlockProcessed(beat, nil)
3✔
957

958
                case <-t.quit:
3✔
959
                        log.Debug("Fee bumper stopped, exit monitor")
3✔
960
                        return
3✔
961
                }
962
        }
963
}
964

965
// processRecords checks all the txns being monitored, and checks if any of
966
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
967
func (t *TxPublisher) processRecords() {
3✔
968
        // confirmedRecords stores a map of the records which have been
3✔
969
        // confirmed.
3✔
970
        confirmedRecords := make(map[uint64]*monitorRecord)
3✔
971

3✔
972
        // feeBumpRecords stores a map of records which need to be bumped.
3✔
973
        feeBumpRecords := make(map[uint64]*monitorRecord)
3✔
974

3✔
975
        // failedRecords stores a map of records which has inputs being spent
3✔
976
        // by a third party.
3✔
977
        failedRecords := make(map[uint64]*monitorRecord)
3✔
978

3✔
979
        // initialRecords stores a map of records which are being created and
3✔
980
        // published for the first time.
3✔
981
        initialRecords := make(map[uint64]*monitorRecord)
3✔
982

3✔
983
        // visitor is a helper closure that visits each record and divides them
3✔
984
        // into two groups.
3✔
985
        visitor := func(requestID uint64, r *monitorRecord) error {
6✔
986
                log.Tracef("Checking monitor recordID=%v", requestID)
3✔
987

3✔
988
                // Check whether the inputs have already been spent.
3✔
989
                spends := t.getSpentInputs(r)
3✔
990

3✔
991
                // If the any of the inputs has been spent, the record will be
3✔
992
                // marked as failed or confirmed.
3✔
993
                if len(spends) != 0 {
6✔
994
                        // Attach the spending txns.
3✔
995
                        r.spentInputs = spends
3✔
996

3✔
997
                        // When tx is nil, it means we haven't tried the initial
3✔
998
                        // broadcast yet the input is already spent. This could
3✔
999
                        // happen when the node shuts down, a previous sweeping
3✔
1000
                        // tx confirmed, then the node comes back online and
3✔
1001
                        // reoffers the inputs. Another case is the remote node
3✔
1002
                        // spends the input quickly before we even attempt the
3✔
1003
                        // sweep. In either case we will fail the record and let
3✔
1004
                        // the sweeper handles it.
3✔
1005
                        if r.tx == nil {
5✔
1006
                                failedRecords[requestID] = r
2✔
1007
                                return nil
2✔
1008
                        }
2✔
1009

1010
                        // Check whether the inputs has been spent by a unknown
1011
                        // tx.
1012
                        if t.isUnknownSpent(r, spends) {
6✔
1013
                                failedRecords[requestID] = r
3✔
1014

3✔
1015
                                // Move to the next record.
3✔
1016
                                return nil
3✔
1017
                        }
3✔
1018

1019
                        // The tx is ours, we can move it to the confirmed queue
1020
                        // and stop monitoring it.
1021
                        confirmedRecords[requestID] = r
3✔
1022

3✔
1023
                        // Move to the next record.
3✔
1024
                        return nil
3✔
1025
                }
1026

1027
                // This is the first time we see this record, so we put it in
1028
                // the initial queue.
1029
                if r.tx == nil {
6✔
1030
                        initialRecords[requestID] = r
3✔
1031

3✔
1032
                        return nil
3✔
1033
                }
3✔
1034

1035
                // We can only get here when the inputs are not spent and a
1036
                // previous sweeping tx has been attempted. In this case we will
1037
                // perform an RBF on it in the current block.
1038
                feeBumpRecords[requestID] = r
3✔
1039

3✔
1040
                // Return nil to move to the next record.
3✔
1041
                return nil
3✔
1042
        }
1043

1044
        // Iterate through all the records and divide them into four groups.
1045
        t.records.ForEach(visitor)
3✔
1046

3✔
1047
        // Handle the initial broadcast.
3✔
1048
        for _, r := range initialRecords {
6✔
1049
                t.handleInitialBroadcast(r)
3✔
1050
        }
3✔
1051

1052
        // For records that are confirmed, we'll notify the caller about this
1053
        // result.
1054
        for _, r := range confirmedRecords {
6✔
1055
                t.wg.Add(1)
3✔
1056
                go t.handleTxConfirmed(r)
3✔
1057
        }
3✔
1058

1059
        // Get the current height to be used in the following goroutines.
1060
        currentHeight := t.currentHeight.Load()
3✔
1061

3✔
1062
        // For records that are not confirmed, we perform a fee bump if needed.
3✔
1063
        for _, r := range feeBumpRecords {
6✔
1064
                t.wg.Add(1)
3✔
1065
                go t.handleFeeBumpTx(r, currentHeight)
3✔
1066
        }
3✔
1067

1068
        // For records that are failed, we'll notify the caller about this
1069
        // result.
1070
        for _, r := range failedRecords {
6✔
1071
                t.wg.Add(1)
3✔
1072
                go t.handleUnknownSpent(r)
3✔
1073
        }
3✔
1074
}
1075

1076
// handleTxConfirmed is called when a monitored tx is confirmed. It will
1077
// notify the subscriber then remove the record from the maps .
1078
//
1079
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1080
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
3✔
1081
        defer t.wg.Done()
3✔
1082

3✔
1083
        log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash())
3✔
1084

3✔
1085
        // Create a result that will be sent to the resultChan which is
3✔
1086
        // listened by the caller.
3✔
1087
        result := &BumpResult{
3✔
1088
                Event:     TxConfirmed,
3✔
1089
                Tx:        r.tx,
3✔
1090
                requestID: r.requestID,
3✔
1091
                Fee:       r.fee,
3✔
1092
                FeeRate:   r.feeFunction.FeeRate(),
3✔
1093
        }
3✔
1094

3✔
1095
        // Notify that this tx is confirmed and remove the record from the map.
3✔
1096
        t.handleResult(result)
3✔
1097
}
3✔
1098

1099
// handleInitialTxError takes the error from `initializeTx` and decides the
1100
// bump event. It will construct a BumpResult and handles it.
1101
func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
3✔
1102
        // Create a bump result to be sent to the sweeper.
3✔
1103
        result := &BumpResult{
3✔
1104
                Err:       err,
3✔
1105
                requestID: r.requestID,
3✔
1106
        }
3✔
1107

3✔
1108
        // We now decide what type of event to send.
3✔
1109
        switch {
3✔
1110
        // When the error is due to a dust output, we'll send a TxFailed so
1111
        // these inputs can be retried with a different group in the next
1112
        // block.
1113
        case errors.Is(err, ErrTxNoOutput):
3✔
1114
                result.Event = TxFailed
3✔
1115

1116
        // When the error is due to zero fee rate delta, we'll send a TxFailed
1117
        // so these inputs can be retried in the next block.
1118
        case errors.Is(err, ErrZeroFeeRateDelta):
3✔
1119
                result.Event = TxFailed
3✔
1120

1121
        // When the error is due to budget being used up, we'll send a TxFailed
1122
        // so these inputs can be retried with a different group in the next
1123
        // block.
1124
        case errors.Is(err, ErrMaxPosition):
2✔
1125
                fallthrough
2✔
1126

1127
        // If the tx doesn't not have enough budget, or if the inputs amounts
1128
        // are not sufficient to cover the budget, we will return a TxFailed
1129
        // event so the sweeper can handle it by re-clustering the utxos.
1130
        case errors.Is(err, ErrNotEnoughInputs),
1131
                errors.Is(err, ErrNotEnoughBudget):
2✔
1132

2✔
1133
                result.Event = TxFailed
2✔
1134

2✔
1135
                // Calculate the starting fee rate to be used when retry
2✔
1136
                // sweeping these inputs.
2✔
1137
                feeRate, err := t.calculateRetryFeeRate(r)
2✔
1138
                if err != nil {
2✔
1139
                        result.Event = TxFatal
×
1140
                        result.Err = err
×
1141
                }
×
1142

1143
                // Attach the new fee rate.
1144
                result.FeeRate = feeRate
2✔
1145

1146
        // When there are missing inputs, we'll create a TxUnknownSpend bump
1147
        // result here so the rest of the inputs can be retried.
1148
        case errors.Is(err, ErrInputMissing):
2✔
1149
                result = t.handleMissingInputs(r)
2✔
1150

1151
        // Otherwise this is not a fee-related error and the tx cannot be
1152
        // retried. In that case we will fail ALL the inputs in this tx, which
1153
        // means they will be removed from the sweeper and never be tried
1154
        // again.
1155
        //
1156
        // TODO(yy): Find out which input is causing the failure and fail that
1157
        // one only.
1158
        default:
×
1159
                result.Event = TxFatal
×
1160
        }
1161

1162
        t.handleResult(result)
3✔
1163
}
1164

1165
// handleInitialBroadcast is called when a new request is received. It will
1166
// handle the initial tx creation and broadcast. In details,
1167
// 1. init a fee function based on the given strategy.
1168
// 2. create an RBF-compliant tx and monitor it for confirmation.
1169
// 3. notify the initial broadcast result back to the caller.
1170
func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
3✔
1171
        log.Debugf("Initial broadcast for requestID=%v", r.requestID)
3✔
1172

3✔
1173
        var (
3✔
1174
                result *BumpResult
3✔
1175
                err    error
3✔
1176
        )
3✔
1177

3✔
1178
        // Attempt an initial broadcast which is guaranteed to comply with the
3✔
1179
        // RBF rules.
3✔
1180
        //
3✔
1181
        // Create the initial tx to be broadcasted.
3✔
1182
        record, err := t.initializeTx(r)
3✔
1183
        if err != nil {
6✔
1184
                log.Errorf("Initial broadcast failed: %v", err)
3✔
1185

3✔
1186
                // We now handle the initialization error and exit.
3✔
1187
                t.handleInitialTxError(r, err)
3✔
1188

3✔
1189
                return
3✔
1190
        }
3✔
1191

1192
        // Successfully created the first tx, now broadcast it.
1193
        result, err = t.broadcast(record)
3✔
1194
        if err != nil {
3✔
1195
                // The broadcast failed, which can only happen if the tx record
×
1196
                // cannot be found or the aux sweeper returns an error. In
×
1197
                // either case, we will send back a TxFail event so these
×
1198
                // inputs can be retried.
×
1199
                result = &BumpResult{
×
1200
                        Event:     TxFailed,
×
1201
                        Err:       err,
×
1202
                        requestID: r.requestID,
×
1203
                }
×
1204
        }
×
1205

1206
        t.handleResult(result)
3✔
1207
}
1208

1209
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
1210
// attempt to bump the fee of the tx.
1211
//
1212
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1213
func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
3✔
1214
        defer t.wg.Done()
3✔
1215

3✔
1216
        log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(),
3✔
1217
                r.requestID)
3✔
1218

3✔
1219
        oldTxid := r.tx.TxHash()
3✔
1220

3✔
1221
        // Get the current conf target for this record.
3✔
1222
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
3✔
1223

3✔
1224
        // Ask the fee function whether a bump is needed. We expect the fee
3✔
1225
        // function to increase its returned fee rate after calling this
3✔
1226
        // method.
3✔
1227
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
3✔
1228
        if err != nil {
6✔
1229
                // TODO(yy): send this error back to the sweeper so it can
3✔
1230
                // re-group the inputs?
3✔
1231
                log.Errorf("Failed to increase fee rate for tx %v at "+
3✔
1232
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
3✔
1233

3✔
1234
                return
3✔
1235
        }
3✔
1236

1237
        // If the fee rate was not increased, there's no need to bump the fee.
1238
        if !increased {
3✔
1239
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
×
1240
                        t.currentHeight.Load())
×
1241

×
1242
                return
×
1243
        }
×
1244

1245
        // The fee function now has a new fee rate, we will use it to bump the
1246
        // fee of the tx.
1247
        resultOpt := t.createAndPublishTx(r)
3✔
1248

3✔
1249
        // If there's a result, we will notify the caller about the result.
3✔
1250
        resultOpt.WhenSome(func(result BumpResult) {
6✔
1251
                // Notify the new result.
3✔
1252
                t.handleResult(&result)
3✔
1253
        })
3✔
1254
}
1255

1256
// handleUnknownSpent is called when the inputs are spent by a unknown tx. It
1257
// will notify the subscriber then remove the record from the maps and send a
1258
// TxUnknownSpend event to the subscriber.
1259
//
1260
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1261
func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
3✔
1262
        defer t.wg.Done()
3✔
1263

3✔
1264
        log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+
3✔
1265
                "bumper, failing it now:\n%v", r.requestID,
3✔
1266
                inputTypeSummary(r.req.Inputs))
3✔
1267

3✔
1268
        // Create a result that will be sent to the resultChan which is listened
3✔
1269
        // by the caller.
3✔
1270
        result := t.createUnknownSpentBumpResult(r)
3✔
1271

3✔
1272
        // Notify the sweeper about this result in the end.
3✔
1273
        t.handleResult(result)
3✔
1274
}
3✔
1275

1276
// createUnknownSpentBumpResult creates and returns a BumpResult given the
1277
// monitored record has unknown spends.
1278
func (t *TxPublisher) createUnknownSpentBumpResult(
1279
        r *monitorRecord) *BumpResult {
3✔
1280

3✔
1281
        // Create a result that will be sent to the resultChan which is listened
3✔
1282
        // by the caller.
3✔
1283
        result := &BumpResult{
3✔
1284
                Event:       TxUnknownSpend,
3✔
1285
                Tx:          r.tx,
3✔
1286
                requestID:   r.requestID,
3✔
1287
                Err:         ErrUnknownSpent,
3✔
1288
                SpentInputs: r.spentInputs,
3✔
1289
        }
3✔
1290

3✔
1291
        // Calculate the next fee rate for the retry.
3✔
1292
        feeRate, err := t.calculateRetryFeeRate(r)
3✔
1293
        if err != nil {
3✔
UNCOV
1294
                // Overwrite the event and error so the sweeper will
×
UNCOV
1295
                // remove this input.
×
UNCOV
1296
                result.Event = TxFatal
×
UNCOV
1297
                result.Err = err
×
UNCOV
1298
        }
×
1299

1300
        // Attach the new fee rate to be used for the next sweeping attempt.
1301
        result.FeeRate = feeRate
3✔
1302

3✔
1303
        return result
3✔
1304
}
1305

1306
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
1307
// to the network. It will update the record with the new tx and fee rate if
1308
// successfully created, and return the result when published successfully.
1309
func (t *TxPublisher) createAndPublishTx(
1310
        r *monitorRecord) fn.Option[BumpResult] {
3✔
1311

3✔
1312
        // Fetch the old tx.
3✔
1313
        oldTx := r.tx
3✔
1314

3✔
1315
        // Create a new tx with the new fee rate.
3✔
1316
        //
3✔
1317
        // NOTE: The fee function is expected to have increased its returned
3✔
1318
        // fee rate after calling the SkipFeeBump method. So we can use it
3✔
1319
        // directly here.
3✔
1320
        sweepCtx, err := t.createAndCheckTx(r)
3✔
1321

3✔
1322
        // If there's an error creating the replacement tx, we need to abort the
3✔
1323
        // flow and handle it.
3✔
1324
        if err != nil {
6✔
1325
                return t.handleReplacementTxError(r, oldTx, err)
3✔
1326
        }
3✔
1327

1328
        // The tx has been created without any errors, we now register a new
1329
        // record by overwriting the same requestID.
1330
        record := t.updateRecord(r, sweepCtx)
3✔
1331

3✔
1332
        // Attempt to broadcast this new tx.
3✔
1333
        result, err := t.broadcast(record)
3✔
1334
        if err != nil {
3✔
1335
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1336
                        sweepCtx.tx.TxHash(), err)
×
1337

×
1338
                return fn.None[BumpResult]()
×
1339
        }
×
1340

1341
        // If the result error is fee related, we will return no error and let
1342
        // the fee bumper retry it at next block.
1343
        //
1344
        // NOTE: we may get this error if we've bypassed the mempool check,
1345
        // which means we are using neutrino backend.
1346
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
3✔
1347
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
5✔
1348

2✔
1349
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(),
2✔
1350
                        result.Err)
2✔
1351

2✔
1352
                return fn.None[BumpResult]()
2✔
1353
        }
2✔
1354

1355
        // A successful replacement tx is created, attach the old tx.
1356
        result.ReplacedTx = oldTx
3✔
1357

3✔
1358
        // If the new tx failed to be published, we will return the result so
3✔
1359
        // the caller can handle it.
3✔
1360
        if result.Event == TxFailed {
3✔
1361
                return fn.Some(*result)
×
1362
        }
×
1363

1364
        log.Debugf("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
3✔
1365
                sweepCtx.tx.TxHash())
3✔
1366

3✔
1367
        // Otherwise, it's a successful RBF, set the event and return.
3✔
1368
        result.Event = TxReplaced
3✔
1369

3✔
1370
        return fn.Some(*result)
3✔
1371
}
1372

1373
// isUnknownSpent checks whether the inputs of the tx has already been spent by
1374
// a tx not known to us. When a tx is not confirmed, yet its inputs has been
1375
// spent, then it must be spent by a different tx other than the sweeping tx
1376
// here.
1377
func (t *TxPublisher) isUnknownSpent(r *monitorRecord,
1378
        spends map[wire.OutPoint]*wire.MsgTx) bool {
3✔
1379

3✔
1380
        txid := r.tx.TxHash()
3✔
1381

3✔
1382
        // Iterate all the spending txns and check if they match the sweeping
3✔
1383
        // tx.
3✔
1384
        for op, spendingTx := range spends {
6✔
1385
                spendingTxID := spendingTx.TxHash()
3✔
1386

3✔
1387
                // If the spending tx is the same as the sweeping tx then we are
3✔
1388
                // good.
3✔
1389
                if spendingTxID == txid {
6✔
1390
                        continue
3✔
1391
                }
1392

1393
                log.Warnf("Detected unknown spend of input=%v in tx=%v", op,
3✔
1394
                        spendingTx.TxHash())
3✔
1395

3✔
1396
                return true
3✔
1397
        }
1398

1399
        return false
3✔
1400
}
1401

1402
// getSpentInputs performs a non-blocking read on the spending subscriptions to
1403
// see whether any of the monitored inputs has been spent. A map of inputs with
1404
// their spending txns are returned if found.
1405
func (t *TxPublisher) getSpentInputs(
1406
        r *monitorRecord) map[wire.OutPoint]*wire.MsgTx {
3✔
1407

3✔
1408
        // Create a slice to record the inputs spent.
3✔
1409
        spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs))
3✔
1410

3✔
1411
        // Iterate all the inputs and check if they have been spent already.
3✔
1412
        for _, inp := range r.req.Inputs {
6✔
1413
                op := inp.OutPoint()
3✔
1414

3✔
1415
                // For wallet utxos, the height hint is not set - we don't need
3✔
1416
                // to monitor them for third party spend.
3✔
1417
                //
3✔
1418
                // TODO(yy): We need to properly lock wallet utxos before
3✔
1419
                // skipping this check as the same wallet utxo can be used by
3✔
1420
                // different sweeping txns.
3✔
1421
                heightHint := inp.HeightHint()
3✔
1422
                if heightHint == 0 {
6✔
1423
                        heightHint = uint32(t.currentHeight.Load())
3✔
1424
                        log.Debugf("Checking wallet input %v using heightHint "+
3✔
1425
                                "%v", op, heightHint)
3✔
1426
                }
3✔
1427

1428
                // Check whether the input has been spent or not.
1429
                utxo, err := t.cfg.ChainIO.GetUtxo(
3✔
1430
                        &op, inp.SignDesc().Output.PkScript, heightHint, t.quit,
3✔
1431
                )
3✔
1432
                if err != nil {
6✔
1433
                        // GetUtxo will return `ErrOutputSpent` when the input
3✔
1434
                        // has already been spent. In that case, the returned
3✔
1435
                        // `utxo` must be nil, which will move us to subscribe
3✔
1436
                        // its spending event below.
3✔
1437
                        if !errors.Is(err, btcwallet.ErrOutputSpent) {
4✔
1438
                                log.Errorf("Failed to get utxo for input=%v: "+
1✔
1439
                                        "%v", op, err)
1✔
1440

1✔
1441
                                // If this is an unexpected error, move to check
1✔
1442
                                // the next input.
1✔
1443
                                continue
1✔
1444
                        }
1445

1446
                        log.Tracef("GetUtxo for input=%v, err: %v", op, err)
3✔
1447
                }
1448

1449
                // If a non-nil utxo is returned it means this input is still
1450
                // unspent. Thus we can continue to the next input as there's no
1451
                // need to register spend notification for it.
1452
                if utxo != nil {
6✔
1453
                        log.Tracef("Input=%v not spent yet", op)
3✔
1454
                        continue
3✔
1455
                }
1456

1457
                log.Debugf("Input=%v already spent, fetching its spending "+
3✔
1458
                        "tx...", op)
3✔
1459

3✔
1460
                // If the input has already been spent after the height hint, a
3✔
1461
                // spend event is sent back immediately.
3✔
1462
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
3✔
1463
                        &op, inp.SignDesc().Output.PkScript, heightHint,
3✔
1464
                )
3✔
1465
                if err != nil {
3✔
1466
                        log.Criticalf("Failed to register spend ntfn for "+
×
1467
                                "input=%v: %v", op, err)
×
1468

×
NEW
1469
                        return spentInputs
×
1470
                }
×
1471

1472
                // Remove the subscription when exit.
1473
                defer spendEvent.Cancel()
3✔
1474

3✔
1475
                // Do a blocking read to receive the spent event.
3✔
1476
                select {
3✔
1477
                case spend, ok := <-spendEvent.Spend:
3✔
1478
                        if !ok {
3✔
1479
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1480

×
1481
                                continue
×
1482
                        }
1483

1484
                        spendingTx := spend.SpendingTx
3✔
1485

3✔
1486
                        log.Debugf("Detected spent of input=%v in tx=%v", op,
3✔
1487
                                spendingTx.TxHash())
3✔
1488

3✔
1489
                        spentInputs[op] = spendingTx
3✔
1490

1491
                // The above spent event should be returned immediately, yet we
1492
                // still perform a timeout check here in case it blocks forever.
1493
                //
1494
                // TODO(yy): The proper way to fix this is to redesign the area
1495
                // so we use the async flow for checking whether a given input
1496
                // is spent or not. A better approach is to implement a new
1497
                // synchronous method to check for spending, which should be
1498
                // attempted when implementing SQL into btcwallet.
1499
                case <-time.After(spentNotificationTimeout):
2✔
1500
                        log.Warnf("Input is reported as spent by GetUtxo, "+
2✔
1501
                                "but spending notification is not returned "+
2✔
1502
                                "immediately: input=%v, heightHint=%v", op,
2✔
1503
                                heightHint)
2✔
1504
                }
1505
        }
1506

1507
        return spentInputs
3✔
1508
}
1509

1510
// calcCurrentConfTarget calculates the current confirmation target based on
1511
// the deadline height. The conf target is capped at 0 if the deadline has
1512
// already been past.
1513
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
3✔
1514
        var confTarget uint32
3✔
1515

3✔
1516
        // Calculate how many blocks left until the deadline.
3✔
1517
        deadlineDelta := deadline - currentHeight
3✔
1518

3✔
1519
        // If we are already past the deadline, we will set the conf target to
3✔
1520
        // be 1.
3✔
1521
        if deadlineDelta < 0 {
6✔
1522
                log.Warnf("Deadline is %d blocks behind current height %v",
3✔
1523
                        -deadlineDelta, currentHeight)
3✔
1524

3✔
1525
                confTarget = 0
3✔
1526
        } else {
6✔
1527
                confTarget = uint32(deadlineDelta)
3✔
1528
        }
3✔
1529

1530
        return confTarget
3✔
1531
}
1532

1533
// sweepTxCtx houses a sweep transaction with additional context.
1534
type sweepTxCtx struct {
1535
        tx *wire.MsgTx
1536

1537
        fee btcutil.Amount
1538

1539
        extraTxOut fn.Option[SweepOutput]
1540

1541
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1542
        // the sweep transaction.
1543
        outpointToTxIndex map[wire.OutPoint]int
1544
}
1545

1546
// createSweepTx creates a sweeping tx based on the given inputs, change
1547
// address and fee rate.
1548
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1549
        changePkScript lnwallet.AddrWithKey,
1550
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
3✔
1551

3✔
1552
        // Validate and calculate the fee and change amount.
3✔
1553
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
3✔
1554
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
3✔
1555
                t.cfg.AuxSweeper,
3✔
1556
        )
3✔
1557
        if err != nil {
6✔
1558
                return nil, err
3✔
1559
        }
3✔
1560

1561
        var (
3✔
1562
                // Create the sweep transaction that we will be building. We
3✔
1563
                // use version 2 as it is required for CSV.
3✔
1564
                sweepTx = wire.NewMsgTx(2)
3✔
1565

3✔
1566
                // We'll add the inputs as we go so we know the final ordering
3✔
1567
                // of inputs to sign.
3✔
1568
                idxs []input.Input
3✔
1569
        )
3✔
1570

3✔
1571
        // We start by adding all inputs that commit to an output. We do this
3✔
1572
        // since the input and output index must stay the same for the
3✔
1573
        // signatures to be valid.
3✔
1574
        outpointToTxIndex := make(map[wire.OutPoint]int)
3✔
1575
        for _, o := range inputs {
6✔
1576
                if o.RequiredTxOut() == nil {
6✔
1577
                        continue
3✔
1578
                }
1579

1580
                idxs = append(idxs, o)
3✔
1581
                sweepTx.AddTxIn(&wire.TxIn{
3✔
1582
                        PreviousOutPoint: o.OutPoint(),
3✔
1583
                        Sequence:         o.BlocksToMaturity(),
3✔
1584
                })
3✔
1585
                sweepTx.AddTxOut(o.RequiredTxOut())
3✔
1586

3✔
1587
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
3✔
1588
        }
1589

1590
        // Sum up the value contained in the remaining inputs, and add them to
1591
        // the sweep transaction.
1592
        for _, o := range inputs {
6✔
1593
                if o.RequiredTxOut() != nil {
6✔
1594
                        continue
3✔
1595
                }
1596

1597
                idxs = append(idxs, o)
3✔
1598
                sweepTx.AddTxIn(&wire.TxIn{
3✔
1599
                        PreviousOutPoint: o.OutPoint(),
3✔
1600
                        Sequence:         o.BlocksToMaturity(),
3✔
1601
                })
3✔
1602
        }
1603

1604
        // If we have change outputs to add, then add it the sweep transaction
1605
        // here.
1606
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
6✔
1607
                for i := range changeOuts {
6✔
1608
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
3✔
1609
                }
3✔
1610
        })
1611

1612
        // We'll default to using the current block height as locktime, if none
1613
        // of the inputs commits to a different locktime.
1614
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
3✔
1615

3✔
1616
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
3✔
1617
        if err != nil {
3✔
1618
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
1619
                        "for hash cache: %v", err)
×
1620
        }
×
1621
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
3✔
1622

3✔
1623
        // With all the inputs in place, use each output's unique input script
3✔
1624
        // function to generate the final witness required for spending.
3✔
1625
        addInputScript := func(idx int, tso input.Input) error {
6✔
1626
                inputScript, err := tso.CraftInputScript(
3✔
1627
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
3✔
1628
                )
3✔
1629
                if err != nil {
3✔
1630
                        return err
×
1631
                }
×
1632

1633
                sweepTx.TxIn[idx].Witness = inputScript.Witness
3✔
1634

3✔
1635
                if len(inputScript.SigScript) == 0 {
6✔
1636
                        return nil
3✔
1637
                }
3✔
1638

1639
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1640

×
1641
                return nil
×
1642
        }
1643

1644
        for idx, inp := range idxs {
6✔
1645
                if err := addInputScript(idx, inp); err != nil {
3✔
1646
                        return nil, err
×
1647
                }
×
1648
        }
1649

1650
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
3✔
1651
                inputTypeSummary(inputs))
3✔
1652

3✔
1653
        // Try to locate the extra change output, though there might be None.
3✔
1654
        extraTxOut := fn.MapOption(
3✔
1655
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
6✔
1656
                        for _, sweepOut := range sweepOuts {
6✔
1657
                                if !sweepOut.IsExtra {
6✔
1658
                                        continue
3✔
1659
                                }
1660

1661
                                // If we sweep outputs of a custom channel, the
1662
                                // custom leaves in those outputs will be merged
1663
                                // into a single output, even if we sweep
1664
                                // multiple outputs (e.g. to_remote and breached
1665
                                // to_local of a breached channel) at the same
1666
                                // time. So there will only ever be one extra
1667
                                // output.
1668
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
1669
                                        lnutils.SpewLogClosure(sweepOut))
×
1670

×
1671
                                return fn.Some(sweepOut)
×
1672
                        }
1673

1674
                        return fn.None[SweepOutput]()
3✔
1675
                },
1676
        )(changeOutputsOpt)
1677

1678
        return &sweepTxCtx{
3✔
1679
                tx:                sweepTx,
3✔
1680
                fee:               txFee,
3✔
1681
                extraTxOut:        fn.FlattenOption(extraTxOut),
3✔
1682
                outpointToTxIndex: outpointToTxIndex,
3✔
1683
        }, nil
3✔
1684
}
1685

1686
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1687
// optional locktime after a series of validations:
1688
// 1. check the locktime has been reached.
1689
// 2. check the locktimes are the same.
1690
// 3. check the inputs cover the outputs.
1691
//
1692
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1693
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1694
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1695
        auxSweeper fn.Option[AuxSweeper]) (
1696
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
3✔
1697

3✔
1698
        noChange := fn.None[[]SweepOutput]()
3✔
1699
        noLocktime := fn.None[int32]()
3✔
1700

3✔
1701
        // Given the set of inputs we have, if we have an aux sweeper, then
3✔
1702
        // we'll attempt to see if we have any other change outputs we'll need
3✔
1703
        // to add to the sweep transaction.
3✔
1704
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
3✔
1705

3✔
1706
        var extraChangeOut fn.Option[SweepOutput]
3✔
1707
        err := fn.MapOptionZ(
3✔
1708
                auxSweeper, func(aux AuxSweeper) error {
3✔
1709
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
×
1710
                        if err := extraOut.Err(); err != nil {
×
1711
                                return err
×
1712
                        }
×
1713

1714
                        extraChangeOut = extraOut.LeftToSome()
×
1715

×
1716
                        return nil
×
1717
                },
1718
        )
1719
        if err != nil {
3✔
1720
                return 0, noChange, noLocktime, err
×
1721
        }
×
1722

1723
        // Creating a weight estimator with nil outputs and zero max fee rate.
1724
        // We don't allow adding customized outputs in the sweeping tx, and the
1725
        // fee rate is already being managed before we get here.
1726
        inputs, estimator, err := getWeightEstimate(
3✔
1727
                inputs, nil, feeRate, 0, changePkScripts,
3✔
1728
        )
3✔
1729
        if err != nil {
3✔
1730
                return 0, noChange, noLocktime, err
×
1731
        }
×
1732

1733
        txFee := estimator.fee()
3✔
1734

3✔
1735
        var (
3✔
1736
                // Track whether any of the inputs require a certain locktime.
3✔
1737
                locktime = int32(-1)
3✔
1738

3✔
1739
                // We keep track of total input amount, and required output
3✔
1740
                // amount to use for calculating the change amount below.
3✔
1741
                totalInput     btcutil.Amount
3✔
1742
                requiredOutput btcutil.Amount
3✔
1743
        )
3✔
1744

3✔
1745
        // If we have an extra change output, then we'll add it as a required
3✔
1746
        // output amt.
3✔
1747
        extraChangeOut.WhenSome(func(o SweepOutput) {
3✔
1748
                requiredOutput += btcutil.Amount(o.Value)
×
1749
        })
×
1750

1751
        // Go through each input and check if the required lock times have
1752
        // reached and are the same.
1753
        for _, o := range inputs {
6✔
1754
                // If the input has a required output, we'll add it to the
3✔
1755
                // required output amount.
3✔
1756
                if o.RequiredTxOut() != nil {
6✔
1757
                        requiredOutput += btcutil.Amount(
3✔
1758
                                o.RequiredTxOut().Value,
3✔
1759
                        )
3✔
1760
                }
3✔
1761

1762
                // Update the total input amount.
1763
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
3✔
1764

3✔
1765
                lt, ok := o.RequiredLockTime()
3✔
1766

3✔
1767
                // Skip if the input doesn't require a lock time.
3✔
1768
                if !ok {
6✔
1769
                        continue
3✔
1770
                }
1771

1772
                // Check if the lock time has reached
1773
                if lt > uint32(currentHeight) {
3✔
1774
                        return 0, noChange, noLocktime,
×
1775
                                fmt.Errorf("%w: current height is %v, "+
×
1776
                                        "locktime is %v", ErrLocktimeImmature,
×
1777
                                        currentHeight, lt)
×
1778
                }
×
1779

1780
                // If another input commits to a different locktime, they
1781
                // cannot be combined in the same transaction.
1782
                if locktime != -1 && locktime != int32(lt) {
3✔
1783
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1784
                }
×
1785

1786
                // Update the locktime for next iteration.
1787
                locktime = int32(lt)
3✔
1788
        }
1789

1790
        // Make sure total output amount is less than total input amount.
1791
        if requiredOutput+txFee > totalInput {
6✔
1792
                log.Errorf("Insufficient input to create sweep tx: "+
3✔
1793
                        "input_sum=%v, output_sum=%v", totalInput,
3✔
1794
                        requiredOutput+txFee)
3✔
1795

3✔
1796
                return 0, noChange, noLocktime, ErrNotEnoughInputs
3✔
1797
        }
3✔
1798

1799
        // The value remaining after the required output and fees is the
1800
        // change output.
1801
        changeAmt := totalInput - requiredOutput - txFee
3✔
1802
        changeOuts := make([]SweepOutput, 0, 2)
3✔
1803

3✔
1804
        extraChangeOut.WhenSome(func(o SweepOutput) {
3✔
1805
                changeOuts = append(changeOuts, o)
×
1806
        })
×
1807

1808
        // We'll calculate the dust limit for the given changePkScript since it
1809
        // is variable.
1810
        changeFloor := lnwallet.DustLimitForSize(
3✔
1811
                len(changePkScript.DeliveryAddress),
3✔
1812
        )
3✔
1813

3✔
1814
        switch {
3✔
1815
        // If the change amount is dust, we'll move it into the fees, and
1816
        // ignore it.
1817
        case changeAmt < changeFloor:
3✔
1818
                log.Infof("Change amt %v below dustlimit %v, not adding "+
3✔
1819
                        "change output", changeAmt, changeFloor)
3✔
1820

3✔
1821
                // If there's no required output, and the change output is a
3✔
1822
                // dust, it means we are creating a tx without any outputs. In
3✔
1823
                // this case we'll return an error. This could happen when
3✔
1824
                // creating a tx that has an anchor as the only input.
3✔
1825
                if requiredOutput == 0 {
6✔
1826
                        return 0, noChange, noLocktime, ErrTxNoOutput
3✔
1827
                }
3✔
1828

1829
                // The dust amount is added to the fee.
1830
                txFee += changeAmt
×
1831

1832
        // Otherwise, we'll actually recognize it as a change output.
1833
        default:
3✔
1834
                changeOuts = append(changeOuts, SweepOutput{
3✔
1835
                        TxOut: wire.TxOut{
3✔
1836
                                Value:    int64(changeAmt),
3✔
1837
                                PkScript: changePkScript.DeliveryAddress,
3✔
1838
                        },
3✔
1839
                        IsExtra:     false,
3✔
1840
                        InternalKey: changePkScript.InternalKey,
3✔
1841
                })
3✔
1842
        }
1843

1844
        // Optionally set the locktime.
1845
        locktimeOpt := fn.Some(locktime)
3✔
1846
        if locktime == -1 {
6✔
1847
                locktimeOpt = noLocktime
3✔
1848
        }
3✔
1849

1850
        var changeOutsOpt fn.Option[[]SweepOutput]
3✔
1851
        if len(changeOuts) > 0 {
6✔
1852
                changeOutsOpt = fn.Some(changeOuts)
3✔
1853
        }
3✔
1854

1855
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
3✔
1856
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
3✔
1857
                "parents_fee=%v, parents_weight=%v, current_height=%v",
3✔
1858
                len(inputs), inputTypeSummary(inputs), feeRate,
3✔
1859
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
3✔
1860
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
3✔
1861

3✔
1862
        return txFee, changeOutsOpt, locktimeOpt, nil
3✔
1863
}
1864

1865
// handleReplacementTxError handles the error returned from creating the
1866
// replacement tx. It returns a BumpResult that should be notified to the
1867
// sweeper.
1868
func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
1869
        oldTx *wire.MsgTx, err error) fn.Option[BumpResult] {
3✔
1870

3✔
1871
        // If the error is fee related, we will return no error and let the fee
3✔
1872
        // bumper retry it at next block.
3✔
1873
        //
3✔
1874
        // NOTE: we can check the RBF error here and ask the fee function to
3✔
1875
        // recalculate the fee rate. However, this would defeat the purpose of
3✔
1876
        // using a deadline based fee function:
3✔
1877
        // - if the deadline is far away, there's no rush to RBF the tx.
3✔
1878
        // - if the deadline is close, we expect the fee function to give us a
3✔
1879
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
3✔
1880
        //   means the budget is not enough.
3✔
1881
        if errors.Is(err, chain.ErrInsufficientFee) ||
3✔
1882
                errors.Is(err, lnwallet.ErrMempoolFee) {
5✔
1883

2✔
1884
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
2✔
1885
                return fn.None[BumpResult]()
2✔
1886
        }
2✔
1887

1888
        // At least one of the inputs is missing, which means it has already
1889
        // been spent by another tx and confirmed. In this case we will handle
1890
        // it by returning a TxUnknownSpend bump result.
1891
        if errors.Is(err, ErrInputMissing) {
3✔
1892
                log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
×
1893
                bumpResult := t.handleMissingInputs(r)
×
1894

×
1895
                return fn.Some(*bumpResult)
×
1896
        }
×
1897

1898
        // Return a failed event to retry the sweep.
1899
        event := TxFailed
3✔
1900

3✔
1901
        // Calculate the next fee rate for the retry.
3✔
1902
        feeRate, ferr := t.calculateRetryFeeRate(r)
3✔
1903
        if ferr != nil {
3✔
1904
                // If there's an error with the fee calculation, we need to
×
1905
                // abort the sweep.
×
1906
                event = TxFatal
×
1907
        }
×
1908

1909
        // If the error is not fee related, we will return a `TxFailed` event so
1910
        // this input can be retried.
1911
        result := fn.Some(BumpResult{
3✔
1912
                Event:     event,
3✔
1913
                Tx:        oldTx,
3✔
1914
                Err:       err,
3✔
1915
                requestID: r.requestID,
3✔
1916
                FeeRate:   feeRate,
3✔
1917
        })
3✔
1918

3✔
1919
        // If the tx doesn't not have enough budget, or if the inputs amounts
3✔
1920
        // are not sufficient to cover the budget, we will return a result so
3✔
1921
        // the sweeper can handle it by re-clustering the utxos.
3✔
1922
        if errors.Is(err, ErrNotEnoughBudget) ||
3✔
1923
                errors.Is(err, ErrNotEnoughInputs) {
6✔
1924

3✔
1925
                log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
3✔
1926
                return result
3✔
1927
        }
3✔
1928

1929
        // Otherwise, an unexpected error occurred, we will log an error and let
1930
        // the sweeper retry the whole process.
1931
        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
×
1932

×
1933
        return result
×
1934
}
1935

1936
// calculateRetryFeeRate calculates a new fee rate to be used as the starting
1937
// fee rate for the next sweep attempt if the inputs are to be retried. When the
1938
// fee function is nil it will be created here, and an error is returned if the
1939
// fee func cannot be initialized.
1940
func (t *TxPublisher) calculateRetryFeeRate(
1941
        r *monitorRecord) (chainfee.SatPerKWeight, error) {
3✔
1942

3✔
1943
        // Get the fee function, which will be used to decided the next fee rate
3✔
1944
        // to use if the sweeper decides to retry sweeping this input.
3✔
1945
        feeFunc := r.feeFunction
3✔
1946

3✔
1947
        // When the record is failed before the initial broadcast is attempted,
3✔
1948
        // it will have a nil fee func. In this case, we'll create the fee func
3✔
1949
        // here.
3✔
1950
        //
3✔
1951
        // NOTE: Since the current record is failed and will be deleted, we
3✔
1952
        // don't need to update the record on this fee function. We only need
3✔
1953
        // the fee rate data so the sweeper can pick up where we left off.
3✔
1954
        if feeFunc == nil {
5✔
1955
                f, err := t.initializeFeeFunction(r.req)
2✔
1956

2✔
1957
                // TODO(yy): The only error we would receive here is when the
2✔
1958
                // pkScript is not recognized by the weightEstimator. What we
2✔
1959
                // should do instead is to check the pkScript immediately after
2✔
1960
                // receiving a sweep request so we don't need to check it again,
2✔
1961
                // which will also save us from error checking from several
2✔
1962
                // callsites.
2✔
1963
                if err != nil {
2✔
UNCOV
1964
                        log.Errorf("Failed to create fee func for record %v: "+
×
UNCOV
1965
                                "%v", r.requestID, err)
×
UNCOV
1966

×
UNCOV
1967
                        return 0, err
×
UNCOV
1968
                }
×
1969

1970
                feeFunc = f
2✔
1971
        }
1972

1973
        // Since we failed to sweep the inputs, either the sweeping tx has been
1974
        // replaced by another party's tx, or the current output values cannot
1975
        // cover the budget, we missed this block window to increase its fee
1976
        // rate. To make sure the fee rate stays in the initial line, we now ask
1977
        // the fee function to give us the next fee rate as if the sweeping tx
1978
        // were RBFed. This new fee rate will be used as the starting fee rate
1979
        // if the upper system decides to continue sweeping the rest of the
1980
        // inputs.
1981
        _, err := feeFunc.Increment()
3✔
1982
        if err != nil {
6✔
1983
                // The fee function has reached its max position - nothing we
3✔
1984
                // can do here other than letting the user increase the budget.
3✔
1985
                log.Errorf("Failed to calculate the next fee rate for "+
3✔
1986
                        "Record(%v): %v", r.requestID, err)
3✔
1987
        }
3✔
1988

1989
        return feeFunc.FeeRate(), nil
3✔
1990
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc