• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12145425118

03 Dec 2024 05:56PM UTC coverage: 58.934% (-0.04%) from 58.972%
12145425118

push

github

web-flow
Merge pull request #9257 from starius/estimatefeerate-regtest2

chainreg: use feerate estimator in regtest and simnet

5 of 5 new or added lines in 1 file covered. (100.0%)

104 existing lines in 24 files now uncovered.

133434 of 226413 relevant lines covered (58.93%)

19484.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.2
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/fn"
17
        "github.com/lightningnetwork/lnd/input"
18
        "github.com/lightningnetwork/lnd/labels"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnutils"
21
        "github.com/lightningnetwork/lnd/lnwallet"
22
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
23
        "github.com/lightningnetwork/lnd/tlv"
24
)
25

26
var (
27
        // ErrInvalidBumpResult is returned when the bump result is invalid.
28
        ErrInvalidBumpResult = errors.New("invalid bump result")
29

30
        // ErrNotEnoughBudget is returned when the fee bumper decides the
31
        // current budget cannot cover the fee.
32
        ErrNotEnoughBudget = errors.New("not enough budget")
33

34
        // ErrLocktimeImmature is returned when sweeping an input whose
35
        // locktime is not reached.
36
        ErrLocktimeImmature = errors.New("immature input")
37

38
        // ErrTxNoOutput is returned when an output cannot be created during tx
39
        // preparation, usually due to the output being dust.
40
        ErrTxNoOutput = errors.New("tx has no output")
41

42
        // ErrThirdPartySpent is returned when a third party has spent the
43
        // input in the sweeping tx.
44
        ErrThirdPartySpent = errors.New("third party spent the output")
45
)
46

47
var (
48
        // dummyChangePkScript is a dummy tapscript change script that's used
49
        // when we don't need a real address, just something that can be used
50
        // for fee estimation.
51
        dummyChangePkScript = []byte{
52
                0x51, 0x20,
53
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
54
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
55
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
56
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
57
        }
58
)
59

60
// Bumper defines an interface that can be used by other subsystems for fee
61
// bumping.
62
type Bumper interface {
63
        // Broadcast is used to publish the tx created from the given inputs
64
        // specified in the request. It handles the tx creation, broadcasts it,
65
        // and monitors its confirmation status for potential fee bumping. It
66
        // returns a chan that the caller can use to receive updates about the
67
        // broadcast result and potential RBF attempts.
68
        Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
69
}
70

71
// BumpEvent represents the event of a fee bumping attempt.
72
type BumpEvent uint8
73

74
const (
75
        // TxPublished is sent when the broadcast attempt is finished.
76
        TxPublished BumpEvent = iota
77

78
        // TxFailed is sent when the broadcast attempt fails.
79
        TxFailed
80

81
        // TxReplaced is sent when the original tx is replaced by a new one.
82
        TxReplaced
83

84
        // TxConfirmed is sent when the tx is confirmed.
85
        TxConfirmed
86

87
        // sentinalEvent is used to check if an event is unknown.
88
        sentinalEvent
89
)
90

91
// String returns a human-readable string for the event.
92
func (e BumpEvent) String() string {
4✔
93
        switch e {
4✔
94
        case TxPublished:
4✔
95
                return "Published"
4✔
96
        case TxFailed:
4✔
97
                return "Failed"
4✔
98
        case TxReplaced:
4✔
99
                return "Replaced"
4✔
100
        case TxConfirmed:
4✔
101
                return "Confirmed"
4✔
102
        default:
×
103
                return "Unknown"
×
104
        }
105
}
106

107
// Unknown returns true if the event is unknown.
108
func (e BumpEvent) Unknown() bool {
12✔
109
        return e >= sentinalEvent
12✔
110
}
12✔
111

112
// BumpRequest is used by the caller to give the Bumper the necessary info to
113
// create and manage potential fee bumps for a set of inputs.
114
type BumpRequest struct {
115
        // Budget givens the total amount that can be used as fees by these
116
        // inputs.
117
        Budget btcutil.Amount
118

119
        // Inputs is the set of inputs to sweep.
120
        Inputs []input.Input
121

122
        // DeadlineHeight is the block height at which the tx should be
123
        // confirmed.
124
        DeadlineHeight int32
125

126
        // DeliveryAddress is the script to send the change output to.
127
        DeliveryAddress lnwallet.AddrWithKey
128

129
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
130
        MaxFeeRate chainfee.SatPerKWeight
131

132
        // StartingFeeRate is an optional parameter that can be used to specify
133
        // the initial fee rate to use for the fee function.
134
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
135

136
        // ExtraTxOut tracks if this bump request has an optional set of extra
137
        // outputs to add to the transaction.
138
        ExtraTxOut fn.Option[SweepOutput]
139
}
140

141
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
142
// request. It calculates the feerate using the supplied budget and the weight,
143
// compares it with the specified MaxFeeRate, and returns the smaller of the
144
// two.
145
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
12✔
146
        // We'll want to know if we have any blobs, as we need to factor this
12✔
147
        // into the max fee rate for this bump request.
12✔
148
        hasBlobs := fn.Any(func(i input.Input) bool {
23✔
149
                return fn.MapOptionZ(
11✔
150
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
15✔
151
                                return len(b) > 0
4✔
152
                        },
4✔
153
                )
154
        }, r.Inputs)
155

156
        sweepAddrs := [][]byte{
12✔
157
                r.DeliveryAddress.DeliveryAddress,
12✔
158
        }
12✔
159

12✔
160
        // If we have blobs, then we'll add an extra sweep addr for the size
12✔
161
        // estimate below. We know that these blobs will also always be based on
12✔
162
        // p2tr addrs.
12✔
163
        if hasBlobs {
12✔
164
                // We need to pass in a real address, so we'll use a dummy
×
165
                // tapscript change script that's used elsewhere for tests.
×
166
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
167
        }
×
168

169
        // Get the size of the sweep tx, which will be used to calculate the
170
        // budget fee rate.
171
        size, err := calcSweepTxWeight(
12✔
172
                r.Inputs, sweepAddrs,
12✔
173
        )
12✔
174
        if err != nil {
13✔
175
                return 0, err
1✔
176
        }
1✔
177

178
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
179
        // This is needed as, when the input has a large value and the user
180
        // sets the budget to be proportional to the input value, the fee rate
181
        // can be very high and we need to make sure it doesn't exceed the max
182
        // fee rate.
183
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
11✔
184
        if maxFeeRateAllowed > r.MaxFeeRate {
16✔
185
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
5✔
186
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
5✔
187
                        r.MaxFeeRate, size)
5✔
188

5✔
189
                return r.MaxFeeRate, nil
5✔
190
        }
5✔
191

192
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
10✔
193
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
10✔
194

10✔
195
        return maxFeeRateAllowed, nil
10✔
196
}
197

198
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
199
// sweeping tx always has a single output(change).
200
func calcSweepTxWeight(inputs []input.Input,
201
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
15✔
202

15✔
203
        // Use a const fee rate as we only use the weight estimator to
15✔
204
        // calculate the size.
15✔
205
        const feeRate = 1
15✔
206

15✔
207
        // Initialize the tx weight estimator with,
15✔
208
        // - nil outputs as we only have one single change output.
15✔
209
        // - const fee rate as we don't care about the fees here.
15✔
210
        // - 0 maxfeerate as we don't care about fees here.
15✔
211
        //
15✔
212
        // TODO(yy): we should refactor the weight estimator to not require a
15✔
213
        // fee rate and max fee rate and make it a pure tx weight calculator.
15✔
214
        _, estimator, err := getWeightEstimate(
15✔
215
                inputs, nil, feeRate, 0, outputPkScript,
15✔
216
        )
15✔
217
        if err != nil {
17✔
218
                return 0, err
2✔
219
        }
2✔
220

221
        return estimator.weight(), nil
13✔
222
}
223

224
// BumpResult is used by the Bumper to send updates about the tx being
225
// broadcast.
226
type BumpResult struct {
227
        // Event is the type of event that the result is for.
228
        Event BumpEvent
229

230
        // Tx is the tx being broadcast.
231
        Tx *wire.MsgTx
232

233
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
234
        ReplacedTx *wire.MsgTx
235

236
        // FeeRate is the fee rate used for the new tx.
237
        FeeRate chainfee.SatPerKWeight
238

239
        // Fee is the fee paid by the new tx.
240
        Fee btcutil.Amount
241

242
        // Err is the error that occurred during the broadcast.
243
        Err error
244

245
        // requestID is the ID of the request that created this record.
246
        requestID uint64
247
}
248

249
// Validate validates the BumpResult so it's safe to use.
250
func (b *BumpResult) Validate() error {
13✔
251
        // Every result must have a tx.
13✔
252
        if b.Tx == nil {
14✔
253
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
254
        }
1✔
255

256
        // Every result must have a known event.
257
        if b.Event.Unknown() {
13✔
258
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
259
        }
1✔
260

261
        // If it's a replacing event, it must have a replaced tx.
262
        if b.Event == TxReplaced && b.ReplacedTx == nil {
12✔
263
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
264
        }
1✔
265

266
        // If it's a failed event, it must have an error.
267
        if b.Event == TxFailed && b.Err == nil {
11✔
268
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
1✔
269
        }
1✔
270

271
        // If it's a confirmed event, it must have a fee rate and fee.
272
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
10✔
273
                return fmt.Errorf("%w: missing fee rate or fee",
1✔
274
                        ErrInvalidBumpResult)
1✔
275
        }
1✔
276

277
        return nil
8✔
278
}
279

280
// TxPublisherConfig is the config used to create a new TxPublisher.
281
type TxPublisherConfig struct {
282
        // Signer is used to create the tx signature.
283
        Signer input.Signer
284

285
        // Wallet is used primarily to publish the tx.
286
        Wallet Wallet
287

288
        // Estimator is used to estimate the fee rate for the new tx based on
289
        // its deadline conf target.
290
        Estimator chainfee.Estimator
291

292
        // Notifier is used to monitor the confirmation status of the tx.
293
        Notifier chainntnfs.ChainNotifier
294

295
        // AuxSweeper is an optional interface that can be used to modify the
296
        // way sweep transaction are generated.
297
        AuxSweeper fn.Option[AuxSweeper]
298
}
299

300
// TxPublisher is an implementation of the Bumper interface. It utilizes the
301
// `testmempoolaccept` RPC to bump the fee of txns it created based on
302
// different fee function selected or configed by the caller. Its purpose is to
303
// take a list of inputs specified, and create a tx that spends them to a
304
// specified output. It will then monitor the confirmation status of the tx,
305
// and if it's not confirmed within a certain time frame, it will attempt to
306
// bump the fee of the tx by creating a new tx that spends the same inputs to
307
// the same output, but with a higher fee rate. It will continue to do this
308
// until the tx is confirmed or the fee rate reaches the maximum fee rate
309
// specified by the caller.
310
type TxPublisher struct {
311
        started atomic.Bool
312
        stopped atomic.Bool
313

314
        wg sync.WaitGroup
315

316
        // cfg specifies the configuration of the TxPublisher.
317
        cfg *TxPublisherConfig
318

319
        // currentHeight is the current block height.
320
        currentHeight atomic.Int32
321

322
        // records is a map keyed by the requestCounter and the value is the tx
323
        // being monitored.
324
        records lnutils.SyncMap[uint64, *monitorRecord]
325

326
        // requestCounter is a monotonically increasing counter used to keep
327
        // track of how many requests have been made.
328
        requestCounter atomic.Uint64
329

330
        // subscriberChans is a map keyed by the requestCounter, each item is
331
        // the chan that the publisher sends the fee bump result to.
332
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
333

334
        // quit is used to signal the publisher to stop.
335
        quit chan struct{}
336
}
337

338
// Compile-time constraint to ensure TxPublisher implements Bumper.
339
var _ Bumper = (*TxPublisher)(nil)
340

341
// NewTxPublisher creates a new TxPublisher.
342
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
18✔
343
        return &TxPublisher{
18✔
344
                cfg:             &cfg,
18✔
345
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
18✔
346
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
18✔
347
                quit:            make(chan struct{}),
18✔
348
        }
18✔
349
}
18✔
350

351
// isNeutrinoBackend checks if the wallet backend is neutrino.
352
func (t *TxPublisher) isNeutrinoBackend() bool {
5✔
353
        return t.cfg.Wallet.BackEnd() == "neutrino"
5✔
354
}
5✔
355

356
// Broadcast is used to publish the tx created from the given inputs. It will,
357
// 1. init a fee function based on the given strategy.
358
// 2. create an RBF-compliant tx and monitor it for confirmation.
359
// 3. notify the initial broadcast result back to the caller.
360
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
361
// specified cannot cover the fee.
362
//
363
// NOTE: part of the Bumper interface.
364
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
7✔
365
        log.Tracef("Received broadcast request: %s", lnutils.SpewLogClosure(
7✔
366
                req))
7✔
367

7✔
368
        // Attempt an initial broadcast which is guaranteed to comply with the
7✔
369
        // RBF rules.
7✔
370
        result, err := t.initialBroadcast(req)
7✔
371
        if err != nil {
12✔
372
                log.Errorf("Initial broadcast failed: %v", err)
5✔
373

5✔
374
                return nil, err
5✔
375
        }
5✔
376

377
        // Create a chan to send the result to the caller.
378
        subscriber := make(chan *BumpResult, 1)
6✔
379
        t.subscriberChans.Store(result.requestID, subscriber)
6✔
380

6✔
381
        // Send the initial broadcast result to the caller.
6✔
382
        t.handleResult(result)
6✔
383

6✔
384
        return subscriber, nil
6✔
385
}
386

387
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
388
// broadcasts it.
389
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
7✔
390
        // Create a fee bumping algorithm to be used for future RBF.
7✔
391
        feeAlgo, err := t.initializeFeeFunction(req)
7✔
392
        if err != nil {
11✔
393
                return nil, fmt.Errorf("init fee function: %w", err)
4✔
394
        }
4✔
395

396
        // Create the initial tx to be broadcasted. This tx is guaranteed to
397
        // comply with the RBF restrictions.
398
        requestID, err := t.createRBFCompliantTx(req, feeAlgo)
7✔
399
        if err != nil {
12✔
400
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
5✔
401
        }
5✔
402

403
        // Broadcast the tx and return the monitored record.
404
        result, err := t.broadcast(requestID)
6✔
405
        if err != nil {
6✔
406
                return nil, fmt.Errorf("broadcast sweep tx: %w", err)
×
407
        }
×
408

409
        return result, nil
6✔
410
}
411

412
// initializeFeeFunction initializes a fee function to be used for this request
413
// for future fee bumping.
414
func (t *TxPublisher) initializeFeeFunction(
415
        req *BumpRequest) (FeeFunction, error) {
9✔
416

9✔
417
        // Get the max allowed feerate.
9✔
418
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
9✔
419
        if err != nil {
9✔
420
                return nil, err
×
421
        }
×
422

423
        // Get the initial conf target.
424
        confTarget := calcCurrentConfTarget(
9✔
425
                t.currentHeight.Load(), req.DeadlineHeight,
9✔
426
        )
9✔
427

9✔
428
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
9✔
429
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
9✔
430
                maxFeeRateAllowed)
9✔
431

9✔
432
        // Initialize the fee function and return it.
9✔
433
        //
9✔
434
        // TODO(yy): return based on differet req.Strategy?
9✔
435
        return NewLinearFeeFunction(
9✔
436
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
9✔
437
                req.StartingFeeRate,
9✔
438
        )
9✔
439
}
440

441
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
442
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
443
// and redo the process until the tx is valid, or return an error when non-RBF
444
// related errors occur or the budget has been used up.
445
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
446
        f FeeFunction) (uint64, error) {
13✔
447

13✔
448
        for {
29✔
449
                // Create a new tx with the given fee rate and check its
16✔
450
                // mempool acceptance.
16✔
451
                sweepCtx, err := t.createAndCheckTx(req, f)
16✔
452

16✔
453
                switch {
16✔
454
                case err == nil:
10✔
455
                        // The tx is valid, return the request ID.
10✔
456
                        requestID := t.storeRecord(
10✔
457
                                sweepCtx.tx, req, f, sweepCtx.fee,
10✔
458
                                sweepCtx.outpointToTxIndex,
10✔
459
                        )
10✔
460

10✔
461
                        log.Infof("Created tx %v for %v inputs: feerate=%v, "+
10✔
462
                                "fee=%v, inputs=%v", sweepCtx.tx.TxHash(),
10✔
463
                                len(req.Inputs), f.FeeRate(), sweepCtx.fee,
10✔
464
                                inputTypeSummary(req.Inputs))
10✔
465

10✔
466
                        return requestID, nil
10✔
467

468
                // If the error indicates the fees paid is not enough, we will
469
                // ask the fee function to increase the fee rate and retry.
470
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
471
                        // We should at least start with a feerate above the
2✔
472
                        // mempool min feerate, so if we get this error, it
2✔
473
                        // means something is wrong earlier in the pipeline.
2✔
474
                        log.Errorf("Current fee=%v, feerate=%v, %v",
2✔
475
                                sweepCtx.fee, f.FeeRate(), err)
2✔
476

2✔
477
                        fallthrough
2✔
478

479
                // We are not paying enough fees so we increase it.
480
                case errors.Is(err, chain.ErrInsufficientFee):
7✔
481
                        increased := false
7✔
482

7✔
483
                        // Keep calling the fee function until the fee rate is
7✔
484
                        // increased or maxed out.
7✔
485
                        for !increased {
15✔
486
                                log.Debugf("Increasing fee for next round, "+
8✔
487
                                        "current fee=%v, feerate=%v",
8✔
488
                                        sweepCtx.fee, f.FeeRate())
8✔
489

8✔
490
                                // If the fee function tells us that we have
8✔
491
                                // used up the budget, we will return an error
8✔
492
                                // indicating this tx cannot be made. The
8✔
493
                                // sweeper should handle this error and try to
8✔
494
                                // cluster these inputs differetly.
8✔
495
                                increased, err = f.Increment()
8✔
496
                                if err != nil {
12✔
497
                                        return 0, err
4✔
498
                                }
4✔
499
                        }
500

501
                // TODO(yy): suppose there's only one bad input, we can do a
502
                // binary search to find out which input is causing this error
503
                // by recreating a tx using half of the inputs and check its
504
                // mempool acceptance.
505
                default:
6✔
506
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
6✔
507
                        return 0, err
6✔
508
                }
509
        }
510
}
511

512
// storeRecord stores the given record in the records map.
513
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
514
        f FeeFunction, fee btcutil.Amount,
515
        outpointToTxIndex map[wire.OutPoint]int) uint64 {
18✔
516

18✔
517
        // Increase the request counter.
18✔
518
        //
18✔
519
        // NOTE: this is the only place where we increase the
18✔
520
        // counter.
18✔
521
        requestID := t.requestCounter.Add(1)
18✔
522

18✔
523
        // Register the record.
18✔
524
        t.records.Store(requestID, &monitorRecord{
18✔
525
                tx:                tx,
18✔
526
                req:               req,
18✔
527
                feeFunction:       f,
18✔
528
                fee:               fee,
18✔
529
                outpointToTxIndex: outpointToTxIndex,
18✔
530
        })
18✔
531

18✔
532
        return requestID
18✔
533
}
18✔
534

535
// createAndCheckTx creates a tx based on the given inputs, change output
536
// script, and the fee rate. In addition, it validates the tx's mempool
537
// acceptance before returning a tx that can be published directly, along with
538
// its fee.
539
func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
540
        f FeeFunction) (*sweepTxCtx, error) {
26✔
541

26✔
542
        // Create the sweep tx with max fee rate of 0 as the fee function
26✔
543
        // guarantees the fee rate used here won't exceed the max fee rate.
26✔
544
        sweepCtx, err := t.createSweepTx(
26✔
545
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
26✔
546
        )
26✔
547
        if err != nil {
30✔
548
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
4✔
549
        }
4✔
550

551
        // Sanity check the budget still covers the fee.
552
        if sweepCtx.fee > req.Budget {
28✔
553
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
554
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
2✔
555
        }
2✔
556

557
        // If we had an extra txOut, then we'll update the result to include
558
        // it.
559
        req.ExtraTxOut = sweepCtx.extraTxOut
24✔
560

24✔
561
        // Validate the tx's mempool acceptance.
24✔
562
        err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
24✔
563

24✔
564
        // Exit early if the tx is valid.
24✔
565
        if err == nil {
38✔
566
                return sweepCtx, nil
14✔
567
        }
14✔
568

569
        // Print an error log if the chain backend doesn't support the mempool
570
        // acceptance test RPC.
571
        if errors.Is(err, rpcclient.ErrBackendVersion) {
13✔
572
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
573
                        "consider upgrading it to a newer version")
×
574
                return sweepCtx, nil
×
575
        }
×
576

577
        // We are running on a backend that doesn't implement the RPC
578
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
579
        if errors.Is(err, chain.ErrUnimplemented) {
14✔
580
                log.Debug("Skipped testmempoolaccept due to not implemented")
1✔
581
                return sweepCtx, nil
1✔
582
        }
1✔
583

584
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
12✔
585
                sweepCtx.tx.TxHash(), err)
12✔
586
}
587

588
// broadcast takes a monitored tx and publishes it to the network. Prior to the
589
// broadcast, it will subscribe the tx's confirmation notification and attach
590
// the event channel to the record. Any broadcast-related errors will not be
591
// returned here, instead, they will be put inside the `BumpResult` and
592
// returned to the caller.
593
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
13✔
594
        // Get the record being monitored.
13✔
595
        record, ok := t.records.Load(requestID)
13✔
596
        if !ok {
14✔
597
                return nil, fmt.Errorf("tx record %v not found", requestID)
1✔
598
        }
1✔
599

600
        txid := record.tx.TxHash()
12✔
601

12✔
602
        tx := record.tx
12✔
603
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
12✔
604
                txid, len(tx.TxIn), t.currentHeight.Load())
12✔
605

12✔
606
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
12✔
607
        // present of this new broadcast attempt.
12✔
608
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
20✔
609
                return aux.NotifyBroadcast(
8✔
610
                        record.req, tx, record.fee, record.outpointToTxIndex,
8✔
611
                )
8✔
612
        })
8✔
613
        if err != nil {
12✔
614
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
615
        }
×
616

617
        // Set the event, and change it to TxFailed if the wallet fails to
618
        // publish it.
619
        event := TxPublished
12✔
620

12✔
621
        // Publish the sweeping tx with customized label. If the publish fails,
12✔
622
        // this error will be saved in the `BumpResult` and it will be removed
12✔
623
        // from being monitored.
12✔
624
        err = t.cfg.Wallet.PublishTransaction(
12✔
625
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
12✔
626
        )
12✔
627
        if err != nil {
16✔
628
                // NOTE: we decide to attach this error to the result instead
4✔
629
                // of returning it here because by the time the tx reaches
4✔
630
                // here, it should have passed the mempool acceptance check. If
4✔
631
                // it still fails to be broadcast, it's likely a non-RBF
4✔
632
                // related error happened. So we send this error back to the
4✔
633
                // caller so that it can handle it properly.
4✔
634
                //
4✔
635
                // TODO(yy): find out which input is causing the failure.
4✔
636
                log.Errorf("Failed to publish tx %v: %v", txid, err)
4✔
637
                event = TxFailed
4✔
638
        }
4✔
639

640
        result := &BumpResult{
12✔
641
                Event:     event,
12✔
642
                Tx:        record.tx,
12✔
643
                Fee:       record.fee,
12✔
644
                FeeRate:   record.feeFunction.FeeRate(),
12✔
645
                Err:       err,
12✔
646
                requestID: requestID,
12✔
647
        }
12✔
648

12✔
649
        return result, nil
12✔
650
}
651

652
// notifyResult sends the result to the resultChan specified by the requestID.
653
// This channel is expected to be read by the caller.
654
func (t *TxPublisher) notifyResult(result *BumpResult) {
13✔
655
        id := result.requestID
13✔
656
        subscriber, ok := t.subscriberChans.Load(id)
13✔
657
        if !ok {
14✔
658
                log.Errorf("Result chan for id=%v not found", id)
1✔
659
                return
1✔
660
        }
1✔
661

662
        log.Debugf("Sending result for requestID=%v, tx=%v", id,
13✔
663
                result.Tx.TxHash())
13✔
664

13✔
665
        select {
13✔
666
        // Send the result to the subscriber.
667
        //
668
        // TODO(yy): Add timeout in case it's blocking?
669
        case subscriber <- result:
12✔
670
        case <-t.quit:
2✔
671
                log.Debug("Fee bumper stopped")
2✔
672
        }
673
}
674

675
// removeResult removes the tracking of the result if the result contains a
676
// non-nil error, or the tx is confirmed, the record will be removed from the
677
// maps.
678
func (t *TxPublisher) removeResult(result *BumpResult) {
13✔
679
        id := result.requestID
13✔
680

13✔
681
        // Remove the record from the maps if there's an error. This means this
13✔
682
        // tx has failed its broadcast and cannot be retried. There are two
13✔
683
        // cases,
13✔
684
        // - when the budget cannot cover the fee.
13✔
685
        // - when a non-RBF related error occurs.
13✔
686
        switch result.Event {
13✔
687
        case TxFailed:
6✔
688
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
6✔
689
                        id, result.Tx.TxHash(), result.Err)
6✔
690

691
        case TxConfirmed:
7✔
692
                // Remove the record is the tx is confirmed.
7✔
693
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
7✔
694
                        result.Tx.TxHash())
7✔
695

696
        // Do nothing if it's neither failed or confirmed.
697
        default:
8✔
698
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
8✔
699
                        result.Event)
8✔
700

8✔
701
                return
8✔
702
        }
703

704
        t.records.Delete(id)
9✔
705
        t.subscriberChans.Delete(id)
9✔
706
}
707

708
// handleResult handles the result of a tx broadcast. It will notify the
709
// subscriber and remove the record if the tx is confirmed or failed to be
710
// broadcast.
711
func (t *TxPublisher) handleResult(result *BumpResult) {
10✔
712
        // Notify the subscriber.
10✔
713
        t.notifyResult(result)
10✔
714

10✔
715
        // Remove the record if it's failed or confirmed.
10✔
716
        t.removeResult(result)
10✔
717
}
10✔
718

719
// monitorRecord is used to keep track of the tx being monitored by the
720
// publisher internally.
721
type monitorRecord struct {
722
        // tx is the tx being monitored.
723
        tx *wire.MsgTx
724

725
        // req is the original request.
726
        req *BumpRequest
727

728
        // feeFunction is the fee bumping algorithm used by the publisher.
729
        feeFunction FeeFunction
730

731
        // fee is the fee paid by the tx.
732
        fee btcutil.Amount
733

734
        // outpointToTxIndex is a map of outpoint to tx index.
735
        outpointToTxIndex map[wire.OutPoint]int
736
}
737

738
// Start starts the publisher by subscribing to block epoch updates and kicking
739
// off the monitor loop.
740
func (t *TxPublisher) Start() error {
4✔
741
        log.Info("TxPublisher starting...")
4✔
742

4✔
743
        if t.started.Swap(true) {
4✔
744
                return fmt.Errorf("TxPublisher started more than once")
×
745
        }
×
746

747
        blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
4✔
748
        if err != nil {
4✔
749
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
750
        }
×
751

752
        t.wg.Add(1)
4✔
753
        go t.monitor(blockEvent)
4✔
754

4✔
755
        log.Debugf("TxPublisher started")
4✔
756

4✔
757
        return nil
4✔
758
}
759

760
// Stop stops the publisher and waits for the monitor loop to exit.
761
func (t *TxPublisher) Stop() error {
4✔
762
        log.Info("TxPublisher stopping...")
4✔
763

4✔
764
        if t.stopped.Swap(true) {
4✔
765
                return fmt.Errorf("TxPublisher stopped more than once")
×
766
        }
×
767

768
        close(t.quit)
4✔
769
        t.wg.Wait()
4✔
770

4✔
771
        log.Debug("TxPublisher stopped")
4✔
772

4✔
773
        return nil
4✔
774
}
775

776
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
777
// it will examine all the txns being monitored, and check if any of them needs
778
// to be bumped. If so, it will attempt to bump the fee of the tx.
779
//
780
// NOTE: Must be run as a goroutine.
781
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
4✔
782
        defer blockEvent.Cancel()
4✔
783
        defer t.wg.Done()
4✔
784

4✔
785
        for {
8✔
786
                select {
4✔
787
                case epoch, ok := <-blockEvent.Epochs:
4✔
788
                        if !ok {
4✔
789
                                // We should stop the publisher before stopping
×
790
                                // the chain service. Otherwise it indicates an
×
791
                                // error.
×
792
                                log.Error("Block epoch channel closed, exit " +
×
793
                                        "monitor")
×
794

×
795
                                return
×
796
                        }
×
797

798
                        log.Debugf("TxPublisher received new block: %v",
4✔
799
                                epoch.Height)
4✔
800

4✔
801
                        // Update the best known height for the publisher.
4✔
802
                        t.currentHeight.Store(epoch.Height)
4✔
803

4✔
804
                        // Check all monitored txns to see if any of them needs
4✔
805
                        // to be bumped.
4✔
806
                        t.processRecords()
4✔
807

808
                case <-t.quit:
4✔
809
                        log.Debug("Fee bumper stopped, exit monitor")
4✔
810
                        return
4✔
811
                }
812
        }
813
}
814

815
// processRecords checks all the txns being monitored, and checks if any of
816
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
817
func (t *TxPublisher) processRecords() {
5✔
818
        // confirmedRecords stores a map of the records which have been
5✔
819
        // confirmed.
5✔
820
        confirmedRecords := make(map[uint64]*monitorRecord)
5✔
821

5✔
822
        // feeBumpRecords stores a map of the records which need to be bumped.
5✔
823
        feeBumpRecords := make(map[uint64]*monitorRecord)
5✔
824

5✔
825
        // failedRecords stores a map of the records which has inputs being
5✔
826
        // spent by a third party.
5✔
827
        //
5✔
828
        // NOTE: this is only used for neutrino backend.
5✔
829
        failedRecords := make(map[uint64]*monitorRecord)
5✔
830

5✔
831
        // visitor is a helper closure that visits each record and divides them
5✔
832
        // into two groups.
5✔
833
        visitor := func(requestID uint64, r *monitorRecord) error {
11✔
834
                log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
6✔
835
                        r.tx.TxHash())
6✔
836

6✔
837
                // If the tx is already confirmed, we can stop monitoring it.
6✔
838
                if t.isConfirmed(r.tx.TxHash()) {
11✔
839
                        confirmedRecords[requestID] = r
5✔
840

5✔
841
                        // Move to the next record.
5✔
842
                        return nil
5✔
843
                }
5✔
844

845
                // Check whether the inputs has been spent by a third party.
846
                //
847
                // NOTE: this check is only done for neutrino backend.
848
                if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
6✔
849
                        failedRecords[requestID] = r
1✔
850

1✔
851
                        // Move to the next record.
1✔
852
                        return nil
1✔
853
                }
1✔
854

855
                feeBumpRecords[requestID] = r
5✔
856

5✔
857
                // Return nil to move to the next record.
5✔
858
                return nil
5✔
859
        }
860

861
        // Iterate through all the records and divide them into two groups.
862
        t.records.ForEach(visitor)
5✔
863

5✔
864
        // For records that are confirmed, we'll notify the caller about this
5✔
865
        // result.
5✔
866
        for requestID, r := range confirmedRecords {
10✔
867
                rec := r
5✔
868

5✔
869
                log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
5✔
870
                t.wg.Add(1)
5✔
871
                go t.handleTxConfirmed(rec, requestID)
5✔
872
        }
5✔
873

874
        // Get the current height to be used in the following goroutines.
875
        currentHeight := t.currentHeight.Load()
5✔
876

5✔
877
        // For records that are not confirmed, we perform a fee bump if needed.
5✔
878
        for requestID, r := range feeBumpRecords {
10✔
879
                rec := r
5✔
880

5✔
881
                log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
5✔
882
                t.wg.Add(1)
5✔
883
                go t.handleFeeBumpTx(requestID, rec, currentHeight)
5✔
884
        }
5✔
885

886
        // For records that are failed, we'll notify the caller about this
887
        // result.
888
        for requestID, r := range failedRecords {
6✔
889
                rec := r
1✔
890

1✔
891
                log.Debugf("Tx=%v has inputs been spent by a third party, "+
1✔
892
                        "failing it now", r.tx.TxHash())
1✔
893
                t.wg.Add(1)
1✔
894
                go t.handleThirdPartySpent(rec, requestID)
1✔
895
        }
1✔
896
}
897

898
// handleTxConfirmed is called when a monitored tx is confirmed. It will
899
// notify the subscriber then remove the record from the maps .
900
//
901
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
902
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
6✔
903
        defer t.wg.Done()
6✔
904

6✔
905
        // Create a result that will be sent to the resultChan which is
6✔
906
        // listened by the caller.
6✔
907
        result := &BumpResult{
6✔
908
                Event:     TxConfirmed,
6✔
909
                Tx:        r.tx,
6✔
910
                requestID: requestID,
6✔
911
                Fee:       r.fee,
6✔
912
                FeeRate:   r.feeFunction.FeeRate(),
6✔
913
        }
6✔
914

6✔
915
        // Notify that this tx is confirmed and remove the record from the map.
6✔
916
        t.handleResult(result)
6✔
917
}
6✔
918

919
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
920
// attempt to bump the fee of the tx.
921
//
922
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
923
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
924
        currentHeight int32) {
8✔
925

8✔
926
        defer t.wg.Done()
8✔
927

8✔
928
        oldTxid := r.tx.TxHash()
8✔
929

8✔
930
        // Get the current conf target for this record.
8✔
931
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
8✔
932

8✔
933
        // Ask the fee function whether a bump is needed. We expect the fee
8✔
934
        // function to increase its returned fee rate after calling this
8✔
935
        // method.
8✔
936
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
8✔
937
        if err != nil {
13✔
938
                // TODO(yy): send this error back to the sweeper so it can
5✔
939
                // re-group the inputs?
5✔
940
                log.Errorf("Failed to increase fee rate for tx %v at "+
5✔
941
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
5✔
942

5✔
943
                return
5✔
944
        }
5✔
945

946
        // If the fee rate was not increased, there's no need to bump the fee.
947
        if !increased {
12✔
948
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
5✔
949
                        t.currentHeight.Load())
5✔
950

5✔
951
                return
5✔
952
        }
5✔
953

954
        // The fee function now has a new fee rate, we will use it to bump the
955
        // fee of the tx.
956
        resultOpt := t.createAndPublishTx(requestID, r)
6✔
957

6✔
958
        // If there's a result, we will notify the caller about the result.
6✔
959
        resultOpt.WhenSome(func(result BumpResult) {
12✔
960
                // Notify the new result.
6✔
961
                t.handleResult(&result)
6✔
962
        })
6✔
963
}
964

965
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
966
// spent. It will notify the subscriber then remove the record from the maps
967
// and send a TxFailed event to the subscriber.
968
//
969
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
970
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
971
        requestID uint64) {
1✔
972

1✔
973
        defer t.wg.Done()
1✔
974

1✔
975
        // Create a result that will be sent to the resultChan which is
1✔
976
        // listened by the caller.
1✔
977
        //
1✔
978
        // TODO(yy): create a new state `TxThirdPartySpent` to notify the
1✔
979
        // sweeper to remove the input, hence moving the monitoring of inputs
1✔
980
        // spent inside the fee bumper.
1✔
981
        result := &BumpResult{
1✔
982
                Event:     TxFailed,
1✔
983
                Tx:        r.tx,
1✔
984
                requestID: requestID,
1✔
985
                Err:       ErrThirdPartySpent,
1✔
986
        }
1✔
987

1✔
988
        // Notify that this tx is confirmed and remove the record from the map.
1✔
989
        t.handleResult(result)
1✔
990
}
1✔
991

992
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
993
// to the network. It will update the record with the new tx and fee rate if
994
// successfully created, and return the result when published successfully.
995
func (t *TxPublisher) createAndPublishTx(requestID uint64,
996
        r *monitorRecord) fn.Option[BumpResult] {
11✔
997

11✔
998
        // Fetch the old tx.
11✔
999
        oldTx := r.tx
11✔
1000

11✔
1001
        // Create a new tx with the new fee rate.
11✔
1002
        //
11✔
1003
        // NOTE: The fee function is expected to have increased its returned
11✔
1004
        // fee rate after calling the SkipFeeBump method. So we can use it
11✔
1005
        // directly here.
11✔
1006
        sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction)
11✔
1007

11✔
1008
        // If the error is fee related, we will return no error and let the fee
11✔
1009
        // bumper retry it at next block.
11✔
1010
        //
11✔
1011
        // NOTE: we can check the RBF error here and ask the fee function to
11✔
1012
        // recalculate the fee rate. However, this would defeat the purpose of
11✔
1013
        // using a deadline based fee function:
11✔
1014
        // - if the deadline is far away, there's no rush to RBF the tx.
11✔
1015
        // - if the deadline is close, we expect the fee function to give us a
11✔
1016
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
11✔
1017
        //   means the budget is not enough.
11✔
1018
        if errors.Is(err, chain.ErrInsufficientFee) ||
11✔
1019
                errors.Is(err, lnwallet.ErrMempoolFee) {
16✔
1020

5✔
1021
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
5✔
1022
                return fn.None[BumpResult]()
5✔
1023
        }
5✔
1024

1025
        // If the error is not fee related, we will return a `TxFailed` event
1026
        // so this input can be retried.
1027
        if err != nil {
13✔
1028
                // If the tx doesn't not have enought budget, we will return a
4✔
1029
                // result so the sweeper can handle it by re-clustering the
4✔
1030
                // utxos.
4✔
1031
                if errors.Is(err, ErrNotEnoughBudget) {
5✔
1032
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
1✔
1033
                                err)
1✔
1034
                } else {
4✔
1035
                        // Otherwise, an unexpected error occurred, we will
3✔
1036
                        // fail the tx and let the sweeper retry the whole
3✔
1037
                        // process.
3✔
1038
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
3✔
1039
                                err)
3✔
1040
                }
3✔
1041

1042
                return fn.Some(BumpResult{
4✔
1043
                        Event:     TxFailed,
4✔
1044
                        Tx:        oldTx,
4✔
1045
                        Err:       err,
4✔
1046
                        requestID: requestID,
4✔
1047
                })
4✔
1048
        }
1049

1050
        // The tx has been created without any errors, we now register a new
1051
        // record by overwriting the same requestID.
1052
        t.records.Store(requestID, &monitorRecord{
8✔
1053
                tx:                sweepCtx.tx,
8✔
1054
                req:               r.req,
8✔
1055
                feeFunction:       r.feeFunction,
8✔
1056
                fee:               sweepCtx.fee,
8✔
1057
                outpointToTxIndex: sweepCtx.outpointToTxIndex,
8✔
1058
        })
8✔
1059

8✔
1060
        // Attempt to broadcast this new tx.
8✔
1061
        result, err := t.broadcast(requestID)
8✔
1062
        if err != nil {
8✔
1063
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1064
                        sweepCtx.tx.TxHash(), err)
×
1065

×
1066
                return fn.None[BumpResult]()
×
1067
        }
×
1068

1069
        // If the result error is fee related, we will return no error and let
1070
        // the fee bumper retry it at next block.
1071
        //
1072
        // NOTE: we may get this error if we've bypassed the mempool check,
1073
        // which means we are suing neutrino backend.
1074
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
8✔
1075
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
9✔
1076

1✔
1077
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
1✔
1078
                return fn.None[BumpResult]()
1✔
1079
        }
1✔
1080

1081
        // A successful replacement tx is created, attach the old tx.
1082
        result.ReplacedTx = oldTx
8✔
1083

8✔
1084
        // If the new tx failed to be published, we will return the result so
8✔
1085
        // the caller can handle it.
8✔
1086
        if result.Event == TxFailed {
9✔
1087
                return fn.Some(*result)
1✔
1088
        }
1✔
1089

1090
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
7✔
1091
                sweepCtx.tx.TxHash())
7✔
1092

7✔
1093
        // Otherwise, it's a successful RBF, set the event and return.
7✔
1094
        result.Event = TxReplaced
7✔
1095

7✔
1096
        return fn.Some(*result)
7✔
1097
}
1098

1099
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
1100
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
6✔
1101
        details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
6✔
1102
        if err != nil {
10✔
1103
                log.Warnf("Failed to get tx details for %v: %v", txid, err)
4✔
1104
                return false
4✔
1105
        }
4✔
1106

1107
        return details.NumConfirmations > 0
6✔
1108
}
1109

1110
// isThirdPartySpent checks whether the inputs of the tx has already been spent
1111
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
1112
// then it must be spent by a different tx other than the sweeping tx here.
1113
//
1114
// NOTE: this check is only performed for neutrino backend as it has no
1115
// reliable way to tell a tx has been replaced.
1116
func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
1117
        inputs []input.Input) bool {
5✔
1118

5✔
1119
        // Skip this check for if this is not neutrino backend.
5✔
1120
        if !t.isNeutrinoBackend() {
9✔
1121
                return false
4✔
1122
        }
4✔
1123

1124
        // Iterate all the inputs and check if they have been spent already.
1125
        for _, inp := range inputs {
2✔
1126
                op := inp.OutPoint()
1✔
1127

1✔
1128
                // For wallet utxos, the height hint is not set - we don't need
1✔
1129
                // to monitor them for third party spend.
1✔
1130
                heightHint := inp.HeightHint()
1✔
1131
                if heightHint == 0 {
2✔
1132
                        log.Debugf("Skipped third party check for wallet "+
1✔
1133
                                "input %v", op)
1✔
1134

1✔
1135
                        continue
1✔
1136
                }
1137

1138
                // If the input has already been spent after the height hint, a
1139
                // spend event is sent back immediately.
1140
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
1✔
1141
                        &op, inp.SignDesc().Output.PkScript, heightHint,
1✔
1142
                )
1✔
1143
                if err != nil {
1✔
1144
                        log.Criticalf("Failed to register spend ntfn for "+
×
1145
                                "input=%v: %v", op, err)
×
1146
                        return false
×
1147
                }
×
1148

1149
                // Remove the subscription when exit.
1150
                defer spendEvent.Cancel()
1✔
1151

1✔
1152
                // Do a non-blocking read to see if the output has been spent.
1✔
1153
                select {
1✔
1154
                case spend, ok := <-spendEvent.Spend:
1✔
1155
                        if !ok {
1✔
1156
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1157
                                return false
×
1158
                        }
×
1159

1160
                        spendingTxID := spend.SpendingTx.TxHash()
1✔
1161

1✔
1162
                        // If the spending tx is the same as the sweeping tx
1✔
1163
                        // then we are good.
1✔
1164
                        if spendingTxID == txid {
1✔
UNCOV
1165
                                continue
×
1166
                        }
1167

1168
                        log.Warnf("Detected third party spent of output=%v "+
1✔
1169
                                "in tx=%v", op, spend.SpendingTx.TxHash())
1✔
1170

1✔
1171
                        return true
1✔
1172

1173
                // Move to the next input.
1174
                default:
1✔
1175
                }
1176
        }
1177

1178
        return false
1✔
1179
}
1180

1181
// calcCurrentConfTarget calculates the current confirmation target based on
1182
// the deadline height. The conf target is capped at 0 if the deadline has
1183
// already been past.
1184
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
15✔
1185
        var confTarget uint32
15✔
1186

15✔
1187
        // Calculate how many blocks left until the deadline.
15✔
1188
        deadlineDelta := deadline - currentHeight
15✔
1189

15✔
1190
        // If we are already past the deadline, we will set the conf target to
15✔
1191
        // be 1.
15✔
1192
        if deadlineDelta < 0 {
23✔
1193
                log.Warnf("Deadline is %d blocks behind current height %v",
8✔
1194
                        -deadlineDelta, currentHeight)
8✔
1195

8✔
1196
                confTarget = 0
8✔
1197
        } else {
19✔
1198
                confTarget = uint32(deadlineDelta)
11✔
1199
        }
11✔
1200

1201
        return confTarget
15✔
1202
}
1203

1204
// sweepTxCtx houses a sweep transaction with additional context.
1205
type sweepTxCtx struct {
1206
        tx *wire.MsgTx
1207

1208
        fee btcutil.Amount
1209

1210
        extraTxOut fn.Option[SweepOutput]
1211

1212
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1213
        // the sweep transaction.
1214
        outpointToTxIndex map[wire.OutPoint]int
1215
}
1216

1217
// createSweepTx creates a sweeping tx based on the given inputs, change
1218
// address and fee rate.
1219
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1220
        changePkScript lnwallet.AddrWithKey,
1221
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
26✔
1222

26✔
1223
        // Validate and calculate the fee and change amount.
26✔
1224
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
26✔
1225
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
26✔
1226
                t.cfg.AuxSweeper,
26✔
1227
        )
26✔
1228
        if err != nil {
30✔
1229
                return nil, err
4✔
1230
        }
4✔
1231

1232
        var (
26✔
1233
                // Create the sweep transaction that we will be building. We
26✔
1234
                // use version 2 as it is required for CSV.
26✔
1235
                sweepTx = wire.NewMsgTx(2)
26✔
1236

26✔
1237
                // We'll add the inputs as we go so we know the final ordering
26✔
1238
                // of inputs to sign.
26✔
1239
                idxs []input.Input
26✔
1240
        )
26✔
1241

26✔
1242
        // We start by adding all inputs that commit to an output. We do this
26✔
1243
        // since the input and output index must stay the same for the
26✔
1244
        // signatures to be valid.
26✔
1245
        outpointToTxIndex := make(map[wire.OutPoint]int)
26✔
1246
        for _, o := range inputs {
52✔
1247
                if o.RequiredTxOut() == nil {
52✔
1248
                        continue
26✔
1249
                }
1250

1251
                idxs = append(idxs, o)
4✔
1252
                sweepTx.AddTxIn(&wire.TxIn{
4✔
1253
                        PreviousOutPoint: o.OutPoint(),
4✔
1254
                        Sequence:         o.BlocksToMaturity(),
4✔
1255
                })
4✔
1256
                sweepTx.AddTxOut(o.RequiredTxOut())
4✔
1257

4✔
1258
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
4✔
1259
        }
1260

1261
        // Sum up the value contained in the remaining inputs, and add them to
1262
        // the sweep transaction.
1263
        for _, o := range inputs {
52✔
1264
                if o.RequiredTxOut() != nil {
30✔
1265
                        continue
4✔
1266
                }
1267

1268
                idxs = append(idxs, o)
26✔
1269
                sweepTx.AddTxIn(&wire.TxIn{
26✔
1270
                        PreviousOutPoint: o.OutPoint(),
26✔
1271
                        Sequence:         o.BlocksToMaturity(),
26✔
1272
                })
26✔
1273
        }
1274

1275
        // If we have change outputs to add, then add it the sweep transaction
1276
        // here.
1277
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
52✔
1278
                for i := range changeOuts {
74✔
1279
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
48✔
1280
                }
48✔
1281
        })
1282

1283
        // We'll default to using the current block height as locktime, if none
1284
        // of the inputs commits to a different locktime.
1285
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
26✔
1286

26✔
1287
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
26✔
1288
        if err != nil {
26✔
1289
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
1290
                        "for hash cache: %v", err)
×
1291
        }
×
1292
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
26✔
1293

26✔
1294
        // With all the inputs in place, use each output's unique input script
26✔
1295
        // function to generate the final witness required for spending.
26✔
1296
        addInputScript := func(idx int, tso input.Input) error {
52✔
1297
                inputScript, err := tso.CraftInputScript(
26✔
1298
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
26✔
1299
                )
26✔
1300
                if err != nil {
26✔
1301
                        return err
×
1302
                }
×
1303

1304
                sweepTx.TxIn[idx].Witness = inputScript.Witness
26✔
1305

26✔
1306
                if len(inputScript.SigScript) == 0 {
52✔
1307
                        return nil
26✔
1308
                }
26✔
1309

1310
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1311

×
1312
                return nil
×
1313
        }
1314

1315
        for idx, inp := range idxs {
52✔
1316
                if err := addInputScript(idx, inp); err != nil {
26✔
1317
                        return nil, err
×
1318
                }
×
1319
        }
1320

1321
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
26✔
1322
                inputTypeSummary(inputs))
26✔
1323

26✔
1324
        // Try to locate the extra change output, though there might be None.
26✔
1325
        extraTxOut := fn.MapOption(
26✔
1326
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
52✔
1327
                        for _, sweepOut := range sweepOuts {
74✔
1328
                                if !sweepOut.IsExtra {
96✔
1329
                                        continue
48✔
1330
                                }
1331

1332
                                // If we sweep outputs of a custom channel, the
1333
                                // custom leaves in those outputs will be merged
1334
                                // into a single output, even if we sweep
1335
                                // multiple outputs (e.g. to_remote and breached
1336
                                // to_local of a breached channel) at the same
1337
                                // time. So there will only ever be one extra
1338
                                // output.
1339
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
1340
                                        lnutils.SpewLogClosure(sweepOut))
×
1341

×
1342
                                return fn.Some(sweepOut)
×
1343
                        }
1344

1345
                        return fn.None[SweepOutput]()
26✔
1346
                },
1347
        )(changeOutputsOpt)
1348

1349
        return &sweepTxCtx{
26✔
1350
                tx:                sweepTx,
26✔
1351
                fee:               txFee,
26✔
1352
                extraTxOut:        fn.FlattenOption(extraTxOut),
26✔
1353
                outpointToTxIndex: outpointToTxIndex,
26✔
1354
        }, nil
26✔
1355
}
1356

1357
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1358
// optional locktime after a series of validations:
1359
// 1. check the locktime has been reached.
1360
// 2. check the locktimes are the same.
1361
// 3. check the inputs cover the outputs.
1362
//
1363
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1364
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1365
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1366
        auxSweeper fn.Option[AuxSweeper]) (
1367
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
26✔
1368

26✔
1369
        noChange := fn.None[[]SweepOutput]()
26✔
1370
        noLocktime := fn.None[int32]()
26✔
1371

26✔
1372
        // Given the set of inputs we have, if we have an aux sweeper, then
26✔
1373
        // we'll attempt to see if we have any other change outputs we'll need
26✔
1374
        // to add to the sweep transaction.
26✔
1375
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
26✔
1376

26✔
1377
        var extraChangeOut fn.Option[SweepOutput]
26✔
1378
        err := fn.MapOptionZ(
26✔
1379
                auxSweeper, func(aux AuxSweeper) error {
48✔
1380
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
22✔
1381
                        if err := extraOut.Err(); err != nil {
22✔
1382
                                return err
×
1383
                        }
×
1384

1385
                        extraChangeOut = extraOut.LeftToOption()
22✔
1386

22✔
1387
                        return nil
22✔
1388
                },
1389
        )
1390
        if err != nil {
26✔
1391
                return 0, noChange, noLocktime, err
×
1392
        }
×
1393

1394
        // Creating a weight estimator with nil outputs and zero max fee rate.
1395
        // We don't allow adding customized outputs in the sweeping tx, and the
1396
        // fee rate is already being managed before we get here.
1397
        inputs, estimator, err := getWeightEstimate(
26✔
1398
                inputs, nil, feeRate, 0, changePkScripts,
26✔
1399
        )
26✔
1400
        if err != nil {
26✔
1401
                return 0, noChange, noLocktime, err
×
1402
        }
×
1403

1404
        txFee := estimator.fee()
26✔
1405

26✔
1406
        var (
26✔
1407
                // Track whether any of the inputs require a certain locktime.
26✔
1408
                locktime = int32(-1)
26✔
1409

26✔
1410
                // We keep track of total input amount, and required output
26✔
1411
                // amount to use for calculating the change amount below.
26✔
1412
                totalInput     btcutil.Amount
26✔
1413
                requiredOutput btcutil.Amount
26✔
1414
        )
26✔
1415

26✔
1416
        // If we have an extra change output, then we'll add it as a required
26✔
1417
        // output amt.
26✔
1418
        extraChangeOut.WhenSome(func(o SweepOutput) {
48✔
1419
                requiredOutput += btcutil.Amount(o.Value)
22✔
1420
        })
22✔
1421

1422
        // Go through each input and check if the required lock times have
1423
        // reached and are the same.
1424
        for _, o := range inputs {
52✔
1425
                // If the input has a required output, we'll add it to the
26✔
1426
                // required output amount.
26✔
1427
                if o.RequiredTxOut() != nil {
30✔
1428
                        requiredOutput += btcutil.Amount(
4✔
1429
                                o.RequiredTxOut().Value,
4✔
1430
                        )
4✔
1431
                }
4✔
1432

1433
                // Update the total input amount.
1434
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
26✔
1435

26✔
1436
                lt, ok := o.RequiredLockTime()
26✔
1437

26✔
1438
                // Skip if the input doesn't require a lock time.
26✔
1439
                if !ok {
52✔
1440
                        continue
26✔
1441
                }
1442

1443
                // Check if the lock time has reached
1444
                if lt > uint32(currentHeight) {
4✔
1445
                        return 0, noChange, noLocktime, ErrLocktimeImmature
×
1446
                }
×
1447

1448
                // If another input commits to a different locktime, they
1449
                // cannot be combined in the same transaction.
1450
                if locktime != -1 && locktime != int32(lt) {
4✔
1451
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1452
                }
×
1453

1454
                // Update the locktime for next iteration.
1455
                locktime = int32(lt)
4✔
1456
        }
1457

1458
        // Make sure total output amount is less than total input amount.
1459
        if requiredOutput+txFee > totalInput {
26✔
1460
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
1461
                        "input to create sweep tx: input_sum=%v, "+
×
1462
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
1463
        }
×
1464

1465
        // The value remaining after the required output and fees is the
1466
        // change output.
1467
        changeAmt := totalInput - requiredOutput - txFee
26✔
1468
        changeOuts := make([]SweepOutput, 0, 2)
26✔
1469

26✔
1470
        extraChangeOut.WhenSome(func(o SweepOutput) {
48✔
1471
                changeOuts = append(changeOuts, o)
22✔
1472
        })
22✔
1473

1474
        // We'll calculate the dust limit for the given changePkScript since it
1475
        // is variable.
1476
        changeFloor := lnwallet.DustLimitForSize(
26✔
1477
                len(changePkScript.DeliveryAddress),
26✔
1478
        )
26✔
1479

26✔
1480
        switch {
26✔
1481
        // If the change amount is dust, we'll move it into the fees, and
1482
        // ignore it.
1483
        case changeAmt < changeFloor:
4✔
1484
                log.Infof("Change amt %v below dustlimit %v, not adding "+
4✔
1485
                        "change output", changeAmt, changeFloor)
4✔
1486

4✔
1487
                // If there's no required output, and the change output is a
4✔
1488
                // dust, it means we are creating a tx without any outputs. In
4✔
1489
                // this case we'll return an error. This could happen when
4✔
1490
                // creating a tx that has an anchor as the only input.
4✔
1491
                if requiredOutput == 0 {
8✔
1492
                        return 0, noChange, noLocktime, ErrTxNoOutput
4✔
1493
                }
4✔
1494

1495
                // The dust amount is added to the fee.
1496
                txFee += changeAmt
×
1497

1498
        // Otherwise, we'll actually recognize it as a change output.
1499
        default:
26✔
1500
                changeOuts = append(changeOuts, SweepOutput{
26✔
1501
                        TxOut: wire.TxOut{
26✔
1502
                                Value:    int64(changeAmt),
26✔
1503
                                PkScript: changePkScript.DeliveryAddress,
26✔
1504
                        },
26✔
1505
                        IsExtra:     false,
26✔
1506
                        InternalKey: changePkScript.InternalKey,
26✔
1507
                })
26✔
1508
        }
1509

1510
        // Optionally set the locktime.
1511
        locktimeOpt := fn.Some(locktime)
26✔
1512
        if locktime == -1 {
52✔
1513
                locktimeOpt = noLocktime
26✔
1514
        }
26✔
1515

1516
        var changeOutsOpt fn.Option[[]SweepOutput]
26✔
1517
        if len(changeOuts) > 0 {
52✔
1518
                changeOutsOpt = fn.Some(changeOuts)
26✔
1519
        }
26✔
1520

1521
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
26✔
1522
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
26✔
1523
                "parents_fee=%v, parents_weight=%v, current_height=%v",
26✔
1524
                len(inputs), inputTypeSummary(inputs), feeRate,
26✔
1525
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
26✔
1526
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
26✔
1527

26✔
1528
        return txFee, changeOutsOpt, locktimeOpt, nil
26✔
1529
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc