• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.74
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/fn"
17
        "github.com/lightningnetwork/lnd/input"
18
        "github.com/lightningnetwork/lnd/labels"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnutils"
21
        "github.com/lightningnetwork/lnd/lnwallet"
22
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
23
)
24

25
var (
26
        // ErrInvalidBumpResult is returned when the bump result is invalid.
27
        ErrInvalidBumpResult = errors.New("invalid bump result")
28

29
        // ErrNotEnoughBudget is returned when the fee bumper decides the
30
        // current budget cannot cover the fee.
31
        ErrNotEnoughBudget = errors.New("not enough budget")
32

33
        // ErrLocktimeImmature is returned when sweeping an input whose
34
        // locktime is not reached.
35
        ErrLocktimeImmature = errors.New("immature input")
36

37
        // ErrTxNoOutput is returned when an output cannot be created during tx
38
        // preparation, usually due to the output being dust.
39
        ErrTxNoOutput = errors.New("tx has no output")
40

41
        // ErrThirdPartySpent is returned when a third party has spent the
42
        // input in the sweeping tx.
43
        ErrThirdPartySpent = errors.New("third party spent the output")
44
)
45

46
// Bumper defines an interface that can be used by other subsystems for fee
47
// bumping.
48
type Bumper interface {
49
        // Broadcast is used to publish the tx created from the given inputs
50
        // specified in the request. It handles the tx creation, broadcasts it,
51
        // and monitors its confirmation status for potential fee bumping. It
52
        // returns a chan that the caller can use to receive updates about the
53
        // broadcast result and potential RBF attempts.
54
        Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
55
}
56

57
// BumpEvent represents the event of a fee bumping attempt.
58
type BumpEvent uint8
59

60
const (
61
        // TxPublished is sent when the broadcast attempt is finished.
62
        TxPublished BumpEvent = iota
63

64
        // TxFailed is sent when the broadcast attempt fails.
65
        TxFailed
66

67
        // TxReplaced is sent when the original tx is replaced by a new one.
68
        TxReplaced
69

70
        // TxConfirmed is sent when the tx is confirmed.
71
        TxConfirmed
72

73
        // sentinalEvent is used to check if an event is unknown.
74
        sentinalEvent
75
)
76

77
// String returns a human-readable string for the event.
78
func (e BumpEvent) String() string {
79
        switch e {
80
        case TxPublished:
81
                return "Published"
82
        case TxFailed:
83
                return "Failed"
84
        case TxReplaced:
85
                return "Replaced"
86
        case TxConfirmed:
87
                return "Confirmed"
88
        default:
89
                return "Unknown"
90
        }
91
}
UNCOV
92

×
UNCOV
93
// Unknown returns true if the event is unknown.
×
UNCOV
94
func (e BumpEvent) Unknown() bool {
×
UNCOV
95
        return e >= sentinalEvent
×
UNCOV
96
}
×
UNCOV
97

×
UNCOV
98
// BumpRequest is used by the caller to give the Bumper the necessary info to
×
UNCOV
99
// create and manage potential fee bumps for a set of inputs.
×
UNCOV
100
type BumpRequest struct {
×
UNCOV
101
        // Budget givens the total amount that can be used as fees by these
×
UNCOV
102
        // inputs.
×
UNCOV
103
        Budget btcutil.Amount
×
104

105
        // Inputs is the set of inputs to sweep.
106
        Inputs []input.Input
107

108
        // DeadlineHeight is the block height at which the tx should be
8✔
109
        // confirmed.
8✔
110
        DeadlineHeight int32
8✔
111

112
        // DeliveryAddress is the script to send the change output to.
113
        DeliveryAddress []byte
114

115
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
116
        MaxFeeRate chainfee.SatPerKWeight
117

118
        // StartingFeeRate is an optional parameter that can be used to specify
119
        // the initial fee rate to use for the fee function.
120
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
121
}
122

123
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
124
// request. It calculates the feerate using the supplied budget and the weight,
125
// compares it with the specified MaxFeeRate, and returns the smaller of the
126
// two.
127
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
128
        // Get the size of the sweep tx, which will be used to calculate the
129
        // budget fee rate.
130
        size, err := calcSweepTxWeight(r.Inputs, r.DeliveryAddress)
131
        if err != nil {
132
                return 0, err
133
        }
134

135
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
136
        // This is needed as, when the input has a large value and the user
137
        // sets the budget to be proportional to the input value, the fee rate
138
        // can be very high and we need to make sure it doesn't exceed the max
139
        // fee rate.
140
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
141
        if maxFeeRateAllowed > r.MaxFeeRate {
142
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
143
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
144
                        r.MaxFeeRate, size)
145

8✔
146
                return r.MaxFeeRate, nil
8✔
147
        }
8✔
148

15✔
149
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
7✔
150
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
7✔
UNCOV
151

×
UNCOV
152
        return maxFeeRateAllowed, nil
×
153
}
154

155
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
156
// sweeping tx always has a single output(change).
8✔
157
func calcSweepTxWeight(inputs []input.Input,
8✔
158
        outputPkScript []byte) (lntypes.WeightUnit, error) {
8✔
159

8✔
160
        // Use a const fee rate as we only use the weight estimator to
8✔
161
        // calculate the size.
8✔
162
        const feeRate = 1
8✔
163

8✔
UNCOV
164
        // Initialize the tx weight estimator with,
×
UNCOV
165
        // - nil outputs as we only have one single change output.
×
UNCOV
166
        // - const fee rate as we don't care about the fees here.
×
UNCOV
167
        // - 0 maxfeerate as we don't care about fees here.
×
168
        //
169
        // TODO(yy): we should refactor the weight estimator to not require a
170
        // fee rate and max fee rate and make it a pure tx weight calculator.
171
        _, estimator, err := getWeightEstimate(
8✔
172
                inputs, nil, feeRate, 0, outputPkScript,
8✔
173
        )
8✔
174
        if err != nil {
9✔
175
                return 0, err
1✔
176
        }
1✔
177

178
        return estimator.weight(), nil
179
}
180

181
// BumpResult is used by the Bumper to send updates about the tx being
182
// broadcast.
183
type BumpResult struct {
7✔
184
        // Event is the type of event that the result is for.
8✔
185
        Event BumpEvent
1✔
186

1✔
187
        // Tx is the tx being broadcast.
1✔
188
        Tx *wire.MsgTx
1✔
189

1✔
190
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
1✔
191
        ReplacedTx *wire.MsgTx
192

6✔
193
        // FeeRate is the fee rate used for the new tx.
6✔
194
        FeeRate chainfee.SatPerKWeight
6✔
195

6✔
196
        // Fee is the fee paid by the new tx.
197
        Fee btcutil.Amount
198

199
        // Err is the error that occurred during the broadcast.
200
        Err error
201

11✔
202
        // requestID is the ID of the request that created this record.
11✔
203
        requestID uint64
11✔
204
}
11✔
205

11✔
206
// Validate validates the BumpResult so it's safe to use.
11✔
207
func (b *BumpResult) Validate() error {
11✔
208
        // Every result must have a tx.
11✔
209
        if b.Tx == nil {
11✔
210
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
11✔
211
        }
11✔
212

11✔
213
        // Every result must have a known event.
11✔
214
        if b.Event.Unknown() {
11✔
215
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
11✔
216
        }
11✔
217

13✔
218
        // If it's a replacing event, it must have a replaced tx.
2✔
219
        if b.Event == TxReplaced && b.ReplacedTx == nil {
2✔
220
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
221
        }
9✔
222

223
        // If it's a failed event, it must have an error.
224
        if b.Event == TxFailed && b.Err == nil {
225
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
226
        }
227

228
        // If it's a confirmed event, it must have a fee rate and fee.
229
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
230
                return fmt.Errorf("%w: missing fee rate or fee",
231
                        ErrInvalidBumpResult)
232
        }
233

234
        return nil
235
}
236

237
// TxPublisherConfig is the config used to create a new TxPublisher.
238
type TxPublisherConfig struct {
239
        // Signer is used to create the tx signature.
240
        Signer input.Signer
241

242
        // Wallet is used primarily to publish the tx.
243
        Wallet Wallet
244

245
        // Estimator is used to estimate the fee rate for the new tx based on
246
        // its deadline conf target.
247
        Estimator chainfee.Estimator
248

249
        // Notifier is used to monitor the confirmation status of the tx.
250
        Notifier chainntnfs.ChainNotifier
9✔
251
}
9✔
252

10✔
253
// TxPublisher is an implementation of the Bumper interface. It utilizes the
1✔
254
// `testmempoolaccept` RPC to bump the fee of txns it created based on
1✔
255
// different fee function selected or configed by the caller. Its purpose is to
256
// take a list of inputs specified, and create a tx that spends them to a
257
// specified output. It will then monitor the confirmation status of the tx,
9✔
258
// and if it's not confirmed within a certain time frame, it will attempt to
1✔
259
// bump the fee of the tx by creating a new tx that spends the same inputs to
1✔
260
// the same output, but with a higher fee rate. It will continue to do this
261
// until the tx is confirmed or the fee rate reaches the maximum fee rate
262
// specified by the caller.
8✔
263
type TxPublisher struct {
1✔
264
        started atomic.Bool
1✔
265
        stopped atomic.Bool
266

267
        wg sync.WaitGroup
7✔
268

1✔
269
        // cfg specifies the configuration of the TxPublisher.
1✔
270
        cfg *TxPublisherConfig
271

272
        // currentHeight is the current block height.
6✔
273
        currentHeight atomic.Int32
1✔
274

1✔
275
        // records is a map keyed by the requestCounter and the value is the tx
1✔
276
        // being monitored.
277
        records lnutils.SyncMap[uint64, *monitorRecord]
4✔
278

279
        // requestCounter is a monotonically increasing counter used to keep
280
        // track of how many requests have been made.
281
        requestCounter atomic.Uint64
282

283
        // subscriberChans is a map keyed by the requestCounter, each item is
284
        // the chan that the publisher sends the fee bump result to.
285
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
286

287
        // quit is used to signal the publisher to stop.
288
        quit chan struct{}
289
}
290

291
// Compile-time constraint to ensure TxPublisher implements Bumper.
292
var _ Bumper = (*TxPublisher)(nil)
293

294
// NewTxPublisher creates a new TxPublisher.
295
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
296
        return &TxPublisher{
297
                cfg:             &cfg,
298
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
299
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
300
                quit:            make(chan struct{}),
301
        }
302
}
303

304
// isNeutrinoBackend checks if the wallet backend is neutrino.
305
func (t *TxPublisher) isNeutrinoBackend() bool {
306
        return t.cfg.Wallet.BackEnd() == "neutrino"
307
}
308

309
// Broadcast is used to publish the tx created from the given inputs. It will,
310
// 1. init a fee function based on the given strategy.
311
// 2. create an RBF-compliant tx and monitor it for confirmation.
312
// 3. notify the initial broadcast result back to the caller.
313
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
314
// specified cannot cover the fee.
315
//
316
// NOTE: part of the Bumper interface.
317
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
318
        log.Tracef("Received broadcast request: %s", lnutils.SpewLogClosure(
319
                req))
320

321
        // Attempt an initial broadcast which is guaranteed to comply with the
322
        // RBF rules.
323
        result, err := t.initialBroadcast(req)
324
        if err != nil {
325
                log.Errorf("Initial broadcast failed: %v", err)
326

327
                return nil, err
328
        }
329

330
        // Create a chan to send the result to the caller.
331
        subscriber := make(chan *BumpResult, 1)
332
        t.subscriberChans.Store(result.requestID, subscriber)
333

334
        // Send the initial broadcast result to the caller.
335
        t.handleResult(result)
336

337
        return subscriber, nil
338
}
339

340
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
341
// broadcasts it.
342
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
14✔
343
        // Create a fee bumping algorithm to be used for future RBF.
14✔
344
        feeAlgo, err := t.initializeFeeFunction(req)
14✔
345
        if err != nil {
14✔
346
                return nil, fmt.Errorf("init fee function: %w", err)
14✔
347
        }
14✔
348

14✔
349
        // Create the initial tx to be broadcasted. This tx is guaranteed to
14✔
350
        // comply with the RBF restrictions.
351
        requestID, err := t.createRBFCompliantTx(req, feeAlgo)
352
        if err != nil {
1✔
353
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
1✔
354
        }
1✔
355

356
        // Broadcast the tx and return the monitored record.
357
        result, err := t.broadcast(requestID)
358
        if err != nil {
359
                return nil, fmt.Errorf("broadcast sweep tx: %w", err)
360
        }
361

362
        return result, nil
363
}
364

3✔
365
// initializeFeeFunction initializes a fee function to be used for this request
3✔
366
// for future fee bumping.
3✔
367
func (t *TxPublisher) initializeFeeFunction(
3✔
368
        req *BumpRequest) (FeeFunction, error) {
3✔
369

3✔
370
        // Get the max allowed feerate.
3✔
371
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
4✔
372
        if err != nil {
1✔
373
                return nil, err
1✔
374
        }
1✔
375

1✔
376
        // Get the initial conf target.
377
        confTarget := calcCurrentConfTarget(
378
                t.currentHeight.Load(), req.DeadlineHeight,
2✔
379
        )
2✔
380

2✔
381
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
2✔
382
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
2✔
383
                maxFeeRateAllowed)
2✔
384

2✔
385
        // Initialize the fee function and return it.
386
        //
387
        // TODO(yy): return based on differet req.Strategy?
388
        return NewLinearFeeFunction(
389
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
3✔
390
                req.StartingFeeRate,
3✔
391
        )
3✔
392
}
3✔
UNCOV
393

×
UNCOV
394
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
×
395
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
396
// and redo the process until the tx is valid, or return an error when non-RBF
397
// related errors occur or the budget has been used up.
398
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
3✔
399
        f FeeFunction) (uint64, error) {
4✔
400

1✔
401
        for {
1✔
402
                // Create a new tx with the given fee rate and check its
403
                // mempool acceptance.
404
                tx, fee, err := t.createAndCheckTx(req, f)
2✔
405

2✔
UNCOV
406
                switch {
×
UNCOV
407
                case err == nil:
×
408
                        // The tx is valid, return the request ID.
409
                        requestID := t.storeRecord(tx, req, f, fee)
2✔
410

411
                        log.Infof("Created tx %v for %v inputs: feerate=%v, "+
412
                                "fee=%v, inputs=%v", tx.TxHash(),
413
                                len(req.Inputs), f.FeeRate(), fee,
414
                                inputTypeSummary(req.Inputs))
415

5✔
416
                        return requestID, nil
5✔
417

5✔
418
                // If the error indicates the fees paid is not enough, we will
5✔
419
                // ask the fee function to increase the fee rate and retry.
5✔
UNCOV
420
                case errors.Is(err, lnwallet.ErrMempoolFee):
×
UNCOV
421
                        // We should at least start with a feerate above the
×
422
                        // mempool min feerate, so if we get this error, it
423
                        // means something is wrong earlier in the pipeline.
424
                        log.Errorf("Current fee=%v, feerate=%v, %v", fee,
5✔
425
                                f.FeeRate(), err)
5✔
426

5✔
427
                        fallthrough
5✔
428

5✔
429
                // We are not paying enough fees so we increase it.
5✔
430
                case errors.Is(err, chain.ErrInsufficientFee):
5✔
431
                        increased := false
5✔
432

5✔
433
                        // Keep calling the fee function until the fee rate is
5✔
434
                        // increased or maxed out.
5✔
435
                        for !increased {
5✔
436
                                log.Debugf("Increasing fee for next round, "+
5✔
437
                                        "current fee=%v, feerate=%v", fee,
5✔
438
                                        f.FeeRate())
5✔
439

440
                                // If the fee function tells us that we have
441
                                // used up the budget, we will return an error
442
                                // indicating this tx cannot be made. The
443
                                // sweeper should handle this error and try to
444
                                // cluster these inputs differetly.
445
                                increased, err = f.Increment()
446
                                if err != nil {
9✔
447
                                        return 0, err
9✔
448
                                }
21✔
449
                        }
12✔
450

12✔
451
                // TODO(yy): suppose there's only one bad input, we can do a
12✔
452
                // binary search to find out which input is causing this error
12✔
453
                // by recreating a tx using half of the inputs and check its
12✔
454
                // mempool acceptance.
6✔
455
                default:
6✔
456
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
6✔
457
                        return 0, err
6✔
458
                }
6✔
459
        }
6✔
460
}
6✔
461

6✔
462
// storeRecord stores the given record in the records map.
6✔
463
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
6✔
464
        f FeeFunction, fee btcutil.Amount) uint64 {
6✔
465

6✔
466
        // Increase the request counter.
467
        //
468
        // NOTE: this is the only place where we increase the
469
        // counter.
2✔
470
        requestID := t.requestCounter.Add(1)
2✔
471

2✔
472
        // Register the record.
2✔
473
        t.records.Store(requestID, &monitorRecord{
2✔
474
                tx:          tx,
2✔
475
                req:         req,
2✔
476
                feeFunction: f,
2✔
477
                fee:         fee,
478
        })
479

4✔
480
        return requestID
4✔
481
}
4✔
482

4✔
483
// createAndCheckTx creates a tx based on the given inputs, change output
4✔
484
// script, and the fee rate. In addition, it validates the tx's mempool
9✔
485
// acceptance before returning a tx that can be published directly, along with
5✔
486
// its fee.
5✔
487
func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
5✔
488
        *wire.MsgTx, btcutil.Amount, error) {
5✔
489

5✔
490
        // Create the sweep tx with max fee rate of 0 as the fee function
5✔
491
        // guarantees the fee rate used here won't exceed the max fee rate.
5✔
492
        tx, fee, err := t.createSweepTx(
5✔
493
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
5✔
494
        )
5✔
495
        if err != nil {
6✔
496
                return nil, fee, fmt.Errorf("create sweep tx: %w", err)
1✔
497
        }
1✔
498

499
        // Sanity check the budget still covers the fee.
500
        if fee > req.Budget {
501
                return nil, fee, fmt.Errorf("%w: budget=%v, fee=%v",
502
                        ErrNotEnoughBudget, req.Budget, fee)
503
        }
504

2✔
505
        // Validate the tx's mempool acceptance.
2✔
506
        err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
2✔
507

508
        // Exit early if the tx is valid.
509
        if err == nil {
510
                return tx, fee, nil
511
        }
512

513
        // Print an error log if the chain backend doesn't support the mempool
14✔
514
        // acceptance test RPC.
14✔
515
        if errors.Is(err, rpcclient.ErrBackendVersion) {
14✔
516
                log.Errorf("TestMempoolAccept not supported by backend, " +
14✔
517
                        "consider upgrading it to a newer version")
14✔
518
                return tx, fee, nil
14✔
519
        }
14✔
520

14✔
521
        // We are running on a backend that doesn't implement the RPC
14✔
522
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
14✔
523
        if errors.Is(err, chain.ErrUnimplemented) {
14✔
524
                log.Debug("Skipped testmempoolaccept due to not implemented")
14✔
525
                return tx, fee, nil
14✔
526
        }
14✔
527

14✔
528
        return nil, fee, fmt.Errorf("tx=%v failed mempool check: %w",
14✔
529
                tx.TxHash(), err)
14✔
530
}
14✔
531

532
// broadcast takes a monitored tx and publishes it to the network. Prior to the
533
// broadcast, it will subscribe the tx's confirmation notification and attach
534
// the event channel to the record. Any broadcast-related errors will not be
535
// returned here, instead, they will be put inside the `BumpResult` and
536
// returned to the caller.
537
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
22✔
538
        // Get the record being monitored.
22✔
539
        record, ok := t.records.Load(requestID)
22✔
540
        if !ok {
22✔
541
                return nil, fmt.Errorf("tx record %v not found", requestID)
22✔
542
        }
22✔
543

22✔
544
        txid := record.tx.TxHash()
22✔
UNCOV
545

×
UNCOV
546
        tx := record.tx
×
547
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
548
                txid, len(tx.TxIn), t.currentHeight.Load())
549

24✔
550
        // Set the event, and change it to TxFailed if the wallet fails to
2✔
551
        // publish it.
2✔
552
        event := TxPublished
2✔
553

554
        // Publish the sweeping tx with customized label. If the publish fails,
555
        // this error will be saved in the `BumpResult` and it will be removed
556
        // from being monitored.
20✔
557
        err := t.cfg.Wallet.PublishTransaction(
20✔
558
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
20✔
559
        )
20✔
560
        if err != nil {
20✔
561
                // NOTE: we decide to attach this error to the result instead
20✔
562
                // of returning it here because by the time the tx reaches
31✔
563
                // here, it should have passed the mempool acceptance check. If
11✔
564
                // it still fails to be broadcast, it's likely a non-RBF
11✔
565
                // related error happened. So we send this error back to the
566
                // caller so that it can handle it properly.
567
                //
568
                // TODO(yy): find out which input is causing the failure.
9✔
UNCOV
569
                log.Errorf("Failed to publish tx %v: %v", txid, err)
×
UNCOV
570
                event = TxFailed
×
UNCOV
571
        }
×
UNCOV
572

×
573
        result := &BumpResult{
574
                Event:     event,
575
                Tx:        record.tx,
576
                Fee:       record.fee,
9✔
UNCOV
577
                FeeRate:   record.feeFunction.FeeRate(),
×
UNCOV
578
                Err:       err,
×
UNCOV
579
                requestID: requestID,
×
580
        }
581

9✔
582
        return result, nil
9✔
583
}
584

585
// notifyResult sends the result to the resultChan specified by the requestID.
586
// This channel is expected to be read by the caller.
587
func (t *TxPublisher) notifyResult(result *BumpResult) {
588
        id := result.requestID
589
        subscriber, ok := t.subscriberChans.Load(id)
590
        if !ok {
9✔
591
                log.Errorf("Result chan for id=%v not found", id)
9✔
592
                return
9✔
593
        }
10✔
594

1✔
595
        log.Debugf("Sending result for requestID=%v, tx=%v", id,
1✔
596
                result.Tx.TxHash())
597

8✔
598
        select {
8✔
599
        // Send the result to the subscriber.
8✔
600
        //
8✔
601
        // TODO(yy): Add timeout in case it's blocking?
8✔
602
        case subscriber <- result:
8✔
603
        case <-t.quit:
8✔
604
                log.Debug("Fee bumper stopped")
8✔
605
        }
16✔
606
}
8✔
607

8✔
608
// removeResult removes the tracking of the result if the result contains a
8✔
UNCOV
609
// non-nil error, or the tx is confirmed, the record will be removed from the
×
UNCOV
610
// maps.
×
611
func (t *TxPublisher) removeResult(result *BumpResult) {
612
        id := result.requestID
613

614
        // Remove the record from the maps if there's an error. This means this
8✔
615
        // tx has failed its broadcast and cannot be retried. There are two
8✔
616
        // cases,
8✔
617
        // - when the budget cannot cover the fee.
8✔
618
        // - when a non-RBF related error occurs.
8✔
619
        switch result.Event {
8✔
620
        case TxFailed:
8✔
621
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
8✔
622
                        id, result.Tx.TxHash(), result.Err)
11✔
623

3✔
624
        case TxConfirmed:
3✔
625
                // Remove the record is the tx is confirmed.
3✔
626
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
3✔
627
                        result.Tx.TxHash())
3✔
628

3✔
629
        // Do nothing if it's neither failed or confirmed.
3✔
630
        default:
3✔
631
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
3✔
632
                        result.Event)
3✔
633

3✔
634
                return
635
        }
8✔
636

8✔
637
        t.records.Delete(id)
8✔
638
        t.subscriberChans.Delete(id)
8✔
639
}
8✔
640

8✔
641
// handleResult handles the result of a tx broadcast. It will notify the
8✔
642
// subscriber and remove the record if the tx is confirmed or failed to be
8✔
643
// broadcast.
8✔
644
func (t *TxPublisher) handleResult(result *BumpResult) {
8✔
645
        // Notify the subscriber.
646
        t.notifyResult(result)
647

648
        // Remove the record if it's failed or confirmed.
649
        t.removeResult(result)
9✔
650
}
9✔
651

9✔
652
// monitorRecord is used to keep track of the tx being monitored by the
9✔
UNCOV
653
// publisher internally.
×
UNCOV
654
type monitorRecord struct {
×
UNCOV
655
        // tx is the tx being monitored.
×
656
        tx *wire.MsgTx
657

9✔
658
        // req is the original request.
9✔
659
        req *BumpRequest
9✔
660

9✔
661
        // feeFunction is the fee bumping algorithm used by the publisher.
662
        feeFunction FeeFunction
663

664
        // fee is the fee paid by the tx.
8✔
665
        fee btcutil.Amount
1✔
666
}
1✔
667

668
// Start starts the publisher by subscribing to block epoch updates and kicking
669
// off the monitor loop.
670
func (t *TxPublisher) Start() error {
671
        log.Info("TxPublisher starting...")
672

673
        if t.started.Swap(true) {
9✔
674
                return fmt.Errorf("TxPublisher started more than once")
9✔
675
        }
9✔
676

9✔
677
        blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
9✔
678
        if err != nil {
9✔
679
                return fmt.Errorf("register block epoch ntfn: %w", err)
9✔
680
        }
9✔
681

9✔
682
        t.wg.Add(1)
2✔
683
        go t.monitor(blockEvent)
2✔
684

2✔
685
        log.Debugf("TxPublisher started")
686

3✔
687
        return nil
3✔
688
}
3✔
689

3✔
690
// Stop stops the publisher and waits for the monitor loop to exit.
691
func (t *TxPublisher) Stop() error {
692
        log.Info("TxPublisher stopping...")
4✔
693

4✔
694
        if t.stopped.Swap(true) {
4✔
695
                return fmt.Errorf("TxPublisher stopped more than once")
4✔
696
        }
4✔
697

698
        close(t.quit)
699
        t.wg.Wait()
5✔
700

5✔
701
        log.Debug("TxPublisher stopped")
702

703
        return nil
704
}
705

706
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
6✔
707
// it will examine all the txns being monitored, and check if any of them needs
6✔
708
// to be bumped. If so, it will attempt to bump the fee of the tx.
6✔
709
//
6✔
710
// NOTE: Must be run as a goroutine.
6✔
711
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
6✔
712
        defer blockEvent.Cancel()
6✔
713
        defer t.wg.Done()
714

715
        for {
716
                select {
717
                case epoch, ok := <-blockEvent.Epochs:
718
                        if !ok {
719
                                // We should stop the publisher before stopping
720
                                // the chain service. Otherwise it indicates an
721
                                // error.
722
                                log.Error("Block epoch channel closed, exit " +
723
                                        "monitor")
724

725
                                return
726
                        }
727

728
                        log.Debugf("TxPublisher received new block: %v",
729
                                epoch.Height)
730

731
                        // Update the best known height for the publisher.
UNCOV
732
                        t.currentHeight.Store(epoch.Height)
×
UNCOV
733

×
UNCOV
734
                        // Check all monitored txns to see if any of them needs
×
UNCOV
735
                        // to be bumped.
×
UNCOV
736
                        t.processRecords()
×
UNCOV
737

×
738
                case <-t.quit:
UNCOV
739
                        log.Debug("Fee bumper stopped, exit monitor")
×
UNCOV
740
                        return
×
UNCOV
741
                }
×
UNCOV
742
        }
×
743
}
UNCOV
744

×
UNCOV
745
// processRecords checks all the txns being monitored, and checks if any of
×
UNCOV
746
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
×
UNCOV
747
func (t *TxPublisher) processRecords() {
×
UNCOV
748
        // confirmedRecords stores a map of the records which have been
×
UNCOV
749
        // confirmed.
×
750
        confirmedRecords := make(map[uint64]*monitorRecord)
751

752
        // feeBumpRecords stores a map of the records which need to be bumped.
UNCOV
753
        feeBumpRecords := make(map[uint64]*monitorRecord)
×
UNCOV
754

×
UNCOV
755
        // failedRecords stores a map of the records which has inputs being
×
UNCOV
756
        // spent by a third party.
×
UNCOV
757
        //
×
UNCOV
758
        // NOTE: this is only used for neutrino backend.
×
759
        failedRecords := make(map[uint64]*monitorRecord)
UNCOV
760

×
UNCOV
761
        // visitor is a helper closure that visits each record and divides them
×
UNCOV
762
        // into two groups.
×
UNCOV
763
        visitor := func(requestID uint64, r *monitorRecord) error {
×
UNCOV
764
                log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
×
UNCOV
765
                        r.tx.TxHash())
×
766

767
                // If the tx is already confirmed, we can stop monitoring it.
768
                if t.isConfirmed(r.tx.TxHash()) {
769
                        confirmedRecords[requestID] = r
770

771
                        // Move to the next record.
772
                        return nil
UNCOV
773
                }
×
UNCOV
774

×
UNCOV
775
                // Check whether the inputs has been spent by a third party.
×
UNCOV
776
                //
×
UNCOV
777
                // NOTE: this check is only done for neutrino backend.
×
UNCOV
778
                if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
×
UNCOV
779
                        failedRecords[requestID] = r
×
UNCOV
780

×
UNCOV
781
                        // Move to the next record.
×
UNCOV
782
                        return nil
×
UNCOV
783
                }
×
UNCOV
784

×
UNCOV
785
                feeBumpRecords[requestID] = r
×
UNCOV
786

×
UNCOV
787
                // Return nil to move to the next record.
×
UNCOV
788
                return nil
×
789
        }
UNCOV
790

×
UNCOV
791
        // Iterate through all the records and divide them into two groups.
×
UNCOV
792
        t.records.ForEach(visitor)
×
UNCOV
793

×
UNCOV
794
        // For records that are confirmed, we'll notify the caller about this
×
UNCOV
795
        // result.
×
UNCOV
796
        for requestID, r := range confirmedRecords {
×
UNCOV
797
                rec := r
×
UNCOV
798

×
799
                log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
UNCOV
800
                t.wg.Add(1)
×
UNCOV
801
                go t.handleTxConfirmed(rec, requestID)
×
UNCOV
802
        }
×
803

804
        // Get the current height to be used in the following goroutines.
805
        currentHeight := t.currentHeight.Load()
806

807
        // For records that are not confirmed, we perform a fee bump if needed.
808
        for requestID, r := range feeBumpRecords {
809
                rec := r
1✔
810

1✔
811
                log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
1✔
812
                t.wg.Add(1)
1✔
813
                go t.handleFeeBumpTx(requestID, rec, currentHeight)
1✔
814
        }
1✔
815

1✔
816
        // For records that are failed, we'll notify the caller about this
1✔
817
        // result.
1✔
818
        for requestID, r := range failedRecords {
1✔
819
                rec := r
1✔
820

1✔
821
                log.Debugf("Tx=%v has inputs been spent by a third party, "+
1✔
822
                        "failing it now", r.tx.TxHash())
1✔
823
                t.wg.Add(1)
1✔
824
                go t.handleThirdPartySpent(rec, requestID)
1✔
825
        }
3✔
826
}
2✔
827

2✔
828
// handleTxConfirmed is called when a monitored tx is confirmed. It will
2✔
829
// notify the subscriber then remove the record from the maps .
2✔
830
//
3✔
831
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1✔
832
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
1✔
833
        defer t.wg.Done()
1✔
834

1✔
835
        // Create a result that will be sent to the resultChan which is
1✔
836
        // listened by the caller.
837
        result := &BumpResult{
838
                Event:     TxConfirmed,
839
                Tx:        r.tx,
840
                requestID: requestID,
1✔
UNCOV
841
                Fee:       r.fee,
×
UNCOV
842
                FeeRate:   r.feeFunction.FeeRate(),
×
UNCOV
843
        }
×
UNCOV
844

×
UNCOV
845
        // Notify that this tx is confirmed and remove the record from the map.
×
846
        t.handleResult(result)
847
}
1✔
848

1✔
849
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
1✔
850
// attempt to bump the fee of the tx.
1✔
851
//
852
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
853
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
854
        currentHeight int32) {
1✔
855

1✔
856
        defer t.wg.Done()
1✔
857

1✔
858
        oldTxid := r.tx.TxHash()
2✔
859

1✔
860
        // Get the current conf target for this record.
1✔
861
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
1✔
862

1✔
863
        // Ask the fee function whether a bump is needed. We expect the fee
1✔
864
        // function to increase its returned fee rate after calling this
1✔
865
        // method.
866
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
867
        if err != nil {
1✔
868
                // TODO(yy): send this error back to the sweeper so it can
1✔
869
                // re-group the inputs?
1✔
870
                log.Errorf("Failed to increase fee rate for tx %v at "+
2✔
871
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
1✔
872

1✔
873
                return
1✔
874
        }
1✔
875

1✔
876
        // If the fee rate was not increased, there's no need to bump the fee.
1✔
877
        if !increased {
878
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
879
                        t.currentHeight.Load())
880

1✔
UNCOV
881
                return
×
UNCOV
882
        }
×
UNCOV
883

×
UNCOV
884
        // The fee function now has a new fee rate, we will use it to bump the
×
UNCOV
885
        // fee of the tx.
×
UNCOV
886
        resultOpt := t.createAndPublishTx(requestID, r)
×
UNCOV
887

×
888
        // If there's a result, we will notify the caller about the result.
889
        resultOpt.WhenSome(func(result BumpResult) {
890
                // Notify the new result.
891
                t.handleResult(&result)
892
        })
893
}
894

2✔
895
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
2✔
896
// spent. It will notify the subscriber then remove the record from the maps
2✔
897
// and send a TxFailed event to the subscriber.
2✔
898
//
2✔
899
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
2✔
900
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
2✔
901
        requestID uint64) {
2✔
902

2✔
903
        defer t.wg.Done()
2✔
904

2✔
905
        // Create a result that will be sent to the resultChan which is
2✔
906
        // listened by the caller.
2✔
907
        //
2✔
908
        // TODO(yy): create a new state `TxThirdPartySpent` to notify the
2✔
909
        // sweeper to remove the input, hence moving the monitoring of inputs
2✔
910
        // spent inside the fee bumper.
911
        result := &BumpResult{
912
                Event:     TxFailed,
913
                Tx:        r.tx,
914
                requestID: requestID,
915
                Err:       ErrThirdPartySpent,
916
        }
4✔
917

4✔
918
        // Notify that this tx is confirmed and remove the record from the map.
4✔
919
        t.handleResult(result)
4✔
920
}
4✔
921

4✔
922
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
4✔
923
// to the network. It will update the record with the new tx and fee rate if
4✔
924
// successfully created, and return the result when published successfully.
4✔
925
func (t *TxPublisher) createAndPublishTx(requestID uint64,
4✔
926
        r *monitorRecord) fn.Option[BumpResult] {
4✔
927

4✔
928
        // Fetch the old tx.
4✔
929
        oldTx := r.tx
5✔
930

1✔
931
        // Create a new tx with the new fee rate.
1✔
932
        //
1✔
933
        // NOTE: The fee function is expected to have increased its returned
1✔
934
        // fee rate after calling the SkipFeeBump method. So we can use it
1✔
935
        // directly here.
1✔
936
        tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
1✔
937

938
        // If the error is fee related, we will return no error and let the fee
939
        // bumper retry it at next block.
4✔
940
        //
1✔
941
        // NOTE: we can check the RBF error here and ask the fee function to
1✔
942
        // recalculate the fee rate. However, this would defeat the purpose of
1✔
943
        // using a deadline based fee function:
1✔
944
        // - if the deadline is far away, there's no rush to RBF the tx.
1✔
945
        // - if the deadline is close, we expect the fee function to give us a
946
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
947
        //   means the budget is not enough.
948
        if errors.Is(err, chain.ErrInsufficientFee) ||
2✔
949
                errors.Is(err, lnwallet.ErrMempoolFee) {
2✔
950

2✔
951
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
4✔
952
                return fn.None[BumpResult]()
2✔
953
        }
2✔
954

2✔
955
        // If the error is not fee related, we will return a `TxFailed` event
956
        // so this input can be retried.
957
        if err != nil {
958
                // If the tx doesn't not have enought budget, we will return a
959
                // result so the sweeper can handle it by re-clustering the
960
                // utxos.
961
                if errors.Is(err, ErrNotEnoughBudget) {
962
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
UNCOV
963
                                err)
×
UNCOV
964
                } else {
×
UNCOV
965
                        // Otherwise, an unexpected error occurred, we will
×
UNCOV
966
                        // fail the tx and let the sweeper retry the whole
×
UNCOV
967
                        // process.
×
UNCOV
968
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
×
UNCOV
969
                                err)
×
UNCOV
970
                }
×
UNCOV
971

×
UNCOV
972
                return fn.Some(BumpResult{
×
UNCOV
973
                        Event:     TxFailed,
×
UNCOV
974
                        Tx:        oldTx,
×
UNCOV
975
                        Err:       err,
×
UNCOV
976
                        requestID: requestID,
×
UNCOV
977
                })
×
UNCOV
978
        }
×
UNCOV
979

×
UNCOV
980
        // The tx has been created without any errors, we now register a new
×
UNCOV
981
        // record by overwriting the same requestID.
×
UNCOV
982
        t.records.Store(requestID, &monitorRecord{
×
983
                tx:          tx,
984
                req:         r.req,
985
                feeFunction: r.feeFunction,
986
                fee:         fee,
987
        })
988

7✔
989
        // Attempt to broadcast this new tx.
7✔
990
        result, err := t.broadcast(requestID)
7✔
991
        if err != nil {
7✔
992
                log.Infof("Failed to broadcast replacement tx %v: %v",
7✔
993
                        tx.TxHash(), err)
7✔
994

7✔
995
                return fn.None[BumpResult]()
7✔
996
        }
7✔
997

7✔
998
        // If the result error is fee related, we will return no error and let
7✔
999
        // the fee bumper retry it at next block.
7✔
1000
        //
7✔
1001
        // NOTE: we may get this error if we've bypassed the mempool check,
7✔
1002
        // which means we are suing neutrino backend.
7✔
1003
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
7✔
1004
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
7✔
1005

7✔
1006
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
7✔
1007
                return fn.None[BumpResult]()
7✔
1008
        }
7✔
1009

7✔
1010
        // A successful replacement tx is created, attach the old tx.
7✔
1011
        result.ReplacedTx = oldTx
9✔
1012

2✔
1013
        // If the new tx failed to be published, we will return the result so
2✔
1014
        // the caller can handle it.
2✔
1015
        if result.Event == TxFailed {
2✔
1016
                return fn.Some(*result)
1017
        }
1018

1019
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(), tx.TxHash())
6✔
1020

1✔
1021
        // Otherwise, it's a successful RBF, set the event and return.
1✔
1022
        result.Event = TxReplaced
1✔
1023

2✔
1024
        return fn.Some(*result)
1✔
1025
}
1✔
1026

1✔
UNCOV
1027
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
×
UNCOV
1028
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
×
UNCOV
1029
        details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
×
UNCOV
1030
        if err != nil {
×
UNCOV
1031
                log.Warnf("Failed to get tx details for %v: %v", txid, err)
×
UNCOV
1032
                return false
×
1033
        }
1034

1✔
1035
        return details.NumConfirmations > 0
1✔
1036
}
1✔
1037

1✔
1038
// isThirdPartySpent checks whether the inputs of the tx has already been spent
1✔
1039
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
1✔
1040
// then it must be spent by a different tx other than the sweeping tx here.
1041
//
1042
// NOTE: this check is only performed for neutrino backend as it has no
1043
// reliable way to tell a tx has been replaced.
1044
func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
4✔
1045
        inputs []input.Input) bool {
4✔
1046

4✔
1047
        // Skip this check for if this is not neutrino backend.
4✔
1048
        if !t.isNeutrinoBackend() {
4✔
1049
                return false
4✔
1050
        }
4✔
1051

4✔
1052
        // Iterate all the inputs and check if they have been spent already.
4✔
1053
        for _, inp := range inputs {
4✔
UNCOV
1054
                op := inp.OutPoint()
×
UNCOV
1055

×
UNCOV
1056
                // For wallet utxos, the height hint is not set - we don't need
×
UNCOV
1057
                // to monitor them for third party spend.
×
UNCOV
1058
                heightHint := inp.HeightHint()
×
1059
                if heightHint == 0 {
1060
                        log.Debugf("Skipped third party check for wallet "+
1061
                                "input %v", op)
1062

1063
                        continue
1064
                }
1065

4✔
1066
                // If the input has already been spent after the height hint, a
4✔
UNCOV
1067
                // spend event is sent back immediately.
×
UNCOV
1068
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
×
UNCOV
1069
                        &op, inp.SignDesc().Output.PkScript, heightHint,
×
UNCOV
1070
                )
×
1071
                if err != nil {
1072
                        log.Criticalf("Failed to register spend ntfn for "+
1073
                                "input=%v: %v", op, err)
4✔
1074
                        return false
4✔
1075
                }
4✔
1076

4✔
1077
                // Remove the subscription when exit.
5✔
1078
                defer spendEvent.Cancel()
1✔
1079

1✔
1080
                // Do a non-blocking read to see if the output has been spent.
1081
                select {
3✔
1082
                case spend, ok := <-spendEvent.Spend:
3✔
1083
                        if !ok {
3✔
1084
                                log.Debugf("Spend ntfn for %v canceled", op)
3✔
1085
                                return false
3✔
1086
                        }
3✔
1087

3✔
1088
                        spendingTxID := spend.SpendingTx.TxHash()
1089

1090
                        // If the spending tx is the same as the sweeping tx
1091
                        // then we are good.
2✔
1092
                        if spendingTxID == txid {
2✔
1093
                                continue
2✔
UNCOV
1094
                        }
×
UNCOV
1095

×
UNCOV
1096
                        log.Warnf("Detected third party spent of output=%v "+
×
1097
                                "in tx=%v", op, spend.SpendingTx.TxHash())
1098

2✔
1099
                        return true
1100

1101
                // Move to the next input.
1102
                default:
1103
                }
1104
        }
1105

1106
        return false
1107
}
1108

1✔
1109
// calcCurrentConfTarget calculates the current confirmation target based on
1✔
1110
// the deadline height. The conf target is capped at 0 if the deadline has
1✔
1111
// already been past.
2✔
1112
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
1✔
1113
        var confTarget uint32
1✔
1114

1115
        // Calculate how many blocks left until the deadline.
UNCOV
1116
        deadlineDelta := deadline - currentHeight
×
UNCOV
1117

×
UNCOV
1118
        // If we are already past the deadline, we will set the conf target to
×
UNCOV
1119
        // be 1.
×
UNCOV
1120
        if deadlineDelta < 0 {
×
UNCOV
1121
                log.Warnf("Deadline is %d blocks behind current height %v",
×
UNCOV
1122
                        -deadlineDelta, currentHeight)
×
UNCOV
1123

×
UNCOV
1124
                confTarget = 0
×
UNCOV
1125
        } else {
×
UNCOV
1126
                confTarget = uint32(deadlineDelta)
×
1127
        }
1128

1129
        return confTarget
1130
}
UNCOV
1131

×
UNCOV
1132
// createSweepTx creates a sweeping tx based on the given inputs, change
×
UNCOV
1133
// address and fee rate.
×
UNCOV
1134
func (t *TxPublisher) createSweepTx(inputs []input.Input, changePkScript []byte,
×
UNCOV
1135
        feeRate chainfee.SatPerKWeight) (*wire.MsgTx, btcutil.Amount, error) {
×
UNCOV
1136

×
UNCOV
1137
        // Validate and calculate the fee and change amount.
×
UNCOV
1138
        txFee, changeAmtOpt, locktimeOpt, err := prepareSweepTx(
×
1139
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
1140
        )
UNCOV
1141
        if err != nil {
×
UNCOV
1142
                return nil, 0, err
×
UNCOV
1143
        }
×
UNCOV
1144

×
UNCOV
1145
        var (
×
UNCOV
1146
                // Create the sweep transaction that we will be building. We
×
UNCOV
1147
                // use version 2 as it is required for CSV.
×
UNCOV
1148
                sweepTx = wire.NewMsgTx(2)
×
UNCOV
1149

×
1150
                // We'll add the inputs as we go so we know the final ordering
UNCOV
1151
                // of inputs to sign.
×
UNCOV
1152
                idxs []input.Input
×
UNCOV
1153
        )
×
UNCOV
1154

×
UNCOV
1155
        // We start by adding all inputs that commit to an output. We do this
×
UNCOV
1156
        // since the input and output index must stay the same for the
×
1157
        // signatures to be valid.
1158
        for _, o := range inputs {
UNCOV
1159
                if o.RequiredTxOut() == nil {
×
UNCOV
1160
                        continue
×
UNCOV
1161
                }
×
UNCOV
1162

×
1163
                idxs = append(idxs, o)
1164
                sweepTx.AddTxIn(&wire.TxIn{
UNCOV
1165
                        PreviousOutPoint: o.OutPoint(),
×
1166
                        Sequence:         o.BlocksToMaturity(),
1167
                })
1168
                sweepTx.AddTxOut(o.RequiredTxOut())
UNCOV
1169
        }
×
1170

1171
        // Sum up the value contained in the remaining inputs, and add them to
1172
        // the sweep transaction.
1173
        for _, o := range inputs {
1174
                if o.RequiredTxOut() != nil {
1175
                        continue
11✔
1176
                }
11✔
1177

11✔
1178
                idxs = append(idxs, o)
11✔
1179
                sweepTx.AddTxIn(&wire.TxIn{
11✔
1180
                        PreviousOutPoint: o.OutPoint(),
11✔
1181
                        Sequence:         o.BlocksToMaturity(),
11✔
1182
                })
11✔
1183
        }
15✔
1184

4✔
1185
        // If there's a change amount, add it to the transaction.
4✔
1186
        changeAmtOpt.WhenSome(func(changeAmt btcutil.Amount) {
4✔
1187
                sweepTx.AddTxOut(&wire.TxOut{
4✔
1188
                        PkScript: changePkScript,
11✔
1189
                        Value:    int64(changeAmt),
7✔
1190
                })
7✔
1191
        })
1192

11✔
1193
        // We'll default to using the current block height as locktime, if none
1194
        // of the inputs commits to a different locktime.
1195
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
1196

1197
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
1198
        if err != nil {
1199
                return nil, 0, fmt.Errorf("error creating prev input fetcher "+
1200
                        "for hash cache: %v", err)
1201
        }
1202
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
1203

1204
        // With all the inputs in place, use each output's unique input script
1205
        // function to generate the final witness required for spending.
1206
        addInputScript := func(idx int, tso input.Input) error {
1207
                inputScript, err := tso.CraftInputScript(
1208
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
22✔
1209
                )
22✔
1210
                if err != nil {
22✔
1211
                        return err
22✔
1212
                }
22✔
1213

22✔
1214
                sweepTx.TxIn[idx].Witness = inputScript.Witness
22✔
1215

22✔
UNCOV
1216
                if len(inputScript.SigScript) == 0 {
×
UNCOV
1217
                        return nil
×
1218
                }
1219

22✔
1220
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
22✔
1221

22✔
1222
                return nil
22✔
1223
        }
22✔
1224

22✔
1225
        for idx, inp := range idxs {
22✔
1226
                if err := addInputScript(idx, inp); err != nil {
22✔
1227
                        return nil, 0, err
22✔
1228
                }
22✔
1229
        }
22✔
1230

22✔
1231
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
22✔
1232
                inputTypeSummary(inputs))
44✔
1233

44✔
1234
        return sweepTx, txFee, nil
22✔
1235
}
1236

UNCOV
1237
// prepareSweepTx returns the tx fee, an optional change amount and an optional
×
UNCOV
1238
// locktime after a series of validations:
×
UNCOV
1239
// 1. check the locktime has been reached.
×
UNCOV
1240
// 2. check the locktimes are the same.
×
UNCOV
1241
// 3. check the inputs cover the outputs.
×
UNCOV
1242
//
×
1243
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1244
func prepareSweepTx(inputs []input.Input, changePkScript []byte,
1245
        feeRate chainfee.SatPerKWeight, currentHeight int32) (
1246
        btcutil.Amount, fn.Option[btcutil.Amount], fn.Option[int32], error) {
1247

44✔
1248
        noChange := fn.None[btcutil.Amount]()
22✔
UNCOV
1249
        noLocktime := fn.None[int32]()
×
1250

1251
        // Creating a weight estimator with nil outputs and zero max fee rate.
1252
        // We don't allow adding customized outputs in the sweeping tx, and the
22✔
1253
        // fee rate is already being managed before we get here.
22✔
1254
        inputs, estimator, err := getWeightEstimate(
22✔
1255
                inputs, nil, feeRate, 0, changePkScript,
22✔
1256
        )
22✔
1257
        if err != nil {
1258
                return 0, noChange, noLocktime, err
1259
        }
1260

1261
        txFee := estimator.fee()
44✔
1262

66✔
1263
        var (
44✔
1264
                // Track whether any of the inputs require a certain locktime.
44✔
1265
                locktime = int32(-1)
1266

1267
                // We keep track of total input amount, and required output
1268
                // amount to use for calculating the change amount below.
1269
                totalInput     btcutil.Amount
22✔
1270
                requiredOutput btcutil.Amount
22✔
1271
        )
22✔
1272

22✔
UNCOV
1273
        // Go through each input and check if the required lock times have
×
UNCOV
1274
        // reached and are the same.
×
UNCOV
1275
        for _, o := range inputs {
×
1276
                // If the input has a required output, we'll add it to the
22✔
1277
                // required output amount.
22✔
1278
                if o.RequiredTxOut() != nil {
22✔
1279
                        requiredOutput += btcutil.Amount(
22✔
1280
                                o.RequiredTxOut().Value,
44✔
1281
                        )
22✔
1282
                }
22✔
1283

22✔
1284
                // Update the total input amount.
22✔
UNCOV
1285
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
×
UNCOV
1286

×
1287
                lt, ok := o.RequiredLockTime()
1288

22✔
1289
                // Skip if the input doesn't require a lock time.
22✔
1290
                if !ok {
44✔
1291
                        continue
22✔
1292
                }
22✔
1293

UNCOV
1294
                // Check if the lock time has reached
×
UNCOV
1295
                if lt > uint32(currentHeight) {
×
1296
                        return 0, noChange, noLocktime, ErrLocktimeImmature
×
1297
                }
1298

1299
                // If another input commits to a different locktime, they
44✔
1300
                // cannot be combined in the same transaction.
22✔
UNCOV
1301
                if locktime != -1 && locktime != int32(lt) {
×
1302
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1303
                }
1304

1305
                // Update the locktime for next iteration.
22✔
1306
                locktime = int32(lt)
22✔
1307
        }
22✔
1308

22✔
1309
        // Make sure total output amount is less than total input amount.
22✔
1310
        if requiredOutput+txFee > totalInput {
44✔
1311
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
66✔
1312
                        "input to create sweep tx: input_sum=%v, "+
88✔
1313
                        "output_sum=%v", totalInput, requiredOutput+txFee)
44✔
1314
        }
1315

1316
        // The value remaining after the required output and fees is the
1317
        // change output.
1318
        changeAmt := totalInput - requiredOutput - txFee
1319
        changeAmtOpt := fn.Some(changeAmt)
1320

1321
        // We'll calculate the dust limit for the given changePkScript since it
1322
        // is variable.
UNCOV
1323
        changeFloor := lnwallet.DustLimitForSize(len(changePkScript))
×
UNCOV
1324

×
UNCOV
1325
        // If the change amount is dust, we'll move it into the fees.
×
UNCOV
1326
        if changeAmt < changeFloor {
×
1327
                log.Infof("Change amt %v below dustlimit %v, not adding "+
1328
                        "change output", changeAmt, changeFloor)
1329

22✔
1330
                // If there's no required output, and the change output is a
1331
                // dust, it means we are creating a tx without any outputs. In
1332
                // this case we'll return an error. This could happen when
1333
                // creating a tx that has an anchor as the only input.
22✔
1334
                if requiredOutput == 0 {
22✔
1335
                        return 0, noChange, noLocktime, ErrTxNoOutput
22✔
1336
                }
22✔
1337

22✔
1338
                // The dust amount is added to the fee.
1339
                txFee += changeAmt
1340

1341
                // Set the change amount to none.
1342
                changeAmtOpt = fn.None[btcutil.Amount]()
1343
        }
1344

1345
        // Optionally set the locktime.
1346
        locktimeOpt := fn.Some(locktime)
1347
        if locktime == -1 {
1348
                locktimeOpt = noLocktime
1349
        }
1350

22✔
1351
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
22✔
1352
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
22✔
1353
                "parents_fee=%v, parents_weight=%v, current_height=%v",
22✔
1354
                len(inputs), inputTypeSummary(inputs), feeRate,
22✔
1355
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
22✔
1356
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
22✔
1357

22✔
1358
        return txFee, changeAmtOpt, locktimeOpt, nil
22✔
1359
}
22✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc