• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13157844420

05 Feb 2025 12:55PM UTC coverage: 58.821% (+1.1%) from 57.712%
13157844420

Pull #9448

github

yyforyongyu
docs: add release notes
Pull Request #9448: sweep: properly handle failed sweeping txns

300 of 342 new or added lines in 3 files covered. (87.72%)

35 existing lines in 7 files now uncovered.

136339 of 231787 relevant lines covered (58.82%)

19230.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.41
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainio"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/fn/v2"
18
        "github.com/lightningnetwork/lnd/input"
19
        "github.com/lightningnetwork/lnd/labels"
20
        "github.com/lightningnetwork/lnd/lntypes"
21
        "github.com/lightningnetwork/lnd/lnutils"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/tlv"
25
)
26

27
var (
28
        // ErrInvalidBumpResult is returned when the bump result is invalid.
29
        ErrInvalidBumpResult = errors.New("invalid bump result")
30

31
        // ErrNotEnoughBudget is returned when the fee bumper decides the
32
        // current budget cannot cover the fee.
33
        ErrNotEnoughBudget = errors.New("not enough budget")
34

35
        // ErrLocktimeImmature is returned when sweeping an input whose
36
        // locktime is not reached.
37
        ErrLocktimeImmature = errors.New("immature input")
38

39
        // ErrTxNoOutput is returned when an output cannot be created during tx
40
        // preparation, usually due to the output being dust.
41
        ErrTxNoOutput = errors.New("tx has no output")
42

43
        // Err*ErrUnknownSpent is returned when a unknown tx has spent the input
44
        // in the sweeping tx.
45
        ErrUnknownSpent = errors.New("unknown spent of input")
46

47
        // ErrInputMissing is returned when a given input no longer exists,
48
        // e.g., spending from an orphan tx.
49
        ErrInputMissing = errors.New("input no longer exists")
50
)
51

52
var (
53
        // dummyChangePkScript is a dummy tapscript change script that's used
54
        // when we don't need a real address, just something that can be used
55
        // for fee estimation.
56
        dummyChangePkScript = []byte{
57
                0x51, 0x20,
58
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
59
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
60
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
61
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
62
        }
63
)
64

65
// Bumper defines an interface that can be used by other subsystems for fee
66
// bumping.
67
type Bumper interface {
68
        // Broadcast is used to publish the tx created from the given inputs
69
        // specified in the request. It handles the tx creation, broadcasts it,
70
        // and monitors its confirmation status for potential fee bumping. It
71
        // returns a chan that the caller can use to receive updates about the
72
        // broadcast result and potential RBF attempts.
73
        Broadcast(req *BumpRequest) <-chan *BumpResult
74
}
75

76
// BumpEvent represents the event of a fee bumping attempt.
77
type BumpEvent uint8
78

79
const (
80
        // TxPublished is sent when the broadcast attempt is finished.
81
        TxPublished BumpEvent = iota
82

83
        // TxFailed is sent when the tx has encountered a fee-related error
84
        // during its creation or broadcast, or an internal error from the fee
85
        // bumper. In either case the inputs in this tx should be retried with
86
        // either a different grouping strategy or an increased budget.
87
        //
88
        // TODO(yy): Remove the above usage once we remove sweeping non-CPFP
89
        // anchors.
90
        TxFailed
91

92
        // TxReplaced is sent when the original tx is replaced by a new one.
93
        TxReplaced
94

95
        // TxConfirmed is sent when the tx is confirmed.
96
        TxConfirmed
97

98
        // TxUnknownSpend is sent when at least one of the inputs is spent but
99
        // not by the current sweeping tx, this can happen when,
100
        // - a remote party has replaced our sweeping tx by spending the
101
        //   input(s), e.g., via the direct preimage spend on our outgoing HTLC.
102
        // - a third party has replaced our sweeping tx, e.g., the anchor output
103
        //   after 16 blocks.
104
        // - A previous sweeping tx has confirmed but the fee bumper is not
105
        //   aware of it, e.g., a restart happens right after the sweeping tx is
106
        //   broadcast and confirmed.
107
        TxUnknownSpend
108

109
        // TxFatal is sent when the inputs in this tx cannot be retried. Txns
110
        // will end up in this state if they have encountered a non-fee related
111
        // error, which means they cannot be retried with increased budget.
112
        TxFatal
113

114
        // sentinalEvent is used to check if an event is unknown.
115
        sentinalEvent
116
)
117

118
// String returns a human-readable string for the event.
119
func (e BumpEvent) String() string {
3✔
120
        switch e {
3✔
121
        case TxPublished:
3✔
122
                return "Published"
3✔
123
        case TxFailed:
3✔
124
                return "Failed"
3✔
125
        case TxReplaced:
3✔
126
                return "Replaced"
3✔
127
        case TxConfirmed:
3✔
128
                return "Confirmed"
3✔
129
        case TxFatal:
2✔
130
                return "Fatal"
2✔
131
        case TxUnknownSpend:
3✔
132
                return "UnknownSpend"
3✔
133
        default:
×
134
                return "Unknown"
×
135
        }
136
}
137

138
// Unknown returns true if the event is unknown.
139
func (e BumpEvent) Unknown() bool {
13✔
140
        return e >= sentinalEvent
13✔
141
}
13✔
142

143
// BumpRequest is used by the caller to give the Bumper the necessary info to
144
// create and manage potential fee bumps for a set of inputs.
145
type BumpRequest struct {
146
        // Budget givens the total amount that can be used as fees by these
147
        // inputs.
148
        Budget btcutil.Amount
149

150
        // Inputs is the set of inputs to sweep.
151
        Inputs []input.Input
152

153
        // DeadlineHeight is the block height at which the tx should be
154
        // confirmed.
155
        DeadlineHeight int32
156

157
        // DeliveryAddress is the script to send the change output to.
158
        DeliveryAddress lnwallet.AddrWithKey
159

160
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
161
        MaxFeeRate chainfee.SatPerKWeight
162

163
        // StartingFeeRate is an optional parameter that can be used to specify
164
        // the initial fee rate to use for the fee function.
165
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
166

167
        // ExtraTxOut tracks if this bump request has an optional set of extra
168
        // outputs to add to the transaction.
169
        ExtraTxOut fn.Option[SweepOutput]
170

171
        // Immediate is used to specify that the tx should be broadcast
172
        // immediately.
173
        Immediate bool
174
}
175

176
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
177
// request. It calculates the feerate using the supplied budget and the weight,
178
// compares it with the specified MaxFeeRate, and returns the smaller of the
179
// two.
180
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
14✔
181
        // We'll want to know if we have any blobs, as we need to factor this
14✔
182
        // into the max fee rate for this bump request.
14✔
183
        hasBlobs := fn.Any(r.Inputs, func(i input.Input) bool {
27✔
184
                return fn.MapOptionZ(
13✔
185
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
16✔
186
                                return len(b) > 0
3✔
187
                        },
3✔
188
                )
189
        })
190

191
        sweepAddrs := [][]byte{
14✔
192
                r.DeliveryAddress.DeliveryAddress,
14✔
193
        }
14✔
194

14✔
195
        // If we have blobs, then we'll add an extra sweep addr for the size
14✔
196
        // estimate below. We know that these blobs will also always be based on
14✔
197
        // p2tr addrs.
14✔
198
        if hasBlobs {
14✔
199
                // We need to pass in a real address, so we'll use a dummy
×
200
                // tapscript change script that's used elsewhere for tests.
×
201
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
202
        }
×
203

204
        // Get the size of the sweep tx, which will be used to calculate the
205
        // budget fee rate.
206
        size, err := calcSweepTxWeight(
14✔
207
                r.Inputs, sweepAddrs,
14✔
208
        )
14✔
209
        if err != nil {
15✔
210
                return 0, err
1✔
211
        }
1✔
212

213
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
214
        // This is needed as, when the input has a large value and the user
215
        // sets the budget to be proportional to the input value, the fee rate
216
        // can be very high and we need to make sure it doesn't exceed the max
217
        // fee rate.
218
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
13✔
219
        if maxFeeRateAllowed > r.MaxFeeRate {
19✔
220
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
6✔
221
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
6✔
222
                        r.MaxFeeRate, size)
6✔
223

6✔
224
                return r.MaxFeeRate, nil
6✔
225
        }
6✔
226

227
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
10✔
228
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
10✔
229

10✔
230
        return maxFeeRateAllowed, nil
10✔
231
}
232

233
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
234
// sweeping tx always has a single output(change).
235
func calcSweepTxWeight(inputs []input.Input,
236
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
17✔
237

17✔
238
        // Use a const fee rate as we only use the weight estimator to
17✔
239
        // calculate the size.
17✔
240
        const feeRate = 1
17✔
241

17✔
242
        // Initialize the tx weight estimator with,
17✔
243
        // - nil outputs as we only have one single change output.
17✔
244
        // - const fee rate as we don't care about the fees here.
17✔
245
        // - 0 maxfeerate as we don't care about fees here.
17✔
246
        //
17✔
247
        // TODO(yy): we should refactor the weight estimator to not require a
17✔
248
        // fee rate and max fee rate and make it a pure tx weight calculator.
17✔
249
        _, estimator, err := getWeightEstimate(
17✔
250
                inputs, nil, feeRate, 0, outputPkScript,
17✔
251
        )
17✔
252
        if err != nil {
19✔
253
                return 0, err
2✔
254
        }
2✔
255

256
        return estimator.weight(), nil
15✔
257
}
258

259
// BumpResult is used by the Bumper to send updates about the tx being
260
// broadcast.
261
type BumpResult struct {
262
        // Event is the type of event that the result is for.
263
        Event BumpEvent
264

265
        // Tx is the tx being broadcast.
266
        Tx *wire.MsgTx
267

268
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
269
        ReplacedTx *wire.MsgTx
270

271
        // FeeRate is the fee rate used for the new tx.
272
        FeeRate chainfee.SatPerKWeight
273

274
        // Fee is the fee paid by the new tx.
275
        Fee btcutil.Amount
276

277
        // Err is the error that occurred during the broadcast.
278
        Err error
279

280
        // SpentInputs are the inputs spent by another tx which caused the
281
        // current tx to be failed.
282
        SpentInputs map[wire.OutPoint]*wire.MsgTx
283

284
        // requestID is the ID of the request that created this record.
285
        requestID uint64
286
}
287

288
// String returns a human-readable string for the result.
289
func (b *BumpResult) String() string {
3✔
290
        desc := fmt.Sprintf("Event=%v", b.Event)
3✔
291
        if b.Tx != nil {
6✔
292
                desc += fmt.Sprintf(", Tx=%v", b.Tx.TxHash())
3✔
293
        }
3✔
294

295
        return fmt.Sprintf("[%s]", desc)
3✔
296
}
297

298
// Validate validates the BumpResult so it's safe to use.
299
func (b *BumpResult) Validate() error {
14✔
300
        isFailureEvent := b.Event == TxFailed || b.Event == TxFatal ||
14✔
301
                b.Event == TxUnknownSpend
14✔
302

14✔
303
        // Every result must have a tx except the fatal or failed case.
14✔
304
        if b.Tx == nil && !isFailureEvent {
15✔
305
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
306
        }
1✔
307

308
        // Every result must have a known event.
309
        if b.Event.Unknown() {
14✔
310
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
311
        }
1✔
312

313
        // If it's a replacing event, it must have a replaced tx.
314
        if b.Event == TxReplaced && b.ReplacedTx == nil {
13✔
315
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
316
        }
1✔
317

318
        // If it's a failed or fatal event, it must have an error.
319
        if isFailureEvent && b.Err == nil {
13✔
320
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
2✔
321
        }
2✔
322

323
        return nil
9✔
324
}
325

326
// TxPublisherConfig is the config used to create a new TxPublisher.
327
type TxPublisherConfig struct {
328
        // Signer is used to create the tx signature.
329
        Signer input.Signer
330

331
        // Wallet is used primarily to publish the tx.
332
        Wallet Wallet
333

334
        // Estimator is used to estimate the fee rate for the new tx based on
335
        // its deadline conf target.
336
        Estimator chainfee.Estimator
337

338
        // Notifier is used to monitor the confirmation status of the tx.
339
        Notifier chainntnfs.ChainNotifier
340

341
        // AuxSweeper is an optional interface that can be used to modify the
342
        // way sweep transaction are generated.
343
        AuxSweeper fn.Option[AuxSweeper]
344
}
345

346
// TxPublisher is an implementation of the Bumper interface. It utilizes the
347
// `testmempoolaccept` RPC to bump the fee of txns it created based on
348
// different fee function selected or configed by the caller. Its purpose is to
349
// take a list of inputs specified, and create a tx that spends them to a
350
// specified output. It will then monitor the confirmation status of the tx,
351
// and if it's not confirmed within a certain time frame, it will attempt to
352
// bump the fee of the tx by creating a new tx that spends the same inputs to
353
// the same output, but with a higher fee rate. It will continue to do this
354
// until the tx is confirmed or the fee rate reaches the maximum fee rate
355
// specified by the caller.
356
type TxPublisher struct {
357
        started atomic.Bool
358
        stopped atomic.Bool
359

360
        // Embed the blockbeat consumer struct to get access to the method
361
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
362
        chainio.BeatConsumer
363

364
        wg sync.WaitGroup
365

366
        // cfg specifies the configuration of the TxPublisher.
367
        cfg *TxPublisherConfig
368

369
        // currentHeight is the current block height.
370
        currentHeight atomic.Int32
371

372
        // records is a map keyed by the requestCounter and the value is the tx
373
        // being monitored.
374
        records lnutils.SyncMap[uint64, *monitorRecord]
375

376
        // requestCounter is a monotonically increasing counter used to keep
377
        // track of how many requests have been made.
378
        requestCounter atomic.Uint64
379

380
        // subscriberChans is a map keyed by the requestCounter, each item is
381
        // the chan that the publisher sends the fee bump result to.
382
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
383

384
        // quit is used to signal the publisher to stop.
385
        quit chan struct{}
386
}
387

388
// Compile-time constraint to ensure TxPublisher implements Bumper.
389
var _ Bumper = (*TxPublisher)(nil)
390

391
// Compile-time check for the chainio.Consumer interface.
392
var _ chainio.Consumer = (*TxPublisher)(nil)
393

394
// NewTxPublisher creates a new TxPublisher.
395
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
24✔
396
        tp := &TxPublisher{
24✔
397
                cfg:             &cfg,
24✔
398
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
24✔
399
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
24✔
400
                quit:            make(chan struct{}),
24✔
401
        }
24✔
402

24✔
403
        // Mount the block consumer.
24✔
404
        tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
24✔
405

24✔
406
        return tp
24✔
407
}
24✔
408

409
// isNeutrinoBackend checks if the wallet backend is neutrino.
410
func (t *TxPublisher) isNeutrinoBackend() bool {
27✔
411
        return t.cfg.Wallet.BackEnd() == "neutrino"
27✔
412
}
27✔
413

414
// Broadcast is used to publish the tx created from the given inputs. It will
415
// register the broadcast request and return a chan to the caller to subscribe
416
// the broadcast result. The initial broadcast is guaranteed to be
417
// RBF-compliant unless the budget specified cannot cover the fee.
418
//
419
// NOTE: part of the Bumper interface.
420
func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult {
8✔
421
        log.Tracef("Received broadcast request: %s",
8✔
422
                lnutils.SpewLogClosure(req))
8✔
423

8✔
424
        // Store the request.
8✔
425
        record := t.storeInitialRecord(req)
8✔
426

8✔
427
        // Create a chan to send the result to the caller.
8✔
428
        subscriber := make(chan *BumpResult, 1)
8✔
429
        t.subscriberChans.Store(record.requestID, subscriber)
8✔
430

8✔
431
        // Publish the tx immediately if specified.
8✔
432
        if req.Immediate {
12✔
433
                t.handleInitialBroadcast(record)
4✔
434
        }
4✔
435

436
        return subscriber
8✔
437
}
438

439
// storeInitialRecord initializes a monitor record and saves it in the map.
440
func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord {
8✔
441
        // Increase the request counter.
8✔
442
        //
8✔
443
        // NOTE: this is the only place where we increase the counter.
8✔
444
        requestID := t.requestCounter.Add(1)
8✔
445

8✔
446
        // Register the record.
8✔
447
        record := &monitorRecord{
8✔
448
                requestID: requestID,
8✔
449
                req:       req,
8✔
450
        }
8✔
451
        t.records.Store(requestID, record)
8✔
452

8✔
453
        return record
8✔
454
}
8✔
455

456
// updateRecord updates the given record's tx and fee, and saves it in the
457
// records map.
458
func (t *TxPublisher) updateRecord(r *monitorRecord,
459
        sweepCtx *sweepTxCtx) *monitorRecord {
22✔
460

22✔
461
        r.tx = sweepCtx.tx
22✔
462
        r.fee = sweepCtx.fee
22✔
463
        r.outpointToTxIndex = sweepCtx.outpointToTxIndex
22✔
464

22✔
465
        // Register the record.
22✔
466
        t.records.Store(r.requestID, r)
22✔
467

22✔
468
        return r
22✔
469
}
22✔
470

471
// NOTE: part of the `chainio.Consumer` interface.
472
func (t *TxPublisher) Name() string {
24✔
473
        return "TxPublisher"
24✔
474
}
24✔
475

476
// initializeTx initializes a fee function and creates an RBF-compliant tx. If
477
// succeeded, the initial tx is stored in the records map.
478
func (t *TxPublisher) initializeTx(r *monitorRecord) (*monitorRecord, error) {
8✔
479
        // Create a fee bumping algorithm to be used for future RBF.
8✔
480
        feeAlgo, err := t.initializeFeeFunction(r.req)
8✔
481
        if err != nil {
12✔
482
                return nil, fmt.Errorf("init fee function: %w", err)
4✔
483
        }
4✔
484

485
        // Attach the newly created fee function.
486
        //
487
        // TODO(yy): current we'd initialize a monitorRecord before creating the
488
        // fee function, while we could instead create the fee function first
489
        // then save it to the record. To make this happen we need to change the
490
        // conf target calculation below since we would be initializing the fee
491
        // function one block before.
492
        r.feeFunction = feeAlgo
7✔
493

7✔
494
        // Create the initial tx to be broadcasted. This tx is guaranteed to
7✔
495
        // comply with the RBF restrictions.
7✔
496
        record, err := t.createRBFCompliantTx(r)
7✔
497
        if err != nil {
11✔
498
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
4✔
499
        }
4✔
500

501
        return record, nil
6✔
502
}
503

504
// initializeFeeFunction initializes a fee function to be used for this request
505
// for future fee bumping.
506
func (t *TxPublisher) initializeFeeFunction(
507
        req *BumpRequest) (FeeFunction, error) {
11✔
508

11✔
509
        // Get the max allowed feerate.
11✔
510
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
11✔
511
        if err != nil {
11✔
512
                return nil, err
×
513
        }
×
514

515
        // Get the initial conf target.
516
        confTarget := calcCurrentConfTarget(
11✔
517
                t.currentHeight.Load(), req.DeadlineHeight,
11✔
518
        )
11✔
519

11✔
520
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
11✔
521
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
11✔
522
                maxFeeRateAllowed)
11✔
523

11✔
524
        // Initialize the fee function and return it.
11✔
525
        //
11✔
526
        // TODO(yy): return based on differet req.Strategy?
11✔
527
        return NewLinearFeeFunction(
11✔
528
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
11✔
529
                req.StartingFeeRate,
11✔
530
        )
11✔
531
}
532

533
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
534
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
535
// and redo the process until the tx is valid, or return an error when non-RBF
536
// related errors occur or the budget has been used up.
537
//
538
// NOTE: For neutrino backend, it will use PublishTransaction instead due to
539
// CheckMempoolAcceptance not available.
540
func (t *TxPublisher) createRBFCompliantTx(
541
        r *monitorRecord) (*monitorRecord, error) {
13✔
542

13✔
543
        f := r.feeFunction
13✔
544

13✔
545
        for {
29✔
546
                // Create a new tx with the given fee rate and check its
16✔
547
                // mempool acceptance.
16✔
548
                sweepCtx, err := t.createAndCheckTx(r)
16✔
549

16✔
550
                switch {
16✔
551
                case err == nil:
10✔
552
                        // The tx is valid, store it.
10✔
553
                        record := t.updateRecord(r, sweepCtx)
10✔
554

10✔
555
                        log.Infof("Created initial sweep tx=%v for %v inputs: "+
10✔
556
                                "feerate=%v, fee=%v, inputs:\n%v",
10✔
557
                                sweepCtx.tx.TxHash(), len(r.req.Inputs),
10✔
558
                                f.FeeRate(), sweepCtx.fee,
10✔
559
                                inputTypeSummary(r.req.Inputs))
10✔
560

10✔
561
                        return record, nil
10✔
562

563
                // If the error indicates the fees paid is not enough, we will
564
                // ask the fee function to increase the fee rate and retry.
565
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
566
                        // We should at least start with a feerate above the
2✔
567
                        // mempool min feerate, so if we get this error, it
2✔
568
                        // means something is wrong earlier in the pipeline.
2✔
569
                        log.Errorf("Current fee=%v, feerate=%v, %v",
2✔
570
                                sweepCtx.fee, f.FeeRate(), err)
2✔
571

2✔
572
                        fallthrough
2✔
573

574
                // We are not paying enough fees so we increase it.
575
                case errors.Is(err, chain.ErrInsufficientFee):
7✔
576
                        increased := false
7✔
577

7✔
578
                        // Keep calling the fee function until the fee rate is
7✔
579
                        // increased or maxed out.
7✔
580
                        for !increased {
15✔
581
                                log.Debugf("Increasing fee for next round, "+
8✔
582
                                        "current fee=%v, feerate=%v",
8✔
583
                                        sweepCtx.fee, f.FeeRate())
8✔
584

8✔
585
                                // If the fee function tells us that we have
8✔
586
                                // used up the budget, we will return an error
8✔
587
                                // indicating this tx cannot be made. The
8✔
588
                                // sweeper should handle this error and try to
8✔
589
                                // cluster these inputs differetly.
8✔
590
                                increased, err = f.Increment()
8✔
591
                                if err != nil {
11✔
592
                                        return nil, err
3✔
593
                                }
3✔
594
                        }
595

596
                // TODO(yy): suppose there's only one bad input, we can do a
597
                // binary search to find out which input is causing this error
598
                // by recreating a tx using half of the inputs and check its
599
                // mempool acceptance.
600
                default:
5✔
601
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
5✔
602
                        return nil, err
5✔
603
                }
604
        }
605
}
606

607
// createAndCheckTx creates a tx based on the given inputs, change output
608
// script, and the fee rate. In addition, it validates the tx's mempool
609
// acceptance before returning a tx that can be published directly, along with
610
// its fee.
611
//
612
// NOTE: For neutrino backend, it will use PublishTransaction instead due to
613
// CheckMempoolAcceptance not available.
614
func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) {
26✔
615
        req := r.req
26✔
616
        f := r.feeFunction
26✔
617

26✔
618
        // Create the sweep tx with max fee rate of 0 as the fee function
26✔
619
        // guarantees the fee rate used here won't exceed the max fee rate.
26✔
620
        sweepCtx, err := t.createSweepTx(
26✔
621
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
26✔
622
        )
26✔
623
        if err != nil {
29✔
624
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
3✔
625
        }
3✔
626

627
        // Sanity check the budget still covers the fee.
628
        if sweepCtx.fee > req.Budget {
28✔
629
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
630
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
2✔
631
        }
2✔
632

633
        // If we had an extra txOut, then we'll update the result to include
634
        // it.
635
        req.ExtraTxOut = sweepCtx.extraTxOut
24✔
636

24✔
637
        // Validate the tx's mempool acceptance.
24✔
638
        //
24✔
639
        // For neutrino, we will publish tx instead as it's the only way to know
24✔
640
        // whether the RBF requirements are met or not.
24✔
641
        if t.isNeutrinoBackend() {
25✔
642
                err = t.cfg.Wallet.PublishTransaction(
1✔
643
                        sweepCtx.tx,
1✔
644
                        labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
1✔
645
                )
1✔
646
        } else {
24✔
647
                err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
23✔
648
        }
23✔
649

650
        // Exit early if the tx is valid.
651
        if err == nil {
39✔
652
                return sweepCtx, nil
15✔
653
        }
15✔
654

655
        // Print an error log if the chain backend doesn't support the mempool
656
        // acceptance test RPC.
657
        if errors.Is(err, rpcclient.ErrBackendVersion) {
12✔
658
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
659
                        "consider upgrading it to a newer version")
×
660
                return sweepCtx, nil
×
661
        }
×
662

663
        // We are running on a backend that doesn't implement the RPC
664
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
665
        if errors.Is(err, chain.ErrUnimplemented) {
12✔
666
                log.Debug("Skipped testmempoolaccept due to not implemented")
×
667
                return sweepCtx, nil
×
668
        }
×
669

670
        // If the inputs are spent by another tx, we will exit with the latest
671
        // sweepCtx and an error.
672
        if errors.Is(err, chain.ErrMissingInputs) {
14✔
673
                log.Debugf("Tx %v missing inputs, it's likely the input has "+
2✔
674
                        "been spent by others", sweepCtx.tx.TxHash())
2✔
675

2✔
676
                t.handleMissingInputs(r, sweepCtx)
2✔
677

2✔
678
                return sweepCtx, ErrInputMissing
2✔
679
        }
2✔
680

681
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
12✔
682
                sweepCtx.tx.TxHash(), err)
12✔
683
}
684

685
// handleMissingInputs handles the case when the chain backend reports back a
686
// missing inputs error, which could happen when one of the input has been spent
687
// in another tx, or the input is referencing an orphan. When the input is
688
// spent, it will be handled via the TxUnknownSpend flow, otherwise, a TxFatal
689
// event is notified.
690
func (t *TxPublisher) handleMissingInputs(record *monitorRecord,
691
        sweepCtx *sweepTxCtx) {
2✔
692

2✔
693
        // Make sure to update the record with the latest attempt.
2✔
694
        r := t.updateRecord(record, sweepCtx)
2✔
695

2✔
696
        // Get the spending txns.
2✔
697
        spends := t.hasInputSpent(r)
2✔
698

2✔
699
        // Attach the spending txns.
2✔
700
        r.spentInputs = spends
2✔
701

2✔
702
        // If there are no spending txns found and the input is missing, the
2✔
703
        // input is referencing an orphan tx that's no longer valid, e.g., the
2✔
704
        // spending the anchor output from the remote commitment after the local
2✔
705
        // commitment has confirmed. In this case we will mark it as fatal and
2✔
706
        // exit.
2✔
707
        if len(spends) == 0 {
4✔
708
                log.Warnf("Failing record=%v: found orphan inputs: %v\n",
2✔
709
                        r.requestID, inputTypeSummary(r.req.Inputs))
2✔
710

2✔
711
                // Create a result that will be sent to the resultChan which is
2✔
712
                // listened by the caller.
2✔
713
                result := &BumpResult{
2✔
714
                        Event:     TxFatal,
2✔
715
                        Tx:        r.tx,
2✔
716
                        requestID: r.requestID,
2✔
717
                        Err:       ErrInputMissing,
2✔
718
                }
2✔
719

2✔
720
                // Notify that this tx is confirmed and remove the record from
2✔
721
                // the map.
2✔
722
                t.handleResult(result)
2✔
723

2✔
724
                return
2✔
725
        }
2✔
726

727
        // Check there the spending tx matches the sweeping tx - given the input
728
        // is missing from the sweeping tx, it's impossible to match. We perform
729
        // a sanity to chech here to catch the unexpected state.
730
        if !t.isUnknownSpent(r, spends) {
4✔
731
                log.Errorf("Sweeping tx %v has missing inputs, yet the "+
2✔
732
                        "spending tx is the sweeping tx itself: %v",
2✔
733
                        r.tx.TxHash(), r.spentInputs)
2✔
734
        }
2✔
735

736
        t.wg.Add(1)
2✔
737
        go t.handleUnknownSpent(r)
2✔
738
}
739

740
// broadcast takes a monitored tx and publishes it to the network. Prior to the
741
// broadcast, it will subscribe the tx's confirmation notification and attach
742
// the event channel to the record. Any broadcast-related errors will not be
743
// returned here, instead, they will be put inside the `BumpResult` and
744
// returned to the caller.
745
func (t *TxPublisher) broadcast(record *monitorRecord) (*BumpResult, error) {
12✔
746
        txid := record.tx.TxHash()
12✔
747

12✔
748
        tx := record.tx
12✔
749
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
12✔
750
                txid, len(tx.TxIn), t.currentHeight.Load())
12✔
751

12✔
752
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
12✔
753
        // present of this new broadcast attempt.
12✔
754
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
21✔
755
                return aux.NotifyBroadcast(
9✔
756
                        record.req, tx, record.fee, record.outpointToTxIndex,
9✔
757
                )
9✔
758
        })
9✔
759
        if err != nil {
12✔
760
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
761
        }
×
762

763
        // Set the event, and change it to TxFailed if the wallet fails to
764
        // publish it.
765
        event := TxPublished
12✔
766

12✔
767
        // Publish the sweeping tx with customized label. If the publish fails,
12✔
768
        // this error will be saved in the `BumpResult` and it will be removed
12✔
769
        // from being monitored.
12✔
770
        err = t.cfg.Wallet.PublishTransaction(
12✔
771
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
12✔
772
        )
12✔
773
        if err != nil {
17✔
774
                // NOTE: we decide to attach this error to the result instead
5✔
775
                // of returning it here because by the time the tx reaches
5✔
776
                // here, it should have passed the mempool acceptance check. If
5✔
777
                // it still fails to be broadcast, it's likely a non-RBF
5✔
778
                // related error happened. So we send this error back to the
5✔
779
                // caller so that it can handle it properly.
5✔
780
                //
5✔
781
                // TODO(yy): find out which input is causing the failure.
5✔
782
                log.Errorf("Failed to publish tx %v: %v", txid, err)
5✔
783
                event = TxFailed
5✔
784
        }
5✔
785

786
        result := &BumpResult{
12✔
787
                Event:     event,
12✔
788
                Tx:        record.tx,
12✔
789
                Fee:       record.fee,
12✔
790
                FeeRate:   record.feeFunction.FeeRate(),
12✔
791
                Err:       err,
12✔
792
                requestID: record.requestID,
12✔
793
        }
12✔
794

12✔
795
        return result, nil
12✔
796
}
797

798
// notifyResult sends the result to the resultChan specified by the requestID.
799
// This channel is expected to be read by the caller.
800
func (t *TxPublisher) notifyResult(result *BumpResult) {
17✔
801
        id := result.requestID
17✔
802
        subscriber, ok := t.subscriberChans.Load(id)
17✔
803
        if !ok {
18✔
804
                log.Errorf("Result chan for id=%v not found", id)
1✔
805
                return
1✔
806
        }
1✔
807

808
        log.Debugf("Sending result %v for requestID=%v", result, id)
17✔
809

17✔
810
        select {
17✔
811
        // Send the result to the subscriber.
812
        //
813
        // TODO(yy): Add timeout in case it's blocking?
814
        case subscriber <- result:
16✔
815
        case <-t.quit:
3✔
816
                log.Debug("Fee bumper stopped")
3✔
817
        }
818
}
819

820
// removeResult removes the tracking of the result if the result contains a
821
// non-nil error, or the tx is confirmed, the record will be removed from the
822
// maps.
823
func (t *TxPublisher) removeResult(result *BumpResult) {
17✔
824
        id := result.requestID
17✔
825

17✔
826
        var txid chainhash.Hash
17✔
827
        if result.Tx != nil {
31✔
828
                txid = result.Tx.TxHash()
14✔
829
        }
14✔
830

831
        // Remove the record from the maps if there's an error or the tx is
832
        // confirmed. When there's an error, it means this tx has failed its
833
        // broadcast and cannot be retried. There are two cases it may fail,
834
        // - when the budget cannot cover the increased fee calculated by the
835
        //   fee function, hence the budget is used up.
836
        // - when a non-fee related error returned from PublishTransaction.
837
        switch result.Event {
17✔
838
        case TxFailed:
5✔
839
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
5✔
840
                        id, txid, result.Err)
5✔
841

842
        case TxConfirmed:
6✔
843
                // Remove the record if the tx is confirmed.
6✔
844
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
6✔
845
                        txid)
6✔
846

847
        case TxFatal:
4✔
848
                // Remove the record if there's an error.
4✔
849
                log.Debugf("Removing monitor record=%v due to fatal err: %v",
4✔
850
                        id, result.Err)
4✔
851

852
        case TxUnknownSpend:
5✔
853
                // Remove the record if there's an unknown spend.
5✔
854
                log.Debugf("Removing monitor record=%v due unknown spent: "+
5✔
855
                        "%v", id, result.Err)
5✔
856

857
        // Do nothing if it's neither failed or confirmed.
858
        default:
8✔
859
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
8✔
860
                        result.Event)
8✔
861

8✔
862
                return
8✔
863
        }
864

865
        t.records.Delete(id)
12✔
866
        t.subscriberChans.Delete(id)
12✔
867
}
868

869
// handleResult handles the result of a tx broadcast. It will notify the
870
// subscriber and remove the record if the tx is confirmed or failed to be
871
// broadcast.
872
func (t *TxPublisher) handleResult(result *BumpResult) {
14✔
873
        // Notify the subscriber.
14✔
874
        t.notifyResult(result)
14✔
875

14✔
876
        // Remove the record if it's failed or confirmed.
14✔
877
        t.removeResult(result)
14✔
878
}
14✔
879

880
// monitorRecord is used to keep track of the tx being monitored by the
881
// publisher internally.
882
type monitorRecord struct {
883
        // requestID is the ID of the request that created this record.
884
        requestID uint64
885

886
        // tx is the tx being monitored.
887
        tx *wire.MsgTx
888

889
        // req is the original request.
890
        req *BumpRequest
891

892
        // feeFunction is the fee bumping algorithm used by the publisher.
893
        feeFunction FeeFunction
894

895
        // fee is the fee paid by the tx.
896
        fee btcutil.Amount
897

898
        // outpointToTxIndex is a map of outpoint to tx index.
899
        outpointToTxIndex map[wire.OutPoint]int
900

901
        // spentInputs are the inputs spent by another tx which caused the
902
        // current tx failed.
903
        spentInputs map[wire.OutPoint]*wire.MsgTx
904
}
905

906
// Start starts the publisher by subscribing to block epoch updates and kicking
907
// off the monitor loop.
908
func (t *TxPublisher) Start(beat chainio.Blockbeat) error {
3✔
909
        log.Info("TxPublisher starting...")
3✔
910

3✔
911
        if t.started.Swap(true) {
3✔
912
                return fmt.Errorf("TxPublisher started more than once")
×
913
        }
×
914

915
        // Set the current height.
916
        t.currentHeight.Store(beat.Height())
3✔
917

3✔
918
        t.wg.Add(1)
3✔
919
        go t.monitor()
3✔
920

3✔
921
        log.Debugf("TxPublisher started")
3✔
922

3✔
923
        return nil
3✔
924
}
925

926
// Stop stops the publisher and waits for the monitor loop to exit.
927
func (t *TxPublisher) Stop() error {
3✔
928
        log.Info("TxPublisher stopping...")
3✔
929

3✔
930
        if t.stopped.Swap(true) {
3✔
931
                return fmt.Errorf("TxPublisher stopped more than once")
×
932
        }
×
933

934
        close(t.quit)
3✔
935
        t.wg.Wait()
3✔
936

3✔
937
        log.Debug("TxPublisher stopped")
3✔
938

3✔
939
        return nil
3✔
940
}
941

942
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
943
// it will examine all the txns being monitored, and check if any of them needs
944
// to be bumped. If so, it will attempt to bump the fee of the tx.
945
//
946
// NOTE: Must be run as a goroutine.
947
func (t *TxPublisher) monitor() {
3✔
948
        defer t.wg.Done()
3✔
949

3✔
950
        for {
6✔
951
                select {
3✔
952
                case beat := <-t.BlockbeatChan:
3✔
953
                        height := beat.Height()
3✔
954
                        log.Debugf("TxPublisher received new block: %v", height)
3✔
955

3✔
956
                        // Update the best known height for the publisher.
3✔
957
                        t.currentHeight.Store(height)
3✔
958

3✔
959
                        // Check all monitored txns to see if any of them needs
3✔
960
                        // to be bumped.
3✔
961
                        t.processRecords()
3✔
962

3✔
963
                        // Notify we've processed the block.
3✔
964
                        t.NotifyBlockProcessed(beat, nil)
3✔
965

966
                case <-t.quit:
3✔
967
                        log.Debug("Fee bumper stopped, exit monitor")
3✔
968
                        return
3✔
969
                }
970
        }
971
}
972

973
// processRecords checks all the txns being monitored, and checks if any of
974
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
975
func (t *TxPublisher) processRecords() {
8✔
976
        // confirmedRecords stores a map of the records which have been
8✔
977
        // confirmed.
8✔
978
        confirmedRecords := make(map[uint64]*monitorRecord)
8✔
979

8✔
980
        // feeBumpRecords stores a map of records which need to be bumped.
8✔
981
        feeBumpRecords := make(map[uint64]*monitorRecord)
8✔
982

8✔
983
        // failedRecords stores a map of records which has inputs being spent
8✔
984
        // by a third party.
8✔
985
        failedRecords := make(map[uint64]*monitorRecord)
8✔
986

8✔
987
        // initialRecords stores a map of records which are being created and
8✔
988
        // published for the first time.
8✔
989
        initialRecords := make(map[uint64]*monitorRecord)
8✔
990

8✔
991
        // visitor is a helper closure that visits each record and divides them
8✔
992
        // into two groups.
8✔
993
        visitor := func(requestID uint64, r *monitorRecord) error {
16✔
994
                log.Tracef("Checking monitor recordID=%v", requestID)
8✔
995

8✔
996
                // Check whether the inputs have already been spent.
8✔
997
                spends := t.hasInputSpent(r)
8✔
998

8✔
999
                // If the any of the inputs has been spent, the record will be
8✔
1000
                // marked as failed or confirmed.
8✔
1001
                if len(spends) != 0 {
14✔
1002
                        // Attach the spending txns.
6✔
1003
                        r.spentInputs = spends
6✔
1004

6✔
1005
                        // Check whether the inputs has been spent by a unknown
6✔
1006
                        // tx. Note the r.tx can be nil, which means we haven't
6✔
1007
                        // tried the initial broadcast yet the input is already
6✔
1008
                        // spent. This could happen when the node shuts down, a
6✔
1009
                        // previous sweeping tx confirmed, then the node comes
6✔
1010
                        // back online and reoffers the inputs. Another case is
6✔
1011
                        // the remote node spends the input quickly before we
6✔
1012
                        // even attempt the sweep. In either case we will fail
6✔
1013
                        // the record and let the sweeper handles it.
6✔
1014
                        if t.isUnknownSpent(r, spends) {
11✔
1015
                                failedRecords[requestID] = r
5✔
1016

5✔
1017
                                // Move to the next record.
5✔
1018
                                return nil
5✔
1019
                        }
5✔
1020

1021
                        // The tx is ours, we can move it to the confirmed queue
1022
                        // and stop monitoring it.
1023
                        confirmedRecords[requestID] = r
4✔
1024

4✔
1025
                        // Move to the next record.
4✔
1026
                        return nil
4✔
1027
                }
1028

1029
                // This is the first time we see this record, so we put it in
1030
                // the initial queue.
1031
                if r.tx == nil {
9✔
1032
                        initialRecords[requestID] = r
4✔
1033

4✔
1034
                        return nil
4✔
1035
                }
4✔
1036

1037
                // We can only get here when the inputs are not spent and a
1038
                // previous sweeping tx has been attempted. In this case we will
1039
                // perform an RBF on it in the current block.
1040
                feeBumpRecords[requestID] = r
4✔
1041

4✔
1042
                // Return nil to move to the next record.
4✔
1043
                return nil
4✔
1044
        }
1045

1046
        // Iterate through all the records and divide them into four groups.
1047
        t.records.ForEach(visitor)
8✔
1048

8✔
1049
        // Handle the initial broadcast.
8✔
1050
        for _, r := range initialRecords {
12✔
1051
                t.handleInitialBroadcast(r)
4✔
1052
        }
4✔
1053

1054
        // For records that are confirmed, we'll notify the caller about this
1055
        // result.
1056
        for _, r := range confirmedRecords {
12✔
1057
                t.wg.Add(1)
4✔
1058
                go t.handleTxConfirmed(r)
4✔
1059
        }
4✔
1060

1061
        // Get the current height to be used in the following goroutines.
1062
        currentHeight := t.currentHeight.Load()
8✔
1063

8✔
1064
        // For records that are not confirmed, we perform a fee bump if needed.
8✔
1065
        for _, r := range feeBumpRecords {
12✔
1066
                t.wg.Add(1)
4✔
1067
                go t.handleFeeBumpTx(r, currentHeight)
4✔
1068
        }
4✔
1069

1070
        // For records that are failed, we'll notify the caller about this
1071
        // result.
1072
        for _, r := range failedRecords {
13✔
1073
                t.wg.Add(1)
5✔
1074
                go t.handleUnknownSpent(r)
5✔
1075
        }
5✔
1076
}
1077

1078
// handleTxConfirmed is called when a monitored tx is confirmed. It will
1079
// notify the subscriber then remove the record from the maps .
1080
//
1081
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1082
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
5✔
1083
        defer t.wg.Done()
5✔
1084

5✔
1085
        log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash())
5✔
1086

5✔
1087
        // Create a result that will be sent to the resultChan which is
5✔
1088
        // listened by the caller.
5✔
1089
        result := &BumpResult{
5✔
1090
                Event:     TxConfirmed,
5✔
1091
                Tx:        r.tx,
5✔
1092
                requestID: r.requestID,
5✔
1093
                Fee:       r.fee,
5✔
1094
        }
5✔
1095

5✔
1096
        // When the record is failed before the initial broadcast is attempted,
5✔
1097
        // it will have a nil fee func.
5✔
1098
        //
5✔
1099
        // TODO(yy): can instead calculate the fee rate from the tx.
5✔
1100
        if r.feeFunction != nil {
10✔
1101
                result.FeeRate = r.feeFunction.FeeRate()
5✔
1102
        }
5✔
1103

1104
        // Notify that this tx is confirmed and remove the record from the map.
1105
        t.handleResult(result)
5✔
1106
}
1107

1108
// handleInitialTxError takes the error from `initializeTx` and decides the
1109
// bump event. It will construct a BumpResult and handles it.
1110
func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) {
5✔
1111
        // We now decide what type of event to send.
5✔
1112
        var event BumpEvent
5✔
1113

5✔
1114
        switch {
5✔
1115
        // When the error is due to a dust output, we'll send a TxFailed so
1116
        // these inputs can be retried with a different group in the next
1117
        // block.
1118
        case errors.Is(err, ErrTxNoOutput):
3✔
1119
                event = TxFailed
3✔
1120

1121
        // When the error is due to budget being used up, we'll send a TxFailed
1122
        // so these inputs can be retried with a different group in the next
1123
        // block.
1124
        case errors.Is(err, ErrMaxPosition):
2✔
1125
                event = TxFailed
2✔
1126

1127
        // When the error is due to zero fee rate delta, we'll send a TxFailed
1128
        // so these inputs can be retried in the next block.
1129
        case errors.Is(err, ErrZeroFeeRateDelta):
3✔
1130
                event = TxFailed
3✔
1131

1132
        // Exit when there are missing inputs - we'll send a TxUnknownSpend
1133
        // event in `createAndCheckTx`` so the rest of the inputs can be
1134
        // retried.
1135
        case errors.Is(err, ErrInputMissing):
2✔
1136
                return
2✔
1137

1138
        // Otherwise this is not a fee-related error and the tx cannot be
1139
        // retried. In that case we will fail ALL the inputs in this tx, which
1140
        // means they will be removed from the sweeper and never be tried
1141
        // again.
1142
        //
1143
        // TODO(yy): Find out which input is causing the failure and fail that
1144
        // one only.
1145
        default:
2✔
1146
                event = TxFatal
2✔
1147
        }
1148

1149
        result := &BumpResult{
5✔
1150
                Event:     event,
5✔
1151
                Err:       err,
5✔
1152
                requestID: requestID,
5✔
1153
        }
5✔
1154

5✔
1155
        t.handleResult(result)
5✔
1156
}
1157

1158
// handleInitialBroadcast is called when a new request is received. It will
1159
// handle the initial tx creation and broadcast. In details,
1160
// 1. init a fee function based on the given strategy.
1161
// 2. create an RBF-compliant tx and monitor it for confirmation.
1162
// 3. notify the initial broadcast result back to the caller.
1163
func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
8✔
1164
        log.Debugf("Initial broadcast for requestID=%v", r.requestID)
8✔
1165

8✔
1166
        var (
8✔
1167
                result *BumpResult
8✔
1168
                err    error
8✔
1169
        )
8✔
1170

8✔
1171
        // Attempt an initial broadcast which is guaranteed to comply with the
8✔
1172
        // RBF rules.
8✔
1173
        //
8✔
1174
        // Create the initial tx to be broadcasted.
8✔
1175
        record, err := t.initializeTx(r)
8✔
1176
        if err != nil {
13✔
1177
                log.Errorf("Initial broadcast failed: %v", err)
5✔
1178

5✔
1179
                // We now handle the initialization error and exit.
5✔
1180
                t.handleInitialTxError(r.requestID, err)
5✔
1181

5✔
1182
                return
5✔
1183
        }
5✔
1184

1185
        // For neutrino, initializeTx has already published the tx so we can
1186
        // return early.
1187
        if t.isNeutrinoBackend() {
7✔
1188
                result = &BumpResult{
1✔
1189
                        Event:     TxPublished,
1✔
1190
                        Tx:        record.tx,
1✔
1191
                        Fee:       record.fee,
1✔
1192
                        FeeRate:   record.feeFunction.FeeRate(),
1✔
1193
                        Err:       err,
1✔
1194
                        requestID: record.requestID,
1✔
1195
                }
1✔
1196
                t.handleResult(result)
1✔
1197

1✔
1198
                return
1✔
1199
        }
1✔
1200

1201
        // Successfully created the first tx, now broadcast it.
1202
        result, err = t.broadcast(record)
5✔
1203
        if err != nil {
5✔
1204
                // The broadcast failed, which can only happen if the tx record
×
1205
                // cannot be found or the aux sweeper returns an error. In
×
1206
                // either case, we will send back a TxFail event so these
×
1207
                // inputs can be retried.
×
1208
                result = &BumpResult{
×
1209
                        Event:     TxFailed,
×
1210
                        Err:       err,
×
1211
                        requestID: r.requestID,
×
1212
                }
×
1213
        }
×
1214

1215
        t.handleResult(result)
5✔
1216
}
1217

1218
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
1219
// attempt to bump the fee of the tx.
1220
//
1221
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1222
func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
7✔
1223
        defer t.wg.Done()
7✔
1224

7✔
1225
        log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(),
7✔
1226
                r.requestID)
7✔
1227

7✔
1228
        oldTxid := r.tx.TxHash()
7✔
1229

7✔
1230
        // Get the current conf target for this record.
7✔
1231
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
7✔
1232

7✔
1233
        // Ask the fee function whether a bump is needed. We expect the fee
7✔
1234
        // function to increase its returned fee rate after calling this
7✔
1235
        // method.
7✔
1236
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
7✔
1237
        if err != nil {
11✔
1238
                // TODO(yy): send this error back to the sweeper so it can
4✔
1239
                // re-group the inputs?
4✔
1240
                log.Errorf("Failed to increase fee rate for tx %v at "+
4✔
1241
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
4✔
1242

4✔
1243
                return
4✔
1244
        }
4✔
1245

1246
        // If the fee rate was not increased, there's no need to bump the fee.
1247
        if !increased {
7✔
1248
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
1✔
1249
                        t.currentHeight.Load())
1✔
1250

1✔
1251
                return
1✔
1252
        }
1✔
1253

1254
        // The fee function now has a new fee rate, we will use it to bump the
1255
        // fee of the tx.
1256
        resultOpt := t.createAndPublishTx(r)
5✔
1257

5✔
1258
        // If there's a result, we will notify the caller about the result.
5✔
1259
        resultOpt.WhenSome(func(result BumpResult) {
10✔
1260
                // Notify the new result.
5✔
1261
                t.handleResult(&result)
5✔
1262
        })
5✔
1263
}
1264

1265
// handleUnknownSpent is called when the inputs are spent by a unknown tx. It
1266
// will notify the subscriber then remove the record from the maps and send a
1267
// TxFailed event to the subscriber.
1268
//
1269
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1270
func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
5✔
1271
        defer t.wg.Done()
5✔
1272

5✔
1273
        log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+
5✔
1274
                "bumper, failing it now:\n%v", r.requestID,
5✔
1275
                inputTypeSummary(r.req.Inputs))
5✔
1276

5✔
1277
        // Get the fee function, which will be used to decided the next fee rate
5✔
1278
        // to use if the sweeper decides to retry sweeping this input.
5✔
1279
        feeFunc := r.feeFunction
5✔
1280

5✔
1281
        // When the record is failed before the initial broadcast is attempted,
5✔
1282
        // it will have a nil fee func. In this case, we'll create the fee func
5✔
1283
        // here.
5✔
1284
        //
5✔
1285
        // NOTE: Since the current record is failed and will be deleted, we
5✔
1286
        // don't need to update the record on this fee function. We only need
5✔
1287
        // the fee rate data so the sweeper can pick up where we left off.
5✔
1288
        if feeFunc == nil {
9✔
1289
                f, err := t.initializeFeeFunction(r.req)
4✔
1290
                if err != nil {
4✔
NEW
1291
                        log.Errorf("Failed to create fee func for record %v: "+
×
NEW
1292
                                "%v", r.requestID, err)
×
NEW
1293

×
NEW
1294
                        return
×
NEW
1295
                }
×
1296

1297
                feeFunc = f
4✔
1298
        }
1299

1300
        // Since the sweeping tx has been replaced by another party's tx, we
1301
        // missed this block window to increase its fee rate. To make sure the
1302
        // fee rate stays in the initial line, we now ask the fee function to
1303
        // give us the next fee rate as if the sweeping tx were RBFed. This new
1304
        // fee rate will be used as the starting fee rate if the upper system
1305
        // decides to continue sweeping the rest of the inputs.
1306
        _, err := feeFunc.Increment()
5✔
1307
        if err != nil {
6✔
1308
                // The fee function has reached its max position - nothing we
1✔
1309
                // can do here other than letting the user increase the budget.
1✔
1310
                log.Errorf("Failed to calculate the next fee rate for "+
1✔
1311
                        "Record(%v): %v", r.requestID, err)
1✔
1312
        }
1✔
1313

1314
        // Create a result that will be sent to the resultChan which is
1315
        // listened by the caller.
1316
        result := &BumpResult{
5✔
1317
                Event:       TxUnknownSpend,
5✔
1318
                Tx:          r.tx,
5✔
1319
                requestID:   r.requestID,
5✔
1320
                Err:         ErrUnknownSpent,
5✔
1321
                SpentInputs: r.spentInputs,
5✔
1322
                FeeRate:     feeFunc.FeeRate(),
5✔
1323
        }
5✔
1324

5✔
1325
        // Notify that this tx is confirmed and remove the record from the map.
5✔
1326
        t.handleResult(result)
5✔
1327
}
1328

1329
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
1330
// to the network. It will update the record with the new tx and fee rate if
1331
// successfully created, and return the result when published successfully.
1332
func (t *TxPublisher) createAndPublishTx(
1333
        r *monitorRecord) fn.Option[BumpResult] {
10✔
1334

10✔
1335
        // Fetch the old tx.
10✔
1336
        oldTx := r.tx
10✔
1337

10✔
1338
        // Create a new tx with the new fee rate.
10✔
1339
        //
10✔
1340
        // NOTE: The fee function is expected to have increased its returned
10✔
1341
        // fee rate after calling the SkipFeeBump method. So we can use it
10✔
1342
        // directly here.
10✔
1343
        sweepCtx, err := t.createAndCheckTx(r)
10✔
1344

10✔
1345
        // If the error is fee related, we will return no error and let the fee
10✔
1346
        // bumper retry it at next block.
10✔
1347
        //
10✔
1348
        // NOTE: we can check the RBF error here and ask the fee function to
10✔
1349
        // recalculate the fee rate. However, this would defeat the purpose of
10✔
1350
        // using a deadline based fee function:
10✔
1351
        // - if the deadline is far away, there's no rush to RBF the tx.
10✔
1352
        // - if the deadline is close, we expect the fee function to give us a
10✔
1353
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
10✔
1354
        //   means the budget is not enough.
10✔
1355
        if errors.Is(err, chain.ErrInsufficientFee) ||
10✔
1356
                errors.Is(err, lnwallet.ErrMempoolFee) {
15✔
1357

5✔
1358
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
5✔
1359
                return fn.None[BumpResult]()
5✔
1360
        }
5✔
1361

1362
        // If the error is not fee related, we will return a `TxFailed` event
1363
        // so this input can be retried, or `TxUnknownSpend` if the mempool
1364
        // gives us missing inputs error.
1365
        if err != nil {
9✔
1366
                switch {
1✔
1367
                // At least one of the inputs is missing, which means it has
1368
                // already been spent by another tx and confirmed. In this case
1369
                // we will handle it by spending the TxUnknownSpend event.
NEW
1370
                case errors.Is(err, ErrInputMissing):
×
NEW
1371
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
×
NEW
1372
                                err)
×
NEW
1373

×
NEW
1374
                        return fn.None[BumpResult]()
×
1375

1376
                // If the tx doesn't not have enought budget, we will return a
1377
                // result so the sweeper can handle it by re-clustering the
1378
                // utxos.
1379
                case errors.Is(err, ErrNotEnoughBudget):
1✔
1380
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
1✔
1381
                                err)
1✔
1382

NEW
1383
                default:
×
1384
                        // Otherwise, an unexpected error occurred, we will
×
1385
                        // fail the tx and let the sweeper retry the whole
×
1386
                        // process.
×
1387
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
×
1388
                                err)
×
1389
                }
1390

1391
                return fn.Some(BumpResult{
1✔
1392
                        Event:     TxFailed,
1✔
1393
                        Tx:        oldTx,
1✔
1394
                        Err:       err,
1✔
1395
                        requestID: r.requestID,
1✔
1396
                })
1✔
1397
        }
1398

1399
        // The tx has been created without any errors, we now register a new
1400
        // record by overwriting the same requestID.
1401
        record := t.updateRecord(r, sweepCtx)
7✔
1402

7✔
1403
        // Attempt to broadcast this new tx.
7✔
1404
        result, err := t.broadcast(record)
7✔
1405
        if err != nil {
7✔
1406
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1407
                        sweepCtx.tx.TxHash(), err)
×
1408

×
1409
                return fn.None[BumpResult]()
×
1410
        }
×
1411

1412
        // If the result error is fee related, we will return no error and let
1413
        // the fee bumper retry it at next block.
1414
        //
1415
        // NOTE: we may get this error if we've bypassed the mempool check,
1416
        // which means we are suing neutrino backend.
1417
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
7✔
1418
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
9✔
1419

2✔
1420
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
2✔
1421
                return fn.None[BumpResult]()
2✔
1422
        }
2✔
1423

1424
        // A successful replacement tx is created, attach the old tx.
1425
        result.ReplacedTx = oldTx
7✔
1426

7✔
1427
        // If the new tx failed to be published, we will return the result so
7✔
1428
        // the caller can handle it.
7✔
1429
        if result.Event == TxFailed {
8✔
1430
                return fn.Some(*result)
1✔
1431
        }
1✔
1432

1433
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
6✔
1434
                sweepCtx.tx.TxHash())
6✔
1435

6✔
1436
        // Otherwise, it's a successful RBF, set the event and return.
6✔
1437
        result.Event = TxReplaced
6✔
1438

6✔
1439
        return fn.Some(*result)
6✔
1440
}
1441

1442
// isUnknownSpent checks whether the inputs of the tx has already been spent by
1443
// a tx not known to us. When a tx is not confirmed, yet its inputs has been
1444
// spent, then it must be spent by a different tx other than the sweeping tx
1445
// here.
1446
//
1447
// NOTE: This method will update `r.tx` if a confirmed wallet tx is found.
1448
func (t *TxPublisher) isUnknownSpent(r *monitorRecord,
1449
        spends map[wire.OutPoint]*wire.MsgTx) bool {
6✔
1450

6✔
1451
        // walletTx is the confirmed tx found in our wallet. When it's set, it
6✔
1452
        // means  a previous tx has spent the inputs, which could happen during
6✔
1453
        // restarts.
6✔
1454
        var walletTx *wire.MsgTx
6✔
1455

6✔
1456
        // Iterate all the spending txns and check if they match the sweeping
6✔
1457
        // tx.
6✔
1458
        for op, spendingTx := range spends {
12✔
1459
                spendingTxid := spendingTx.TxHash()
6✔
1460

6✔
1461
                // If the spending tx is the same as the sweeping tx then we are
6✔
1462
                // good.
6✔
1463
                if r.tx != nil && spendingTxid == r.tx.TxHash() {
10✔
1464
                        continue
4✔
1465
                }
1466

1467
                // Fetch the wallet to see if it's our previous sweeping tx.
1468
                tx, err := t.cfg.Wallet.FetchTx(spendingTxid)
5✔
1469
                if err != nil {
5✔
NEW
1470
                        log.Errorf("Failed to FetchTx for %v: %v", spendingTxid,
×
NEW
1471
                                err)
×
NEW
1472

×
NEW
1473
                        continue
×
1474
                }
1475

1476
                // Found a wallet tx, which means a previous sweeping tx is now
1477
                // confirmed.
1478
                if tx != nil {
8✔
1479
                        // Assign the tx for the first time.
3✔
1480
                        if walletTx == nil {
6✔
1481
                                walletTx = tx
3✔
1482
                                continue
3✔
1483
                        }
1484

1485
                        // Inputs are spent via the same tx, which is the
1486
                        // expected case.
1487
                        if walletTx.TxHash() == tx.TxHash() {
2✔
1488
                                continue
1✔
1489
                        }
1490

1491
                        // We'd end up here if the inputs are spent by at least
1492
                        // two previous sweeping txns, which could happen if the
1493
                        // two sweeping txns are spending non-time sensitive
1494
                        // inputs:
1495
                        // 1. sweeping tx 1 published and stays in the mempool.
1496
                        // 2. sweeping tx 2 published and stays in the mempool.
1497
                        // 3. the node shuts down.
1498
                        // 4. the two sweeping txns confirmed.
1499
                        // 5. the node comes back and reoffers the inputs, which
1500
                        //    could be grouped together if they all have non
1501
                        //    deadline heights.
1502
                        //
1503
                        // This is fine as we will return the first tx and retry
1504
                        // the sweeping process, which will end up here again
1505
                        // with a deducted input set.
NEW
1506
                        log.Warnf("Found tx %v and %v spending the same input "+
×
NEW
1507
                                "set in record %v", walletTx.TxHash(),
×
NEW
1508
                                tx.TxHash(), r.requestID)
×
1509
                }
1510

1511
                log.Warnf("Detected unknown spent of input=%v in tx=%v", op,
5✔
1512
                        spendingTx.TxHash())
5✔
1513

5✔
1514
                return true
5✔
1515
        }
1516

1517
        // Found a previous tx, we now change our record to use this one.
1518
        if walletTx != nil {
7✔
1519
                txid := walletTx.TxHash()
3✔
1520

3✔
1521
                log.Debugf("Overwriting tx for record %v, old tx: %v, new tx: "+
3✔
1522
                        "%v", r.requestID, lnutils.NewLogClosure(func() string {
6✔
1523
                        if r.tx == nil {
6✔
1524
                                return "nil"
3✔
1525
                        }
3✔
1526

1527
                        return r.tx.TxHash().String()
3✔
1528
                }), txid)
1529

1530
                r.tx = walletTx
3✔
1531
        }
1532

1533
        return false
4✔
1534
}
1535

1536
// hasInputSpent performs a non-blocking read on the spending subscriptions to
1537
// see whether any of the monitored inputs has been spent. A map of inputs with
1538
// their spending txns are returned if found.
1539
func (t *TxPublisher) hasInputSpent(
1540
        r *monitorRecord) map[wire.OutPoint]*wire.MsgTx {
9✔
1541

9✔
1542
        // Create a slice to record the inputs spent.
9✔
1543
        spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs))
9✔
1544

9✔
1545
        // Iterate all the inputs and check if they have been spent already.
9✔
1546
        for _, inp := range r.req.Inputs {
20✔
1547
                op := inp.OutPoint()
11✔
1548

11✔
1549
                // For wallet utxos, the height hint is not set - we don't need
11✔
1550
                // to monitor them for third party spend.
11✔
1551
                //
11✔
1552
                // TODO(yy): We need to properly lock wallet utxos before
11✔
1553
                // skipping this check as the same wallet utxo can be used by
11✔
1554
                // different sweeping txns.
11✔
1555
                heightHint := inp.HeightHint()
11✔
1556
                if heightHint == 0 {
15✔
1557
                        heightHint = uint32(t.currentHeight.Load())
4✔
1558
                        log.Debugf("Checking wallet input %v using heightHint "+
4✔
1559
                                "%v", op, heightHint)
4✔
1560
                }
4✔
1561

1562
                // If the input has already been spent after the height hint, a
1563
                // spend event is sent back immediately.
1564
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
11✔
1565
                        &op, inp.SignDesc().Output.PkScript, heightHint,
11✔
1566
                )
11✔
1567
                if err != nil {
11✔
1568
                        log.Criticalf("Failed to register spend ntfn for "+
×
1569
                                "input=%v: %v", op, err)
×
1570

×
1571
                        return nil
×
1572
                }
×
1573

1574
                // Remove the subscription when exit.
1575
                defer spendEvent.Cancel()
11✔
1576

11✔
1577
                // Do a non-blocking read to see if the output has been spent.
11✔
1578
                select {
11✔
1579
                case spend, ok := <-spendEvent.Spend:
7✔
1580
                        if !ok {
7✔
1581
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1582

×
1583
                                continue
×
1584
                        }
1585

1586
                        spendingTx := spend.SpendingTx
7✔
1587

7✔
1588
                        log.Debugf("Detected spent of input=%v in tx=%v", op,
7✔
1589
                                spendingTx.TxHash())
7✔
1590

7✔
1591
                        spentInputs[op] = spendingTx
7✔
1592

1593
                // Move to the next input.
1594
                default:
7✔
1595
                        log.Tracef("Input %v not spent yet", op)
7✔
1596
                }
1597
        }
1598

1599
        return spentInputs
9✔
1600
}
1601

1602
// calcCurrentConfTarget calculates the current confirmation target based on
1603
// the deadline height. The conf target is capped at 0 if the deadline has
1604
// already been past.
1605
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
17✔
1606
        var confTarget uint32
17✔
1607

17✔
1608
        // Calculate how many blocks left until the deadline.
17✔
1609
        deadlineDelta := deadline - currentHeight
17✔
1610

17✔
1611
        // If we are already past the deadline, we will set the conf target to
17✔
1612
        // be 1.
17✔
1613
        if deadlineDelta < 0 {
24✔
1614
                log.Warnf("Deadline is %d blocks behind current height %v",
7✔
1615
                        -deadlineDelta, currentHeight)
7✔
1616

7✔
1617
                confTarget = 0
7✔
1618
        } else {
20✔
1619
                confTarget = uint32(deadlineDelta)
13✔
1620
        }
13✔
1621

1622
        return confTarget
17✔
1623
}
1624

1625
// sweepTxCtx houses a sweep transaction with additional context.
1626
type sweepTxCtx struct {
1627
        tx *wire.MsgTx
1628

1629
        fee btcutil.Amount
1630

1631
        extraTxOut fn.Option[SweepOutput]
1632

1633
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1634
        // the sweep transaction.
1635
        outpointToTxIndex map[wire.OutPoint]int
1636
}
1637

1638
// createSweepTx creates a sweeping tx based on the given inputs, change
1639
// address and fee rate.
1640
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1641
        changePkScript lnwallet.AddrWithKey,
1642
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
26✔
1643

26✔
1644
        // Validate and calculate the fee and change amount.
26✔
1645
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
26✔
1646
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
26✔
1647
                t.cfg.AuxSweeper,
26✔
1648
        )
26✔
1649
        if err != nil {
29✔
1650
                return nil, err
3✔
1651
        }
3✔
1652

1653
        var (
26✔
1654
                // Create the sweep transaction that we will be building. We
26✔
1655
                // use version 2 as it is required for CSV.
26✔
1656
                sweepTx = wire.NewMsgTx(2)
26✔
1657

26✔
1658
                // We'll add the inputs as we go so we know the final ordering
26✔
1659
                // of inputs to sign.
26✔
1660
                idxs []input.Input
26✔
1661
        )
26✔
1662

26✔
1663
        // We start by adding all inputs that commit to an output. We do this
26✔
1664
        // since the input and output index must stay the same for the
26✔
1665
        // signatures to be valid.
26✔
1666
        outpointToTxIndex := make(map[wire.OutPoint]int)
26✔
1667
        for _, o := range inputs {
52✔
1668
                if o.RequiredTxOut() == nil {
52✔
1669
                        continue
26✔
1670
                }
1671

1672
                idxs = append(idxs, o)
3✔
1673
                sweepTx.AddTxIn(&wire.TxIn{
3✔
1674
                        PreviousOutPoint: o.OutPoint(),
3✔
1675
                        Sequence:         o.BlocksToMaturity(),
3✔
1676
                })
3✔
1677
                sweepTx.AddTxOut(o.RequiredTxOut())
3✔
1678

3✔
1679
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
3✔
1680
        }
1681

1682
        // Sum up the value contained in the remaining inputs, and add them to
1683
        // the sweep transaction.
1684
        for _, o := range inputs {
52✔
1685
                if o.RequiredTxOut() != nil {
29✔
1686
                        continue
3✔
1687
                }
1688

1689
                idxs = append(idxs, o)
26✔
1690
                sweepTx.AddTxIn(&wire.TxIn{
26✔
1691
                        PreviousOutPoint: o.OutPoint(),
26✔
1692
                        Sequence:         o.BlocksToMaturity(),
26✔
1693
                })
26✔
1694
        }
1695

1696
        // If we have change outputs to add, then add it the sweep transaction
1697
        // here.
1698
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
52✔
1699
                for i := range changeOuts {
75✔
1700
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
49✔
1701
                }
49✔
1702
        })
1703

1704
        // We'll default to using the current block height as locktime, if none
1705
        // of the inputs commits to a different locktime.
1706
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
26✔
1707

26✔
1708
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
26✔
1709
        if err != nil {
26✔
1710
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
1711
                        "for hash cache: %v", err)
×
1712
        }
×
1713
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
26✔
1714

26✔
1715
        // With all the inputs in place, use each output's unique input script
26✔
1716
        // function to generate the final witness required for spending.
26✔
1717
        addInputScript := func(idx int, tso input.Input) error {
52✔
1718
                inputScript, err := tso.CraftInputScript(
26✔
1719
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
26✔
1720
                )
26✔
1721
                if err != nil {
26✔
1722
                        return err
×
1723
                }
×
1724

1725
                sweepTx.TxIn[idx].Witness = inputScript.Witness
26✔
1726

26✔
1727
                if len(inputScript.SigScript) == 0 {
52✔
1728
                        return nil
26✔
1729
                }
26✔
1730

1731
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1732

×
1733
                return nil
×
1734
        }
1735

1736
        for idx, inp := range idxs {
52✔
1737
                if err := addInputScript(idx, inp); err != nil {
26✔
1738
                        return nil, err
×
1739
                }
×
1740
        }
1741

1742
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
26✔
1743
                inputTypeSummary(inputs))
26✔
1744

26✔
1745
        // Try to locate the extra change output, though there might be None.
26✔
1746
        extraTxOut := fn.MapOption(
26✔
1747
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
52✔
1748
                        for _, sweepOut := range sweepOuts {
75✔
1749
                                if !sweepOut.IsExtra {
98✔
1750
                                        continue
49✔
1751
                                }
1752

1753
                                // If we sweep outputs of a custom channel, the
1754
                                // custom leaves in those outputs will be merged
1755
                                // into a single output, even if we sweep
1756
                                // multiple outputs (e.g. to_remote and breached
1757
                                // to_local of a breached channel) at the same
1758
                                // time. So there will only ever be one extra
1759
                                // output.
1760
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
1761
                                        lnutils.SpewLogClosure(sweepOut))
×
1762

×
1763
                                return fn.Some(sweepOut)
×
1764
                        }
1765

1766
                        return fn.None[SweepOutput]()
26✔
1767
                },
1768
        )(changeOutputsOpt)
1769

1770
        return &sweepTxCtx{
26✔
1771
                tx:                sweepTx,
26✔
1772
                fee:               txFee,
26✔
1773
                extraTxOut:        fn.FlattenOption(extraTxOut),
26✔
1774
                outpointToTxIndex: outpointToTxIndex,
26✔
1775
        }, nil
26✔
1776
}
1777

1778
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1779
// optional locktime after a series of validations:
1780
// 1. check the locktime has been reached.
1781
// 2. check the locktimes are the same.
1782
// 3. check the inputs cover the outputs.
1783
//
1784
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1785
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1786
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1787
        auxSweeper fn.Option[AuxSweeper]) (
1788
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
26✔
1789

26✔
1790
        noChange := fn.None[[]SweepOutput]()
26✔
1791
        noLocktime := fn.None[int32]()
26✔
1792

26✔
1793
        // Given the set of inputs we have, if we have an aux sweeper, then
26✔
1794
        // we'll attempt to see if we have any other change outputs we'll need
26✔
1795
        // to add to the sweep transaction.
26✔
1796
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
26✔
1797

26✔
1798
        var extraChangeOut fn.Option[SweepOutput]
26✔
1799
        err := fn.MapOptionZ(
26✔
1800
                auxSweeper, func(aux AuxSweeper) error {
49✔
1801
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
23✔
1802
                        if err := extraOut.Err(); err != nil {
23✔
1803
                                return err
×
1804
                        }
×
1805

1806
                        extraChangeOut = extraOut.LeftToSome()
23✔
1807

23✔
1808
                        return nil
23✔
1809
                },
1810
        )
1811
        if err != nil {
26✔
1812
                return 0, noChange, noLocktime, err
×
1813
        }
×
1814

1815
        // Creating a weight estimator with nil outputs and zero max fee rate.
1816
        // We don't allow adding customized outputs in the sweeping tx, and the
1817
        // fee rate is already being managed before we get here.
1818
        inputs, estimator, err := getWeightEstimate(
26✔
1819
                inputs, nil, feeRate, 0, changePkScripts,
26✔
1820
        )
26✔
1821
        if err != nil {
26✔
1822
                return 0, noChange, noLocktime, err
×
1823
        }
×
1824

1825
        txFee := estimator.fee()
26✔
1826

26✔
1827
        var (
26✔
1828
                // Track whether any of the inputs require a certain locktime.
26✔
1829
                locktime = int32(-1)
26✔
1830

26✔
1831
                // We keep track of total input amount, and required output
26✔
1832
                // amount to use for calculating the change amount below.
26✔
1833
                totalInput     btcutil.Amount
26✔
1834
                requiredOutput btcutil.Amount
26✔
1835
        )
26✔
1836

26✔
1837
        // If we have an extra change output, then we'll add it as a required
26✔
1838
        // output amt.
26✔
1839
        extraChangeOut.WhenSome(func(o SweepOutput) {
49✔
1840
                requiredOutput += btcutil.Amount(o.Value)
23✔
1841
        })
23✔
1842

1843
        // Go through each input and check if the required lock times have
1844
        // reached and are the same.
1845
        for _, o := range inputs {
52✔
1846
                // If the input has a required output, we'll add it to the
26✔
1847
                // required output amount.
26✔
1848
                if o.RequiredTxOut() != nil {
29✔
1849
                        requiredOutput += btcutil.Amount(
3✔
1850
                                o.RequiredTxOut().Value,
3✔
1851
                        )
3✔
1852
                }
3✔
1853

1854
                // Update the total input amount.
1855
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
26✔
1856

26✔
1857
                lt, ok := o.RequiredLockTime()
26✔
1858

26✔
1859
                // Skip if the input doesn't require a lock time.
26✔
1860
                if !ok {
52✔
1861
                        continue
26✔
1862
                }
1863

1864
                // Check if the lock time has reached
1865
                if lt > uint32(currentHeight) {
3✔
1866
                        return 0, noChange, noLocktime,
×
1867
                                fmt.Errorf("%w: current height is %v, "+
×
1868
                                        "locktime is %v", ErrLocktimeImmature,
×
1869
                                        currentHeight, lt)
×
1870
                }
×
1871

1872
                // If another input commits to a different locktime, they
1873
                // cannot be combined in the same transaction.
1874
                if locktime != -1 && locktime != int32(lt) {
3✔
1875
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1876
                }
×
1877

1878
                // Update the locktime for next iteration.
1879
                locktime = int32(lt)
3✔
1880
        }
1881

1882
        // Make sure total output amount is less than total input amount.
1883
        if requiredOutput+txFee > totalInput {
26✔
1884
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
1885
                        "input to create sweep tx: input_sum=%v, "+
×
1886
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
1887
        }
×
1888

1889
        // The value remaining after the required output and fees is the
1890
        // change output.
1891
        changeAmt := totalInput - requiredOutput - txFee
26✔
1892
        changeOuts := make([]SweepOutput, 0, 2)
26✔
1893

26✔
1894
        extraChangeOut.WhenSome(func(o SweepOutput) {
49✔
1895
                changeOuts = append(changeOuts, o)
23✔
1896
        })
23✔
1897

1898
        // We'll calculate the dust limit for the given changePkScript since it
1899
        // is variable.
1900
        changeFloor := lnwallet.DustLimitForSize(
26✔
1901
                len(changePkScript.DeliveryAddress),
26✔
1902
        )
26✔
1903

26✔
1904
        switch {
26✔
1905
        // If the change amount is dust, we'll move it into the fees, and
1906
        // ignore it.
1907
        case changeAmt < changeFloor:
3✔
1908
                log.Infof("Change amt %v below dustlimit %v, not adding "+
3✔
1909
                        "change output", changeAmt, changeFloor)
3✔
1910

3✔
1911
                // If there's no required output, and the change output is a
3✔
1912
                // dust, it means we are creating a tx without any outputs. In
3✔
1913
                // this case we'll return an error. This could happen when
3✔
1914
                // creating a tx that has an anchor as the only input.
3✔
1915
                if requiredOutput == 0 {
6✔
1916
                        return 0, noChange, noLocktime, ErrTxNoOutput
3✔
1917
                }
3✔
1918

1919
                // The dust amount is added to the fee.
1920
                txFee += changeAmt
×
1921

1922
        // Otherwise, we'll actually recognize it as a change output.
1923
        default:
26✔
1924
                changeOuts = append(changeOuts, SweepOutput{
26✔
1925
                        TxOut: wire.TxOut{
26✔
1926
                                Value:    int64(changeAmt),
26✔
1927
                                PkScript: changePkScript.DeliveryAddress,
26✔
1928
                        },
26✔
1929
                        IsExtra:     false,
26✔
1930
                        InternalKey: changePkScript.InternalKey,
26✔
1931
                })
26✔
1932
        }
1933

1934
        // Optionally set the locktime.
1935
        locktimeOpt := fn.Some(locktime)
26✔
1936
        if locktime == -1 {
52✔
1937
                locktimeOpt = noLocktime
26✔
1938
        }
26✔
1939

1940
        var changeOutsOpt fn.Option[[]SweepOutput]
26✔
1941
        if len(changeOuts) > 0 {
52✔
1942
                changeOutsOpt = fn.Some(changeOuts)
26✔
1943
        }
26✔
1944

1945
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
26✔
1946
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
26✔
1947
                "parents_fee=%v, parents_weight=%v, current_height=%v",
26✔
1948
                len(inputs), inputTypeSummary(inputs), feeRate,
26✔
1949
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
26✔
1950
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
26✔
1951

26✔
1952
        return txFee, changeOutsOpt, locktimeOpt, nil
26✔
1953
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc