• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12428593038

20 Dec 2024 09:02AM UTC coverage: 58.33% (-0.2%) from 58.576%
12428593038

Pull #9382

github

guggero
.golangci.yml: speed up linter by updating start commit

With this we allow the linter to only look at recent changes, since
everything between that old commit and this most recent one has been
linted correctly anyway.
Pull Request #9382: lint: deprecate old linters, use new ref commit

133769 of 229330 relevant lines covered (58.33%)

19284.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.18
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainntnfs"
16
        "github.com/lightningnetwork/lnd/fn/v2"
17
        "github.com/lightningnetwork/lnd/input"
18
        "github.com/lightningnetwork/lnd/labels"
19
        "github.com/lightningnetwork/lnd/lntypes"
20
        "github.com/lightningnetwork/lnd/lnutils"
21
        "github.com/lightningnetwork/lnd/lnwallet"
22
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
23
        "github.com/lightningnetwork/lnd/tlv"
24
)
25

26
var (
27
        // ErrInvalidBumpResult is returned when the bump result is invalid.
28
        ErrInvalidBumpResult = errors.New("invalid bump result")
29

30
        // ErrNotEnoughBudget is returned when the fee bumper decides the
31
        // current budget cannot cover the fee.
32
        ErrNotEnoughBudget = errors.New("not enough budget")
33

34
        // ErrLocktimeImmature is returned when sweeping an input whose
35
        // locktime is not reached.
36
        ErrLocktimeImmature = errors.New("immature input")
37

38
        // ErrTxNoOutput is returned when an output cannot be created during tx
39
        // preparation, usually due to the output being dust.
40
        ErrTxNoOutput = errors.New("tx has no output")
41

42
        // ErrThirdPartySpent is returned when a third party has spent the
43
        // input in the sweeping tx.
44
        ErrThirdPartySpent = errors.New("third party spent the output")
45
)
46

47
var (
48
        // dummyChangePkScript is a dummy tapscript change script that's used
49
        // when we don't need a real address, just something that can be used
50
        // for fee estimation.
51
        dummyChangePkScript = []byte{
52
                0x51, 0x20,
53
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
54
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
55
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
56
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
57
        }
58
)
59

60
// Bumper defines an interface that can be used by other subsystems for fee
61
// bumping.
62
type Bumper interface {
63
        // Broadcast is used to publish the tx created from the given inputs
64
        // specified in the request. It handles the tx creation, broadcasts it,
65
        // and monitors its confirmation status for potential fee bumping. It
66
        // returns a chan that the caller can use to receive updates about the
67
        // broadcast result and potential RBF attempts.
68
        Broadcast(req *BumpRequest) (<-chan *BumpResult, error)
69
}
70

71
// BumpEvent represents the event of a fee bumping attempt.
72
type BumpEvent uint8
73

74
const (
75
        // TxPublished is sent when the broadcast attempt is finished.
76
        TxPublished BumpEvent = iota
77

78
        // TxFailed is sent when the broadcast attempt fails.
79
        TxFailed
80

81
        // TxReplaced is sent when the original tx is replaced by a new one.
82
        TxReplaced
83

84
        // TxConfirmed is sent when the tx is confirmed.
85
        TxConfirmed
86

87
        // sentinalEvent is used to check if an event is unknown.
88
        sentinalEvent
89
)
90

91
// String returns a human-readable string for the event.
92
func (e BumpEvent) String() string {
1✔
93
        switch e {
1✔
94
        case TxPublished:
1✔
95
                return "Published"
1✔
96
        case TxFailed:
1✔
97
                return "Failed"
1✔
98
        case TxReplaced:
1✔
99
                return "Replaced"
1✔
100
        case TxConfirmed:
1✔
101
                return "Confirmed"
1✔
102
        default:
×
103
                return "Unknown"
×
104
        }
105
}
106

107
// Unknown returns true if the event is unknown.
108
func (e BumpEvent) Unknown() bool {
9✔
109
        return e >= sentinalEvent
9✔
110
}
9✔
111

112
// BumpRequest is used by the caller to give the Bumper the necessary info to
113
// create and manage potential fee bumps for a set of inputs.
114
type BumpRequest struct {
115
        // Budget givens the total amount that can be used as fees by these
116
        // inputs.
117
        Budget btcutil.Amount
118

119
        // Inputs is the set of inputs to sweep.
120
        Inputs []input.Input
121

122
        // DeadlineHeight is the block height at which the tx should be
123
        // confirmed.
124
        DeadlineHeight int32
125

126
        // DeliveryAddress is the script to send the change output to.
127
        DeliveryAddress lnwallet.AddrWithKey
128

129
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
130
        MaxFeeRate chainfee.SatPerKWeight
131

132
        // StartingFeeRate is an optional parameter that can be used to specify
133
        // the initial fee rate to use for the fee function.
134
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
135

136
        // ExtraTxOut tracks if this bump request has an optional set of extra
137
        // outputs to add to the transaction.
138
        ExtraTxOut fn.Option[SweepOutput]
139
}
140

141
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
142
// request. It calculates the feerate using the supplied budget and the weight,
143
// compares it with the specified MaxFeeRate, and returns the smaller of the
144
// two.
145
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
9✔
146
        // We'll want to know if we have any blobs, as we need to factor this
9✔
147
        // into the max fee rate for this bump request.
9✔
148
        hasBlobs := fn.Any(r.Inputs, func(i input.Input) bool {
17✔
149
                return fn.MapOptionZ(
8✔
150
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
9✔
151
                                return len(b) > 0
1✔
152
                        },
1✔
153
                )
154
        })
155

156
        sweepAddrs := [][]byte{
9✔
157
                r.DeliveryAddress.DeliveryAddress,
9✔
158
        }
9✔
159

9✔
160
        // If we have blobs, then we'll add an extra sweep addr for the size
9✔
161
        // estimate below. We know that these blobs will also always be based on
9✔
162
        // p2tr addrs.
9✔
163
        if hasBlobs {
9✔
164
                // We need to pass in a real address, so we'll use a dummy
×
165
                // tapscript change script that's used elsewhere for tests.
×
166
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
167
        }
×
168

169
        // Get the size of the sweep tx, which will be used to calculate the
170
        // budget fee rate.
171
        size, err := calcSweepTxWeight(
9✔
172
                r.Inputs, sweepAddrs,
9✔
173
        )
9✔
174
        if err != nil {
10✔
175
                return 0, err
1✔
176
        }
1✔
177

178
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
179
        // This is needed as, when the input has a large value and the user
180
        // sets the budget to be proportional to the input value, the fee rate
181
        // can be very high and we need to make sure it doesn't exceed the max
182
        // fee rate.
183
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
8✔
184
        if maxFeeRateAllowed > r.MaxFeeRate {
10✔
185
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
2✔
186
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
2✔
187
                        r.MaxFeeRate, size)
2✔
188

2✔
189
                return r.MaxFeeRate, nil
2✔
190
        }
2✔
191

192
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
7✔
193
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
7✔
194

7✔
195
        return maxFeeRateAllowed, nil
7✔
196
}
197

198
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
199
// sweeping tx always has a single output(change).
200
func calcSweepTxWeight(inputs []input.Input,
201
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
12✔
202

12✔
203
        // Use a const fee rate as we only use the weight estimator to
12✔
204
        // calculate the size.
12✔
205
        const feeRate = 1
12✔
206

12✔
207
        // Initialize the tx weight estimator with,
12✔
208
        // - nil outputs as we only have one single change output.
12✔
209
        // - const fee rate as we don't care about the fees here.
12✔
210
        // - 0 maxfeerate as we don't care about fees here.
12✔
211
        //
12✔
212
        // TODO(yy): we should refactor the weight estimator to not require a
12✔
213
        // fee rate and max fee rate and make it a pure tx weight calculator.
12✔
214
        _, estimator, err := getWeightEstimate(
12✔
215
                inputs, nil, feeRate, 0, outputPkScript,
12✔
216
        )
12✔
217
        if err != nil {
14✔
218
                return 0, err
2✔
219
        }
2✔
220

221
        return estimator.weight(), nil
10✔
222
}
223

224
// BumpResult is used by the Bumper to send updates about the tx being
225
// broadcast.
226
type BumpResult struct {
227
        // Event is the type of event that the result is for.
228
        Event BumpEvent
229

230
        // Tx is the tx being broadcast.
231
        Tx *wire.MsgTx
232

233
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
234
        ReplacedTx *wire.MsgTx
235

236
        // FeeRate is the fee rate used for the new tx.
237
        FeeRate chainfee.SatPerKWeight
238

239
        // Fee is the fee paid by the new tx.
240
        Fee btcutil.Amount
241

242
        // Err is the error that occurred during the broadcast.
243
        Err error
244

245
        // requestID is the ID of the request that created this record.
246
        requestID uint64
247
}
248

249
// Validate validates the BumpResult so it's safe to use.
250
func (b *BumpResult) Validate() error {
10✔
251
        // Every result must have a tx.
10✔
252
        if b.Tx == nil {
11✔
253
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
254
        }
1✔
255

256
        // Every result must have a known event.
257
        if b.Event.Unknown() {
10✔
258
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
259
        }
1✔
260

261
        // If it's a replacing event, it must have a replaced tx.
262
        if b.Event == TxReplaced && b.ReplacedTx == nil {
9✔
263
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
264
        }
1✔
265

266
        // If it's a failed event, it must have an error.
267
        if b.Event == TxFailed && b.Err == nil {
8✔
268
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
1✔
269
        }
1✔
270

271
        // If it's a confirmed event, it must have a fee rate and fee.
272
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
7✔
273
                return fmt.Errorf("%w: missing fee rate or fee",
1✔
274
                        ErrInvalidBumpResult)
1✔
275
        }
1✔
276

277
        return nil
5✔
278
}
279

280
// TxPublisherConfig is the config used to create a new TxPublisher.
281
type TxPublisherConfig struct {
282
        // Signer is used to create the tx signature.
283
        Signer input.Signer
284

285
        // Wallet is used primarily to publish the tx.
286
        Wallet Wallet
287

288
        // Estimator is used to estimate the fee rate for the new tx based on
289
        // its deadline conf target.
290
        Estimator chainfee.Estimator
291

292
        // Notifier is used to monitor the confirmation status of the tx.
293
        Notifier chainntnfs.ChainNotifier
294

295
        // AuxSweeper is an optional interface that can be used to modify the
296
        // way sweep transaction are generated.
297
        AuxSweeper fn.Option[AuxSweeper]
298
}
299

300
// TxPublisher is an implementation of the Bumper interface. It utilizes the
301
// `testmempoolaccept` RPC to bump the fee of txns it created based on
302
// different fee function selected or configed by the caller. Its purpose is to
303
// take a list of inputs specified, and create a tx that spends them to a
304
// specified output. It will then monitor the confirmation status of the tx,
305
// and if it's not confirmed within a certain time frame, it will attempt to
306
// bump the fee of the tx by creating a new tx that spends the same inputs to
307
// the same output, but with a higher fee rate. It will continue to do this
308
// until the tx is confirmed or the fee rate reaches the maximum fee rate
309
// specified by the caller.
310
type TxPublisher struct {
311
        started atomic.Bool
312
        stopped atomic.Bool
313

314
        wg sync.WaitGroup
315

316
        // cfg specifies the configuration of the TxPublisher.
317
        cfg *TxPublisherConfig
318

319
        // currentHeight is the current block height.
320
        currentHeight atomic.Int32
321

322
        // records is a map keyed by the requestCounter and the value is the tx
323
        // being monitored.
324
        records lnutils.SyncMap[uint64, *monitorRecord]
325

326
        // requestCounter is a monotonically increasing counter used to keep
327
        // track of how many requests have been made.
328
        requestCounter atomic.Uint64
329

330
        // subscriberChans is a map keyed by the requestCounter, each item is
331
        // the chan that the publisher sends the fee bump result to.
332
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
333

334
        // quit is used to signal the publisher to stop.
335
        quit chan struct{}
336
}
337

338
// Compile-time constraint to ensure TxPublisher implements Bumper.
339
var _ Bumper = (*TxPublisher)(nil)
340

341
// NewTxPublisher creates a new TxPublisher.
342
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
15✔
343
        return &TxPublisher{
15✔
344
                cfg:             &cfg,
15✔
345
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
15✔
346
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
15✔
347
                quit:            make(chan struct{}),
15✔
348
        }
15✔
349
}
15✔
350

351
// isNeutrinoBackend checks if the wallet backend is neutrino.
352
func (t *TxPublisher) isNeutrinoBackend() bool {
2✔
353
        return t.cfg.Wallet.BackEnd() == "neutrino"
2✔
354
}
2✔
355

356
// Broadcast is used to publish the tx created from the given inputs. It will,
357
// 1. init a fee function based on the given strategy.
358
// 2. create an RBF-compliant tx and monitor it for confirmation.
359
// 3. notify the initial broadcast result back to the caller.
360
// The initial broadcast is guaranteed to be RBF-compliant unless the budget
361
// specified cannot cover the fee.
362
//
363
// NOTE: part of the Bumper interface.
364
func (t *TxPublisher) Broadcast(req *BumpRequest) (<-chan *BumpResult, error) {
4✔
365
        log.Tracef("Received broadcast request: %s", lnutils.SpewLogClosure(
4✔
366
                req))
4✔
367

4✔
368
        // Attempt an initial broadcast which is guaranteed to comply with the
4✔
369
        // RBF rules.
4✔
370
        result, err := t.initialBroadcast(req)
4✔
371
        if err != nil {
6✔
372
                log.Errorf("Initial broadcast failed: %v", err)
2✔
373

2✔
374
                return nil, err
2✔
375
        }
2✔
376

377
        // Create a chan to send the result to the caller.
378
        subscriber := make(chan *BumpResult, 1)
3✔
379
        t.subscriberChans.Store(result.requestID, subscriber)
3✔
380

3✔
381
        // Send the initial broadcast result to the caller.
3✔
382
        t.handleResult(result)
3✔
383

3✔
384
        return subscriber, nil
3✔
385
}
386

387
// initialBroadcast initializes a fee function, creates an RBF-compliant tx and
388
// broadcasts it.
389
func (t *TxPublisher) initialBroadcast(req *BumpRequest) (*BumpResult, error) {
4✔
390
        // Create a fee bumping algorithm to be used for future RBF.
4✔
391
        feeAlgo, err := t.initializeFeeFunction(req)
4✔
392
        if err != nil {
5✔
393
                return nil, fmt.Errorf("init fee function: %w", err)
1✔
394
        }
1✔
395

396
        // Create the initial tx to be broadcasted. This tx is guaranteed to
397
        // comply with the RBF restrictions.
398
        requestID, err := t.createRBFCompliantTx(req, feeAlgo)
4✔
399
        if err != nil {
6✔
400
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
2✔
401
        }
2✔
402

403
        // Broadcast the tx and return the monitored record.
404
        result, err := t.broadcast(requestID)
3✔
405
        if err != nil {
3✔
406
                return nil, fmt.Errorf("broadcast sweep tx: %w", err)
×
407
        }
×
408

409
        return result, nil
3✔
410
}
411

412
// initializeFeeFunction initializes a fee function to be used for this request
413
// for future fee bumping.
414
func (t *TxPublisher) initializeFeeFunction(
415
        req *BumpRequest) (FeeFunction, error) {
6✔
416

6✔
417
        // Get the max allowed feerate.
6✔
418
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
6✔
419
        if err != nil {
6✔
420
                return nil, err
×
421
        }
×
422

423
        // Get the initial conf target.
424
        confTarget := calcCurrentConfTarget(
6✔
425
                t.currentHeight.Load(), req.DeadlineHeight,
6✔
426
        )
6✔
427

6✔
428
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
6✔
429
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
6✔
430
                maxFeeRateAllowed)
6✔
431

6✔
432
        // Initialize the fee function and return it.
6✔
433
        //
6✔
434
        // TODO(yy): return based on differet req.Strategy?
6✔
435
        return NewLinearFeeFunction(
6✔
436
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
6✔
437
                req.StartingFeeRate,
6✔
438
        )
6✔
439
}
440

441
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
442
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
443
// and redo the process until the tx is valid, or return an error when non-RBF
444
// related errors occur or the budget has been used up.
445
func (t *TxPublisher) createRBFCompliantTx(req *BumpRequest,
446
        f FeeFunction) (uint64, error) {
10✔
447

10✔
448
        for {
23✔
449
                // Create a new tx with the given fee rate and check its
13✔
450
                // mempool acceptance.
13✔
451
                sweepCtx, err := t.createAndCheckTx(req, f)
13✔
452

13✔
453
                switch {
13✔
454
                case err == nil:
7✔
455
                        // The tx is valid, return the request ID.
7✔
456
                        requestID := t.storeRecord(
7✔
457
                                sweepCtx.tx, req, f, sweepCtx.fee,
7✔
458
                                sweepCtx.outpointToTxIndex,
7✔
459
                        )
7✔
460

7✔
461
                        log.Infof("Created tx %v for %v inputs: feerate=%v, "+
7✔
462
                                "fee=%v, inputs=%v", sweepCtx.tx.TxHash(),
7✔
463
                                len(req.Inputs), f.FeeRate(), sweepCtx.fee,
7✔
464
                                inputTypeSummary(req.Inputs))
7✔
465

7✔
466
                        return requestID, nil
7✔
467

468
                // If the error indicates the fees paid is not enough, we will
469
                // ask the fee function to increase the fee rate and retry.
470
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
471
                        // We should at least start with a feerate above the
2✔
472
                        // mempool min feerate, so if we get this error, it
2✔
473
                        // means something is wrong earlier in the pipeline.
2✔
474
                        log.Errorf("Current fee=%v, feerate=%v, %v",
2✔
475
                                sweepCtx.fee, f.FeeRate(), err)
2✔
476

2✔
477
                        fallthrough
2✔
478

479
                // We are not paying enough fees so we increase it.
480
                case errors.Is(err, chain.ErrInsufficientFee):
5✔
481
                        increased := false
5✔
482

5✔
483
                        // Keep calling the fee function until the fee rate is
5✔
484
                        // increased or maxed out.
5✔
485
                        for !increased {
11✔
486
                                log.Debugf("Increasing fee for next round, "+
6✔
487
                                        "current fee=%v, feerate=%v",
6✔
488
                                        sweepCtx.fee, f.FeeRate())
6✔
489

6✔
490
                                // If the fee function tells us that we have
6✔
491
                                // used up the budget, we will return an error
6✔
492
                                // indicating this tx cannot be made. The
6✔
493
                                // sweeper should handle this error and try to
6✔
494
                                // cluster these inputs differetly.
6✔
495
                                increased, err = f.Increment()
6✔
496
                                if err != nil {
8✔
497
                                        return 0, err
2✔
498
                                }
2✔
499
                        }
500

501
                // TODO(yy): suppose there's only one bad input, we can do a
502
                // binary search to find out which input is causing this error
503
                // by recreating a tx using half of the inputs and check its
504
                // mempool acceptance.
505
                default:
3✔
506
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
3✔
507
                        return 0, err
3✔
508
                }
509
        }
510
}
511

512
// storeRecord stores the given record in the records map.
513
func (t *TxPublisher) storeRecord(tx *wire.MsgTx, req *BumpRequest,
514
        f FeeFunction, fee btcutil.Amount,
515
        outpointToTxIndex map[wire.OutPoint]int) uint64 {
15✔
516

15✔
517
        // Increase the request counter.
15✔
518
        //
15✔
519
        // NOTE: this is the only place where we increase the
15✔
520
        // counter.
15✔
521
        requestID := t.requestCounter.Add(1)
15✔
522

15✔
523
        // Register the record.
15✔
524
        t.records.Store(requestID, &monitorRecord{
15✔
525
                tx:                tx,
15✔
526
                req:               req,
15✔
527
                feeFunction:       f,
15✔
528
                fee:               fee,
15✔
529
                outpointToTxIndex: outpointToTxIndex,
15✔
530
        })
15✔
531

15✔
532
        return requestID
15✔
533
}
15✔
534

535
// createAndCheckTx creates a tx based on the given inputs, change output
536
// script, and the fee rate. In addition, it validates the tx's mempool
537
// acceptance before returning a tx that can be published directly, along with
538
// its fee.
539
func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
540
        f FeeFunction) (*sweepTxCtx, error) {
23✔
541

23✔
542
        // Create the sweep tx with max fee rate of 0 as the fee function
23✔
543
        // guarantees the fee rate used here won't exceed the max fee rate.
23✔
544
        sweepCtx, err := t.createSweepTx(
23✔
545
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
23✔
546
        )
23✔
547
        if err != nil {
24✔
548
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
1✔
549
        }
1✔
550

551
        // Sanity check the budget still covers the fee.
552
        if sweepCtx.fee > req.Budget {
25✔
553
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
554
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
2✔
555
        }
2✔
556

557
        // If we had an extra txOut, then we'll update the result to include
558
        // it.
559
        req.ExtraTxOut = sweepCtx.extraTxOut
21✔
560

21✔
561
        // Validate the tx's mempool acceptance.
21✔
562
        err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
21✔
563

21✔
564
        // Exit early if the tx is valid.
21✔
565
        if err == nil {
33✔
566
                return sweepCtx, nil
12✔
567
        }
12✔
568

569
        // Print an error log if the chain backend doesn't support the mempool
570
        // acceptance test RPC.
571
        if errors.Is(err, rpcclient.ErrBackendVersion) {
10✔
572
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
573
                        "consider upgrading it to a newer version")
×
574
                return sweepCtx, nil
×
575
        }
×
576

577
        // We are running on a backend that doesn't implement the RPC
578
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
579
        if errors.Is(err, chain.ErrUnimplemented) {
10✔
580
                log.Debug("Skipped testmempoolaccept due to not implemented")
×
581
                return sweepCtx, nil
×
582
        }
×
583

584
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
10✔
585
                sweepCtx.tx.TxHash(), err)
10✔
586
}
587

588
// broadcast takes a monitored tx and publishes it to the network. Prior to the
589
// broadcast, it will subscribe the tx's confirmation notification and attach
590
// the event channel to the record. Any broadcast-related errors will not be
591
// returned here, instead, they will be put inside the `BumpResult` and
592
// returned to the caller.
593
func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
10✔
594
        // Get the record being monitored.
10✔
595
        record, ok := t.records.Load(requestID)
10✔
596
        if !ok {
11✔
597
                return nil, fmt.Errorf("tx record %v not found", requestID)
1✔
598
        }
1✔
599

600
        txid := record.tx.TxHash()
9✔
601

9✔
602
        tx := record.tx
9✔
603
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
9✔
604
                txid, len(tx.TxIn), t.currentHeight.Load())
9✔
605

9✔
606
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
9✔
607
        // present of this new broadcast attempt.
9✔
608
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
17✔
609
                return aux.NotifyBroadcast(
8✔
610
                        record.req, tx, record.fee, record.outpointToTxIndex,
8✔
611
                )
8✔
612
        })
8✔
613
        if err != nil {
9✔
614
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
615
        }
×
616

617
        // Set the event, and change it to TxFailed if the wallet fails to
618
        // publish it.
619
        event := TxPublished
9✔
620

9✔
621
        // Publish the sweeping tx with customized label. If the publish fails,
9✔
622
        // this error will be saved in the `BumpResult` and it will be removed
9✔
623
        // from being monitored.
9✔
624
        err = t.cfg.Wallet.PublishTransaction(
9✔
625
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
9✔
626
        )
9✔
627
        if err != nil {
12✔
628
                // NOTE: we decide to attach this error to the result instead
3✔
629
                // of returning it here because by the time the tx reaches
3✔
630
                // here, it should have passed the mempool acceptance check. If
3✔
631
                // it still fails to be broadcast, it's likely a non-RBF
3✔
632
                // related error happened. So we send this error back to the
3✔
633
                // caller so that it can handle it properly.
3✔
634
                //
3✔
635
                // TODO(yy): find out which input is causing the failure.
3✔
636
                log.Errorf("Failed to publish tx %v: %v", txid, err)
3✔
637
                event = TxFailed
3✔
638
        }
3✔
639

640
        result := &BumpResult{
9✔
641
                Event:     event,
9✔
642
                Tx:        record.tx,
9✔
643
                Fee:       record.fee,
9✔
644
                FeeRate:   record.feeFunction.FeeRate(),
9✔
645
                Err:       err,
9✔
646
                requestID: requestID,
9✔
647
        }
9✔
648

9✔
649
        return result, nil
9✔
650
}
651

652
// notifyResult sends the result to the resultChan specified by the requestID.
653
// This channel is expected to be read by the caller.
654
func (t *TxPublisher) notifyResult(result *BumpResult) {
10✔
655
        id := result.requestID
10✔
656
        subscriber, ok := t.subscriberChans.Load(id)
10✔
657
        if !ok {
10✔
658
                log.Errorf("Result chan for id=%v not found", id)
×
659
                return
×
660
        }
×
661

662
        log.Debugf("Sending result for requestID=%v, tx=%v", id,
10✔
663
                result.Tx.TxHash())
10✔
664

10✔
665
        select {
10✔
666
        // Send the result to the subscriber.
667
        //
668
        // TODO(yy): Add timeout in case it's blocking?
669
        case subscriber <- result:
9✔
670
        case <-t.quit:
1✔
671
                log.Debug("Fee bumper stopped")
1✔
672
        }
673
}
674

675
// removeResult removes the tracking of the result if the result contains a
676
// non-nil error, or the tx is confirmed, the record will be removed from the
677
// maps.
678
func (t *TxPublisher) removeResult(result *BumpResult) {
10✔
679
        id := result.requestID
10✔
680

10✔
681
        // Remove the record from the maps if there's an error. This means this
10✔
682
        // tx has failed its broadcast and cannot be retried. There are two
10✔
683
        // cases,
10✔
684
        // - when the budget cannot cover the fee.
10✔
685
        // - when a non-RBF related error occurs.
10✔
686
        switch result.Event {
10✔
687
        case TxFailed:
3✔
688
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
3✔
689
                        id, result.Tx.TxHash(), result.Err)
3✔
690

691
        case TxConfirmed:
4✔
692
                // Remove the record is the tx is confirmed.
4✔
693
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
4✔
694
                        result.Tx.TxHash())
4✔
695

696
        // Do nothing if it's neither failed or confirmed.
697
        default:
5✔
698
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
5✔
699
                        result.Event)
5✔
700

5✔
701
                return
5✔
702
        }
703

704
        t.records.Delete(id)
6✔
705
        t.subscriberChans.Delete(id)
6✔
706
}
707

708
// handleResult handles the result of a tx broadcast. It will notify the
709
// subscriber and remove the record if the tx is confirmed or failed to be
710
// broadcast.
711
func (t *TxPublisher) handleResult(result *BumpResult) {
7✔
712
        // Notify the subscriber.
7✔
713
        t.notifyResult(result)
7✔
714

7✔
715
        // Remove the record if it's failed or confirmed.
7✔
716
        t.removeResult(result)
7✔
717
}
7✔
718

719
// monitorRecord is used to keep track of the tx being monitored by the
720
// publisher internally.
721
type monitorRecord struct {
722
        // tx is the tx being monitored.
723
        tx *wire.MsgTx
724

725
        // req is the original request.
726
        req *BumpRequest
727

728
        // feeFunction is the fee bumping algorithm used by the publisher.
729
        feeFunction FeeFunction
730

731
        // fee is the fee paid by the tx.
732
        fee btcutil.Amount
733

734
        // outpointToTxIndex is a map of outpoint to tx index.
735
        outpointToTxIndex map[wire.OutPoint]int
736
}
737

738
// Start starts the publisher by subscribing to block epoch updates and kicking
739
// off the monitor loop.
740
func (t *TxPublisher) Start() error {
1✔
741
        log.Info("TxPublisher starting...")
1✔
742

1✔
743
        if t.started.Swap(true) {
1✔
744
                return fmt.Errorf("TxPublisher started more than once")
×
745
        }
×
746

747
        blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
1✔
748
        if err != nil {
1✔
749
                return fmt.Errorf("register block epoch ntfn: %w", err)
×
750
        }
×
751

752
        t.wg.Add(1)
1✔
753
        go t.monitor(blockEvent)
1✔
754

1✔
755
        log.Debugf("TxPublisher started")
1✔
756

1✔
757
        return nil
1✔
758
}
759

760
// Stop stops the publisher and waits for the monitor loop to exit.
761
func (t *TxPublisher) Stop() error {
1✔
762
        log.Info("TxPublisher stopping...")
1✔
763

1✔
764
        if t.stopped.Swap(true) {
1✔
765
                return fmt.Errorf("TxPublisher stopped more than once")
×
766
        }
×
767

768
        close(t.quit)
1✔
769
        t.wg.Wait()
1✔
770

1✔
771
        log.Debug("TxPublisher stopped")
1✔
772

1✔
773
        return nil
1✔
774
}
775

776
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
777
// it will examine all the txns being monitored, and check if any of them needs
778
// to be bumped. If so, it will attempt to bump the fee of the tx.
779
//
780
// NOTE: Must be run as a goroutine.
781
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
1✔
782
        defer blockEvent.Cancel()
1✔
783
        defer t.wg.Done()
1✔
784

1✔
785
        for {
2✔
786
                select {
1✔
787
                case epoch, ok := <-blockEvent.Epochs:
1✔
788
                        if !ok {
1✔
789
                                // We should stop the publisher before stopping
×
790
                                // the chain service. Otherwise it indicates an
×
791
                                // error.
×
792
                                log.Error("Block epoch channel closed, exit " +
×
793
                                        "monitor")
×
794

×
795
                                return
×
796
                        }
×
797

798
                        log.Debugf("TxPublisher received new block: %v",
1✔
799
                                epoch.Height)
1✔
800

1✔
801
                        // Update the best known height for the publisher.
1✔
802
                        t.currentHeight.Store(epoch.Height)
1✔
803

1✔
804
                        // Check all monitored txns to see if any of them needs
1✔
805
                        // to be bumped.
1✔
806
                        t.processRecords()
1✔
807

808
                case <-t.quit:
1✔
809
                        log.Debug("Fee bumper stopped, exit monitor")
1✔
810
                        return
1✔
811
                }
812
        }
813
}
814

815
// processRecords checks all the txns being monitored, and checks if any of
816
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
817
func (t *TxPublisher) processRecords() {
2✔
818
        // confirmedRecords stores a map of the records which have been
2✔
819
        // confirmed.
2✔
820
        confirmedRecords := make(map[uint64]*monitorRecord)
2✔
821

2✔
822
        // feeBumpRecords stores a map of the records which need to be bumped.
2✔
823
        feeBumpRecords := make(map[uint64]*monitorRecord)
2✔
824

2✔
825
        // failedRecords stores a map of the records which has inputs being
2✔
826
        // spent by a third party.
2✔
827
        //
2✔
828
        // NOTE: this is only used for neutrino backend.
2✔
829
        failedRecords := make(map[uint64]*monitorRecord)
2✔
830

2✔
831
        // visitor is a helper closure that visits each record and divides them
2✔
832
        // into two groups.
2✔
833
        visitor := func(requestID uint64, r *monitorRecord) error {
5✔
834
                log.Tracef("Checking monitor recordID=%v for tx=%v", requestID,
3✔
835
                        r.tx.TxHash())
3✔
836

3✔
837
                // If the tx is already confirmed, we can stop monitoring it.
3✔
838
                if t.isConfirmed(r.tx.TxHash()) {
5✔
839
                        confirmedRecords[requestID] = r
2✔
840

2✔
841
                        // Move to the next record.
2✔
842
                        return nil
2✔
843
                }
2✔
844

845
                // Check whether the inputs has been spent by a third party.
846
                //
847
                // NOTE: this check is only done for neutrino backend.
848
                if t.isThirdPartySpent(r.tx.TxHash(), r.req.Inputs) {
2✔
849
                        failedRecords[requestID] = r
×
850

×
851
                        // Move to the next record.
×
852
                        return nil
×
853
                }
×
854

855
                feeBumpRecords[requestID] = r
2✔
856

2✔
857
                // Return nil to move to the next record.
2✔
858
                return nil
2✔
859
        }
860

861
        // Iterate through all the records and divide them into two groups.
862
        t.records.ForEach(visitor)
2✔
863

2✔
864
        // For records that are confirmed, we'll notify the caller about this
2✔
865
        // result.
2✔
866
        for requestID, r := range confirmedRecords {
4✔
867
                rec := r
2✔
868

2✔
869
                log.Debugf("Tx=%v is confirmed", r.tx.TxHash())
2✔
870
                t.wg.Add(1)
2✔
871
                go t.handleTxConfirmed(rec, requestID)
2✔
872
        }
2✔
873

874
        // Get the current height to be used in the following goroutines.
875
        currentHeight := t.currentHeight.Load()
2✔
876

2✔
877
        // For records that are not confirmed, we perform a fee bump if needed.
2✔
878
        for requestID, r := range feeBumpRecords {
4✔
879
                rec := r
2✔
880

2✔
881
                log.Debugf("Attempting to fee bump Tx=%v", r.tx.TxHash())
2✔
882
                t.wg.Add(1)
2✔
883
                go t.handleFeeBumpTx(requestID, rec, currentHeight)
2✔
884
        }
2✔
885

886
        // For records that are failed, we'll notify the caller about this
887
        // result.
888
        for requestID, r := range failedRecords {
2✔
889
                rec := r
×
890

×
891
                log.Debugf("Tx=%v has inputs been spent by a third party, "+
×
892
                        "failing it now", r.tx.TxHash())
×
893
                t.wg.Add(1)
×
894
                go t.handleThirdPartySpent(rec, requestID)
×
895
        }
×
896
}
897

898
// handleTxConfirmed is called when a monitored tx is confirmed. It will
899
// notify the subscriber then remove the record from the maps .
900
//
901
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
902
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord, requestID uint64) {
3✔
903
        defer t.wg.Done()
3✔
904

3✔
905
        // Create a result that will be sent to the resultChan which is
3✔
906
        // listened by the caller.
3✔
907
        result := &BumpResult{
3✔
908
                Event:     TxConfirmed,
3✔
909
                Tx:        r.tx,
3✔
910
                requestID: requestID,
3✔
911
                Fee:       r.fee,
3✔
912
                FeeRate:   r.feeFunction.FeeRate(),
3✔
913
        }
3✔
914

3✔
915
        // Notify that this tx is confirmed and remove the record from the map.
3✔
916
        t.handleResult(result)
3✔
917
}
3✔
918

919
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
920
// attempt to bump the fee of the tx.
921
//
922
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
923
func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
924
        currentHeight int32) {
5✔
925

5✔
926
        defer t.wg.Done()
5✔
927

5✔
928
        oldTxid := r.tx.TxHash()
5✔
929

5✔
930
        // Get the current conf target for this record.
5✔
931
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
5✔
932

5✔
933
        // Ask the fee function whether a bump is needed. We expect the fee
5✔
934
        // function to increase its returned fee rate after calling this
5✔
935
        // method.
5✔
936
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
5✔
937
        if err != nil {
7✔
938
                // TODO(yy): send this error back to the sweeper so it can
2✔
939
                // re-group the inputs?
2✔
940
                log.Errorf("Failed to increase fee rate for tx %v at "+
2✔
941
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
2✔
942

2✔
943
                return
2✔
944
        }
2✔
945

946
        // If the fee rate was not increased, there's no need to bump the fee.
947
        if !increased {
6✔
948
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
2✔
949
                        t.currentHeight.Load())
2✔
950

2✔
951
                return
2✔
952
        }
2✔
953

954
        // The fee function now has a new fee rate, we will use it to bump the
955
        // fee of the tx.
956
        resultOpt := t.createAndPublishTx(requestID, r)
3✔
957

3✔
958
        // If there's a result, we will notify the caller about the result.
3✔
959
        resultOpt.WhenSome(func(result BumpResult) {
6✔
960
                // Notify the new result.
3✔
961
                t.handleResult(&result)
3✔
962
        })
3✔
963
}
964

965
// handleThirdPartySpent is called when the inputs in an unconfirmed tx is
966
// spent. It will notify the subscriber then remove the record from the maps
967
// and send a TxFailed event to the subscriber.
968
//
969
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
970
func (t *TxPublisher) handleThirdPartySpent(r *monitorRecord,
971
        requestID uint64) {
×
972

×
973
        defer t.wg.Done()
×
974

×
975
        // Create a result that will be sent to the resultChan which is
×
976
        // listened by the caller.
×
977
        //
×
978
        // TODO(yy): create a new state `TxThirdPartySpent` to notify the
×
979
        // sweeper to remove the input, hence moving the monitoring of inputs
×
980
        // spent inside the fee bumper.
×
981
        result := &BumpResult{
×
982
                Event:     TxFailed,
×
983
                Tx:        r.tx,
×
984
                requestID: requestID,
×
985
                Err:       ErrThirdPartySpent,
×
986
        }
×
987

×
988
        // Notify that this tx is confirmed and remove the record from the map.
×
989
        t.handleResult(result)
×
990
}
×
991

992
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
993
// to the network. It will update the record with the new tx and fee rate if
994
// successfully created, and return the result when published successfully.
995
func (t *TxPublisher) createAndPublishTx(requestID uint64,
996
        r *monitorRecord) fn.Option[BumpResult] {
8✔
997

8✔
998
        // Fetch the old tx.
8✔
999
        oldTx := r.tx
8✔
1000

8✔
1001
        // Create a new tx with the new fee rate.
8✔
1002
        //
8✔
1003
        // NOTE: The fee function is expected to have increased its returned
8✔
1004
        // fee rate after calling the SkipFeeBump method. So we can use it
8✔
1005
        // directly here.
8✔
1006
        sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction)
8✔
1007

8✔
1008
        // If the error is fee related, we will return no error and let the fee
8✔
1009
        // bumper retry it at next block.
8✔
1010
        //
8✔
1011
        // NOTE: we can check the RBF error here and ask the fee function to
8✔
1012
        // recalculate the fee rate. However, this would defeat the purpose of
8✔
1013
        // using a deadline based fee function:
8✔
1014
        // - if the deadline is far away, there's no rush to RBF the tx.
8✔
1015
        // - if the deadline is close, we expect the fee function to give us a
8✔
1016
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
8✔
1017
        //   means the budget is not enough.
8✔
1018
        if errors.Is(err, chain.ErrInsufficientFee) ||
8✔
1019
                errors.Is(err, lnwallet.ErrMempoolFee) {
11✔
1020

3✔
1021
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
3✔
1022
                return fn.None[BumpResult]()
3✔
1023
        }
3✔
1024

1025
        // If the error is not fee related, we will return a `TxFailed` event
1026
        // so this input can be retried.
1027
        if err != nil {
8✔
1028
                // If the tx doesn't not have enought budget, we will return a
2✔
1029
                // result so the sweeper can handle it by re-clustering the
2✔
1030
                // utxos.
2✔
1031
                if errors.Is(err, ErrNotEnoughBudget) {
3✔
1032
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
1✔
1033
                                err)
1✔
1034
                } else {
2✔
1035
                        // Otherwise, an unexpected error occurred, we will
1✔
1036
                        // fail the tx and let the sweeper retry the whole
1✔
1037
                        // process.
1✔
1038
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
1✔
1039
                                err)
1✔
1040
                }
1✔
1041

1042
                return fn.Some(BumpResult{
2✔
1043
                        Event:     TxFailed,
2✔
1044
                        Tx:        oldTx,
2✔
1045
                        Err:       err,
2✔
1046
                        requestID: requestID,
2✔
1047
                })
2✔
1048
        }
1049

1050
        // The tx has been created without any errors, we now register a new
1051
        // record by overwriting the same requestID.
1052
        t.records.Store(requestID, &monitorRecord{
5✔
1053
                tx:                sweepCtx.tx,
5✔
1054
                req:               r.req,
5✔
1055
                feeFunction:       r.feeFunction,
5✔
1056
                fee:               sweepCtx.fee,
5✔
1057
                outpointToTxIndex: sweepCtx.outpointToTxIndex,
5✔
1058
        })
5✔
1059

5✔
1060
        // Attempt to broadcast this new tx.
5✔
1061
        result, err := t.broadcast(requestID)
5✔
1062
        if err != nil {
5✔
1063
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1064
                        sweepCtx.tx.TxHash(), err)
×
1065

×
1066
                return fn.None[BumpResult]()
×
1067
        }
×
1068

1069
        // If the result error is fee related, we will return no error and let
1070
        // the fee bumper retry it at next block.
1071
        //
1072
        // NOTE: we may get this error if we've bypassed the mempool check,
1073
        // which means we are suing neutrino backend.
1074
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
5✔
1075
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
5✔
1076

×
1077
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
×
1078
                return fn.None[BumpResult]()
×
1079
        }
×
1080

1081
        // A successful replacement tx is created, attach the old tx.
1082
        result.ReplacedTx = oldTx
5✔
1083

5✔
1084
        // If the new tx failed to be published, we will return the result so
5✔
1085
        // the caller can handle it.
5✔
1086
        if result.Event == TxFailed {
6✔
1087
                return fn.Some(*result)
1✔
1088
        }
1✔
1089

1090
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
4✔
1091
                sweepCtx.tx.TxHash())
4✔
1092

4✔
1093
        // Otherwise, it's a successful RBF, set the event and return.
4✔
1094
        result.Event = TxReplaced
4✔
1095

4✔
1096
        return fn.Some(*result)
4✔
1097
}
1098

1099
// isConfirmed checks the btcwallet to see whether the tx is confirmed.
1100
func (t *TxPublisher) isConfirmed(txid chainhash.Hash) bool {
3✔
1101
        details, err := t.cfg.Wallet.GetTransactionDetails(&txid)
3✔
1102
        if err != nil {
4✔
1103
                log.Warnf("Failed to get tx details for %v: %v", txid, err)
1✔
1104
                return false
1✔
1105
        }
1✔
1106

1107
        return details.NumConfirmations > 0
3✔
1108
}
1109

1110
// isThirdPartySpent checks whether the inputs of the tx has already been spent
1111
// by a third party. When a tx is not confirmed, yet its inputs has been spent,
1112
// then it must be spent by a different tx other than the sweeping tx here.
1113
//
1114
// NOTE: this check is only performed for neutrino backend as it has no
1115
// reliable way to tell a tx has been replaced.
1116
func (t *TxPublisher) isThirdPartySpent(txid chainhash.Hash,
1117
        inputs []input.Input) bool {
2✔
1118

2✔
1119
        // Skip this check for if this is not neutrino backend.
2✔
1120
        if !t.isNeutrinoBackend() {
4✔
1121
                return false
2✔
1122
        }
2✔
1123

1124
        // Iterate all the inputs and check if they have been spent already.
1125
        for _, inp := range inputs {
×
1126
                op := inp.OutPoint()
×
1127

×
1128
                // For wallet utxos, the height hint is not set - we don't need
×
1129
                // to monitor them for third party spend.
×
1130
                heightHint := inp.HeightHint()
×
1131
                if heightHint == 0 {
×
1132
                        log.Debugf("Skipped third party check for wallet "+
×
1133
                                "input %v", op)
×
1134

×
1135
                        continue
×
1136
                }
1137

1138
                // If the input has already been spent after the height hint, a
1139
                // spend event is sent back immediately.
1140
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
×
1141
                        &op, inp.SignDesc().Output.PkScript, heightHint,
×
1142
                )
×
1143
                if err != nil {
×
1144
                        log.Criticalf("Failed to register spend ntfn for "+
×
1145
                                "input=%v: %v", op, err)
×
1146
                        return false
×
1147
                }
×
1148

1149
                // Remove the subscription when exit.
1150
                defer spendEvent.Cancel()
×
1151

×
1152
                // Do a non-blocking read to see if the output has been spent.
×
1153
                select {
×
1154
                case spend, ok := <-spendEvent.Spend:
×
1155
                        if !ok {
×
1156
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1157
                                return false
×
1158
                        }
×
1159

1160
                        spendingTxID := spend.SpendingTx.TxHash()
×
1161

×
1162
                        // If the spending tx is the same as the sweeping tx
×
1163
                        // then we are good.
×
1164
                        if spendingTxID == txid {
×
1165
                                continue
×
1166
                        }
1167

1168
                        log.Warnf("Detected third party spent of output=%v "+
×
1169
                                "in tx=%v", op, spend.SpendingTx.TxHash())
×
1170

×
1171
                        return true
×
1172

1173
                // Move to the next input.
1174
                default:
×
1175
                }
1176
        }
1177

1178
        return false
×
1179
}
1180

1181
// calcCurrentConfTarget calculates the current confirmation target based on
1182
// the deadline height. The conf target is capped at 0 if the deadline has
1183
// already been past.
1184
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
12✔
1185
        var confTarget uint32
12✔
1186

12✔
1187
        // Calculate how many blocks left until the deadline.
12✔
1188
        deadlineDelta := deadline - currentHeight
12✔
1189

12✔
1190
        // If we are already past the deadline, we will set the conf target to
12✔
1191
        // be 1.
12✔
1192
        if deadlineDelta < 0 {
17✔
1193
                log.Warnf("Deadline is %d blocks behind current height %v",
5✔
1194
                        -deadlineDelta, currentHeight)
5✔
1195

5✔
1196
                confTarget = 0
5✔
1197
        } else {
13✔
1198
                confTarget = uint32(deadlineDelta)
8✔
1199
        }
8✔
1200

1201
        return confTarget
12✔
1202
}
1203

1204
// sweepTxCtx houses a sweep transaction with additional context.
1205
type sweepTxCtx struct {
1206
        tx *wire.MsgTx
1207

1208
        fee btcutil.Amount
1209

1210
        extraTxOut fn.Option[SweepOutput]
1211

1212
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1213
        // the sweep transaction.
1214
        outpointToTxIndex map[wire.OutPoint]int
1215
}
1216

1217
// createSweepTx creates a sweeping tx based on the given inputs, change
1218
// address and fee rate.
1219
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1220
        changePkScript lnwallet.AddrWithKey,
1221
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
23✔
1222

23✔
1223
        // Validate and calculate the fee and change amount.
23✔
1224
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
23✔
1225
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
23✔
1226
                t.cfg.AuxSweeper,
23✔
1227
        )
23✔
1228
        if err != nil {
24✔
1229
                return nil, err
1✔
1230
        }
1✔
1231

1232
        var (
23✔
1233
                // Create the sweep transaction that we will be building. We
23✔
1234
                // use version 2 as it is required for CSV.
23✔
1235
                sweepTx = wire.NewMsgTx(2)
23✔
1236

23✔
1237
                // We'll add the inputs as we go so we know the final ordering
23✔
1238
                // of inputs to sign.
23✔
1239
                idxs []input.Input
23✔
1240
        )
23✔
1241

23✔
1242
        // We start by adding all inputs that commit to an output. We do this
23✔
1243
        // since the input and output index must stay the same for the
23✔
1244
        // signatures to be valid.
23✔
1245
        outpointToTxIndex := make(map[wire.OutPoint]int)
23✔
1246
        for _, o := range inputs {
46✔
1247
                if o.RequiredTxOut() == nil {
46✔
1248
                        continue
23✔
1249
                }
1250

1251
                idxs = append(idxs, o)
1✔
1252
                sweepTx.AddTxIn(&wire.TxIn{
1✔
1253
                        PreviousOutPoint: o.OutPoint(),
1✔
1254
                        Sequence:         o.BlocksToMaturity(),
1✔
1255
                })
1✔
1256
                sweepTx.AddTxOut(o.RequiredTxOut())
1✔
1257

1✔
1258
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
1✔
1259
        }
1260

1261
        // Sum up the value contained in the remaining inputs, and add them to
1262
        // the sweep transaction.
1263
        for _, o := range inputs {
46✔
1264
                if o.RequiredTxOut() != nil {
24✔
1265
                        continue
1✔
1266
                }
1267

1268
                idxs = append(idxs, o)
23✔
1269
                sweepTx.AddTxIn(&wire.TxIn{
23✔
1270
                        PreviousOutPoint: o.OutPoint(),
23✔
1271
                        Sequence:         o.BlocksToMaturity(),
23✔
1272
                })
23✔
1273
        }
1274

1275
        // If we have change outputs to add, then add it the sweep transaction
1276
        // here.
1277
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
46✔
1278
                for i := range changeOuts {
68✔
1279
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
45✔
1280
                }
45✔
1281
        })
1282

1283
        // We'll default to using the current block height as locktime, if none
1284
        // of the inputs commits to a different locktime.
1285
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
23✔
1286

23✔
1287
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
23✔
1288
        if err != nil {
23✔
1289
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
1290
                        "for hash cache: %v", err)
×
1291
        }
×
1292
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
23✔
1293

23✔
1294
        // With all the inputs in place, use each output's unique input script
23✔
1295
        // function to generate the final witness required for spending.
23✔
1296
        addInputScript := func(idx int, tso input.Input) error {
46✔
1297
                inputScript, err := tso.CraftInputScript(
23✔
1298
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
23✔
1299
                )
23✔
1300
                if err != nil {
23✔
1301
                        return err
×
1302
                }
×
1303

1304
                sweepTx.TxIn[idx].Witness = inputScript.Witness
23✔
1305

23✔
1306
                if len(inputScript.SigScript) == 0 {
46✔
1307
                        return nil
23✔
1308
                }
23✔
1309

1310
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1311

×
1312
                return nil
×
1313
        }
1314

1315
        for idx, inp := range idxs {
46✔
1316
                if err := addInputScript(idx, inp); err != nil {
23✔
1317
                        return nil, err
×
1318
                }
×
1319
        }
1320

1321
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
23✔
1322
                inputTypeSummary(inputs))
23✔
1323

23✔
1324
        // Try to locate the extra change output, though there might be None.
23✔
1325
        extraTxOut := fn.MapOption(
23✔
1326
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
46✔
1327
                        for _, sweepOut := range sweepOuts {
68✔
1328
                                if !sweepOut.IsExtra {
90✔
1329
                                        continue
45✔
1330
                                }
1331

1332
                                // If we sweep outputs of a custom channel, the
1333
                                // custom leaves in those outputs will be merged
1334
                                // into a single output, even if we sweep
1335
                                // multiple outputs (e.g. to_remote and breached
1336
                                // to_local of a breached channel) at the same
1337
                                // time. So there will only ever be one extra
1338
                                // output.
1339
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
1340
                                        lnutils.SpewLogClosure(sweepOut))
×
1341

×
1342
                                return fn.Some(sweepOut)
×
1343
                        }
1344

1345
                        return fn.None[SweepOutput]()
23✔
1346
                },
1347
        )(changeOutputsOpt)
1348

1349
        return &sweepTxCtx{
23✔
1350
                tx:                sweepTx,
23✔
1351
                fee:               txFee,
23✔
1352
                extraTxOut:        fn.FlattenOption(extraTxOut),
23✔
1353
                outpointToTxIndex: outpointToTxIndex,
23✔
1354
        }, nil
23✔
1355
}
1356

1357
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1358
// optional locktime after a series of validations:
1359
// 1. check the locktime has been reached.
1360
// 2. check the locktimes are the same.
1361
// 3. check the inputs cover the outputs.
1362
//
1363
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1364
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1365
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1366
        auxSweeper fn.Option[AuxSweeper]) (
1367
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
23✔
1368

23✔
1369
        noChange := fn.None[[]SweepOutput]()
23✔
1370
        noLocktime := fn.None[int32]()
23✔
1371

23✔
1372
        // Given the set of inputs we have, if we have an aux sweeper, then
23✔
1373
        // we'll attempt to see if we have any other change outputs we'll need
23✔
1374
        // to add to the sweep transaction.
23✔
1375
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
23✔
1376

23✔
1377
        var extraChangeOut fn.Option[SweepOutput]
23✔
1378
        err := fn.MapOptionZ(
23✔
1379
                auxSweeper, func(aux AuxSweeper) error {
45✔
1380
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
22✔
1381
                        if err := extraOut.Err(); err != nil {
22✔
1382
                                return err
×
1383
                        }
×
1384

1385
                        extraChangeOut = extraOut.LeftToSome()
22✔
1386

22✔
1387
                        return nil
22✔
1388
                },
1389
        )
1390
        if err != nil {
23✔
1391
                return 0, noChange, noLocktime, err
×
1392
        }
×
1393

1394
        // Creating a weight estimator with nil outputs and zero max fee rate.
1395
        // We don't allow adding customized outputs in the sweeping tx, and the
1396
        // fee rate is already being managed before we get here.
1397
        inputs, estimator, err := getWeightEstimate(
23✔
1398
                inputs, nil, feeRate, 0, changePkScripts,
23✔
1399
        )
23✔
1400
        if err != nil {
23✔
1401
                return 0, noChange, noLocktime, err
×
1402
        }
×
1403

1404
        txFee := estimator.fee()
23✔
1405

23✔
1406
        var (
23✔
1407
                // Track whether any of the inputs require a certain locktime.
23✔
1408
                locktime = int32(-1)
23✔
1409

23✔
1410
                // We keep track of total input amount, and required output
23✔
1411
                // amount to use for calculating the change amount below.
23✔
1412
                totalInput     btcutil.Amount
23✔
1413
                requiredOutput btcutil.Amount
23✔
1414
        )
23✔
1415

23✔
1416
        // If we have an extra change output, then we'll add it as a required
23✔
1417
        // output amt.
23✔
1418
        extraChangeOut.WhenSome(func(o SweepOutput) {
45✔
1419
                requiredOutput += btcutil.Amount(o.Value)
22✔
1420
        })
22✔
1421

1422
        // Go through each input and check if the required lock times have
1423
        // reached and are the same.
1424
        for _, o := range inputs {
46✔
1425
                // If the input has a required output, we'll add it to the
23✔
1426
                // required output amount.
23✔
1427
                if o.RequiredTxOut() != nil {
24✔
1428
                        requiredOutput += btcutil.Amount(
1✔
1429
                                o.RequiredTxOut().Value,
1✔
1430
                        )
1✔
1431
                }
1✔
1432

1433
                // Update the total input amount.
1434
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
23✔
1435

23✔
1436
                lt, ok := o.RequiredLockTime()
23✔
1437

23✔
1438
                // Skip if the input doesn't require a lock time.
23✔
1439
                if !ok {
46✔
1440
                        continue
23✔
1441
                }
1442

1443
                // Check if the lock time has reached
1444
                if lt > uint32(currentHeight) {
1✔
1445
                        return 0, noChange, noLocktime, ErrLocktimeImmature
×
1446
                }
×
1447

1448
                // If another input commits to a different locktime, they
1449
                // cannot be combined in the same transaction.
1450
                if locktime != -1 && locktime != int32(lt) {
1✔
1451
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1452
                }
×
1453

1454
                // Update the locktime for next iteration.
1455
                locktime = int32(lt)
1✔
1456
        }
1457

1458
        // Make sure total output amount is less than total input amount.
1459
        if requiredOutput+txFee > totalInput {
23✔
1460
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
1461
                        "input to create sweep tx: input_sum=%v, "+
×
1462
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
1463
        }
×
1464

1465
        // The value remaining after the required output and fees is the
1466
        // change output.
1467
        changeAmt := totalInput - requiredOutput - txFee
23✔
1468
        changeOuts := make([]SweepOutput, 0, 2)
23✔
1469

23✔
1470
        extraChangeOut.WhenSome(func(o SweepOutput) {
45✔
1471
                changeOuts = append(changeOuts, o)
22✔
1472
        })
22✔
1473

1474
        // We'll calculate the dust limit for the given changePkScript since it
1475
        // is variable.
1476
        changeFloor := lnwallet.DustLimitForSize(
23✔
1477
                len(changePkScript.DeliveryAddress),
23✔
1478
        )
23✔
1479

23✔
1480
        switch {
23✔
1481
        // If the change amount is dust, we'll move it into the fees, and
1482
        // ignore it.
1483
        case changeAmt < changeFloor:
1✔
1484
                log.Infof("Change amt %v below dustlimit %v, not adding "+
1✔
1485
                        "change output", changeAmt, changeFloor)
1✔
1486

1✔
1487
                // If there's no required output, and the change output is a
1✔
1488
                // dust, it means we are creating a tx without any outputs. In
1✔
1489
                // this case we'll return an error. This could happen when
1✔
1490
                // creating a tx that has an anchor as the only input.
1✔
1491
                if requiredOutput == 0 {
2✔
1492
                        return 0, noChange, noLocktime, ErrTxNoOutput
1✔
1493
                }
1✔
1494

1495
                // The dust amount is added to the fee.
1496
                txFee += changeAmt
×
1497

1498
        // Otherwise, we'll actually recognize it as a change output.
1499
        default:
23✔
1500
                changeOuts = append(changeOuts, SweepOutput{
23✔
1501
                        TxOut: wire.TxOut{
23✔
1502
                                Value:    int64(changeAmt),
23✔
1503
                                PkScript: changePkScript.DeliveryAddress,
23✔
1504
                        },
23✔
1505
                        IsExtra:     false,
23✔
1506
                        InternalKey: changePkScript.InternalKey,
23✔
1507
                })
23✔
1508
        }
1509

1510
        // Optionally set the locktime.
1511
        locktimeOpt := fn.Some(locktime)
23✔
1512
        if locktime == -1 {
46✔
1513
                locktimeOpt = noLocktime
23✔
1514
        }
23✔
1515

1516
        var changeOutsOpt fn.Option[[]SweepOutput]
23✔
1517
        if len(changeOuts) > 0 {
46✔
1518
                changeOutsOpt = fn.Some(changeOuts)
23✔
1519
        }
23✔
1520

1521
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
23✔
1522
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
23✔
1523
                "parents_fee=%v, parents_weight=%v, current_height=%v",
23✔
1524
                len(inputs), inputTypeSummary(inputs), feeRate,
23✔
1525
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
23✔
1526
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
23✔
1527

23✔
1528
        return txFee, changeOutsOpt, locktimeOpt, nil
23✔
1529
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc