• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 10204896993

01 Aug 2024 07:57PM UTC coverage: 58.591% (-0.08%) from 58.674%
10204896993

push

github

web-flow
Merge pull request #8962 from ProofOfKeags/refactor/quiescence-micro-spinoffs

[NANO]: Refactor/quiescence micro spinoffs

3 of 4 new or added lines in 2 files covered. (75.0%)

242 existing lines in 26 files now uncovered.

125214 of 213710 relevant lines covered (58.59%)

28092.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.13
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/fn"
17
        "github.com/lightningnetwork/lnd/input"
18
        "github.com/lightningnetwork/lnd/labels"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnutils"
21
        "github.com/lightningnetwork/lnd/lnwallet"
22
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
23
)
24

25
var (
26
        // ErrInvalidBumpResult is returned when the bump result is invalid.
27
        ErrInvalidBumpResult = errors.New("invalid bump result")
28

29
        // ErrNotEnoughBudget is returned when the fee bumper decides the
30
        // current budget cannot cover the fee.
31
        ErrNotEnoughBudget = errors.New("not enough budget")
32

33
        // ErrLocktimeImmature is returned when sweeping an input whose
34
        // locktime is not reached.
35
        ErrLocktimeImmature = errors.New("immature input")
36

37
        // ErrTxNoOutput is returned when an output cannot be created during tx
38
        // preparation, usually due to the output being dust.
39
        ErrTxNoOutput = errors.New("tx has no output")
40

41
        // ErrThirdPartySpent is returned when a third party has spent the
42
        // input in the sweeping tx.
43
        ErrThirdPartySpent = errors.New("third party spent the output")
44
)
45

46
// Bumper defines an interface that can be used by other subsystems for fee
47
// bumping.
48
type Bumper interface {
49
        // Broadcast is used to publish the tx created from the given inputs
50
        // specified in the request. It handles the tx creation, broadcasts it,
51
        // and monitors its confirmation status for potential fee bumping. It
52
        // returns a chan that the caller can use to receive updates about the
53
        // broadcast result and potential RBF attempts.
54
        Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
55
}
56

57
// BumpEvent represents the event of a fee bumping attempt.
58
type BumpEvent uint8
59

60
const (
61
        // TxPublished is sent when the broadcast attempt is finished.
62
        TxPublished BumpEvent = iota
63

64
        // TxFailed is sent when the broadcast attempt fails.
65
        TxFailed
66

67
        // TxReplaced is sent when the original tx is replaced by a new one.
68
        TxReplaced
69

70
        // TxConfirmed is sent when the tx is confirmed.
71
        TxConfirmed
72

73
        // sentinalEvent is used to check if an event is unknown.
74
        sentinalEvent
75
)
76

77
// String returns a human-readable string for the event.
78
func (e BumpEvent) String() string {
3✔
79
        switch e {
3✔
80
        case TxPublished:
3✔
81
                return "Published"
3✔
82
        case TxFailed:
3✔
83
                return "Failed"
3✔
84
        case TxReplaced:
3✔
85
                return "Replaced"
3✔
86
        case TxConfirmed:
3✔
87
                return "Confirmed"
3✔
88
        default:
×
89
                return "Unknown"
×
90
        }
91
}
92

93
// Unknown returns true if the event is unknown.
94
func (e BumpEvent) Unknown() bool {
11✔
95
        return e >= sentinalEvent
11✔
96
}
11✔
97

98
// BumpRequest is used by the caller to give the Bumper the necessary info to
99
// create and manage potential fee bumps for a set of inputs.
100
type BumpRequest struct {
101
        // Budget givens the total amount that can be used as fees by these
102
        // inputs.
103
        Budget btcutil.Amount
104

105
        // Inputs is the set of inputs to sweep.
106
        Inputs []input.Input
107

108
        // DeadlineHeight is the block height at which the tx should be
109
        // confirmed.
110
        DeadlineHeight int32
111

112
        // DeliveryAddress is the script to send the change output to.
113
        DeliveryAddress []byte
114

115
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
116
        MaxFeeRate chainfee.SatPerKWeight
117

118
        // StartingFeeRate is an optional parameter that can be used to specify
119
        // the initial fee rate to use for the fee function.
120
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
121
}
122

123
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
124
// request. It calculates the feerate using the supplied budget and the weight,
125
// compares it with the specified MaxFeeRate, and returns the smaller of the
126
// two.
127
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
11✔
128
        // Get the size of the sweep tx, which will be used to calculate the
11✔
129
        // budget fee rate.
11✔
130
        size, err := calcSweepTxWeight(r.Inputs, r.DeliveryAddress)
11✔
131
        if err != nil {
12✔
132
                return 0, err
1✔
133
        }
1✔
134

135
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
136
        // This is needed as, when the input has a large value and the user
137
        // sets the budget to be proportional to the input value, the fee rate
138
        // can be very high and we need to make sure it doesn't exceed the max
139
        // fee rate.
140
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
10✔
141
        if maxFeeRateAllowed > r.MaxFeeRate {
14✔
142
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
4✔
143
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
4✔
144
                        r.MaxFeeRate, size)
4✔
145

4✔
146
                return r.MaxFeeRate, nil
4✔
147
        }
4✔
148

149
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
9✔
150
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
9✔
151

9✔
152
        return maxFeeRateAllowed, nil
9✔
153
}
154

155
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
156
// sweeping tx always has a single output(change).
157
func calcSweepTxWeight(inputs []input.Input,
158
        outputPkScript []byte) (lntypes.WeightUnit, error) {
14✔
159

14✔
160
        // Use a const fee rate as we only use the weight estimator to
14✔
161
        // calculate the size.
14✔
162
        const feeRate = 1
14✔
163

14✔
164
        // Initialize the tx weight estimator with,
14✔
165
        // - nil outputs as we only have one single change output.
14✔
166
        // - const fee rate as we don't care about the fees here.
14✔
167
        // - 0 maxfeerate as we don't care about fees here.
14✔
168
        //
14✔
169
        // TODO(yy): we should refactor the weight estimator to not require a
14✔
170
        // fee rate and max fee rate and make it a pure tx weight calculator.
14✔
171
        _, estimator, err := getWeightEstimate(
14✔
172
                inputs, nil, feeRate, 0, outputPkScript,
14✔
173
        )
14✔
174
        if err != nil {
16✔
175
                return 0, err
2✔
176
        }
2✔
177

178
        return estimator.weight(), nil
12✔
179
}
180

181
// BumpResult is used by the Bumper to send updates about the tx being
182
// broadcast.
183
type BumpResult struct {
184
        // Event is the type of event that the result is for.
185
        Event BumpEvent
186

187
        // Tx is the tx being broadcast.
188
        Tx *wire.MsgTx
189

190
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
191
        ReplacedTx *wire.MsgTx
192

193
        // FeeRate is the fee rate used for the new tx.
194
        FeeRate chainfee.SatPerKWeight
195

196
        // Fee is the fee paid by the new tx.
197
        Fee btcutil.Amount
198

199
        // Err is the error that occurred during the broadcast.
200
        Err error
201

202
        // requestID is the ID of the request that created this record.
203
        requestID uint64
204
}
205

206
// Validate validates the BumpResult so it's safe to use.
207
func (b *BumpResult) Validate() error {
12✔
208
        // Every result must have a tx.
12✔
209
        if b.Tx == nil {
13✔
210
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
211
        }
1✔
212

213
        // Every result must have a known event.
214
        if b.Event.Unknown() {
12✔
215
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
216
        }
1✔
217

218
        // If it's a replacing event, it must have a replaced tx.
219
        if b.Event == TxReplaced && b.ReplacedTx == nil {
11✔
220
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
221
        }
1✔
222

223
        // If it's a failed event, it must have an error.
224
        if b.Event == TxFailed && b.Err == nil {
10✔
225
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
1✔
226
        }
1✔
227

228
        // If it's a confirmed event, it must have a fee rate and fee.
229
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
9✔
230
                return fmt.Errorf("%w: missing fee rate or fee",
1✔
231
                        ErrInvalidBumpResult)
1✔
232
        }
1✔
233

234
        return nil
7✔
235
}
236

237
// TxPublisherConfig is the config used to create a new TxPublisher.
238
type TxPublisherConfig struct {
239
        // Signer is used to create the tx signature.
240
        Signer input.Signer
241

242
        // Wallet is used primarily to publish the tx.
243
        Wallet Wallet
244

245
        // Estimator is used to estimate the fee rate for the new tx based on
246
        // its deadline conf target.
247
        Estimator chainfee.Estimator
248

249
        // Notifier is used to monitor the confirmation status of the tx.
250
        Notifier chainntnfs.ChainNotifier
251
}
252

253
// TxPublisher is an implementation of the Bumper interface. It utilizes the
254
// `testmempoolaccept` RPC to bump the fee of txns it created based on
255
// different fee function selected or configed by the caller. Its purpose is to
256
// take a list of inputs specified, and create a tx that spends them to a
257
// specified output. It will then monitor the confirmation status of the tx,
258
// and if it's not confirmed within a certain time frame, it will attempt to
259
// bump the fee of the tx by creating a new tx that spends the same inputs to
260
// the same output, but with a higher fee rate. It will continue to do this
261
// until the tx is confirmed or the fee rate reaches the maximum fee rate
262
// specified by the caller.
263
type TxPublisher struct {
264
        wg sync.WaitGroup
265

266
        // cfg specifies the configuration of the TxPublisher.
267
        cfg *TxPublisherConfig
268

269
        // currentHeight is the current block height.
270
        currentHeight atomic.Int32
271

272
        // records is a map keyed by the requestCounter and the value is the tx
273
        // being monitored.
274
        records lnutils.SyncMap[uint64, *monitorRecord]
275

276
        // requestCounter is a monotonically increasing counter used to keep
277
        // track of how many requests have been made.
278
        requestCounter atomic.Uint64
279

280
        // subscriberChans is a map keyed by the requestCounter, each item is
281
        // the chan that the publisher sends the fee bump result to.
282
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
283

284
        // quit is used to signal the publisher to stop.
285
        quit chan struct{}
286
}
287

288
// Compile-time constraint to ensure TxPublisher implements Bumper.
289
var _ Bumper = (*TxPublisher)(nil)
290

291
// NewTxPublisher creates a new TxPublisher.
292
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
17✔
293
        return &TxPublisher{
17✔
294
                cfg:             &cfg,
17✔
295
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
17✔
296
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
17✔
297
                quit:            make(chan struct{}),
17✔
298
        }
17✔
299
}
17✔
300

301
// isNeutrinoBackend checks if the wallet backend is neutrino.
302
func (t *TxPublisher) isNeutrinoBackend() bool {
4✔
303
        return t.cfg.Wallet.BackEnd() == "neutrino"
4✔
304
}
4✔
305

306
// Broadcast is used to publish the tx created from the given inputs. It will,
307
// 1. init a fee function based on the given strategy.
308
// 2. create an RBF-compliant tx and monitor it for confirmation.
309
// 3. notify the initial broadcast result back to the caller.
310
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
311
// specified cannot cover the fee.
312
//
313
// NOTE: part of the Bumper interface.
314
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
6✔
315
        log.Tracef("Received broadcast request: %s", lnutils.SpewLogClosure(
6✔
316
                req))
6✔
317

6✔
318
        // Attempt an initial broadcast which is guaranteed to comply with the
6✔
319
        // RBF rules.
6✔
320
        result, err := t.initialBroadcast(req)
6✔
321
        if err != nil {
10✔
322
                log.Errorf("Initial broadcast failed: %v", err)
4✔
323

4✔
324
                return nil, err
4✔
325
        }
4✔
326

327
        // Create a chan to send the result to the caller.
328
        subscriber := make(chan *BumpResult, 1)
5✔
329
        t.subscriberChans.Store(result.requestID, subscriber)
5✔
330

5✔
331
        // Send the initial broadcast result to the caller.
5✔
332
        t.handleResult(result)
5✔
333

5✔
334
        return subscriber, nil
5✔
335
}
336

337
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
338
// broadcasts it.
339
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
6✔
340
        // Create a fee bumping algorithm to be used for future RBF.
6✔
341
        feeAlgo, err := t.initializeFeeFunction(req)
6✔
342
        if err != nil {
9✔
343
                return nil, fmt.Errorf("init fee function: %w", err)
3✔
344
        }
3✔
345

346
        // Create the initial tx to be broadcasted. This tx is guaranteed to
347
        // comply with the RBF restrictions.
348
        requestID, err := t.createRBFCompliantTx(req, feeAlgo)
6✔
349
        if err != nil {
10✔
350
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
4✔
351
        }
4✔
352

353
        // Broadcast the tx and return the monitored record.
354
        result, err := t.broadcast(requestID)
5✔
355
        if err != nil {
5✔
356
                return nil, fmt.Errorf("broadcast sweep tx: %w", err)
×
357
        }
×
358

359
        return result, nil
5✔
360
}
361

362
// initializeFeeFunction initializes a fee function to be used for this request
363
// for future fee bumping.
364
func (t *TxPublisher) initializeFeeFunction(
365
        req *BumpRequest) (FeeFunction, error) {
8✔
366

8✔
367
        // Get the max allowed feerate.
8✔
368
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
8✔
369
        if err != nil {
8✔
370
                return nil, err
×
371
        }
×
372

373
        // Get the initial conf target.
374
        confTarget := calcCurrentConfTarget(
8✔
375
                t.currentHeight.Load(), req.DeadlineHeight,
8✔
376
        )
8✔
377

8✔
378
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
8✔
379
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
8✔
380
                maxFeeRateAllowed)
8✔
381

8✔
382
        // Initialize the fee function and return it.
8✔
383
        //
8✔
384
        // TODO(yy): return based on differet req.Strategy?
8✔
385
        return NewLinearFeeFunction(
8✔
386
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
8✔
387
                req.StartingFeeRate,
8✔
388
        )
8✔
389
}
390

391
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
392
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
393
// and redo the process until the tx is valid, or return an error when non-RBF
394
// related errors occur or the budget has been used up.
395
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
396
        f FeeFunction) (uint64, error) {
12✔
397

12✔
398
        for {
27✔
399
                // Create a new tx with the given fee rate and check its
15✔
400
                // mempool acceptance.
15✔
401
                tx, fee, err := t.createAndCheckTx(req, f)
15✔
402

15✔
403
                switch {
15✔
404
                case err == nil:
9✔
405
                        // The tx is valid, return the request ID.
9✔
406
                        requestID := t.storeRecord(tx, req, f, fee)
9✔
407

9✔
408
                        log.Infof("Created tx %v for %v inputs: feerate=%v, "+
9✔
409
                                "fee=%v, inputs=%v", tx.TxHash(),
9✔
410
                                len(req.Inputs), f.FeeRate(), fee,
9✔
411
                                inputTypeSummary(req.Inputs))
9✔
412

9✔
413
                        return requestID, nil
9✔
414

415
                // If the error indicates the fees paid is not enough, we will
416
                // ask the fee function to increase the fee rate and retry.
417
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
418
                        // We should at least start with a feerate above the
2✔
419
                        // mempool min feerate, so if we get this error, it
2✔
420
                        // means something is wrong earlier in the pipeline.
2✔
421
                        log.Errorf("Current fee=%v, feerate=%v, %v", fee,
2✔
422
                                f.FeeRate(), err)
2✔
423

2✔
424
                        fallthrough
2✔
425

426
                // We are not paying enough fees so we increase it.
427
                case errors.Is(err, chain.ErrInsufficientFee):
6✔
428
                        increased := false
6✔
429

6✔
430
                        // Keep calling the fee function until the fee rate is
6✔
431
                        // increased or maxed out.
6✔
432
                        for !increased {
13✔
433
                                log.Debugf("Increasing fee for next round, "+
7✔
434
                                        "current fee=%v, feerate=%v", fee,
7✔
435
                                        f.FeeRate())
7✔
436

7✔
437
                                // If the fee function tells us that we have
7✔
438
                                // used up the budget, we will return an error
7✔
439
                                // indicating this tx cannot be made. The
7✔
440
                                // sweeper should handle this error and try to
7✔
441
                                // cluster these inputs differetly.
7✔
442
                                increased, err = f.Increment()
7✔
443
                                if err != nil {
10✔
444
                                        return 0, err
3✔
445
                                }
3✔
446
                        }
447

448
                // TODO(yy): suppose there's only one bad input, we can do a
449
                // binary search to find out which input is causing this error
450
                // by recreating a tx using half of the inputs and check its
451
                // mempool acceptance.
452
                default:
5✔
453
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
5✔
454
                        return 0, err
5✔
455
                }
456
        }
457
}
458

459
// storeRecord stores the given record in the records map.
460
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
461
        f FeeFunction, fee btcutil.Amount) uint64 {
17✔
462

17✔
463
        // Increase the request counter.
17✔
464
        //
17✔
465
        // NOTE: this is the only place where we increase the
17✔
466
        // counter.
17✔
467
        requestID := t.requestCounter.Add(1)
17✔
468

17✔
469
        // Register the record.
17✔
470
        t.records.Store(requestID, &monitorRecord{
17✔
471
                tx:          tx,
17✔
472
                req:         req,
17✔
473
                feeFunction: f,
17✔
474
                fee:         fee,
17✔
475
        })
17✔
476

17✔
477
        return requestID
17✔
478
}
17✔
479

480
// createAndCheckTx creates a tx based on the given inputs, change output
481
// script, and the fee rate. In addition, it validates the tx's mempool
482
// acceptance before returning a tx that can be published directly, along with
483
// its fee.
484
func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
485
        *wire.MsgTx, btcutil.Amount, error) {
25✔
486

25✔
487
        // Create the sweep tx with max fee rate of 0 as the fee function
25✔
488
        // guarantees the fee rate used here won't exceed the max fee rate.
25✔
489
        tx, fee, err := t.createSweepTx(
25✔
490
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
25✔
491
        )
25✔
492
        if err != nil {
28✔
493
                return nil, fee, fmt.Errorf("create sweep tx: %w", err)
3✔
494
        }
3✔
495

496
        // Sanity check the budget still covers the fee.
497
        if fee > req.Budget {
27✔
498
                return nil, fee, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
499
                        ErrNotEnoughBudget, req.Budget, fee)
2✔
500
        }
2✔
501

502
        // Validate the tx's mempool acceptance.
503
        err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
23✔
504

23✔
505
        // Exit early if the tx is valid.
23✔
506
        if err == nil {
36✔
507
                return tx, fee, nil
13✔
508
        }
13✔
509

510
        // Print an error log if the chain backend doesn't support the mempool
511
        // acceptance test RPC.
512
        if errors.Is(err, rpcclient.ErrBackendVersion) {
12✔
513
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
514
                        "consider upgrading it to a newer version")
×
515
                return tx, fee, nil
×
516
        }
×
517

518
        // We are running on a backend that doesn't implement the RPC
519
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
520
        if errors.Is(err, chain.ErrUnimplemented) {
13✔
521
                log.Debug("Skipped testmempoolaccept due to not implemented")
1✔
522
                return tx, fee, nil
1✔
523
        }
1✔
524

525
        return nil, fee, fmt.Errorf("tx=%v failed mempool check: %w",
11✔
526
                tx.TxHash(), err)
11✔
527
}
528

529
// broadcast takes a monitored tx and publishes it to the network. Prior to the
530
// broadcast, it will subscribe the tx's confirmation notification and attach
531
// the event channel to the record. Any broadcast-related errors will not be
532
// returned here, instead, they will be put inside the `BumpResult` and
533
// returned to the caller.
534
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
12✔
535
        // Get the record being monitored.
12✔
536
        record, ok := t.records.Load(requestID)
12✔
537
        if !ok {
13✔
538
                return nil, fmt.Errorf("tx record %v not found", requestID)
1✔
539
        }
1✔
540

541
        txid := record.tx.TxHash()
11✔
542

11✔
543
        tx := record.tx
11✔
544
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
11✔
545
                txid, len(tx.TxIn), t.currentHeight.Load())
11✔
546

11✔
547
        // Set the event, and change it to TxFailed if the wallet fails to
11✔
548
        // publish it.
11✔
549
        event := TxPublished
11✔
550

11✔
551
        // Publish the sweeping tx with customized label. If the publish fails,
11✔
552
        // this error will be saved in the `BumpResult` and it will be removed
11✔
553
        // from being monitored.
11✔
554
        err := t.cfg.Wallet.PublishTransaction(
11✔
555
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
11✔
556
        )
11✔
557
        if err != nil {
15✔
558
                // NOTE: we decide to attach this error to the result instead
4✔
559
                // of returning it here because by the time the tx reaches
4✔
560
                // here, it should have passed the mempool acceptance check. If
4✔
561
                // it still fails to be broadcast, it's likely a non-RBF
4✔
562
                // related error happened. So we send this error back to the
4✔
563
                // caller so that it can handle it properly.
4✔
564
                //
4✔
565
                // TODO(yy): find out which input is causing the failure.
4✔
566
                log.Errorf("Failed to publish tx %v: %v", txid, err)
4✔
567
                event = TxFailed
4✔
568
        }
4✔
569

570
        result := &BumpResult{
11✔
571
                Event:     event,
11✔
572
                Tx:        record.tx,
11✔
573
                Fee:       record.fee,
11✔
574
                FeeRate:   record.feeFunction.FeeRate(),
11✔
575
                Err:       err,
11✔
576
                requestID: requestID,
11✔
577
        }
11✔
578

11✔
579
        return result, nil
11✔
580
}
581

582
// notifyResult sends the result to the resultChan specified by the requestID.
583
// This channel is expected to be read by the caller.
584
func (t *TxPublisher) notifyResult(result *BumpResult) {
12✔
585
        id := result.requestID
12✔
586
        subscriber, ok := t.subscriberChans.Load(id)
12✔
587
        if !ok {
13✔
588
                log.Errorf("Result chan for id=%v not found", id)
1✔
589
                return
1✔
590
        }
1✔
591

592
        log.Debugf("Sending result for requestID=%v, tx=%v", id,
12✔
593
                result.Tx.TxHash())
12✔
594

12✔
595
        select {
12✔
596
        // Send the result to the subscriber.
597
        //
598
        // TODO(yy): Add timeout in case it's blocking?
599
        case subscriber <- result:
10✔
600
        case <-t.quit:
3✔
601
                log.Debug("Fee bumper stopped")
3✔
602
        }
603
}
604

605
// removeResult removes the tracking of the result if the result contains a
606
// non-nil error, or the tx is confirmed, the record will be removed from the
607
// maps.
608
func (t *TxPublisher) removeResult(result *BumpResult) {
12✔
609
        id := result.requestID
12✔
610

12✔
611
        // Remove the record from the maps if there's an error. This means this
12✔
612
        // tx has failed its broadcast and cannot be retried. There are two
12✔
613
        // cases,
12✔
614
        // - when the budget cannot cover the fee.
12✔
615
        // - when a non-RBF related error occurs.
12✔
616
        switch result.Event {
12✔
617
        case TxFailed:
5✔
618
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
5✔
619
                        id, result.Tx.TxHash(), result.Err)
5✔
620

621
        case TxConfirmed:
6✔
622
                // Remove the record is the tx is confirmed.
6✔
623
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
6✔
624
                        result.Tx.TxHash())
6✔
625

626
        // Do nothing if it's neither failed or confirmed.
627
        default:
7✔
628
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
7✔
629
                        result.Event)
7✔
630

7✔
631
                return
7✔
632
        }
633

634
        t.records.Delete(id)
8✔
635
        t.subscriberChans.Delete(id)
8✔
636
}
637

638
// handleResult handles the result of a tx broadcast. It will notify the
639
// subscriber and remove the record if the tx is confirmed or failed to be
640
// broadcast.
641
func (t *TxPublisher) handleResult(result *BumpResult) {
9✔
642
        // Notify the subscriber.
9✔
643
        t.notifyResult(result)
9✔
644

9✔
645
        // Remove the record if it's failed or confirmed.
9✔
646
        t.removeResult(result)
9✔
647
}
9✔
648

649
// monitorRecord is used to keep track of the tx being monitored by the
650
// publisher internally.
651
type monitorRecord struct {
652
        // tx is the tx being monitored.
653
        tx *wire.MsgTx
654

655
        // req is the original request.
656
        req *BumpRequest
657

658
        // feeFunction is the fee bumping algorithm used by the publisher.
659
        feeFunction FeeFunction
660

661
        // fee is the fee paid by the tx.
662
        fee btcutil.Amount
663
}
664

665
// Start starts the publisher by subscribing to block epoch updates and kicking
666
// off the monitor loop.
667
func (t *TxPublisher) Start() error {
3✔
668
        log.Info("TxPublisher starting...")
3✔
669
        defer log.Debugf("TxPublisher started")
3✔
670

3✔
671
        blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
3✔
672
        if err != nil {
3✔
673
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
674
        }
×
675

676
        t.wg.Add(1)
3✔
677
        go t.monitor(blockEvent)
3✔
678

3✔
679
        return nil
3✔
680
}
681

682
// Stop stops the publisher and waits for the monitor loop to exit.
683
func (t *TxPublisher) Stop() {
3✔
684
        log.Info("TxPublisher stopping...")
3✔
685
        defer log.Debugf("TxPublisher stopped")
3✔
686

3✔
687
        close(t.quit)
3✔
688

3✔
689
        t.wg.Wait()
3✔
690
}
3✔
691

692
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
693
// it will examine all the txns being monitored, and check if any of them needs
694
// to be bumped. If so, it will attempt to bump the fee of the tx.
695
//
696
// NOTE: Must be run as a goroutine.
697
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
3✔
698
        defer blockEvent.Cancel()
3✔
699
        defer t.wg.Done()
3✔
700

3✔
701
        for {
6✔
702
                select {
3✔
703
                case epoch, ok := <-blockEvent.Epochs:
3✔
704
                        if !ok {
3✔
705
                                // We should stop the publisher before stopping
×
706
                                // the chain service. Otherwise it indicates an
×
707
                                // error.
×
708
                                log.Error("Block epoch channel closed, exit " +
×
709
                                        "monitor")
×
710

×
711
                                return
×
712
                        }
×
713

714
                        log.Debugf("TxPublisher received new block: %v",
3✔
715
                                epoch.Height)
3✔
716

3✔
717
                        // Update the best known height for the publisher.
3✔
718
                        t.currentHeight.Store(epoch.Height)
3✔
719

3✔
720
                        // Check all monitored txns to see if any of them needs
3✔
721
                        // to be bumped.
3✔
722
                        t.processRecords()
3✔
723

724
                case <-t.quit:
3✔
725
                        log.Debug("Fee bumper stopped, exit monitor")
3✔
726
                        return
3✔
727
                }
728
        }
729
}
730

731
// processRecords checks all the txns being monitored, and checks if any of
732
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
733
func (t *TxPublisher) processRecords() {
4✔
734
        // confirmedRecords stores a map of the records which have been
4✔
735
        // confirmed.
4✔
736
        confirmedRecords := make(map[uint64]*monitorRecord)
4✔
737

4✔
738
        // feeBumpRecords stores a map of the records which need to be bumped.
4✔
739
        feeBumpRecords := make(map[uint64]*monitorRecord)
4✔
740

4✔
741
        // failedRecords stores a map of the records which has inputs being
4✔
742
        // spent by a third party.
4✔
743
        //
4✔
744
        // NOTE: this is only used for neutrino backend.
4✔
745
        failedRecords := make(map[uint64]*monitorRecord)
4✔
746

4✔
747
        // visitor is a helper closure that visits each record and divides them
4✔
748
        // into two groups.
4✔
749
        visitor := func(requestID uint64, r *monitorRecord) error {
9✔
750
                log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
5✔
751
                        r.tx.TxHash())
5✔
752

5✔
753
                // If the tx is already confirmed, we can stop monitoring it.
5✔
754
                if t.isConfirmed(r.tx.TxHash()) {
9✔
755
                        confirmedRecords[requestID] = r
4✔
756

4✔
757
                        // Move to the next record.
4✔
758
                        return nil
4✔
759
                }
4✔
760

761
                // Check whether the inputs has been spent by a third party.
762
                //
763
                // NOTE: this check is only done for neutrino backend.
764
                if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
5✔
765
                        failedRecords[requestID] = r
1✔
766

1✔
767
                        // Move to the next record.
1✔
768
                        return nil
1✔
769
                }
1✔
770

771
                feeBumpRecords[requestID] = r
4✔
772

4✔
773
                // Return nil to move to the next record.
4✔
774
                return nil
4✔
775
        }
776

777
        // Iterate through all the records and divide them into two groups.
778
        t.records.ForEach(visitor)
4✔
779

4✔
780
        // For records that are confirmed, we'll notify the caller about this
4✔
781
        // result.
4✔
782
        for requestID, r := range confirmedRecords {
8✔
783
                rec := r
4✔
784

4✔
785
                log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
4✔
786
                t.wg.Add(1)
4✔
787
                go t.handleTxConfirmed(rec, requestID)
4✔
788
        }
4✔
789

790
        // Get the current height to be used in the following goroutines.
791
        currentHeight := t.currentHeight.Load()
4✔
792

4✔
793
        // For records that are not confirmed, we perform a fee bump if needed.
4✔
794
        for requestID, r := range feeBumpRecords {
8✔
795
                rec := r
4✔
796

4✔
797
                log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
4✔
798
                t.wg.Add(1)
4✔
799
                go t.handleFeeBumpTx(requestID, rec, currentHeight)
4✔
800
        }
4✔
801

802
        // For records that are failed, we'll notify the caller about this
803
        // result.
804
        for requestID, r := range failedRecords {
5✔
805
                rec := r
1✔
806

1✔
807
                log.Debugf("Tx=%v has inputs been spent by a third party, "+
1✔
808
                        "failing it now", r.tx.TxHash())
1✔
809
                t.wg.Add(1)
1✔
810
                go t.handleThirdPartySpent(rec, requestID)
1✔
811
        }
1✔
812
}
813

814
// handleTxConfirmed is called when a monitored tx is confirmed. It will
815
// notify the subscriber then remove the record from the maps .
816
//
817
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
818
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
5✔
819
        defer t.wg.Done()
5✔
820

5✔
821
        // Create a result that will be sent to the resultChan which is
5✔
822
        // listened by the caller.
5✔
823
        result := &BumpResult{
5✔
824
                Event:     TxConfirmed,
5✔
825
                Tx:        r.tx,
5✔
826
                requestID: requestID,
5✔
827
                Fee:       r.fee,
5✔
828
                FeeRate:   r.feeFunction.FeeRate(),
5✔
829
        }
5✔
830

5✔
831
        // Notify that this tx is confirmed and remove the record from the map.
5✔
832
        t.handleResult(result)
5✔
833
}
5✔
834

835
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
836
// attempt to bump the fee of the tx.
837
//
838
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
839
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
840
        currentHeight int32) {
7✔
841

7✔
842
        defer t.wg.Done()
7✔
843

7✔
844
        oldTxid := r.tx.TxHash()
7✔
845

7✔
846
        // Get the current conf target for this record.
7✔
847
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
7✔
848

7✔
849
        // Ask the fee function whether a bump is needed. We expect the fee
7✔
850
        // function to increase its returned fee rate after calling this
7✔
851
        // method.
7✔
852
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
7✔
853
        if err != nil {
11✔
854
                // TODO(yy): send this error back to the sweeper so it can
4✔
855
                // re-group the inputs?
4✔
856
                log.Errorf("Failed to increase fee rate for tx %v at "+
4✔
857
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
4✔
858

4✔
859
                return
4✔
860
        }
4✔
861

862
        // If the fee rate was not increased, there's no need to bump the fee.
863
        if !increased {
10✔
864
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
4✔
865
                        t.currentHeight.Load())
4✔
866

4✔
867
                return
4✔
868
        }
4✔
869

870
        // The fee function now has a new fee rate, we will use it to bump the
871
        // fee of the tx.
872
        resultOpt := t.createAndPublishTx(requestID, r)
5✔
873

5✔
874
        // If there's a result, we will notify the caller about the result.
5✔
875
        resultOpt.WhenSome(func(result BumpResult) {
10✔
876
                // Notify the new result.
5✔
877
                t.handleResult(&result)
5✔
878
        })
5✔
879
}
880

881
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
882
// spent. It will notify the subscriber then remove the record from the maps
883
// and send a TxFailed event to the subscriber.
884
//
885
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
886
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
887
        requestID uint64) {
1✔
888

1✔
889
        defer t.wg.Done()
1✔
890

1✔
891
        // Create a result that will be sent to the resultChan which is
1✔
892
        // listened by the caller.
1✔
893
        //
1✔
894
        // TODO(yy): create a new state `TxThirdPartySpent` to notify the
1✔
895
        // sweeper to remove the input, hence moving the monitoring of inputs
1✔
896
        // spent inside the fee bumper.
1✔
897
        result := &BumpResult{
1✔
898
                Event:     TxFailed,
1✔
899
                Tx:        r.tx,
1✔
900
                requestID: requestID,
1✔
901
                Err:       ErrThirdPartySpent,
1✔
902
        }
1✔
903

1✔
904
        // Notify that this tx is confirmed and remove the record from the map.
1✔
905
        t.handleResult(result)
1✔
906
}
1✔
907

908
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
909
// to the network. It will update the record with the new tx and fee rate if
910
// successfully created, and return the result when published successfully.
911
func (t *TxPublisher) createAndPublishTx(requestID uint64,
912
        r *monitorRecord) fn.Option[BumpResult] {
10✔
913

10✔
914
        // Fetch the old tx.
10✔
915
        oldTx := r.tx
10✔
916

10✔
917
        // Create a new tx with the new fee rate.
10✔
918
        //
10✔
919
        // NOTE: The fee function is expected to have increased its returned
10✔
920
        // fee rate after calling the SkipFeeBump method. So we can use it
10✔
921
        // directly here.
10✔
922
        tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
10✔
923

10✔
924
        // If the error is fee related, we will return no error and let the fee
10✔
925
        // bumper retry it at next block.
10✔
926
        //
10✔
927
        // NOTE: we can check the RBF error here and ask the fee function to
10✔
928
        // recalculate the fee rate. However, this would defeat the purpose of
10✔
929
        // using a deadline based fee function:
10✔
930
        // - if the deadline is far away, there's no rush to RBF the tx.
10✔
931
        // - if the deadline is close, we expect the fee function to give us a
10✔
932
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
10✔
933
        //   means the budget is not enough.
10✔
934
        if errors.Is(err, chain.ErrInsufficientFee) ||
10✔
935
                errors.Is(err, lnwallet.ErrMempoolFee) {
14✔
936

4✔
937
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
4✔
938
                return fn.None[BumpResult]()
4✔
939
        }
4✔
940

941
        // If the error is not fee related, we will return a `TxFailed` event
942
        // so this input can be retried.
943
        if err != nil {
11✔
944
                // If the tx doesn't not have enought budget, we will return a
3✔
945
                // result so the sweeper can handle it by re-clustering the
3✔
946
                // utxos.
3✔
947
                if errors.Is(err, ErrNotEnoughBudget) {
4✔
948
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
1✔
949
                                err)
1✔
950
                } else {
3✔
951
                        // Otherwise, an unexpected error occurred, we will
2✔
952
                        // fail the tx and let the sweeper retry the whole
2✔
953
                        // process.
2✔
954
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
2✔
955
                                err)
2✔
956
                }
2✔
957

958
                return fn.Some(BumpResult{
3✔
959
                        Event:     TxFailed,
3✔
960
                        Tx:        oldTx,
3✔
961
                        Err:       err,
3✔
962
                        requestID: requestID,
3✔
963
                })
3✔
964
        }
965

966
        // The tx has been created without any errors, we now register a new
967
        // record by overwriting the same requestID.
968
        t.records.Store(requestID, &monitorRecord{
7✔
969
                tx:          tx,
7✔
970
                req:         r.req,
7✔
971
                feeFunction: r.feeFunction,
7✔
972
                fee:         fee,
7✔
973
        })
7✔
974

7✔
975
        // Attempt to broadcast this new tx.
7✔
976
        result, err := t.broadcast(requestID)
7✔
977
        if err != nil {
7✔
978
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
979
                        tx.TxHash(), err)
×
980

×
981
                return fn.None[BumpResult]()
×
982
        }
×
983

984
        // If the result error is fee related, we will return no error and let
985
        // the fee bumper retry it at next block.
986
        //
987
        // NOTE: we may get this error if we've bypassed the mempool check,
988
        // which means we are suing neutrino backend.
989
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
7✔
990
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
8✔
991

1✔
992
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
1✔
993
                return fn.None[BumpResult]()
1✔
994
        }
1✔
995

996
        // A successful replacement tx is created, attach the old tx.
997
        result.ReplacedTx = oldTx
7✔
998

7✔
999
        // If the new tx failed to be published, we will return the result so
7✔
1000
        // the caller can handle it.
7✔
1001
        if result.Event == TxFailed {
8✔
1002
                return fn.Some(*result)
1✔
1003
        }
1✔
1004

1005
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(), tx.TxHash())
6✔
1006

6✔
1007
        // Otherwise, it's a successful RBF, set the event and return.
6✔
1008
        result.Event = TxReplaced
6✔
1009

6✔
1010
        return fn.Some(*result)
6✔
1011
}
1012

1013
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
1014
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
5✔
1015
        details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
5✔
1016
        if err != nil {
8✔
1017
                log.Warnf("Failed to get tx details for %v: %v", txid, err)
3✔
1018
                return false
3✔
1019
        }
3✔
1020

1021
        return details.NumConfirmations > 0
5✔
1022
}
1023

1024
// isThirdPartySpent checks whether the inputs of the tx has already been spent
1025
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
1026
// then it must be spent by a different tx other than the sweeping tx here.
1027
//
1028
// NOTE: this check is only performed for neutrino backend as it has no
1029
// reliable way to tell a tx has been replaced.
1030
func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
1031
        inputs []input.Input) bool {
4✔
1032

4✔
1033
        // Skip this check for if this is not neutrino backend.
4✔
1034
        if !t.isNeutrinoBackend() {
7✔
1035
                return false
3✔
1036
        }
3✔
1037

1038
        // Iterate all the inputs and check if they have been spent already.
1039
        for _, inp := range inputs {
2✔
1040
                op := inp.OutPoint()
1✔
1041

1✔
1042
                // For wallet utxos, the height hint is not set - we don't need
1✔
1043
                // to monitor them for third party spend.
1✔
1044
                heightHint := inp.HeightHint()
1✔
1045
                if heightHint == 0 {
2✔
1046
                        log.Debugf("Skipped third party check for wallet "+
1✔
1047
                                "input %v", op)
1✔
1048

1✔
1049
                        continue
1✔
1050
                }
1051

1052
                // If the input has already been spent after the height hint, a
1053
                // spend event is sent back immediately.
1054
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
1✔
1055
                        &op, inp.SignDesc().Output.PkScript, heightHint,
1✔
1056
                )
1✔
1057
                if err != nil {
1✔
1058
                        log.Criticalf("Failed to register spend ntfn for "+
×
1059
                                "input=%v: %v", op, err)
×
1060
                        return false
×
1061
                }
×
1062

1063
                // Remove the subscription when exit.
1064
                defer spendEvent.Cancel()
1✔
1065

1✔
1066
                // Do a non-blocking read to see if the output has been spent.
1✔
1067
                select {
1✔
1068
                case spend, ok := <-spendEvent.Spend:
1✔
1069
                        if !ok {
1✔
1070
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1071
                                return false
×
1072
                        }
×
1073

1074
                        spendingTxID := spend.SpendingTx.TxHash()
1✔
1075

1✔
1076
                        // If the spending tx is the same as the sweeping tx
1✔
1077
                        // then we are good.
1✔
1078
                        if spendingTxID == txid {
1✔
UNCOV
1079
                                continue
×
1080
                        }
1081

1082
                        log.Warnf("Detected third party spent of output=%v "+
1✔
1083
                                "in tx=%v", op, spend.SpendingTx.TxHash())
1✔
1084

1✔
1085
                        return true
1✔
1086

1087
                // Move to the next input.
1088
                default:
1✔
1089
                }
1090
        }
1091

1092
        return false
1✔
1093
}
1094

1095
// calcCurrentConfTarget calculates the current confirmation target based on
1096
// the deadline height. The conf target is capped at 0 if the deadline has
1097
// already been past.
1098
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
14✔
1099
        var confTarget uint32
14✔
1100

14✔
1101
        // Calculate how many blocks left until the deadline.
14✔
1102
        deadlineDelta := deadline - currentHeight
14✔
1103

14✔
1104
        // If we are already past the deadline, we will set the conf target to
14✔
1105
        // be 1.
14✔
1106
        if deadlineDelta < 0 {
21✔
1107
                log.Warnf("Deadline is %d blocks behind current height %v",
7✔
1108
                        -deadlineDelta, currentHeight)
7✔
1109

7✔
1110
                confTarget = 0
7✔
1111
        } else {
17✔
1112
                confTarget = uint32(deadlineDelta)
10✔
1113
        }
10✔
1114

1115
        return confTarget
14✔
1116
}
1117

1118
// createSweepTx creates a sweeping tx based on the given inputs, change
1119
// address and fee rate.
1120
func (t *TxPublisher) createSweepTx(inputs []input.Input, changePkScript []byte,
1121
        feeRate chainfee.SatPerKWeight) (*wire.MsgTx, btcutil.Amount, error) {
25✔
1122

25✔
1123
        // Validate and calculate the fee and change amount.
25✔
1124
        txFee, changeAmtOpt, locktimeOpt, err := prepareSweepTx(
25✔
1125
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
25✔
1126
        )
25✔
1127
        if err != nil {
28✔
1128
                return nil, 0, err
3✔
1129
        }
3✔
1130

1131
        var (
25✔
1132
                // Create the sweep transaction that we will be building. We
25✔
1133
                // use version 2 as it is required for CSV.
25✔
1134
                sweepTx = wire.NewMsgTx(2)
25✔
1135

25✔
1136
                // We'll add the inputs as we go so we know the final ordering
25✔
1137
                // of inputs to sign.
25✔
1138
                idxs []input.Input
25✔
1139
        )
25✔
1140

25✔
1141
        // We start by adding all inputs that commit to an output. We do this
25✔
1142
        // since the input and output index must stay the same for the
25✔
1143
        // signatures to be valid.
25✔
1144
        for _, o := range inputs {
50✔
1145
                if o.RequiredTxOut() == nil {
50✔
1146
                        continue
25✔
1147
                }
1148

1149
                idxs = append(idxs, o)
3✔
1150
                sweepTx.AddTxIn(&wire.TxIn{
3✔
1151
                        PreviousOutPoint: o.OutPoint(),
3✔
1152
                        Sequence:         o.BlocksToMaturity(),
3✔
1153
                })
3✔
1154
                sweepTx.AddTxOut(o.RequiredTxOut())
3✔
1155
        }
1156

1157
        // Sum up the value contained in the remaining inputs, and add them to
1158
        // the sweep transaction.
1159
        for _, o := range inputs {
50✔
1160
                if o.RequiredTxOut() != nil {
28✔
1161
                        continue
3✔
1162
                }
1163

1164
                idxs = append(idxs, o)
25✔
1165
                sweepTx.AddTxIn(&wire.TxIn{
25✔
1166
                        PreviousOutPoint: o.OutPoint(),
25✔
1167
                        Sequence:         o.BlocksToMaturity(),
25✔
1168
                })
25✔
1169
        }
1170

1171
        // If there's a change amount, add it to the transaction.
1172
        changeAmtOpt.WhenSome(func(changeAmt btcutil.Amount) {
50✔
1173
                sweepTx.AddTxOut(&wire.TxOut{
25✔
1174
                        PkScript: changePkScript,
25✔
1175
                        Value:    int64(changeAmt),
25✔
1176
                })
25✔
1177
        })
25✔
1178

1179
        // We'll default to using the current block height as locktime, if none
1180
        // of the inputs commits to a different locktime.
1181
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
25✔
1182

25✔
1183
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
25✔
1184
        if err != nil {
25✔
1185
                return nil, 0, fmt.Errorf("error creating prev input fetcher "+
×
1186
                        "for hash cache: %v", err)
×
1187
        }
×
1188
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
25✔
1189

25✔
1190
        // With all the inputs in place, use each output's unique input script
25✔
1191
        // function to generate the final witness required for spending.
25✔
1192
        addInputScript := func(idx int, tso input.Input) error {
50✔
1193
                inputScript, err := tso.CraftInputScript(
25✔
1194
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
25✔
1195
                )
25✔
1196
                if err != nil {
25✔
1197
                        return err
×
1198
                }
×
1199

1200
                sweepTx.TxIn[idx].Witness = inputScript.Witness
25✔
1201

25✔
1202
                if len(inputScript.SigScript) == 0 {
50✔
1203
                        return nil
25✔
1204
                }
25✔
1205

1206
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1207

×
1208
                return nil
×
1209
        }
1210

1211
        for idx, inp := range idxs {
50✔
1212
                if err := addInputScript(idx, inp); err != nil {
25✔
1213
                        return nil, 0, err
×
1214
                }
×
1215
        }
1216

1217
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
25✔
1218
                inputTypeSummary(inputs))
25✔
1219

25✔
1220
        return sweepTx, txFee, nil
25✔
1221
}
1222

1223
// prepareSweepTx returns the tx fee, an optional change amount and an optional
1224
// locktime after a series of validations:
1225
// 1. check the locktime has been reached.
1226
// 2. check the locktimes are the same.
1227
// 3. check the inputs cover the outputs.
1228
//
1229
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1230
func prepareSweepTx(inputs []input.Input, changePkScript []byte,
1231
        feeRate chainfee.SatPerKWeight, currentHeight int32) (
1232
        btcutil.Amount, fn.Option[btcutil.Amount], fn.Option[int32], error) {
25✔
1233

25✔
1234
        noChange := fn.None[btcutil.Amount]()
25✔
1235
        noLocktime := fn.None[int32]()
25✔
1236

25✔
1237
        // Creating a weight estimator with nil outputs and zero max fee rate.
25✔
1238
        // We don't allow adding customized outputs in the sweeping tx, and the
25✔
1239
        // fee rate is already being managed before we get here.
25✔
1240
        inputs, estimator, err := getWeightEstimate(
25✔
1241
                inputs, nil, feeRate, 0, changePkScript,
25✔
1242
        )
25✔
1243
        if err != nil {
25✔
1244
                return 0, noChange, noLocktime, err
×
1245
        }
×
1246

1247
        txFee := estimator.fee()
25✔
1248

25✔
1249
        var (
25✔
1250
                // Track whether any of the inputs require a certain locktime.
25✔
1251
                locktime = int32(-1)
25✔
1252

25✔
1253
                // We keep track of total input amount, and required output
25✔
1254
                // amount to use for calculating the change amount below.
25✔
1255
                totalInput     btcutil.Amount
25✔
1256
                requiredOutput btcutil.Amount
25✔
1257
        )
25✔
1258

25✔
1259
        // Go through each input and check if the required lock times have
25✔
1260
        // reached and are the same.
25✔
1261
        for _, o := range inputs {
50✔
1262
                // If the input has a required output, we'll add it to the
25✔
1263
                // required output amount.
25✔
1264
                if o.RequiredTxOut() != nil {
28✔
1265
                        requiredOutput += btcutil.Amount(
3✔
1266
                                o.RequiredTxOut().Value,
3✔
1267
                        )
3✔
1268
                }
3✔
1269

1270
                // Update the total input amount.
1271
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
25✔
1272

25✔
1273
                lt, ok := o.RequiredLockTime()
25✔
1274

25✔
1275
                // Skip if the input doesn't require a lock time.
25✔
1276
                if !ok {
50✔
1277
                        continue
25✔
1278
                }
1279

1280
                // Check if the lock time has reached
1281
                if lt > uint32(currentHeight) {
3✔
1282
                        return 0, noChange, noLocktime, ErrLocktimeImmature
×
1283
                }
×
1284

1285
                // If another input commits to a different locktime, they
1286
                // cannot be combined in the same transaction.
1287
                if locktime != -1 && locktime != int32(lt) {
3✔
1288
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1289
                }
×
1290

1291
                // Update the locktime for next iteration.
1292
                locktime = int32(lt)
3✔
1293
        }
1294

1295
        // Make sure total output amount is less than total input amount.
1296
        if requiredOutput+txFee > totalInput {
25✔
1297
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
1298
                        "input to create sweep tx: input_sum=%v, "+
×
1299
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
1300
        }
×
1301

1302
        // The value remaining after the required output and fees is the
1303
        // change output.
1304
        changeAmt := totalInput - requiredOutput - txFee
25✔
1305
        changeAmtOpt := fn.Some(changeAmt)
25✔
1306

25✔
1307
        // We'll calculate the dust limit for the given changePkScript since it
25✔
1308
        // is variable.
25✔
1309
        changeFloor := lnwallet.DustLimitForSize(len(changePkScript))
25✔
1310

25✔
1311
        // If the change amount is dust, we'll move it into the fees.
25✔
1312
        if changeAmt < changeFloor {
28✔
1313
                log.Infof("Change amt %v below dustlimit %v, not adding "+
3✔
1314
                        "change output", changeAmt, changeFloor)
3✔
1315

3✔
1316
                // If there's no required output, and the change output is a
3✔
1317
                // dust, it means we are creating a tx without any outputs. In
3✔
1318
                // this case we'll return an error. This could happen when
3✔
1319
                // creating a tx that has an anchor as the only input.
3✔
1320
                if requiredOutput == 0 {
6✔
1321
                        return 0, noChange, noLocktime, ErrTxNoOutput
3✔
1322
                }
3✔
1323

1324
                // The dust amount is added to the fee.
1325
                txFee += changeAmt
×
1326

×
1327
                // Set the change amount to none.
×
1328
                changeAmtOpt = fn.None[btcutil.Amount]()
×
1329
        }
1330

1331
        // Optionally set the locktime.
1332
        locktimeOpt := fn.Some(locktime)
25✔
1333
        if locktime == -1 {
50✔
1334
                locktimeOpt = noLocktime
25✔
1335
        }
25✔
1336

1337
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
25✔
1338
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
25✔
1339
                "parents_fee=%v, parents_weight=%v, current_height=%v",
25✔
1340
                len(inputs), inputTypeSummary(inputs), feeRate,
25✔
1341
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
25✔
1342
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
25✔
1343

25✔
1344
        return txFee, changeAmtOpt, locktimeOpt, nil
25✔
1345
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc