• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.77
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/davecgh/go-spew/spew"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/fn"
18
        "github.com/lightningnetwork/lnd/input"
19
        "github.com/lightningnetwork/lnd/labels"
20
        "github.com/lightningnetwork/lnd/lntypes"
21
        "github.com/lightningnetwork/lnd/lnutils"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
)
25

26
var (
27
        // ErrInvalidBumpResult is returned when the bump result is invalid.
28
        ErrInvalidBumpResult = errors.New("invalid bump result")
29

30
        // ErrNotEnoughBudget is returned when the fee bumper decides the
31
        // current budget cannot cover the fee.
32
        ErrNotEnoughBudget = errors.New("not enough budget")
33

34
        // ErrLocktimeImmature is returned when sweeping an input whose
35
        // locktime is not reached.
36
        ErrLocktimeImmature = errors.New("immature input")
37

38
        // ErrTxNoOutput is returned when an output cannot be created during tx
39
        // preparation, usually due to the output being dust.
40
        ErrTxNoOutput = errors.New("tx has no output")
41

42
        // ErrThirdPartySpent is returned when a third party has spent the
43
        // input in the sweeping tx.
44
        ErrThirdPartySpent = errors.New("third party spent the output")
45
)
46

47
// Bumper defines an interface that can be used by other subsystems for fee
48
// bumping.
49
type Bumper interface {
50
        // Broadcast is used to publish the tx created from the given inputs
51
        // specified in the request. It handles the tx creation, broadcasts it,
52
        // and monitors its confirmation status for potential fee bumping. It
53
        // returns a chan that the caller can use to receive updates about the
54
        // broadcast result and potential RBF attempts.
55
        Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
56
}
57

58
// BumpEvent represents the event of a fee bumping attempt.
59
type BumpEvent uint8
60

61
const (
62
        // TxPublished is sent when the broadcast attempt is finished.
63
        TxPublished BumpEvent = iota
64

65
        // TxFailed is sent when the broadcast attempt fails.
66
        TxFailed
67

68
        // TxReplaced is sent when the original tx is replaced by a new one.
69
        TxReplaced
70

71
        // TxConfirmed is sent when the tx is confirmed.
72
        TxConfirmed
73

74
        // sentinalEvent is used to check if an event is unknown.
75
        sentinalEvent
76
)
77

78
// String returns a human-readable string for the event.
79
func (e BumpEvent) String() string {
3✔
80
        switch e {
3✔
81
        case TxPublished:
3✔
82
                return "Published"
3✔
83
        case TxFailed:
3✔
84
                return "Failed"
3✔
85
        case TxReplaced:
3✔
86
                return "Replaced"
3✔
87
        case TxConfirmed:
3✔
88
                return "Confirmed"
3✔
89
        default:
×
90
                return "Unknown"
×
91
        }
92
}
93

94
// Unknown returns true if the event is unknown.
95
func (e BumpEvent) Unknown() bool {
3✔
96
        return e >= sentinalEvent
3✔
97
}
3✔
98

99
// BumpRequest is used by the caller to give the Bumper the necessary info to
100
// create and manage potential fee bumps for a set of inputs.
101
type BumpRequest struct {
102
        // Budget givens the total amount that can be used as fees by these
103
        // inputs.
104
        Budget btcutil.Amount
105

106
        // Inputs is the set of inputs to sweep.
107
        Inputs []input.Input
108

109
        // DeadlineHeight is the block height at which the tx should be
110
        // confirmed.
111
        DeadlineHeight int32
112

113
        // DeliveryAddress is the script to send the change output to.
114
        DeliveryAddress []byte
115

116
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
117
        MaxFeeRate chainfee.SatPerKWeight
118

119
        // StartingFeeRate is an optional parameter that can be used to specify
120
        // the initial fee rate to use for the fee function.
121
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
122
}
123

124
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
125
// request. It calculates the feerate using the supplied budget and the weight,
126
// compares it with the specified MaxFeeRate, and returns the smaller of the
127
// two.
128
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
3✔
129
        // Get the size of the sweep tx, which will be used to calculate the
3✔
130
        // budget fee rate.
3✔
131
        size, err := calcSweepTxWeight(r.Inputs, r.DeliveryAddress)
3✔
132
        if err != nil {
3✔
133
                return 0, err
×
134
        }
×
135

136
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
137
        // This is needed as, when the input has a large value and the user
138
        // sets the budget to be proportional to the input value, the fee rate
139
        // can be very high and we need to make sure it doesn't exceed the max
140
        // fee rate.
141
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
3✔
142
        if maxFeeRateAllowed > r.MaxFeeRate {
6✔
143
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
3✔
144
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
3✔
145
                        r.MaxFeeRate, size)
3✔
146

3✔
147
                return r.MaxFeeRate, nil
3✔
148
        }
3✔
149

150
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
3✔
151
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
3✔
152

3✔
153
        return maxFeeRateAllowed, nil
3✔
154
}
155

156
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
157
// sweeping tx always has a single output(change).
158
func calcSweepTxWeight(inputs []input.Input,
159
        outputPkScript []byte) (lntypes.WeightUnit, error) {
3✔
160

3✔
161
        // Use a const fee rate as we only use the weight estimator to
3✔
162
        // calculate the size.
3✔
163
        const feeRate = 1
3✔
164

3✔
165
        // Initialize the tx weight estimator with,
3✔
166
        // - nil outputs as we only have one single change output.
3✔
167
        // - const fee rate as we don't care about the fees here.
3✔
168
        // - 0 maxfeerate as we don't care about fees here.
3✔
169
        //
3✔
170
        // TODO(yy): we should refactor the weight estimator to not require a
3✔
171
        // fee rate and max fee rate and make it a pure tx weight calculator.
3✔
172
        _, estimator, err := getWeightEstimate(
3✔
173
                inputs, nil, feeRate, 0, outputPkScript,
3✔
174
        )
3✔
175
        if err != nil {
3✔
176
                return 0, err
×
177
        }
×
178

179
        return estimator.weight(), nil
3✔
180
}
181

182
// BumpResult is used by the Bumper to send updates about the tx being
183
// broadcast.
184
type BumpResult struct {
185
        // Event is the type of event that the result is for.
186
        Event BumpEvent
187

188
        // Tx is the tx being broadcast.
189
        Tx *wire.MsgTx
190

191
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
192
        ReplacedTx *wire.MsgTx
193

194
        // FeeRate is the fee rate used for the new tx.
195
        FeeRate chainfee.SatPerKWeight
196

197
        // Fee is the fee paid by the new tx.
198
        Fee btcutil.Amount
199

200
        // Err is the error that occurred during the broadcast.
201
        Err error
202

203
        // requestID is the ID of the request that created this record.
204
        requestID uint64
205
}
206

207
// Validate validates the BumpResult so it's safe to use.
208
func (b *BumpResult) Validate() error {
3✔
209
        // Every result must have a tx.
3✔
210
        if b.Tx == nil {
3✔
211
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
×
212
        }
×
213

214
        // Every result must have a known event.
215
        if b.Event.Unknown() {
3✔
216
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
×
217
        }
×
218

219
        // If it's a replacing event, it must have a replaced tx.
220
        if b.Event == TxReplaced && b.ReplacedTx == nil {
3✔
221
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
×
222
        }
×
223

224
        // If it's a failed event, it must have an error.
225
        if b.Event == TxFailed && b.Err == nil {
3✔
226
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
×
227
        }
×
228

229
        // If it's a confirmed event, it must have a fee rate and fee.
230
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
3✔
231
                return fmt.Errorf("%w: missing fee rate or fee",
×
232
                        ErrInvalidBumpResult)
×
233
        }
×
234

235
        return nil
3✔
236
}
237

238
// TxPublisherConfig is the config used to create a new TxPublisher.
239
type TxPublisherConfig struct {
240
        // Signer is used to create the tx signature.
241
        Signer input.Signer
242

243
        // Wallet is used primarily to publish the tx.
244
        Wallet Wallet
245

246
        // Estimator is used to estimate the fee rate for the new tx based on
247
        // its deadline conf target.
248
        Estimator chainfee.Estimator
249

250
        // Notifier is used to monitor the confirmation status of the tx.
251
        Notifier chainntnfs.ChainNotifier
252
}
253

254
// TxPublisher is an implementation of the Bumper interface. It utilizes the
255
// `testmempoolaccept` RPC to bump the fee of txns it created based on
256
// different fee function selected or configed by the caller. Its purpose is to
257
// take a list of inputs specified, and create a tx that spends them to a
258
// specified output. It will then monitor the confirmation status of the tx,
259
// and if it's not confirmed within a certain time frame, it will attempt to
260
// bump the fee of the tx by creating a new tx that spends the same inputs to
261
// the same output, but with a higher fee rate. It will continue to do this
262
// until the tx is confirmed or the fee rate reaches the maximum fee rate
263
// specified by the caller.
264
type TxPublisher struct {
265
        wg sync.WaitGroup
266

267
        // cfg specifies the configuration of the TxPublisher.
268
        cfg *TxPublisherConfig
269

270
        // currentHeight is the current block height.
271
        currentHeight atomic.Int32
272

273
        // records is a map keyed by the requestCounter and the value is the tx
274
        // being monitored.
275
        records lnutils.SyncMap[uint64, *monitorRecord]
276

277
        // requestCounter is a monotonically increasing counter used to keep
278
        // track of how many requests have been made.
279
        requestCounter atomic.Uint64
280

281
        // subscriberChans is a map keyed by the requestCounter, each item is
282
        // the chan that the publisher sends the fee bump result to.
283
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
284

285
        // quit is used to signal the publisher to stop.
286
        quit chan struct{}
287
}
288

289
// Compile-time constraint to ensure TxPublisher implements Bumper.
290
var _ Bumper = (*TxPublisher)(nil)
291

292
// NewTxPublisher creates a new TxPublisher.
293
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
3✔
294
        return &TxPublisher{
3✔
295
                cfg:             &cfg,
3✔
296
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
3✔
297
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
3✔
298
                quit:            make(chan struct{}),
3✔
299
        }
3✔
300
}
3✔
301

302
// isNeutrinoBackend checks if the wallet backend is neutrino.
303
func (t *TxPublisher) isNeutrinoBackend() bool {
3✔
304
        return t.cfg.Wallet.BackEnd() == "neutrino"
3✔
305
}
3✔
306

307
// Broadcast is used to publish the tx created from the given inputs. It will,
308
// 1. init a fee function based on the given strategy.
309
// 2. create an RBF-compliant tx and monitor it for confirmation.
310
// 3. notify the initial broadcast result back to the caller.
311
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
312
// specified cannot cover the fee.
313
//
314
// NOTE: part of the Bumper interface.
315
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
3✔
316
        log.Tracef("Received broadcast request: %s", newLogClosure(
3✔
317
                func() string {
6✔
318
                        return spew.Sdump(req)
3✔
319
                })())
3✔
320

321
        // Attempt an initial broadcast which is guaranteed to comply with the
322
        // RBF rules.
323
        result, err := t.initialBroadcast(req)
3✔
324
        if err != nil {
6✔
325
                log.Errorf("Initial broadcast failed: %v", err)
3✔
326

3✔
327
                return nil, err
3✔
328
        }
3✔
329

330
        // Create a chan to send the result to the caller.
331
        subscriber := make(chan *BumpResult, 1)
3✔
332
        t.subscriberChans.Store(result.requestID, subscriber)
3✔
333

3✔
334
        // Send the initial broadcast result to the caller.
3✔
335
        t.handleResult(result)
3✔
336

3✔
337
        return subscriber, nil
3✔
338
}
339

340
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
341
// broadcasts it.
342
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
3✔
343
        // Create a fee bumping algorithm to be used for future RBF.
3✔
344
        feeAlgo, err := t.initializeFeeFunction(req)
3✔
345
        if err != nil {
6✔
346
                return nil, fmt.Errorf("init fee function: %w", err)
3✔
347
        }
3✔
348

349
        // Create the initial tx to be broadcasted. This tx is guaranteed to
350
        // comply with the RBF restrictions.
351
        requestID, err := t.createRBFCompliantTx(req, feeAlgo)
3✔
352
        if err != nil {
6✔
353
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
3✔
354
        }
3✔
355

356
        // Broadcast the tx and return the monitored record.
357
        result, err := t.broadcast(requestID)
3✔
358
        if err != nil {
3✔
359
                return nil, fmt.Errorf("broadcast sweep tx: %w", err)
×
360
        }
×
361

362
        return result, nil
3✔
363
}
364

365
// initializeFeeFunction initializes a fee function to be used for this request
366
// for future fee bumping.
367
func (t *TxPublisher) initializeFeeFunction(
368
        req *BumpRequest) (FeeFunction, error) {
3✔
369

3✔
370
        // Get the max allowed feerate.
3✔
371
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
3✔
372
        if err != nil {
3✔
373
                return nil, err
×
374
        }
×
375

376
        // Get the initial conf target.
377
        confTarget := calcCurrentConfTarget(
3✔
378
                t.currentHeight.Load(), req.DeadlineHeight,
3✔
379
        )
3✔
380

3✔
381
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
3✔
382
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
3✔
383
                maxFeeRateAllowed)
3✔
384

3✔
385
        // Initialize the fee function and return it.
3✔
386
        //
3✔
387
        // TODO(yy): return based on differet req.Strategy?
3✔
388
        return NewLinearFeeFunction(
3✔
389
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
3✔
390
                req.StartingFeeRate,
3✔
391
        )
3✔
392
}
393

394
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
395
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
396
// and redo the process until the tx is valid, or return an error when non-RBF
397
// related errors occur or the budget has been used up.
398
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
399
        f FeeFunction) (uint64, error) {
3✔
400

3✔
401
        for {
6✔
402
                // Create a new tx with the given fee rate and check its
3✔
403
                // mempool acceptance.
3✔
404
                tx, fee, err := t.createAndCheckTx(req, f)
3✔
405

3✔
406
                switch {
3✔
407
                case err == nil:
3✔
408
                        // The tx is valid, return the request ID.
3✔
409
                        requestID := t.storeRecord(tx, req, f, fee)
3✔
410

3✔
411
                        log.Infof("Created tx %v for %v inputs: feerate=%v, "+
3✔
412
                                "fee=%v, inputs=%v", tx.TxHash(),
3✔
413
                                len(req.Inputs), f.FeeRate(), fee,
3✔
414
                                inputTypeSummary(req.Inputs))
3✔
415

3✔
416
                        return requestID, nil
3✔
417

418
                // If the error indicates the fees paid is not enough, we will
419
                // ask the fee function to increase the fee rate and retry.
420
                case errors.Is(err, lnwallet.ErrMempoolFee):
×
421
                        // We should at least start with a feerate above the
×
422
                        // mempool min feerate, so if we get this error, it
×
423
                        // means something is wrong earlier in the pipeline.
×
424
                        log.Errorf("Current fee=%v, feerate=%v, %v", fee,
×
425
                                f.FeeRate(), err)
×
426

×
427
                        fallthrough
×
428

429
                // We are not paying enough fees so we increase it.
430
                case errors.Is(err, chain.ErrInsufficientFee):
3✔
431
                        increased := false
3✔
432

3✔
433
                        // Keep calling the fee function until the fee rate is
3✔
434
                        // increased or maxed out.
3✔
435
                        for !increased {
6✔
436
                                log.Debugf("Increasing fee for next round, "+
3✔
437
                                        "current fee=%v, feerate=%v", fee,
3✔
438
                                        f.FeeRate())
3✔
439

3✔
440
                                // If the fee function tells us that we have
3✔
441
                                // used up the budget, we will return an error
3✔
442
                                // indicating this tx cannot be made. The
3✔
443
                                // sweeper should handle this error and try to
3✔
444
                                // cluster these inputs differetly.
3✔
445
                                increased, err = f.Increment()
3✔
446
                                if err != nil {
6✔
447
                                        return 0, err
3✔
448
                                }
3✔
449
                        }
450

451
                // TODO(yy): suppose there's only one bad input, we can do a
452
                // binary search to find out which input is causing this error
453
                // by recreating a tx using half of the inputs and check its
454
                // mempool acceptance.
455
                default:
3✔
456
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
3✔
457
                        return 0, err
3✔
458
                }
459
        }
460
}
461

462
// storeRecord stores the given record in the records map.
463
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
464
        f FeeFunction, fee btcutil.Amount) uint64 {
3✔
465

3✔
466
        // Increase the request counter.
3✔
467
        //
3✔
468
        // NOTE: this is the only place where we increase the
3✔
469
        // counter.
3✔
470
        requestID := t.requestCounter.Add(1)
3✔
471

3✔
472
        // Register the record.
3✔
473
        t.records.Store(requestID, &monitorRecord{
3✔
474
                tx:          tx,
3✔
475
                req:         req,
3✔
476
                feeFunction: f,
3✔
477
                fee:         fee,
3✔
478
        })
3✔
479

3✔
480
        return requestID
3✔
481
}
3✔
482

483
// createAndCheckTx creates a tx based on the given inputs, change output
484
// script, and the fee rate. In addition, it validates the tx's mempool
485
// acceptance before returning a tx that can be published directly, along with
486
// its fee.
487
func (t *TxPublisher) createAndCheckTx(req *BumpRequest, f FeeFunction) (
488
        *wire.MsgTx, btcutil.Amount, error) {
3✔
489

3✔
490
        // Create the sweep tx with max fee rate of 0 as the fee function
3✔
491
        // guarantees the fee rate used here won't exceed the max fee rate.
3✔
492
        tx, fee, err := t.createSweepTx(
3✔
493
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
3✔
494
        )
3✔
495
        if err != nil {
6✔
496
                return nil, fee, fmt.Errorf("create sweep tx: %w", err)
3✔
497
        }
3✔
498

499
        // Sanity check the budget still covers the fee.
500
        if fee > req.Budget {
3✔
501
                return nil, fee, fmt.Errorf("%w: budget=%v, fee=%v",
×
502
                        ErrNotEnoughBudget, req.Budget, fee)
×
503
        }
×
504

505
        // Validate the tx's mempool acceptance.
506
        err = t.cfg.Wallet.CheckMempoolAcceptance(tx)
3✔
507

3✔
508
        // Exit early if the tx is valid.
3✔
509
        if err == nil {
6✔
510
                return tx, fee, nil
3✔
511
        }
3✔
512

513
        // Print an error log if the chain backend doesn't support the mempool
514
        // acceptance test RPC.
515
        if errors.Is(err, rpcclient.ErrBackendVersion) {
3✔
516
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
517
                        "consider upgrading it to a newer version")
×
518
                return tx, fee, nil
×
519
        }
×
520

521
        // We are running on a backend that doesn't implement the RPC
522
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
523
        if errors.Is(err, chain.ErrUnimplemented) {
3✔
524
                log.Debug("Skipped testmempoolaccept due to not implemented")
×
525
                return tx, fee, nil
×
526
        }
×
527

528
        return nil, fee, fmt.Errorf("tx=%v failed mempool check: %w",
3✔
529
                tx.TxHash(), err)
3✔
530
}
531

532
// broadcast takes a monitored tx and publishes it to the network. Prior to the
533
// broadcast, it will subscribe the tx's confirmation notification and attach
534
// the event channel to the record. Any broadcast-related errors will not be
535
// returned here, instead, they will be put inside the `BumpResult` and
536
// returned to the caller.
537
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
3✔
538
        // Get the record being monitored.
3✔
539
        record, ok := t.records.Load(requestID)
3✔
540
        if !ok {
3✔
541
                return nil, fmt.Errorf("tx record %v not found", requestID)
×
542
        }
×
543

544
        txid := record.tx.TxHash()
3✔
545

3✔
546
        tx := record.tx
3✔
547
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
3✔
548
                txid, len(tx.TxIn), t.currentHeight.Load())
3✔
549

3✔
550
        // Set the event, and change it to TxFailed if the wallet fails to
3✔
551
        // publish it.
3✔
552
        event := TxPublished
3✔
553

3✔
554
        // Publish the sweeping tx with customized label. If the publish fails,
3✔
555
        // this error will be saved in the `BumpResult` and it will be removed
3✔
556
        // from being monitored.
3✔
557
        err := t.cfg.Wallet.PublishTransaction(
3✔
558
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
3✔
559
        )
3✔
560
        if err != nil {
3✔
561
                // NOTE: we decide to attach this error to the result instead
×
562
                // of returning it here because by the time the tx reaches
×
563
                // here, it should have passed the mempool acceptance check. If
×
564
                // it still fails to be broadcast, it's likely a non-RBF
×
565
                // related error happened. So we send this error back to the
×
566
                // caller so that it can handle it properly.
×
567
                //
×
568
                // TODO(yy): find out which input is causing the failure.
×
569
                log.Errorf("Failed to publish tx %v: %v", txid, err)
×
570
                event = TxFailed
×
571
        }
×
572

573
        result := &BumpResult{
3✔
574
                Event:     event,
3✔
575
                Tx:        record.tx,
3✔
576
                Fee:       record.fee,
3✔
577
                FeeRate:   record.feeFunction.FeeRate(),
3✔
578
                Err:       err,
3✔
579
                requestID: requestID,
3✔
580
        }
3✔
581

3✔
582
        return result, nil
3✔
583
}
584

585
// notifyResult sends the result to the resultChan specified by the requestID.
586
// This channel is expected to be read by the caller.
587
func (t *TxPublisher) notifyResult(result *BumpResult) {
3✔
588
        id := result.requestID
3✔
589
        subscriber, ok := t.subscriberChans.Load(id)
3✔
590
        if !ok {
3✔
591
                log.Errorf("Result chan for id=%v not found", id)
×
592
                return
×
593
        }
×
594

595
        log.Debugf("Sending result for requestID=%v, tx=%v", id,
3✔
596
                result.Tx.TxHash())
3✔
597

3✔
598
        select {
3✔
599
        // Send the result to the subscriber.
600
        //
601
        // TODO(yy): Add timeout in case it's blocking?
602
        case subscriber <- result:
3✔
603
        case <-t.quit:
×
604
                log.Debug("Fee bumper stopped")
×
605
        }
606
}
607

608
// removeResult removes the tracking of the result if the result contains a
609
// non-nil error, or the tx is confirmed, the record will be removed from the
610
// maps.
611
func (t *TxPublisher) removeResult(result *BumpResult) {
3✔
612
        id := result.requestID
3✔
613

3✔
614
        // Remove the record from the maps if there's an error. This means this
3✔
615
        // tx has failed its broadcast and cannot be retried. There are two
3✔
616
        // cases,
3✔
617
        // - when the budget cannot cover the fee.
3✔
618
        // - when a non-RBF related error occurs.
3✔
619
        switch result.Event {
3✔
620
        case TxFailed:
3✔
621
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
3✔
622
                        id, result.Tx.TxHash(), result.Err)
3✔
623

624
        case TxConfirmed:
3✔
625
                // Remove the record is the tx is confirmed.
3✔
626
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
3✔
627
                        result.Tx.TxHash())
3✔
628

629
        // Do nothing if it's neither failed or confirmed.
630
        default:
3✔
631
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
3✔
632
                        result.Event)
3✔
633

3✔
634
                return
3✔
635
        }
636

637
        t.records.Delete(id)
3✔
638
        t.subscriberChans.Delete(id)
3✔
639
}
640

641
// handleResult handles the result of a tx broadcast. It will notify the
642
// subscriber and remove the record if the tx is confirmed or failed to be
643
// broadcast.
644
func (t *TxPublisher) handleResult(result *BumpResult) {
3✔
645
        // Notify the subscriber.
3✔
646
        t.notifyResult(result)
3✔
647

3✔
648
        // Remove the record if it's failed or confirmed.
3✔
649
        t.removeResult(result)
3✔
650
}
3✔
651

652
// monitorRecord is used to keep track of the tx being monitored by the
653
// publisher internally.
654
type monitorRecord struct {
655
        // tx is the tx being monitored.
656
        tx *wire.MsgTx
657

658
        // req is the original request.
659
        req *BumpRequest
660

661
        // feeFunction is the fee bumping algorithm used by the publisher.
662
        feeFunction FeeFunction
663

664
        // fee is the fee paid by the tx.
665
        fee btcutil.Amount
666
}
667

668
// Start starts the publisher by subscribing to block epoch updates and kicking
669
// off the monitor loop.
670
func (t *TxPublisher) Start() error {
3✔
671
        log.Info("TxPublisher starting...")
3✔
672
        defer log.Debugf("TxPublisher started")
3✔
673

3✔
674
        blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
3✔
675
        if err != nil {
3✔
676
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
677
        }
×
678

679
        t.wg.Add(1)
3✔
680
        go t.monitor(blockEvent)
3✔
681

3✔
682
        return nil
3✔
683
}
684

685
// Stop stops the publisher and waits for the monitor loop to exit.
686
func (t *TxPublisher) Stop() {
3✔
687
        log.Info("TxPublisher stopping...")
3✔
688
        defer log.Debugf("TxPublisher stopped")
3✔
689

3✔
690
        close(t.quit)
3✔
691

3✔
692
        t.wg.Wait()
3✔
693
}
3✔
694

695
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
696
// it will examine all the txns being monitored, and check if any of them needs
697
// to be bumped. If so, it will attempt to bump the fee of the tx.
698
//
699
// NOTE: Must be run as a goroutine.
700
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
3✔
701
        defer blockEvent.Cancel()
3✔
702
        defer t.wg.Done()
3✔
703

3✔
704
        for {
6✔
705
                select {
3✔
706
                case epoch, ok := <-blockEvent.Epochs:
3✔
707
                        if !ok {
3✔
708
                                // We should stop the publisher before stopping
×
709
                                // the chain service. Otherwise it indicates an
×
710
                                // error.
×
711
                                log.Error("Block epoch channel closed, exit " +
×
712
                                        "monitor")
×
713

×
714
                                return
×
715
                        }
×
716

717
                        log.Debugf("TxPublisher received new block: %v",
3✔
718
                                epoch.Height)
3✔
719

3✔
720
                        // Update the best known height for the publisher.
3✔
721
                        t.currentHeight.Store(epoch.Height)
3✔
722

3✔
723
                        // Check all monitored txns to see if any of them needs
3✔
724
                        // to be bumped.
3✔
725
                        t.processRecords()
3✔
726

727
                case <-t.quit:
3✔
728
                        log.Debug("Fee bumper stopped, exit monitor")
3✔
729
                        return
3✔
730
                }
731
        }
732
}
733

734
// processRecords checks all the txns being monitored, and checks if any of
735
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
736
func (t *TxPublisher) processRecords() {
3✔
737
        // confirmedRecords stores a map of the records which have been
3✔
738
        // confirmed.
3✔
739
        confirmedRecords := make(map[uint64]*monitorRecord)
3✔
740

3✔
741
        // feeBumpRecords stores a map of the records which need to be bumped.
3✔
742
        feeBumpRecords := make(map[uint64]*monitorRecord)
3✔
743

3✔
744
        // failedRecords stores a map of the records which has inputs being
3✔
745
        // spent by a third party.
3✔
746
        //
3✔
747
        // NOTE: this is only used for neutrino backend.
3✔
748
        failedRecords := make(map[uint64]*monitorRecord)
3✔
749

3✔
750
        // visitor is a helper closure that visits each record and divides them
3✔
751
        // into two groups.
3✔
752
        visitor := func(requestID uint64, r *monitorRecord) error {
6✔
753
                log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
3✔
754
                        r.tx.TxHash())
3✔
755

3✔
756
                // If the tx is already confirmed, we can stop monitoring it.
3✔
757
                if t.isConfirmed(r.tx.TxHash()) {
6✔
758
                        confirmedRecords[requestID] = r
3✔
759

3✔
760
                        // Move to the next record.
3✔
761
                        return nil
3✔
762
                }
3✔
763

764
                // Check whether the inputs has been spent by a third party.
765
                //
766
                // NOTE: this check is only done for neutrino backend.
767
                if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
3✔
768
                        failedRecords[requestID] = r
×
769

×
770
                        // Move to the next record.
×
771
                        return nil
×
772
                }
×
773

774
                feeBumpRecords[requestID] = r
3✔
775

3✔
776
                // Return nil to move to the next record.
3✔
777
                return nil
3✔
778
        }
779

780
        // Iterate through all the records and divide them into two groups.
781
        t.records.ForEach(visitor)
3✔
782

3✔
783
        // For records that are confirmed, we'll notify the caller about this
3✔
784
        // result.
3✔
785
        for requestID, r := range confirmedRecords {
6✔
786
                rec := r
3✔
787

3✔
788
                log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
3✔
789
                t.wg.Add(1)
3✔
790
                go t.handleTxConfirmed(rec, requestID)
3✔
791
        }
3✔
792

793
        // Get the current height to be used in the following goroutines.
794
        currentHeight := t.currentHeight.Load()
3✔
795

3✔
796
        // For records that are not confirmed, we perform a fee bump if needed.
3✔
797
        for requestID, r := range feeBumpRecords {
6✔
798
                rec := r
3✔
799

3✔
800
                log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
3✔
801
                t.wg.Add(1)
3✔
802
                go t.handleFeeBumpTx(requestID, rec, currentHeight)
3✔
803
        }
3✔
804

805
        // For records that are failed, we'll notify the caller about this
806
        // result.
807
        for requestID, r := range failedRecords {
3✔
808
                rec := r
×
809

×
810
                log.Debugf("Tx=%v has inputs been spent by a third party, "+
×
811
                        "failing it now", r.tx.TxHash())
×
812
                t.wg.Add(1)
×
813
                go t.handleThirdPartySpent(rec, requestID)
×
814
        }
×
815
}
816

817
// handleTxConfirmed is called when a monitored tx is confirmed. It will
818
// notify the subscriber then remove the record from the maps .
819
//
820
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
821
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
3✔
822
        defer t.wg.Done()
3✔
823

3✔
824
        // Create a result that will be sent to the resultChan which is
3✔
825
        // listened by the caller.
3✔
826
        result := &BumpResult{
3✔
827
                Event:     TxConfirmed,
3✔
828
                Tx:        r.tx,
3✔
829
                requestID: requestID,
3✔
830
                Fee:       r.fee,
3✔
831
                FeeRate:   r.feeFunction.FeeRate(),
3✔
832
        }
3✔
833

3✔
834
        // Notify that this tx is confirmed and remove the record from the map.
3✔
835
        t.handleResult(result)
3✔
836
}
3✔
837

838
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
839
// attempt to bump the fee of the tx.
840
//
841
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
842
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
843
        currentHeight int32) {
3✔
844

3✔
845
        defer t.wg.Done()
3✔
846

3✔
847
        oldTxid := r.tx.TxHash()
3✔
848

3✔
849
        // Get the current conf target for this record.
3✔
850
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
3✔
851

3✔
852
        // Ask the fee function whether a bump is needed. We expect the fee
3✔
853
        // function to increase its returned fee rate after calling this
3✔
854
        // method.
3✔
855
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
3✔
856
        if err != nil {
6✔
857
                // TODO(yy): send this error back to the sweeper so it can
3✔
858
                // re-group the inputs?
3✔
859
                log.Errorf("Failed to increase fee rate for tx %v at "+
3✔
860
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
3✔
861

3✔
862
                return
3✔
863
        }
3✔
864

865
        // If the fee rate was not increased, there's no need to bump the fee.
866
        if !increased {
6✔
867
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
3✔
868
                        t.currentHeight.Load())
3✔
869

3✔
870
                return
3✔
871
        }
3✔
872

873
        // The fee function now has a new fee rate, we will use it to bump the
874
        // fee of the tx.
875
        resultOpt := t.createAndPublishTx(requestID, r)
3✔
876

3✔
877
        // If there's a result, we will notify the caller about the result.
3✔
878
        resultOpt.WhenSome(func(result BumpResult) {
6✔
879
                // Notify the new result.
3✔
880
                t.handleResult(&result)
3✔
881
        })
3✔
882
}
883

884
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
885
// spent. It will notify the subscriber then remove the record from the maps
886
// and send a TxFailed event to the subscriber.
887
//
888
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
889
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
890
        requestID uint64) {
×
891

×
892
        defer t.wg.Done()
×
893

×
894
        // Create a result that will be sent to the resultChan which is
×
895
        // listened by the caller.
×
896
        //
×
897
        // TODO(yy): create a new state `TxThirdPartySpent` to notify the
×
898
        // sweeper to remove the input, hence moving the monitoring of inputs
×
899
        // spent inside the fee bumper.
×
900
        result := &BumpResult{
×
901
                Event:     TxFailed,
×
902
                Tx:        r.tx,
×
903
                requestID: requestID,
×
904
                Err:       ErrThirdPartySpent,
×
905
        }
×
906

×
907
        // Notify that this tx is confirmed and remove the record from the map.
×
908
        t.handleResult(result)
×
909
}
×
910

911
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
912
// to the network. It will update the record with the new tx and fee rate if
913
// successfully created, and return the result when published successfully.
914
func (t *TxPublisher) createAndPublishTx(requestID uint64,
915
        r *monitorRecord) fn.Option[BumpResult] {
3✔
916

3✔
917
        // Fetch the old tx.
3✔
918
        oldTx := r.tx
3✔
919

3✔
920
        // Create a new tx with the new fee rate.
3✔
921
        //
3✔
922
        // NOTE: The fee function is expected to have increased its returned
3✔
923
        // fee rate after calling the SkipFeeBump method. So we can use it
3✔
924
        // directly here.
3✔
925
        tx, fee, err := t.createAndCheckTx(r.req, r.feeFunction)
3✔
926

3✔
927
        // If the error is fee related, we will return no error and let the fee
3✔
928
        // bumper retry it at next block.
3✔
929
        //
3✔
930
        // NOTE: we can check the RBF error here and ask the fee function to
3✔
931
        // recalculate the fee rate. However, this would defeat the purpose of
3✔
932
        // using a deadline based fee function:
3✔
933
        // - if the deadline is far away, there's no rush to RBF the tx.
3✔
934
        // - if the deadline is close, we expect the fee function to give us a
3✔
935
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
3✔
936
        //   means the budget is not enough.
3✔
937
        if errors.Is(err, chain.ErrInsufficientFee) ||
3✔
938
                errors.Is(err, lnwallet.ErrMempoolFee) {
6✔
939

3✔
940
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
3✔
941
                return fn.None[BumpResult]()
3✔
942
        }
3✔
943

944
        // If the error is not fee related, we will return a `TxFailed` event
945
        // so this input can be retried.
946
        if err != nil {
6✔
947
                // If the tx doesn't not have enought budget, we will return a
3✔
948
                // result so the sweeper can handle it by re-clustering the
3✔
949
                // utxos.
3✔
950
                if errors.Is(err, ErrNotEnoughBudget) {
3✔
951
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
×
952
                                err)
×
953
                } else {
3✔
954
                        // Otherwise, an unexpected error occurred, we will
3✔
955
                        // fail the tx and let the sweeper retry the whole
3✔
956
                        // process.
3✔
957
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
3✔
958
                                err)
3✔
959
                }
3✔
960

961
                return fn.Some(BumpResult{
3✔
962
                        Event:     TxFailed,
3✔
963
                        Tx:        oldTx,
3✔
964
                        Err:       err,
3✔
965
                        requestID: requestID,
3✔
966
                })
3✔
967
        }
968

969
        // The tx has been created without any errors, we now register a new
970
        // record by overwriting the same requestID.
971
        t.records.Store(requestID, &monitorRecord{
3✔
972
                tx:          tx,
3✔
973
                req:         r.req,
3✔
974
                feeFunction: r.feeFunction,
3✔
975
                fee:         fee,
3✔
976
        })
3✔
977

3✔
978
        // Attempt to broadcast this new tx.
3✔
979
        result, err := t.broadcast(requestID)
3✔
980
        if err != nil {
3✔
981
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
982
                        tx.TxHash(), err)
×
983

×
984
                return fn.None[BumpResult]()
×
985
        }
×
986

987
        // If the result error is fee related, we will return no error and let
988
        // the fee bumper retry it at next block.
989
        //
990
        // NOTE: we may get this error if we've bypassed the mempool check,
991
        // which means we are suing neutrino backend.
992
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
3✔
993
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
3✔
994

×
995
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
×
996
                return fn.None[BumpResult]()
×
997
        }
×
998

999
        // A successful replacement tx is created, attach the old tx.
1000
        result.ReplacedTx = oldTx
3✔
1001

3✔
1002
        // If the new tx failed to be published, we will return the result so
3✔
1003
        // the caller can handle it.
3✔
1004
        if result.Event == TxFailed {
3✔
1005
                return fn.Some(*result)
×
1006
        }
×
1007

1008
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(), tx.TxHash())
3✔
1009

3✔
1010
        // Otherwise, it's a successful RBF, set the event and return.
3✔
1011
        result.Event = TxReplaced
3✔
1012

3✔
1013
        return fn.Some(*result)
3✔
1014
}
1015

1016
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
1017
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
3✔
1018
        details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
3✔
1019
        if err != nil {
6✔
1020
                log.Warnf("Failed to get tx details for %v: %v", txid, err)
3✔
1021
                return false
3✔
1022
        }
3✔
1023

1024
        return details.NumConfirmations > 0
3✔
1025
}
1026

1027
// isThirdPartySpent checks whether the inputs of the tx has already been spent
1028
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
1029
// then it must be spent by a different tx other than the sweeping tx here.
1030
//
1031
// NOTE: this check is only performed for neutrino backend as it has no
1032
// reliable way to tell a tx has been replaced.
1033
func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
1034
        inputs []input.Input) bool {
3✔
1035

3✔
1036
        // Skip this check for if this is not neutrino backend.
3✔
1037
        if !t.isNeutrinoBackend() {
6✔
1038
                return false
3✔
1039
        }
3✔
1040

1041
        // Iterate all the inputs and check if they have been spent already.
1042
        for _, inp := range inputs {
×
1043
                op := inp.OutPoint()
×
1044

×
1045
                // For wallet utxos, the height hint is not set - we don't need
×
1046
                // to monitor them for third party spend.
×
1047
                heightHint := inp.HeightHint()
×
1048
                if heightHint == 0 {
×
1049
                        log.Debugf("Skipped third party check for wallet "+
×
1050
                                "input %v", op)
×
1051

×
1052
                        continue
×
1053
                }
1054

1055
                // If the input has already been spent after the height hint, a
1056
                // spend event is sent back immediately.
1057
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
×
1058
                        &op, inp.SignDesc().Output.PkScript, heightHint,
×
1059
                )
×
1060
                if err != nil {
×
1061
                        log.Criticalf("Failed to register spend ntfn for "+
×
1062
                                "input=%v: %v", op, err)
×
1063
                        return false
×
1064
                }
×
1065

1066
                // Remove the subscription when exit.
1067
                defer spendEvent.Cancel()
×
1068

×
1069
                // Do a non-blocking read to see if the output has been spent.
×
1070
                select {
×
1071
                case spend, ok := <-spendEvent.Spend:
×
1072
                        if !ok {
×
1073
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1074
                                return false
×
1075
                        }
×
1076

1077
                        spendingTxID := spend.SpendingTx.TxHash()
×
1078

×
1079
                        // If the spending tx is the same as the sweeping tx
×
1080
                        // then we are good.
×
1081
                        if spendingTxID == txid {
×
1082
                                continue
×
1083
                        }
1084

1085
                        log.Warnf("Detected third party spent of output=%v "+
×
1086
                                "in tx=%v", op, spend.SpendingTx.TxHash())
×
1087

×
1088
                        return true
×
1089

1090
                // Move to the next input.
1091
                default:
×
1092
                }
1093
        }
1094

1095
        return false
×
1096
}
1097

1098
// calcCurrentConfTarget calculates the current confirmation target based on
1099
// the deadline height. The conf target is capped at 0 if the deadline has
1100
// already been past.
1101
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
3✔
1102
        var confTarget uint32
3✔
1103

3✔
1104
        // Calculate how many blocks left until the deadline.
3✔
1105
        deadlineDelta := deadline - currentHeight
3✔
1106

3✔
1107
        // If we are already past the deadline, we will set the conf target to
3✔
1108
        // be 1.
3✔
1109
        if deadlineDelta < 0 {
6✔
1110
                log.Warnf("Deadline is %d blocks behind current height %v",
3✔
1111
                        -deadlineDelta, currentHeight)
3✔
1112

3✔
1113
                confTarget = 0
3✔
1114
        } else {
6✔
1115
                confTarget = uint32(deadlineDelta)
3✔
1116
        }
3✔
1117

1118
        return confTarget
3✔
1119
}
1120

1121
// createSweepTx creates a sweeping tx based on the given inputs, change
1122
// address and fee rate.
1123
func (t *TxPublisher) createSweepTx(inputs []input.Input, changePkScript []byte,
1124
        feeRate chainfee.SatPerKWeight) (*wire.MsgTx, btcutil.Amount, error) {
3✔
1125

3✔
1126
        // Validate and calculate the fee and change amount.
3✔
1127
        txFee, changeAmtOpt, locktimeOpt, err := prepareSweepTx(
3✔
1128
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
3✔
1129
        )
3✔
1130
        if err != nil {
6✔
1131
                return nil, 0, err
3✔
1132
        }
3✔
1133

1134
        var (
3✔
1135
                // Create the sweep transaction that we will be building. We
3✔
1136
                // use version 2 as it is required for CSV.
3✔
1137
                sweepTx = wire.NewMsgTx(2)
3✔
1138

3✔
1139
                // We'll add the inputs as we go so we know the final ordering
3✔
1140
                // of inputs to sign.
3✔
1141
                idxs []input.Input
3✔
1142
        )
3✔
1143

3✔
1144
        // We start by adding all inputs that commit to an output. We do this
3✔
1145
        // since the input and output index must stay the same for the
3✔
1146
        // signatures to be valid.
3✔
1147
        for _, o := range inputs {
6✔
1148
                if o.RequiredTxOut() == nil {
6✔
1149
                        continue
3✔
1150
                }
1151

1152
                idxs = append(idxs, o)
3✔
1153
                sweepTx.AddTxIn(&wire.TxIn{
3✔
1154
                        PreviousOutPoint: o.OutPoint(),
3✔
1155
                        Sequence:         o.BlocksToMaturity(),
3✔
1156
                })
3✔
1157
                sweepTx.AddTxOut(o.RequiredTxOut())
3✔
1158
        }
1159

1160
        // Sum up the value contained in the remaining inputs, and add them to
1161
        // the sweep transaction.
1162
        for _, o := range inputs {
6✔
1163
                if o.RequiredTxOut() != nil {
6✔
1164
                        continue
3✔
1165
                }
1166

1167
                idxs = append(idxs, o)
3✔
1168
                sweepTx.AddTxIn(&wire.TxIn{
3✔
1169
                        PreviousOutPoint: o.OutPoint(),
3✔
1170
                        Sequence:         o.BlocksToMaturity(),
3✔
1171
                })
3✔
1172
        }
1173

1174
        // If there's a change amount, add it to the transaction.
1175
        changeAmtOpt.WhenSome(func(changeAmt btcutil.Amount) {
6✔
1176
                sweepTx.AddTxOut(&wire.TxOut{
3✔
1177
                        PkScript: changePkScript,
3✔
1178
                        Value:    int64(changeAmt),
3✔
1179
                })
3✔
1180
        })
3✔
1181

1182
        // We'll default to using the current block height as locktime, if none
1183
        // of the inputs commits to a different locktime.
1184
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
3✔
1185

3✔
1186
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
3✔
1187
        if err != nil {
3✔
1188
                return nil, 0, fmt.Errorf("error creating prev input fetcher "+
×
1189
                        "for hash cache: %v", err)
×
1190
        }
×
1191
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
3✔
1192

3✔
1193
        // With all the inputs in place, use each output's unique input script
3✔
1194
        // function to generate the final witness required for spending.
3✔
1195
        addInputScript := func(idx int, tso input.Input) error {
6✔
1196
                inputScript, err := tso.CraftInputScript(
3✔
1197
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
3✔
1198
                )
3✔
1199
                if err != nil {
3✔
1200
                        return err
×
1201
                }
×
1202

1203
                sweepTx.TxIn[idx].Witness = inputScript.Witness
3✔
1204

3✔
1205
                if len(inputScript.SigScript) == 0 {
6✔
1206
                        return nil
3✔
1207
                }
3✔
1208

1209
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1210

×
1211
                return nil
×
1212
        }
1213

1214
        for idx, inp := range idxs {
6✔
1215
                if err := addInputScript(idx, inp); err != nil {
3✔
1216
                        return nil, 0, err
×
1217
                }
×
1218
        }
1219

1220
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
3✔
1221
                inputTypeSummary(inputs))
3✔
1222

3✔
1223
        return sweepTx, txFee, nil
3✔
1224
}
1225

1226
// prepareSweepTx returns the tx fee, an optional change amount and an optional
1227
// locktime after a series of validations:
1228
// 1. check the locktime has been reached.
1229
// 2. check the locktimes are the same.
1230
// 3. check the inputs cover the outputs.
1231
//
1232
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1233
func prepareSweepTx(inputs []input.Input, changePkScript []byte,
1234
        feeRate chainfee.SatPerKWeight, currentHeight int32) (
1235
        btcutil.Amount, fn.Option[btcutil.Amount], fn.Option[int32], error) {
3✔
1236

3✔
1237
        noChange := fn.None[btcutil.Amount]()
3✔
1238
        noLocktime := fn.None[int32]()
3✔
1239

3✔
1240
        // Creating a weight estimator with nil outputs and zero max fee rate.
3✔
1241
        // We don't allow adding customized outputs in the sweeping tx, and the
3✔
1242
        // fee rate is already being managed before we get here.
3✔
1243
        inputs, estimator, err := getWeightEstimate(
3✔
1244
                inputs, nil, feeRate, 0, changePkScript,
3✔
1245
        )
3✔
1246
        if err != nil {
3✔
1247
                return 0, noChange, noLocktime, err
×
1248
        }
×
1249

1250
        txFee := estimator.fee()
3✔
1251

3✔
1252
        var (
3✔
1253
                // Track whether any of the inputs require a certain locktime.
3✔
1254
                locktime = int32(-1)
3✔
1255

3✔
1256
                // We keep track of total input amount, and required output
3✔
1257
                // amount to use for calculating the change amount below.
3✔
1258
                totalInput     btcutil.Amount
3✔
1259
                requiredOutput btcutil.Amount
3✔
1260
        )
3✔
1261

3✔
1262
        // Go through each input and check if the required lock times have
3✔
1263
        // reached and are the same.
3✔
1264
        for _, o := range inputs {
6✔
1265
                // If the input has a required output, we'll add it to the
3✔
1266
                // required output amount.
3✔
1267
                if o.RequiredTxOut() != nil {
6✔
1268
                        requiredOutput += btcutil.Amount(
3✔
1269
                                o.RequiredTxOut().Value,
3✔
1270
                        )
3✔
1271
                }
3✔
1272

1273
                // Update the total input amount.
1274
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
3✔
1275

3✔
1276
                lt, ok := o.RequiredLockTime()
3✔
1277

3✔
1278
                // Skip if the input doesn't require a lock time.
3✔
1279
                if !ok {
6✔
1280
                        continue
3✔
1281
                }
1282

1283
                // Check if the lock time has reached
1284
                if lt > uint32(currentHeight) {
3✔
1285
                        return 0, noChange, noLocktime, ErrLocktimeImmature
×
1286
                }
×
1287

1288
                // If another input commits to a different locktime, they
1289
                // cannot be combined in the same transaction.
1290
                if locktime != -1 && locktime != int32(lt) {
3✔
1291
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1292
                }
×
1293

1294
                // Update the locktime for next iteration.
1295
                locktime = int32(lt)
3✔
1296
        }
1297

1298
        // Make sure total output amount is less than total input amount.
1299
        if requiredOutput+txFee > totalInput {
3✔
1300
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
1301
                        "input to create sweep tx: input_sum=%v, "+
×
1302
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
1303
        }
×
1304

1305
        // The value remaining after the required output and fees is the
1306
        // change output.
1307
        changeAmt := totalInput - requiredOutput - txFee
3✔
1308
        changeAmtOpt := fn.Some(changeAmt)
3✔
1309

3✔
1310
        // We'll calculate the dust limit for the given changePkScript since it
3✔
1311
        // is variable.
3✔
1312
        changeFloor := lnwallet.DustLimitForSize(len(changePkScript))
3✔
1313

3✔
1314
        // If the change amount is dust, we'll move it into the fees.
3✔
1315
        if changeAmt < changeFloor {
6✔
1316
                log.Infof("Change amt %v below dustlimit %v, not adding "+
3✔
1317
                        "change output", changeAmt, changeFloor)
3✔
1318

3✔
1319
                // If there's no required output, and the change output is a
3✔
1320
                // dust, it means we are creating a tx without any outputs. In
3✔
1321
                // this case we'll return an error. This could happen when
3✔
1322
                // creating a tx that has an anchor as the only input.
3✔
1323
                if requiredOutput == 0 {
6✔
1324
                        return 0, noChange, noLocktime, ErrTxNoOutput
3✔
1325
                }
3✔
1326

1327
                // The dust amount is added to the fee.
1328
                txFee += changeAmt
×
1329

×
1330
                // Set the change amount to none.
×
1331
                changeAmtOpt = fn.None[btcutil.Amount]()
×
1332
        }
1333

1334
        // Optionally set the locktime.
1335
        locktimeOpt := fn.Some(locktime)
3✔
1336
        if locktime == -1 {
6✔
1337
                locktimeOpt = noLocktime
3✔
1338
        }
3✔
1339

1340
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
3✔
1341
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
3✔
1342
                "parents_fee=%v, parents_weight=%v, current_height=%v",
3✔
1343
                len(inputs), inputTypeSummary(inputs), feeRate,
3✔
1344
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
3✔
1345
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
3✔
1346

3✔
1347
        return txFee, changeAmtOpt, locktimeOpt, nil
3✔
1348
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc