• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13567496470

27 Feb 2025 01:26PM UTC coverage: 58.757% (-0.1%) from 58.858%
13567496470

Pull #9555

github

ellemouton
graph/db: populate the graph cache in Start instead of during construction

In this commit, we move the graph cache population logic out of the
ChannelGraph constructor and into its Start method instead.
Pull Request #9555: graph: extract cache from CRUD [6]

40 of 54 new or added lines in 4 files covered. (74.07%)

307 existing lines in 27 files now uncovered.

136396 of 232137 relevant lines covered (58.76%)

19208.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.12
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainio"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/fn/v2"
18
        "github.com/lightningnetwork/lnd/input"
19
        "github.com/lightningnetwork/lnd/labels"
20
        "github.com/lightningnetwork/lnd/lntypes"
21
        "github.com/lightningnetwork/lnd/lnutils"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/tlv"
25
)
26

27
var (
28
        // ErrInvalidBumpResult is returned when the bump result is invalid.
29
        ErrInvalidBumpResult = errors.New("invalid bump result")
30

31
        // ErrNotEnoughBudget is returned when the fee bumper decides the
32
        // current budget cannot cover the fee.
33
        ErrNotEnoughBudget = errors.New("not enough budget")
34

35
        // ErrLocktimeImmature is returned when sweeping an input whose
36
        // locktime is not reached.
37
        ErrLocktimeImmature = errors.New("immature input")
38

39
        // ErrTxNoOutput is returned when an output cannot be created during tx
40
        // preparation, usually due to the output being dust.
41
        ErrTxNoOutput = errors.New("tx has no output")
42

43
        // ErrUnknownSpent is returned when an unknown tx has spent an input in
44
        // the sweeping tx.
45
        ErrUnknownSpent = errors.New("unknown spend of input")
46

47
        // ErrInputMissing is returned when a given input no longer exists,
48
        // e.g., spending from an orphan tx.
49
        ErrInputMissing = errors.New("input no longer exists")
50
)
51

52
var (
53
        // dummyChangePkScript is a dummy tapscript change script that's used
54
        // when we don't need a real address, just something that can be used
55
        // for fee estimation.
56
        dummyChangePkScript = []byte{
57
                0x51, 0x20,
58
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
59
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
60
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
61
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
62
        }
63
)
64

65
// Bumper defines an interface that can be used by other subsystems for fee
66
// bumping.
67
type Bumper interface {
68
        // Broadcast is used to publish the tx created from the given inputs
69
        // specified in the request. It handles the tx creation, broadcasts it,
70
        // and monitors its confirmation status for potential fee bumping. It
71
        // returns a chan that the caller can use to receive updates about the
72
        // broadcast result and potential RBF attempts.
73
        Broadcast(req *BumpRequest) <-chan *BumpResult
74
}
75

76
// BumpEvent represents the event of a fee bumping attempt.
77
type BumpEvent uint8
78

79
const (
80
        // TxPublished is sent when the broadcast attempt is finished.
81
        TxPublished BumpEvent = iota
82

83
        // TxFailed is sent when the tx has encountered a fee-related error
84
        // during its creation or broadcast, or an internal error from the fee
85
        // bumper. In either case the inputs in this tx should be retried with
86
        // either a different grouping strategy or an increased budget.
87
        //
88
        // TODO(yy): Remove the above usage once we remove sweeping non-CPFP
89
        // anchors.
90
        TxFailed
91

92
        // TxReplaced is sent when the original tx is replaced by a new one.
93
        TxReplaced
94

95
        // TxConfirmed is sent when the tx is confirmed.
96
        TxConfirmed
97

98
        // TxUnknownSpend is sent when at least one of the inputs is spent but
99
        // not by the current sweeping tx, this can happen when,
100
        // - a remote party has replaced our sweeping tx by spending the
101
        //   input(s), e.g., via the direct preimage spend on our outgoing HTLC.
102
        // - a third party has replaced our sweeping tx, e.g., the anchor output
103
        //   after 16 blocks.
104
        // - A previous sweeping tx has confirmed but the fee bumper is not
105
        //   aware of it, e.g., a restart happens right after the sweeping tx is
106
        //   broadcast and confirmed.
107
        TxUnknownSpend
108

109
        // TxFatal is sent when the inputs in this tx cannot be retried. Txns
110
        // will end up in this state if they have encountered a non-fee related
111
        // error, which means they cannot be retried with increased budget.
112
        TxFatal
113

114
        // sentinalEvent is used to check if an event is unknown.
115
        sentinalEvent
116
)
117

118
// String returns a human-readable string for the event.
119
func (e BumpEvent) String() string {
2✔
120
        switch e {
2✔
121
        case TxPublished:
2✔
122
                return "Published"
2✔
123
        case TxFailed:
2✔
124
                return "Failed"
2✔
125
        case TxReplaced:
2✔
126
                return "Replaced"
2✔
127
        case TxConfirmed:
2✔
128
                return "Confirmed"
2✔
129
        case TxUnknownSpend:
2✔
130
                return "UnknownSpend"
2✔
131
        case TxFatal:
1✔
132
                return "Fatal"
1✔
133
        default:
×
134
                return "Unknown"
×
135
        }
136
}
137

138
// Unknown returns true if the event is unknown.
139
func (e BumpEvent) Unknown() bool {
13✔
140
        return e >= sentinalEvent
13✔
141
}
13✔
142

143
// BumpRequest is used by the caller to give the Bumper the necessary info to
144
// create and manage potential fee bumps for a set of inputs.
145
type BumpRequest struct {
146
        // Budget givens the total amount that can be used as fees by these
147
        // inputs.
148
        Budget btcutil.Amount
149

150
        // Inputs is the set of inputs to sweep.
151
        Inputs []input.Input
152

153
        // DeadlineHeight is the block height at which the tx should be
154
        // confirmed.
155
        DeadlineHeight int32
156

157
        // DeliveryAddress is the script to send the change output to.
158
        DeliveryAddress lnwallet.AddrWithKey
159

160
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
161
        MaxFeeRate chainfee.SatPerKWeight
162

163
        // StartingFeeRate is an optional parameter that can be used to specify
164
        // the initial fee rate to use for the fee function.
165
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
166

167
        // ExtraTxOut tracks if this bump request has an optional set of extra
168
        // outputs to add to the transaction.
169
        ExtraTxOut fn.Option[SweepOutput]
170

171
        // Immediate is used to specify that the tx should be broadcast
172
        // immediately.
173
        Immediate bool
174
}
175

176
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
177
// request. It calculates the feerate using the supplied budget and the weight,
178
// compares it with the specified MaxFeeRate, and returns the smaller of the
179
// two.
180
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
13✔
181
        // We'll want to know if we have any blobs, as we need to factor this
13✔
182
        // into the max fee rate for this bump request.
13✔
183
        hasBlobs := fn.Any(r.Inputs, func(i input.Input) bool {
25✔
184
                return fn.MapOptionZ(
12✔
185
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
14✔
186
                                return len(b) > 0
2✔
187
                        },
2✔
188
                )
189
        })
190

191
        sweepAddrs := [][]byte{
13✔
192
                r.DeliveryAddress.DeliveryAddress,
13✔
193
        }
13✔
194

13✔
195
        // If we have blobs, then we'll add an extra sweep addr for the size
13✔
196
        // estimate below. We know that these blobs will also always be based on
13✔
197
        // p2tr addrs.
13✔
198
        if hasBlobs {
13✔
199
                // We need to pass in a real address, so we'll use a dummy
×
200
                // tapscript change script that's used elsewhere for tests.
×
201
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
202
        }
×
203

204
        // Get the size of the sweep tx, which will be used to calculate the
205
        // budget fee rate.
206
        size, err := calcSweepTxWeight(
13✔
207
                r.Inputs, sweepAddrs,
13✔
208
        )
13✔
209
        if err != nil {
14✔
210
                return 0, err
1✔
211
        }
1✔
212

213
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
214
        // This is needed as, when the input has a large value and the user
215
        // sets the budget to be proportional to the input value, the fee rate
216
        // can be very high and we need to make sure it doesn't exceed the max
217
        // fee rate.
218
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
12✔
219
        if maxFeeRateAllowed > r.MaxFeeRate {
17✔
220
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
5✔
221
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
5✔
222
                        r.MaxFeeRate, size)
5✔
223

5✔
224
                return r.MaxFeeRate, nil
5✔
225
        }
5✔
226

227
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
9✔
228
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
9✔
229

9✔
230
        return maxFeeRateAllowed, nil
9✔
231
}
232

233
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
234
// sweeping tx always has a single output(change).
235
func calcSweepTxWeight(inputs []input.Input,
236
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
16✔
237

16✔
238
        // Use a const fee rate as we only use the weight estimator to
16✔
239
        // calculate the size.
16✔
240
        const feeRate = 1
16✔
241

16✔
242
        // Initialize the tx weight estimator with,
16✔
243
        // - nil outputs as we only have one single change output.
16✔
244
        // - const fee rate as we don't care about the fees here.
16✔
245
        // - 0 maxfeerate as we don't care about fees here.
16✔
246
        //
16✔
247
        // TODO(yy): we should refactor the weight estimator to not require a
16✔
248
        // fee rate and max fee rate and make it a pure tx weight calculator.
16✔
249
        _, estimator, err := getWeightEstimate(
16✔
250
                inputs, nil, feeRate, 0, outputPkScript,
16✔
251
        )
16✔
252
        if err != nil {
18✔
253
                return 0, err
2✔
254
        }
2✔
255

256
        return estimator.weight(), nil
14✔
257
}
258

259
// BumpResult is used by the Bumper to send updates about the tx being
260
// broadcast.
261
type BumpResult struct {
262
        // Event is the type of event that the result is for.
263
        Event BumpEvent
264

265
        // Tx is the tx being broadcast.
266
        Tx *wire.MsgTx
267

268
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
269
        ReplacedTx *wire.MsgTx
270

271
        // FeeRate is the fee rate used for the new tx.
272
        FeeRate chainfee.SatPerKWeight
273

274
        // Fee is the fee paid by the new tx.
275
        Fee btcutil.Amount
276

277
        // Err is the error that occurred during the broadcast.
278
        Err error
279

280
        // SpentInputs are the inputs spent by another tx which caused the
281
        // current tx to be failed.
282
        SpentInputs map[wire.OutPoint]*wire.MsgTx
283

284
        // requestID is the ID of the request that created this record.
285
        requestID uint64
286
}
287

288
// String returns a human-readable string for the result.
289
func (b *BumpResult) String() string {
2✔
290
        desc := fmt.Sprintf("Event=%v", b.Event)
2✔
291
        if b.Tx != nil {
4✔
292
                desc += fmt.Sprintf(", Tx=%v", b.Tx.TxHash())
2✔
293
        }
2✔
294

295
        return fmt.Sprintf("[%s]", desc)
2✔
296
}
297

298
// Validate validates the BumpResult so it's safe to use.
299
func (b *BumpResult) Validate() error {
14✔
300
        isFailureEvent := b.Event == TxFailed || b.Event == TxFatal ||
14✔
301
                b.Event == TxUnknownSpend
14✔
302

14✔
303
        // Every result must have a tx except the fatal or failed case.
14✔
304
        if b.Tx == nil && !isFailureEvent {
15✔
305
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
306
        }
1✔
307

308
        // Every result must have a known event.
309
        if b.Event.Unknown() {
14✔
310
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
311
        }
1✔
312

313
        // If it's a replacing event, it must have a replaced tx.
314
        if b.Event == TxReplaced && b.ReplacedTx == nil {
13✔
315
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
316
        }
1✔
317

318
        // If it's a failed or fatal event, it must have an error.
319
        if isFailureEvent && b.Err == nil {
13✔
320
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
2✔
321
        }
2✔
322

323
        // If it's a confirmed event, it must have a fee rate and fee.
324
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
10✔
325
                return fmt.Errorf("%w: missing fee rate or fee",
1✔
326
                        ErrInvalidBumpResult)
1✔
327
        }
1✔
328

329
        return nil
8✔
330
}
331

332
// TxPublisherConfig is the config used to create a new TxPublisher.
333
type TxPublisherConfig struct {
334
        // Signer is used to create the tx signature.
335
        Signer input.Signer
336

337
        // Wallet is used primarily to publish the tx.
338
        Wallet Wallet
339

340
        // Estimator is used to estimate the fee rate for the new tx based on
341
        // its deadline conf target.
342
        Estimator chainfee.Estimator
343

344
        // Notifier is used to monitor the confirmation status of the tx.
345
        Notifier chainntnfs.ChainNotifier
346

347
        // AuxSweeper is an optional interface that can be used to modify the
348
        // way sweep transaction are generated.
349
        AuxSweeper fn.Option[AuxSweeper]
350
}
351

352
// TxPublisher is an implementation of the Bumper interface. It utilizes the
353
// `testmempoolaccept` RPC to bump the fee of txns it created based on
354
// different fee function selected or configed by the caller. Its purpose is to
355
// take a list of inputs specified, and create a tx that spends them to a
356
// specified output. It will then monitor the confirmation status of the tx,
357
// and if it's not confirmed within a certain time frame, it will attempt to
358
// bump the fee of the tx by creating a new tx that spends the same inputs to
359
// the same output, but with a higher fee rate. It will continue to do this
360
// until the tx is confirmed or the fee rate reaches the maximum fee rate
361
// specified by the caller.
362
type TxPublisher struct {
363
        started atomic.Bool
364
        stopped atomic.Bool
365

366
        // Embed the blockbeat consumer struct to get access to the method
367
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
368
        chainio.BeatConsumer
369

370
        wg sync.WaitGroup
371

372
        // cfg specifies the configuration of the TxPublisher.
373
        cfg *TxPublisherConfig
374

375
        // currentHeight is the current block height.
376
        currentHeight atomic.Int32
377

378
        // records is a map keyed by the requestCounter and the value is the tx
379
        // being monitored.
380
        records lnutils.SyncMap[uint64, *monitorRecord]
381

382
        // requestCounter is a monotonically increasing counter used to keep
383
        // track of how many requests have been made.
384
        requestCounter atomic.Uint64
385

386
        // subscriberChans is a map keyed by the requestCounter, each item is
387
        // the chan that the publisher sends the fee bump result to.
388
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
389

390
        // quit is used to signal the publisher to stop.
391
        quit chan struct{}
392
}
393

394
// Compile-time constraint to ensure TxPublisher implements Bumper.
395
var _ Bumper = (*TxPublisher)(nil)
396

397
// Compile-time check for the chainio.Consumer interface.
398
var _ chainio.Consumer = (*TxPublisher)(nil)
399

400
// NewTxPublisher creates a new TxPublisher.
401
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
23✔
402
        tp := &TxPublisher{
23✔
403
                cfg:             &cfg,
23✔
404
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
23✔
405
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
23✔
406
                quit:            make(chan struct{}),
23✔
407
        }
23✔
408

23✔
409
        // Mount the block consumer.
23✔
410
        tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
23✔
411

23✔
412
        return tp
23✔
413
}
23✔
414

415
// isNeutrinoBackend checks if the wallet backend is neutrino.
416
func (t *TxPublisher) isNeutrinoBackend() bool {
×
417
        return t.cfg.Wallet.BackEnd() == "neutrino"
×
418
}
×
419

420
// Broadcast is used to publish the tx created from the given inputs. It will
421
// register the broadcast request and return a chan to the caller to subscribe
422
// the broadcast result. The initial broadcast is guaranteed to be
423
// RBF-compliant unless the budget specified cannot cover the fee.
424
//
425
// NOTE: part of the Bumper interface.
426
func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult {
7✔
427
        log.Tracef("Received broadcast request: %s",
7✔
428
                lnutils.SpewLogClosure(req))
7✔
429

7✔
430
        // Store the request.
7✔
431
        record := t.storeInitialRecord(req)
7✔
432

7✔
433
        // Create a chan to send the result to the caller.
7✔
434
        subscriber := make(chan *BumpResult, 1)
7✔
435
        t.subscriberChans.Store(record.requestID, subscriber)
7✔
436

7✔
437
        // Publish the tx immediately if specified.
7✔
438
        if req.Immediate {
10✔
439
                t.handleInitialBroadcast(record)
3✔
440
        }
3✔
441

442
        return subscriber
7✔
443
}
444

445
// storeInitialRecord initializes a monitor record and saves it in the map.
446
func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord {
7✔
447
        // Increase the request counter.
7✔
448
        //
7✔
449
        // NOTE: this is the only place where we increase the counter.
7✔
450
        requestID := t.requestCounter.Add(1)
7✔
451

7✔
452
        // Register the record.
7✔
453
        record := &monitorRecord{
7✔
454
                requestID: requestID,
7✔
455
                req:       req,
7✔
456
        }
7✔
457
        t.records.Store(requestID, record)
7✔
458

7✔
459
        return record
7✔
460
}
7✔
461

462
// updateRecord updates the given record's tx and fee, and saves it in the
463
// records map.
464
func (t *TxPublisher) updateRecord(r *monitorRecord,
465
        sweepCtx *sweepTxCtx) *monitorRecord {
21✔
466

21✔
467
        r.tx = sweepCtx.tx
21✔
468
        r.fee = sweepCtx.fee
21✔
469
        r.outpointToTxIndex = sweepCtx.outpointToTxIndex
21✔
470

21✔
471
        // Register the record.
21✔
472
        t.records.Store(r.requestID, r)
21✔
473

21✔
474
        return r
21✔
475
}
21✔
476

477
// NOTE: part of the `chainio.Consumer` interface.
478
func (t *TxPublisher) Name() string {
23✔
479
        return "TxPublisher"
23✔
480
}
23✔
481

482
// initializeTx initializes a fee function and creates an RBF-compliant tx. If
483
// succeeded, the initial tx is stored in the records map.
484
func (t *TxPublisher) initializeTx(r *monitorRecord) (*monitorRecord, error) {
7✔
485
        // Create a fee bumping algorithm to be used for future RBF.
7✔
486
        feeAlgo, err := t.initializeFeeFunction(r.req)
7✔
487
        if err != nil {
10✔
488
                return nil, fmt.Errorf("init fee function: %w", err)
3✔
489
        }
3✔
490

491
        // Attach the newly created fee function.
492
        //
493
        // TODO(yy): current we'd initialize a monitorRecord before creating the
494
        // fee function, while we could instead create the fee function first
495
        // then save it to the record. To make this happen we need to change the
496
        // conf target calculation below since we would be initializing the fee
497
        // function one block before.
498
        r.feeFunction = feeAlgo
6✔
499

6✔
500
        // Create the initial tx to be broadcasted. This tx is guaranteed to
6✔
501
        // comply with the RBF restrictions.
6✔
502
        record, err := t.createRBFCompliantTx(r)
6✔
503
        if err != nil {
9✔
504
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
3✔
505
        }
3✔
506

507
        return record, nil
5✔
508
}
509

510
// initializeFeeFunction initializes a fee function to be used for this request
511
// for future fee bumping.
512
func (t *TxPublisher) initializeFeeFunction(
513
        req *BumpRequest) (FeeFunction, error) {
10✔
514

10✔
515
        // Get the max allowed feerate.
10✔
516
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
10✔
517
        if err != nil {
10✔
518
                return nil, err
×
519
        }
×
520

521
        // Get the initial conf target.
522
        confTarget := calcCurrentConfTarget(
10✔
523
                t.currentHeight.Load(), req.DeadlineHeight,
10✔
524
        )
10✔
525

10✔
526
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
10✔
527
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
10✔
528
                maxFeeRateAllowed)
10✔
529

10✔
530
        // Initialize the fee function and return it.
10✔
531
        //
10✔
532
        // TODO(yy): return based on differet req.Strategy?
10✔
533
        return NewLinearFeeFunction(
10✔
534
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
10✔
535
                req.StartingFeeRate,
10✔
536
        )
10✔
537
}
538

539
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
540
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
541
// and redo the process until the tx is valid, or return an error when non-RBF
542
// related errors occur or the budget has been used up.
543
func (t *TxPublisher) createRBFCompliantTx(
544
        r *monitorRecord) (*monitorRecord, error) {
12✔
545

12✔
546
        f := r.feeFunction
12✔
547

12✔
548
        for {
27✔
549
                // Create a new tx with the given fee rate and check its
15✔
550
                // mempool acceptance.
15✔
551
                sweepCtx, err := t.createAndCheckTx(r)
15✔
552

15✔
553
                switch {
15✔
554
                case err == nil:
9✔
555
                        // The tx is valid, store it.
9✔
556
                        record := t.updateRecord(r, sweepCtx)
9✔
557

9✔
558
                        log.Infof("Created initial sweep tx=%v for %v inputs: "+
9✔
559
                                "feerate=%v, fee=%v, inputs:\n%v",
9✔
560
                                sweepCtx.tx.TxHash(), len(r.req.Inputs),
9✔
561
                                f.FeeRate(), sweepCtx.fee,
9✔
562
                                inputTypeSummary(r.req.Inputs))
9✔
563

9✔
564
                        return record, nil
9✔
565

566
                // If the error indicates the fees paid is not enough, we will
567
                // ask the fee function to increase the fee rate and retry.
568
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
569
                        // We should at least start with a feerate above the
2✔
570
                        // mempool min feerate, so if we get this error, it
2✔
571
                        // means something is wrong earlier in the pipeline.
2✔
572
                        log.Errorf("Current fee=%v, feerate=%v, %v",
2✔
573
                                sweepCtx.fee, f.FeeRate(), err)
2✔
574

2✔
575
                        fallthrough
2✔
576

577
                // We are not paying enough fees so we increase it.
578
                case errors.Is(err, chain.ErrInsufficientFee):
5✔
579
                        increased := false
5✔
580

5✔
581
                        // Keep calling the fee function until the fee rate is
5✔
582
                        // increased or maxed out.
5✔
583
                        for !increased {
11✔
584
                                log.Debugf("Increasing fee for next round, "+
6✔
585
                                        "current fee=%v, feerate=%v",
6✔
586
                                        sweepCtx.fee, f.FeeRate())
6✔
587

6✔
588
                                // If the fee function tells us that we have
6✔
589
                                // used up the budget, we will return an error
6✔
590
                                // indicating this tx cannot be made. The
6✔
591
                                // sweeper should handle this error and try to
6✔
592
                                // cluster these inputs differetly.
6✔
593
                                increased, err = f.Increment()
6✔
594
                                if err != nil {
8✔
595
                                        return nil, err
2✔
596
                                }
2✔
597
                        }
598

599
                // TODO(yy): suppose there's only one bad input, we can do a
600
                // binary search to find out which input is causing this error
601
                // by recreating a tx using half of the inputs and check its
602
                // mempool acceptance.
603
                default:
4✔
604
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
4✔
605
                        return nil, err
4✔
606
                }
607
        }
608
}
609

610
// createAndCheckTx creates a tx based on the given inputs, change output
611
// script, and the fee rate. In addition, it validates the tx's mempool
612
// acceptance before returning a tx that can be published directly, along with
613
// its fee.
614
func (t *TxPublisher) createAndCheckTx(r *monitorRecord) (*sweepTxCtx, error) {
25✔
615
        req := r.req
25✔
616
        f := r.feeFunction
25✔
617

25✔
618
        // Create the sweep tx with max fee rate of 0 as the fee function
25✔
619
        // guarantees the fee rate used here won't exceed the max fee rate.
25✔
620
        sweepCtx, err := t.createSweepTx(
25✔
621
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
25✔
622
        )
25✔
623
        if err != nil {
27✔
624
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
2✔
625
        }
2✔
626

627
        // Sanity check the budget still covers the fee.
628
        if sweepCtx.fee > req.Budget {
27✔
629
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
630
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
2✔
631
        }
2✔
632

633
        // If we had an extra txOut, then we'll update the result to include
634
        // it.
635
        req.ExtraTxOut = sweepCtx.extraTxOut
23✔
636

23✔
637
        // Validate the tx's mempool acceptance.
23✔
638
        err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
23✔
639

23✔
640
        // Exit early if the tx is valid.
23✔
641
        if err == nil {
36✔
642
                return sweepCtx, nil
13✔
643
        }
13✔
644

645
        // Print an error log if the chain backend doesn't support the mempool
646
        // acceptance test RPC.
647
        if errors.Is(err, rpcclient.ErrBackendVersion) {
11✔
648
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
649
                        "consider upgrading it to a newer version")
×
650
                return sweepCtx, nil
×
651
        }
×
652

653
        // We are running on a backend that doesn't implement the RPC
654
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
655
        if errors.Is(err, chain.ErrUnimplemented) {
12✔
656
                log.Debug("Skipped testmempoolaccept due to not implemented")
1✔
657
                return sweepCtx, nil
1✔
658
        }
1✔
659

660
        // If the inputs are spent by another tx, we will exit with the latest
661
        // sweepCtx and an error.
662
        if errors.Is(err, chain.ErrMissingInputs) {
11✔
663
                log.Debugf("Tx %v missing inputs, it's likely the input has "+
1✔
664
                        "been spent by others", sweepCtx.tx.TxHash())
1✔
665

1✔
666
                // Make sure to update the record with the latest attempt.
1✔
667
                t.updateRecord(r, sweepCtx)
1✔
668

1✔
669
                return sweepCtx, ErrInputMissing
1✔
670
        }
1✔
671

672
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
10✔
673
                sweepCtx.tx.TxHash(), err)
10✔
674
}
675

676
// handleMissingInputs handles the case when the chain backend reports back a
677
// missing inputs error, which could happen when one of the input has been spent
678
// in another tx, or the input is referencing an orphan. When the input is
679
// spent, it will be handled via the TxUnknownSpend flow by creating a
680
// TxUnknownSpend bump result, otherwise, a TxFatal bump result is returned.
681
func (t *TxPublisher) handleMissingInputs(r *monitorRecord) *BumpResult {
1✔
682
        // Get the spending txns.
1✔
683
        spends := t.getSpentInputs(r)
1✔
684

1✔
685
        // Attach the spending txns.
1✔
686
        r.spentInputs = spends
1✔
687

1✔
688
        // If there are no spending txns found and the input is missing, the
1✔
689
        // input is referencing an orphan tx that's no longer valid, e.g., the
1✔
690
        // spending the anchor output from the remote commitment after the local
1✔
691
        // commitment has confirmed. In this case we will mark it as fatal and
1✔
692
        // exit.
1✔
693
        if len(spends) == 0 {
2✔
694
                log.Warnf("Failing record=%v: found orphan inputs: %v\n",
1✔
695
                        r.requestID, inputTypeSummary(r.req.Inputs))
1✔
696

1✔
697
                // Create a result that will be sent to the resultChan which is
1✔
698
                // listened by the caller.
1✔
699
                result := &BumpResult{
1✔
700
                        Event:     TxFatal,
1✔
701
                        Tx:        r.tx,
1✔
702
                        requestID: r.requestID,
1✔
703
                        Err:       ErrInputMissing,
1✔
704
                }
1✔
705

1✔
706
                return result
1✔
707
        }
1✔
708

709
        // Check that the spending tx matches the sweeping tx - given that the
710
        // current sweeping tx has been failed due to missing inputs, the
711
        // spending tx must be a different tx, thus it should NOT be matched. We
712
        // perform a sanity check here to catch the unexpected state.
UNCOV
713
        if !t.isUnknownSpent(r, spends) {
×
714
                log.Errorf("Sweeping tx %v has missing inputs, yet the "+
×
715
                        "spending tx is the sweeping tx itself: %v",
×
716
                        r.tx.TxHash(), r.spentInputs)
×
717
        }
×
718

UNCOV
719
        return t.createUnknownSpentBumpResult(r)
×
720
}
721

722
// broadcast takes a monitored tx and publishes it to the network. Prior to the
723
// broadcast, it will subscribe the tx's confirmation notification and attach
724
// the event channel to the record. Any broadcast-related errors will not be
725
// returned here, instead, they will be put inside the `BumpResult` and
726
// returned to the caller.
727
func (t *TxPublisher) broadcast(record *monitorRecord) (*BumpResult, error) {
11✔
728
        txid := record.tx.TxHash()
11✔
729

11✔
730
        tx := record.tx
11✔
731
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
11✔
732
                txid, len(tx.TxIn), t.currentHeight.Load())
11✔
733

11✔
734
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
11✔
735
        // present of this new broadcast attempt.
11✔
736
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
20✔
737
                return aux.NotifyBroadcast(
9✔
738
                        record.req, tx, record.fee, record.outpointToTxIndex,
9✔
739
                )
9✔
740
        })
9✔
741
        if err != nil {
11✔
742
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
743
        }
×
744

745
        // Set the event, and change it to TxFailed if the wallet fails to
746
        // publish it.
747
        event := TxPublished
11✔
748

11✔
749
        // Publish the sweeping tx with customized label. If the publish fails,
11✔
750
        // this error will be saved in the `BumpResult` and it will be removed
11✔
751
        // from being monitored.
11✔
752
        err = t.cfg.Wallet.PublishTransaction(
11✔
753
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
11✔
754
        )
11✔
755
        if err != nil {
15✔
756
                // NOTE: we decide to attach this error to the result instead
4✔
757
                // of returning it here because by the time the tx reaches
4✔
758
                // here, it should have passed the mempool acceptance check. If
4✔
759
                // it still fails to be broadcast, it's likely a non-RBF
4✔
760
                // related error happened. So we send this error back to the
4✔
761
                // caller so that it can handle it properly.
4✔
762
                //
4✔
763
                // TODO(yy): find out which input is causing the failure.
4✔
764
                log.Errorf("Failed to publish tx %v: %v", txid, err)
4✔
765
                event = TxFailed
4✔
766
        }
4✔
767

768
        result := &BumpResult{
11✔
769
                Event:     event,
11✔
770
                Tx:        record.tx,
11✔
771
                Fee:       record.fee,
11✔
772
                FeeRate:   record.feeFunction.FeeRate(),
11✔
773
                Err:       err,
11✔
774
                requestID: record.requestID,
11✔
775
        }
11✔
776

11✔
777
        return result, nil
11✔
778
}
779

780
// notifyResult sends the result to the resultChan specified by the requestID.
781
// This channel is expected to be read by the caller.
782
func (t *TxPublisher) notifyResult(result *BumpResult) {
16✔
783
        id := result.requestID
16✔
784
        subscriber, ok := t.subscriberChans.Load(id)
16✔
785
        if !ok {
17✔
786
                log.Errorf("Result chan for id=%v not found", id)
1✔
787
                return
1✔
788
        }
1✔
789

790
        log.Debugf("Sending result %v for requestID=%v", result, id)
16✔
791

16✔
792
        select {
16✔
793
        // Send the result to the subscriber.
794
        //
795
        // TODO(yy): Add timeout in case it's blocking?
796
        case subscriber <- result:
15✔
797
        case <-t.quit:
1✔
798
                log.Debug("Fee bumper stopped")
1✔
799
        }
800
}
801

802
// removeResult removes the tracking of the result if the result contains a
803
// non-nil error, or the tx is confirmed, the record will be removed from the
804
// maps.
805
func (t *TxPublisher) removeResult(result *BumpResult) {
16✔
806
        id := result.requestID
16✔
807

16✔
808
        var txid chainhash.Hash
16✔
809
        if result.Tx != nil {
29✔
810
                txid = result.Tx.TxHash()
13✔
811
        }
13✔
812

813
        // Remove the record from the maps if there's an error or the tx is
814
        // confirmed. When there's an error, it means this tx has failed its
815
        // broadcast and cannot be retried. There are two cases it may fail,
816
        // - when the budget cannot cover the increased fee calculated by the
817
        //   fee function, hence the budget is used up.
818
        // - when a non-fee related error returned from PublishTransaction.
819
        switch result.Event {
16✔
820
        case TxFailed:
4✔
821
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
4✔
822
                        id, txid, result.Err)
4✔
823

824
        case TxConfirmed:
5✔
825
                // Remove the record if the tx is confirmed.
5✔
826
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
5✔
827
                        txid)
5✔
828

829
        case TxFatal:
3✔
830
                // Remove the record if there's an error.
3✔
831
                log.Debugf("Removing monitor record=%v due to fatal err: %v",
3✔
832
                        id, result.Err)
3✔
833

834
        case TxUnknownSpend:
4✔
835
                // Remove the record if there's an unknown spend.
4✔
836
                log.Debugf("Removing monitor record=%v due unknown spent: "+
4✔
837
                        "%v", id, result.Err)
4✔
838

839
        // Do nothing if it's neither failed or confirmed.
840
        default:
7✔
841
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
7✔
842
                        result.Event)
7✔
843

7✔
844
                return
7✔
845
        }
846

847
        t.records.Delete(id)
11✔
848
        t.subscriberChans.Delete(id)
11✔
849
}
850

851
// handleResult handles the result of a tx broadcast. It will notify the
852
// subscriber and remove the record if the tx is confirmed or failed to be
853
// broadcast.
854
func (t *TxPublisher) handleResult(result *BumpResult) {
13✔
855
        // Notify the subscriber.
13✔
856
        t.notifyResult(result)
13✔
857

13✔
858
        // Remove the record if it's failed or confirmed.
13✔
859
        t.removeResult(result)
13✔
860
}
13✔
861

862
// monitorRecord is used to keep track of the tx being monitored by the
863
// publisher internally.
864
type monitorRecord struct {
865
        // requestID is the ID of the request that created this record.
866
        requestID uint64
867

868
        // tx is the tx being monitored.
869
        tx *wire.MsgTx
870

871
        // req is the original request.
872
        req *BumpRequest
873

874
        // feeFunction is the fee bumping algorithm used by the publisher.
875
        feeFunction FeeFunction
876

877
        // fee is the fee paid by the tx.
878
        fee btcutil.Amount
879

880
        // outpointToTxIndex is a map of outpoint to tx index.
881
        outpointToTxIndex map[wire.OutPoint]int
882

883
        // spentInputs are the inputs spent by another tx which caused the
884
        // current tx failed.
885
        spentInputs map[wire.OutPoint]*wire.MsgTx
886
}
887

888
// Start starts the publisher by subscribing to block epoch updates and kicking
889
// off the monitor loop.
890
func (t *TxPublisher) Start(beat chainio.Blockbeat) error {
2✔
891
        log.Info("TxPublisher starting...")
2✔
892

2✔
893
        if t.started.Swap(true) {
2✔
894
                return fmt.Errorf("TxPublisher started more than once")
×
895
        }
×
896

897
        // Set the current height.
898
        t.currentHeight.Store(beat.Height())
2✔
899

2✔
900
        t.wg.Add(1)
2✔
901
        go t.monitor()
2✔
902

2✔
903
        log.Debugf("TxPublisher started")
2✔
904

2✔
905
        return nil
2✔
906
}
907

908
// Stop stops the publisher and waits for the monitor loop to exit.
909
func (t *TxPublisher) Stop() error {
2✔
910
        log.Info("TxPublisher stopping...")
2✔
911

2✔
912
        if t.stopped.Swap(true) {
2✔
913
                return fmt.Errorf("TxPublisher stopped more than once")
×
914
        }
×
915

916
        close(t.quit)
2✔
917
        t.wg.Wait()
2✔
918

2✔
919
        log.Debug("TxPublisher stopped")
2✔
920

2✔
921
        return nil
2✔
922
}
923

924
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
925
// it will examine all the txns being monitored, and check if any of them needs
926
// to be bumped. If so, it will attempt to bump the fee of the tx.
927
//
928
// NOTE: Must be run as a goroutine.
929
func (t *TxPublisher) monitor() {
2✔
930
        defer t.wg.Done()
2✔
931

2✔
932
        for {
4✔
933
                select {
2✔
934
                case beat := <-t.BlockbeatChan:
2✔
935
                        height := beat.Height()
2✔
936
                        log.Debugf("TxPublisher received new block: %v", height)
2✔
937

2✔
938
                        // Update the best known height for the publisher.
2✔
939
                        t.currentHeight.Store(height)
2✔
940

2✔
941
                        // Check all monitored txns to see if any of them needs
2✔
942
                        // to be bumped.
2✔
943
                        t.processRecords()
2✔
944

2✔
945
                        // Notify we've processed the block.
2✔
946
                        t.NotifyBlockProcessed(beat, nil)
2✔
947

948
                case <-t.quit:
2✔
949
                        log.Debug("Fee bumper stopped, exit monitor")
2✔
950
                        return
2✔
951
                }
952
        }
953
}
954

955
// processRecords checks all the txns being monitored, and checks if any of
956
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
957
func (t *TxPublisher) processRecords() {
7✔
958
        // confirmedRecords stores a map of the records which have been
7✔
959
        // confirmed.
7✔
960
        confirmedRecords := make(map[uint64]*monitorRecord)
7✔
961

7✔
962
        // feeBumpRecords stores a map of records which need to be bumped.
7✔
963
        feeBumpRecords := make(map[uint64]*monitorRecord)
7✔
964

7✔
965
        // failedRecords stores a map of records which has inputs being spent
7✔
966
        // by a third party.
7✔
967
        failedRecords := make(map[uint64]*monitorRecord)
7✔
968

7✔
969
        // initialRecords stores a map of records which are being created and
7✔
970
        // published for the first time.
7✔
971
        initialRecords := make(map[uint64]*monitorRecord)
7✔
972

7✔
973
        // visitor is a helper closure that visits each record and divides them
7✔
974
        // into two groups.
7✔
975
        visitor := func(requestID uint64, r *monitorRecord) error {
14✔
976
                log.Tracef("Checking monitor recordID=%v", requestID)
7✔
977

7✔
978
                // Check whether the inputs have already been spent.
7✔
979
                spends := t.getSpentInputs(r)
7✔
980

7✔
981
                // If the any of the inputs has been spent, the record will be
7✔
982
                // marked as failed or confirmed.
7✔
983
                if len(spends) != 0 {
12✔
984
                        // Attach the spending txns.
5✔
985
                        r.spentInputs = spends
5✔
986

5✔
987
                        // When tx is nil, it means we haven't tried the initial
5✔
988
                        // broadcast yet the input is already spent. This could
5✔
989
                        // happen when the node shuts down, a previous sweeping
5✔
990
                        // tx confirmed, then the node comes back online and
5✔
991
                        // reoffers the inputs. Another case is the remote node
5✔
992
                        // spends the input quickly before we even attempt the
5✔
993
                        // sweep. In either case we will fail the record and let
5✔
994
                        // the sweeper handles it.
5✔
995
                        if r.tx == nil {
6✔
996
                                failedRecords[requestID] = r
1✔
997
                                return nil
1✔
998
                        }
1✔
999

1000
                        // Check whether the inputs has been spent by a unknown
1001
                        // tx.
1002
                        if t.isUnknownSpent(r, spends) {
7✔
1003
                                failedRecords[requestID] = r
3✔
1004

3✔
1005
                                // Move to the next record.
3✔
1006
                                return nil
3✔
1007
                        }
3✔
1008

1009
                        // The tx is ours, we can move it to the confirmed queue
1010
                        // and stop monitoring it.
1011
                        confirmedRecords[requestID] = r
3✔
1012

3✔
1013
                        // Move to the next record.
3✔
1014
                        return nil
3✔
1015
                }
1016

1017
                // This is the first time we see this record, so we put it in
1018
                // the initial queue.
1019
                if r.tx == nil {
7✔
1020
                        initialRecords[requestID] = r
3✔
1021

3✔
1022
                        return nil
3✔
1023
                }
3✔
1024

1025
                // We can only get here when the inputs are not spent and a
1026
                // previous sweeping tx has been attempted. In this case we will
1027
                // perform an RBF on it in the current block.
1028
                feeBumpRecords[requestID] = r
3✔
1029

3✔
1030
                // Return nil to move to the next record.
3✔
1031
                return nil
3✔
1032
        }
1033

1034
        // Iterate through all the records and divide them into four groups.
1035
        t.records.ForEach(visitor)
7✔
1036

7✔
1037
        // Handle the initial broadcast.
7✔
1038
        for _, r := range initialRecords {
10✔
1039
                t.handleInitialBroadcast(r)
3✔
1040
        }
3✔
1041

1042
        // For records that are confirmed, we'll notify the caller about this
1043
        // result.
1044
        for _, r := range confirmedRecords {
10✔
1045
                t.wg.Add(1)
3✔
1046
                go t.handleTxConfirmed(r)
3✔
1047
        }
3✔
1048

1049
        // Get the current height to be used in the following goroutines.
1050
        currentHeight := t.currentHeight.Load()
7✔
1051

7✔
1052
        // For records that are not confirmed, we perform a fee bump if needed.
7✔
1053
        for _, r := range feeBumpRecords {
10✔
1054
                t.wg.Add(1)
3✔
1055
                go t.handleFeeBumpTx(r, currentHeight)
3✔
1056
        }
3✔
1057

1058
        // For records that are failed, we'll notify the caller about this
1059
        // result.
1060
        for _, r := range failedRecords {
11✔
1061
                t.wg.Add(1)
4✔
1062
                go t.handleUnknownSpent(r)
4✔
1063
        }
4✔
1064
}
1065

1066
// handleTxConfirmed is called when a monitored tx is confirmed. It will
1067
// notify the subscriber then remove the record from the maps .
1068
//
1069
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1070
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
4✔
1071
        defer t.wg.Done()
4✔
1072

4✔
1073
        log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash())
4✔
1074

4✔
1075
        // Create a result that will be sent to the resultChan which is
4✔
1076
        // listened by the caller.
4✔
1077
        result := &BumpResult{
4✔
1078
                Event:     TxConfirmed,
4✔
1079
                Tx:        r.tx,
4✔
1080
                requestID: r.requestID,
4✔
1081
                Fee:       r.fee,
4✔
1082
                FeeRate:   r.feeFunction.FeeRate(),
4✔
1083
        }
4✔
1084

4✔
1085
        // Notify that this tx is confirmed and remove the record from the map.
4✔
1086
        t.handleResult(result)
4✔
1087
}
4✔
1088

1089
// handleInitialTxError takes the error from `initializeTx` and decides the
1090
// bump event. It will construct a BumpResult and handles it.
1091
func (t *TxPublisher) handleInitialTxError(r *monitorRecord, err error) {
4✔
1092
        // Create a bump result to be sent to the sweeper.
4✔
1093
        result := &BumpResult{
4✔
1094
                Err:       err,
4✔
1095
                requestID: r.requestID,
4✔
1096
        }
4✔
1097

4✔
1098
        // We now decide what type of event to send.
4✔
1099
        switch {
4✔
1100
        // When the error is due to a dust output, we'll send a TxFailed so
1101
        // these inputs can be retried with a different group in the next
1102
        // block.
1103
        case errors.Is(err, ErrTxNoOutput):
2✔
1104
                result.Event = TxFailed
2✔
1105

1106
        // When the error is due to budget being used up, we'll send a TxFailed
1107
        // so these inputs can be retried with a different group in the next
1108
        // block.
1109
        case errors.Is(err, ErrMaxPosition):
1✔
1110
                result.Event = TxFailed
1✔
1111

1112
        // When the error is due to zero fee rate delta, we'll send a TxFailed
1113
        // so these inputs can be retried in the next block.
1114
        case errors.Is(err, ErrZeroFeeRateDelta):
2✔
1115
                result.Event = TxFailed
2✔
1116

1117
        // When there are missing inputs, we'll create a TxUnknownSpend bump
1118
        // result here so the rest of the inputs can be retried.
1119
        case errors.Is(err, ErrInputMissing):
1✔
1120
                result = t.handleMissingInputs(r)
1✔
1121

1122
        // Otherwise this is not a fee-related error and the tx cannot be
1123
        // retried. In that case we will fail ALL the inputs in this tx, which
1124
        // means they will be removed from the sweeper and never be tried
1125
        // again.
1126
        //
1127
        // TODO(yy): Find out which input is causing the failure and fail that
1128
        // one only.
1129
        default:
2✔
1130
                result.Event = TxFatal
2✔
1131
        }
1132

1133
        t.handleResult(result)
4✔
1134
}
1135

1136
// handleInitialBroadcast is called when a new request is received. It will
1137
// handle the initial tx creation and broadcast. In details,
1138
// 1. init a fee function based on the given strategy.
1139
// 2. create an RBF-compliant tx and monitor it for confirmation.
1140
// 3. notify the initial broadcast result back to the caller.
1141
func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
7✔
1142
        log.Debugf("Initial broadcast for requestID=%v", r.requestID)
7✔
1143

7✔
1144
        var (
7✔
1145
                result *BumpResult
7✔
1146
                err    error
7✔
1147
        )
7✔
1148

7✔
1149
        // Attempt an initial broadcast which is guaranteed to comply with the
7✔
1150
        // RBF rules.
7✔
1151
        //
7✔
1152
        // Create the initial tx to be broadcasted.
7✔
1153
        record, err := t.initializeTx(r)
7✔
1154
        if err != nil {
11✔
1155
                log.Errorf("Initial broadcast failed: %v", err)
4✔
1156

4✔
1157
                // We now handle the initialization error and exit.
4✔
1158
                t.handleInitialTxError(r, err)
4✔
1159

4✔
1160
                return
4✔
1161
        }
4✔
1162

1163
        // Successfully created the first tx, now broadcast it.
1164
        result, err = t.broadcast(record)
5✔
1165
        if err != nil {
5✔
1166
                // The broadcast failed, which can only happen if the tx record
×
1167
                // cannot be found or the aux sweeper returns an error. In
×
1168
                // either case, we will send back a TxFail event so these
×
1169
                // inputs can be retried.
×
1170
                result = &BumpResult{
×
1171
                        Event:     TxFailed,
×
1172
                        Err:       err,
×
1173
                        requestID: r.requestID,
×
1174
                }
×
1175
        }
×
1176

1177
        t.handleResult(result)
5✔
1178
}
1179

1180
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
1181
// attempt to bump the fee of the tx.
1182
//
1183
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1184
func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
6✔
1185
        defer t.wg.Done()
6✔
1186

6✔
1187
        log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(),
6✔
1188
                r.requestID)
6✔
1189

6✔
1190
        oldTxid := r.tx.TxHash()
6✔
1191

6✔
1192
        // Get the current conf target for this record.
6✔
1193
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
6✔
1194

6✔
1195
        // Ask the fee function whether a bump is needed. We expect the fee
6✔
1196
        // function to increase its returned fee rate after calling this
6✔
1197
        // method.
6✔
1198
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
6✔
1199
        if err != nil {
9✔
1200
                // TODO(yy): send this error back to the sweeper so it can
3✔
1201
                // re-group the inputs?
3✔
1202
                log.Errorf("Failed to increase fee rate for tx %v at "+
3✔
1203
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
3✔
1204

3✔
1205
                return
3✔
1206
        }
3✔
1207

1208
        // If the fee rate was not increased, there's no need to bump the fee.
1209
        if !increased {
6✔
1210
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
1✔
1211
                        t.currentHeight.Load())
1✔
1212

1✔
1213
                return
1✔
1214
        }
1✔
1215

1216
        // The fee function now has a new fee rate, we will use it to bump the
1217
        // fee of the tx.
1218
        resultOpt := t.createAndPublishTx(r)
4✔
1219

4✔
1220
        // If there's a result, we will notify the caller about the result.
4✔
1221
        resultOpt.WhenSome(func(result BumpResult) {
8✔
1222
                // Notify the new result.
4✔
1223
                t.handleResult(&result)
4✔
1224
        })
4✔
1225
}
1226

1227
// handleUnknownSpent is called when the inputs are spent by a unknown tx. It
1228
// will notify the subscriber then remove the record from the maps and send a
1229
// TxUnknownSpend event to the subscriber.
1230
//
1231
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1232
func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
4✔
1233
        defer t.wg.Done()
4✔
1234

4✔
1235
        log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+
4✔
1236
                "bumper, failing it now:\n%v", r.requestID,
4✔
1237
                inputTypeSummary(r.req.Inputs))
4✔
1238

4✔
1239
        // Create a result that will be sent to the resultChan which is listened
4✔
1240
        // by the caller.
4✔
1241
        result := t.createUnknownSpentBumpResult(r)
4✔
1242

4✔
1243
        // Notify the sweeper about this result in the end.
4✔
1244
        t.handleResult(result)
4✔
1245
}
4✔
1246

1247
// createUnknownSpentBumpResult creates and returns a BumpResult given the
1248
// monitored record has unknown spends.
1249
func (t *TxPublisher) createUnknownSpentBumpResult(
1250
        r *monitorRecord) *BumpResult {
4✔
1251

4✔
1252
        // Create a result that will be sent to the resultChan which is listened
4✔
1253
        // by the caller.
4✔
1254
        result := &BumpResult{
4✔
1255
                Event:       TxUnknownSpend,
4✔
1256
                Tx:          r.tx,
4✔
1257
                requestID:   r.requestID,
4✔
1258
                Err:         ErrUnknownSpent,
4✔
1259
                SpentInputs: r.spentInputs,
4✔
1260
        }
4✔
1261

4✔
1262
        // Get the fee function, which will be used to decided the next fee rate
4✔
1263
        // to use if the sweeper decides to retry sweeping this input.
4✔
1264
        feeFunc := r.feeFunction
4✔
1265

4✔
1266
        // When the record is failed before the initial broadcast is attempted,
4✔
1267
        // it will have a nil fee func. In this case, we'll create the fee func
4✔
1268
        // here.
4✔
1269
        //
4✔
1270
        // NOTE: Since the current record is failed and will be deleted, we
4✔
1271
        // don't need to update the record on this fee function. We only need
4✔
1272
        // the fee rate data so the sweeper can pick up where we left off.
4✔
1273
        if feeFunc == nil {
5✔
1274
                f, err := t.initializeFeeFunction(r.req)
1✔
1275
                // TODO(yy): The only error we would receive here is when the
1✔
1276
                // pkScript is not recognized by the weightEstimator. What we
1✔
1277
                // should do instead is to check the pkScript immediately after
1✔
1278
                // receiving a sweep request so we don't need to check it again,
1✔
1279
                // which will also save us from error checking from several
1✔
1280
                // callsites.
1✔
1281
                if err != nil {
1✔
1282
                        log.Errorf("Failed to create fee func for record %v: "+
×
1283
                                "%v", r.requestID, err)
×
1284

×
1285
                        // Overwrite the event and error so the sweeper will
×
1286
                        // remove this input.
×
1287
                        result.Event = TxFatal
×
1288
                        result.Err = err
×
1289

×
1290
                        return result
×
1291
                }
×
1292

1293
                feeFunc = f
1✔
1294
        }
1295

1296
        // Since the sweeping tx has been replaced by another party's tx, we
1297
        // missed this block window to increase its fee rate. To make sure the
1298
        // fee rate stays in the initial line, we now ask the fee function to
1299
        // give us the next fee rate as if the sweeping tx were RBFed. This new
1300
        // fee rate will be used as the starting fee rate if the upper system
1301
        // decides to continue sweeping the rest of the inputs.
1302
        _, err := feeFunc.Increment()
4✔
1303
        if err != nil {
6✔
1304
                // The fee function has reached its max position - nothing we
2✔
1305
                // can do here other than letting the user increase the budget.
2✔
1306
                log.Errorf("Failed to calculate the next fee rate for "+
2✔
1307
                        "Record(%v): %v", r.requestID, err)
2✔
1308
        }
2✔
1309

1310
        // Attach the new fee rate to be used for the next sweeping attempt.
1311
        result.FeeRate = feeFunc.FeeRate()
4✔
1312

4✔
1313
        return result
4✔
1314
}
1315

1316
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
1317
// to the network. It will update the record with the new tx and fee rate if
1318
// successfully created, and return the result when published successfully.
1319
func (t *TxPublisher) createAndPublishTx(
1320
        r *monitorRecord) fn.Option[BumpResult] {
9✔
1321

9✔
1322
        // Fetch the old tx.
9✔
1323
        oldTx := r.tx
9✔
1324

9✔
1325
        // Create a new tx with the new fee rate.
9✔
1326
        //
9✔
1327
        // NOTE: The fee function is expected to have increased its returned
9✔
1328
        // fee rate after calling the SkipFeeBump method. So we can use it
9✔
1329
        // directly here.
9✔
1330
        sweepCtx, err := t.createAndCheckTx(r)
9✔
1331

9✔
1332
        // If there's an error creating the replacement tx, we need to abort the
9✔
1333
        // flow and handle it.
9✔
1334
        if err != nil {
13✔
1335
                return t.handleReplacementTxError(r, oldTx, err)
4✔
1336
        }
4✔
1337

1338
        // The tx has been created without any errors, we now register a new
1339
        // record by overwriting the same requestID.
1340
        record := t.updateRecord(r, sweepCtx)
6✔
1341

6✔
1342
        // Attempt to broadcast this new tx.
6✔
1343
        result, err := t.broadcast(record)
6✔
1344
        if err != nil {
6✔
1345
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1346
                        sweepCtx.tx.TxHash(), err)
×
1347

×
1348
                return fn.None[BumpResult]()
×
1349
        }
×
1350

1351
        // If the result error is fee related, we will return no error and let
1352
        // the fee bumper retry it at next block.
1353
        //
1354
        // NOTE: we may get this error if we've bypassed the mempool check,
1355
        // which means we are suing neutrino backend.
1356
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
6✔
1357
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
7✔
1358

1✔
1359
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(),
1✔
1360
                        result.Err)
1✔
1361

1✔
1362
                return fn.None[BumpResult]()
1✔
1363
        }
1✔
1364

1365
        // A successful replacement tx is created, attach the old tx.
1366
        result.ReplacedTx = oldTx
6✔
1367

6✔
1368
        // If the new tx failed to be published, we will return the result so
6✔
1369
        // the caller can handle it.
6✔
1370
        if result.Event == TxFailed {
7✔
1371
                return fn.Some(*result)
1✔
1372
        }
1✔
1373

1374
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
5✔
1375
                sweepCtx.tx.TxHash())
5✔
1376

5✔
1377
        // Otherwise, it's a successful RBF, set the event and return.
5✔
1378
        result.Event = TxReplaced
5✔
1379

5✔
1380
        return fn.Some(*result)
5✔
1381
}
1382

1383
// isUnknownSpent checks whether the inputs of the tx has already been spent by
1384
// a tx not known to us. When a tx is not confirmed, yet its inputs has been
1385
// spent, then it must be spent by a different tx other than the sweeping tx
1386
// here.
1387
func (t *TxPublisher) isUnknownSpent(r *monitorRecord,
1388
        spends map[wire.OutPoint]*wire.MsgTx) bool {
4✔
1389

4✔
1390
        txid := r.tx.TxHash()
4✔
1391

4✔
1392
        // Iterate all the spending txns and check if they match the sweeping
4✔
1393
        // tx.
4✔
1394
        for op, spendingTx := range spends {
8✔
1395
                spendingTxID := spendingTx.TxHash()
4✔
1396

4✔
1397
                // If the spending tx is the same as the sweeping tx then we are
4✔
1398
                // good.
4✔
1399
                if spendingTxID == txid {
7✔
1400
                        continue
3✔
1401
                }
1402

1403
                log.Warnf("Detected unknown spend of input=%v in tx=%v", op,
3✔
1404
                        spendingTx.TxHash())
3✔
1405

3✔
1406
                return true
3✔
1407
        }
1408

1409
        return false
3✔
1410
}
1411

1412
// getSpentInputs performs a non-blocking read on the spending subscriptions to
1413
// see whether any of the monitored inputs has been spent. A map of inputs with
1414
// their spending txns are returned if found.
1415
func (t *TxPublisher) getSpentInputs(
1416
        r *monitorRecord) map[wire.OutPoint]*wire.MsgTx {
8✔
1417

8✔
1418
        // Create a slice to record the inputs spent.
8✔
1419
        spentInputs := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs))
8✔
1420

8✔
1421
        // Iterate all the inputs and check if they have been spent already.
8✔
1422
        for _, inp := range r.req.Inputs {
18✔
1423
                op := inp.OutPoint()
10✔
1424

10✔
1425
                // For wallet utxos, the height hint is not set - we don't need
10✔
1426
                // to monitor them for third party spend.
10✔
1427
                //
10✔
1428
                // TODO(yy): We need to properly lock wallet utxos before
10✔
1429
                // skipping this check as the same wallet utxo can be used by
10✔
1430
                // different sweeping txns.
10✔
1431
                heightHint := inp.HeightHint()
10✔
1432
                if heightHint == 0 {
13✔
1433
                        heightHint = uint32(t.currentHeight.Load())
3✔
1434
                        log.Debugf("Checking wallet input %v using heightHint "+
3✔
1435
                                "%v", op, heightHint)
3✔
1436
                }
3✔
1437

1438
                // If the input has already been spent after the height hint, a
1439
                // spend event is sent back immediately.
1440
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
10✔
1441
                        &op, inp.SignDesc().Output.PkScript, heightHint,
10✔
1442
                )
10✔
1443
                if err != nil {
10✔
1444
                        log.Criticalf("Failed to register spend ntfn for "+
×
1445
                                "input=%v: %v", op, err)
×
1446

×
1447
                        return nil
×
1448
                }
×
1449

1450
                // Remove the subscription when exit.
1451
                defer spendEvent.Cancel()
10✔
1452

10✔
1453
                // Do a non-blocking read to see if the output has been spent.
10✔
1454
                select {
10✔
1455
                case spend, ok := <-spendEvent.Spend:
6✔
1456
                        if !ok {
6✔
1457
                                log.Debugf("Spend ntfn for %v canceled", op)
×
1458

×
1459
                                continue
×
1460
                        }
1461

1462
                        spendingTx := spend.SpendingTx
6✔
1463

6✔
1464
                        log.Debugf("Detected spent of input=%v in tx=%v", op,
6✔
1465
                                spendingTx.TxHash())
6✔
1466

6✔
1467
                        spentInputs[op] = spendingTx
6✔
1468

1469
                // Move to the next input.
1470
                default:
6✔
1471
                        log.Tracef("Input %v not spent yet", op)
6✔
1472
                }
1473
        }
1474

1475
        return spentInputs
8✔
1476
}
1477

1478
// calcCurrentConfTarget calculates the current confirmation target based on
1479
// the deadline height. The conf target is capped at 0 if the deadline has
1480
// already been past.
1481
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
16✔
1482
        var confTarget uint32
16✔
1483

16✔
1484
        // Calculate how many blocks left until the deadline.
16✔
1485
        deadlineDelta := deadline - currentHeight
16✔
1486

16✔
1487
        // If we are already past the deadline, we will set the conf target to
16✔
1488
        // be 1.
16✔
1489
        if deadlineDelta < 0 {
22✔
1490
                log.Warnf("Deadline is %d blocks behind current height %v",
6✔
1491
                        -deadlineDelta, currentHeight)
6✔
1492

6✔
1493
                confTarget = 0
6✔
1494
        } else {
18✔
1495
                confTarget = uint32(deadlineDelta)
12✔
1496
        }
12✔
1497

1498
        return confTarget
16✔
1499
}
1500

1501
// sweepTxCtx houses a sweep transaction with additional context.
1502
type sweepTxCtx struct {
1503
        tx *wire.MsgTx
1504

1505
        fee btcutil.Amount
1506

1507
        extraTxOut fn.Option[SweepOutput]
1508

1509
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1510
        // the sweep transaction.
1511
        outpointToTxIndex map[wire.OutPoint]int
1512
}
1513

1514
// createSweepTx creates a sweeping tx based on the given inputs, change
1515
// address and fee rate.
1516
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1517
        changePkScript lnwallet.AddrWithKey,
1518
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
25✔
1519

25✔
1520
        // Validate and calculate the fee and change amount.
25✔
1521
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
25✔
1522
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
25✔
1523
                t.cfg.AuxSweeper,
25✔
1524
        )
25✔
1525
        if err != nil {
27✔
1526
                return nil, err
2✔
1527
        }
2✔
1528

1529
        var (
25✔
1530
                // Create the sweep transaction that we will be building. We
25✔
1531
                // use version 2 as it is required for CSV.
25✔
1532
                sweepTx = wire.NewMsgTx(2)
25✔
1533

25✔
1534
                // We'll add the inputs as we go so we know the final ordering
25✔
1535
                // of inputs to sign.
25✔
1536
                idxs []input.Input
25✔
1537
        )
25✔
1538

25✔
1539
        // We start by adding all inputs that commit to an output. We do this
25✔
1540
        // since the input and output index must stay the same for the
25✔
1541
        // signatures to be valid.
25✔
1542
        outpointToTxIndex := make(map[wire.OutPoint]int)
25✔
1543
        for _, o := range inputs {
50✔
1544
                if o.RequiredTxOut() == nil {
50✔
1545
                        continue
25✔
1546
                }
1547

1548
                idxs = append(idxs, o)
2✔
1549
                sweepTx.AddTxIn(&wire.TxIn{
2✔
1550
                        PreviousOutPoint: o.OutPoint(),
2✔
1551
                        Sequence:         o.BlocksToMaturity(),
2✔
1552
                })
2✔
1553
                sweepTx.AddTxOut(o.RequiredTxOut())
2✔
1554

2✔
1555
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
2✔
1556
        }
1557

1558
        // Sum up the value contained in the remaining inputs, and add them to
1559
        // the sweep transaction.
1560
        for _, o := range inputs {
50✔
1561
                if o.RequiredTxOut() != nil {
27✔
1562
                        continue
2✔
1563
                }
1564

1565
                idxs = append(idxs, o)
25✔
1566
                sweepTx.AddTxIn(&wire.TxIn{
25✔
1567
                        PreviousOutPoint: o.OutPoint(),
25✔
1568
                        Sequence:         o.BlocksToMaturity(),
25✔
1569
                })
25✔
1570
        }
1571

1572
        // If we have change outputs to add, then add it the sweep transaction
1573
        // here.
1574
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
50✔
1575
                for i := range changeOuts {
73✔
1576
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
48✔
1577
                }
48✔
1578
        })
1579

1580
        // We'll default to using the current block height as locktime, if none
1581
        // of the inputs commits to a different locktime.
1582
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
25✔
1583

25✔
1584
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
25✔
1585
        if err != nil {
25✔
1586
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
1587
                        "for hash cache: %v", err)
×
1588
        }
×
1589
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
25✔
1590

25✔
1591
        // With all the inputs in place, use each output's unique input script
25✔
1592
        // function to generate the final witness required for spending.
25✔
1593
        addInputScript := func(idx int, tso input.Input) error {
50✔
1594
                inputScript, err := tso.CraftInputScript(
25✔
1595
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
25✔
1596
                )
25✔
1597
                if err != nil {
25✔
1598
                        return err
×
1599
                }
×
1600

1601
                sweepTx.TxIn[idx].Witness = inputScript.Witness
25✔
1602

25✔
1603
                if len(inputScript.SigScript) == 0 {
50✔
1604
                        return nil
25✔
1605
                }
25✔
1606

1607
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
1608

×
1609
                return nil
×
1610
        }
1611

1612
        for idx, inp := range idxs {
50✔
1613
                if err := addInputScript(idx, inp); err != nil {
25✔
1614
                        return nil, err
×
1615
                }
×
1616
        }
1617

1618
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
25✔
1619
                inputTypeSummary(inputs))
25✔
1620

25✔
1621
        // Try to locate the extra change output, though there might be None.
25✔
1622
        extraTxOut := fn.MapOption(
25✔
1623
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
50✔
1624
                        for _, sweepOut := range sweepOuts {
73✔
1625
                                if !sweepOut.IsExtra {
96✔
1626
                                        continue
48✔
1627
                                }
1628

1629
                                // If we sweep outputs of a custom channel, the
1630
                                // custom leaves in those outputs will be merged
1631
                                // into a single output, even if we sweep
1632
                                // multiple outputs (e.g. to_remote and breached
1633
                                // to_local of a breached channel) at the same
1634
                                // time. So there will only ever be one extra
1635
                                // output.
1636
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
1637
                                        lnutils.SpewLogClosure(sweepOut))
×
1638

×
1639
                                return fn.Some(sweepOut)
×
1640
                        }
1641

1642
                        return fn.None[SweepOutput]()
25✔
1643
                },
1644
        )(changeOutputsOpt)
1645

1646
        return &sweepTxCtx{
25✔
1647
                tx:                sweepTx,
25✔
1648
                fee:               txFee,
25✔
1649
                extraTxOut:        fn.FlattenOption(extraTxOut),
25✔
1650
                outpointToTxIndex: outpointToTxIndex,
25✔
1651
        }, nil
25✔
1652
}
1653

1654
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1655
// optional locktime after a series of validations:
1656
// 1. check the locktime has been reached.
1657
// 2. check the locktimes are the same.
1658
// 3. check the inputs cover the outputs.
1659
//
1660
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1661
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1662
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1663
        auxSweeper fn.Option[AuxSweeper]) (
1664
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
25✔
1665

25✔
1666
        noChange := fn.None[[]SweepOutput]()
25✔
1667
        noLocktime := fn.None[int32]()
25✔
1668

25✔
1669
        // Given the set of inputs we have, if we have an aux sweeper, then
25✔
1670
        // we'll attempt to see if we have any other change outputs we'll need
25✔
1671
        // to add to the sweep transaction.
25✔
1672
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
25✔
1673

25✔
1674
        var extraChangeOut fn.Option[SweepOutput]
25✔
1675
        err := fn.MapOptionZ(
25✔
1676
                auxSweeper, func(aux AuxSweeper) error {
48✔
1677
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
23✔
1678
                        if err := extraOut.Err(); err != nil {
23✔
1679
                                return err
×
1680
                        }
×
1681

1682
                        extraChangeOut = extraOut.LeftToSome()
23✔
1683

23✔
1684
                        return nil
23✔
1685
                },
1686
        )
1687
        if err != nil {
25✔
1688
                return 0, noChange, noLocktime, err
×
1689
        }
×
1690

1691
        // Creating a weight estimator with nil outputs and zero max fee rate.
1692
        // We don't allow adding customized outputs in the sweeping tx, and the
1693
        // fee rate is already being managed before we get here.
1694
        inputs, estimator, err := getWeightEstimate(
25✔
1695
                inputs, nil, feeRate, 0, changePkScripts,
25✔
1696
        )
25✔
1697
        if err != nil {
25✔
1698
                return 0, noChange, noLocktime, err
×
1699
        }
×
1700

1701
        txFee := estimator.fee()
25✔
1702

25✔
1703
        var (
25✔
1704
                // Track whether any of the inputs require a certain locktime.
25✔
1705
                locktime = int32(-1)
25✔
1706

25✔
1707
                // We keep track of total input amount, and required output
25✔
1708
                // amount to use for calculating the change amount below.
25✔
1709
                totalInput     btcutil.Amount
25✔
1710
                requiredOutput btcutil.Amount
25✔
1711
        )
25✔
1712

25✔
1713
        // If we have an extra change output, then we'll add it as a required
25✔
1714
        // output amt.
25✔
1715
        extraChangeOut.WhenSome(func(o SweepOutput) {
48✔
1716
                requiredOutput += btcutil.Amount(o.Value)
23✔
1717
        })
23✔
1718

1719
        // Go through each input and check if the required lock times have
1720
        // reached and are the same.
1721
        for _, o := range inputs {
50✔
1722
                // If the input has a required output, we'll add it to the
25✔
1723
                // required output amount.
25✔
1724
                if o.RequiredTxOut() != nil {
27✔
1725
                        requiredOutput += btcutil.Amount(
2✔
1726
                                o.RequiredTxOut().Value,
2✔
1727
                        )
2✔
1728
                }
2✔
1729

1730
                // Update the total input amount.
1731
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
25✔
1732

25✔
1733
                lt, ok := o.RequiredLockTime()
25✔
1734

25✔
1735
                // Skip if the input doesn't require a lock time.
25✔
1736
                if !ok {
50✔
1737
                        continue
25✔
1738
                }
1739

1740
                // Check if the lock time has reached
1741
                if lt > uint32(currentHeight) {
2✔
1742
                        return 0, noChange, noLocktime,
×
1743
                                fmt.Errorf("%w: current height is %v, "+
×
1744
                                        "locktime is %v", ErrLocktimeImmature,
×
1745
                                        currentHeight, lt)
×
1746
                }
×
1747

1748
                // If another input commits to a different locktime, they
1749
                // cannot be combined in the same transaction.
1750
                if locktime != -1 && locktime != int32(lt) {
2✔
1751
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
1752
                }
×
1753

1754
                // Update the locktime for next iteration.
1755
                locktime = int32(lt)
2✔
1756
        }
1757

1758
        // Make sure total output amount is less than total input amount.
1759
        if requiredOutput+txFee > totalInput {
25✔
1760
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
1761
                        "input to create sweep tx: input_sum=%v, "+
×
1762
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
1763
        }
×
1764

1765
        // The value remaining after the required output and fees is the
1766
        // change output.
1767
        changeAmt := totalInput - requiredOutput - txFee
25✔
1768
        changeOuts := make([]SweepOutput, 0, 2)
25✔
1769

25✔
1770
        extraChangeOut.WhenSome(func(o SweepOutput) {
48✔
1771
                changeOuts = append(changeOuts, o)
23✔
1772
        })
23✔
1773

1774
        // We'll calculate the dust limit for the given changePkScript since it
1775
        // is variable.
1776
        changeFloor := lnwallet.DustLimitForSize(
25✔
1777
                len(changePkScript.DeliveryAddress),
25✔
1778
        )
25✔
1779

25✔
1780
        switch {
25✔
1781
        // If the change amount is dust, we'll move it into the fees, and
1782
        // ignore it.
1783
        case changeAmt < changeFloor:
2✔
1784
                log.Infof("Change amt %v below dustlimit %v, not adding "+
2✔
1785
                        "change output", changeAmt, changeFloor)
2✔
1786

2✔
1787
                // If there's no required output, and the change output is a
2✔
1788
                // dust, it means we are creating a tx without any outputs. In
2✔
1789
                // this case we'll return an error. This could happen when
2✔
1790
                // creating a tx that has an anchor as the only input.
2✔
1791
                if requiredOutput == 0 {
4✔
1792
                        return 0, noChange, noLocktime, ErrTxNoOutput
2✔
1793
                }
2✔
1794

1795
                // The dust amount is added to the fee.
1796
                txFee += changeAmt
×
1797

1798
        // Otherwise, we'll actually recognize it as a change output.
1799
        default:
25✔
1800
                changeOuts = append(changeOuts, SweepOutput{
25✔
1801
                        TxOut: wire.TxOut{
25✔
1802
                                Value:    int64(changeAmt),
25✔
1803
                                PkScript: changePkScript.DeliveryAddress,
25✔
1804
                        },
25✔
1805
                        IsExtra:     false,
25✔
1806
                        InternalKey: changePkScript.InternalKey,
25✔
1807
                })
25✔
1808
        }
1809

1810
        // Optionally set the locktime.
1811
        locktimeOpt := fn.Some(locktime)
25✔
1812
        if locktime == -1 {
50✔
1813
                locktimeOpt = noLocktime
25✔
1814
        }
25✔
1815

1816
        var changeOutsOpt fn.Option[[]SweepOutput]
25✔
1817
        if len(changeOuts) > 0 {
50✔
1818
                changeOutsOpt = fn.Some(changeOuts)
25✔
1819
        }
25✔
1820

1821
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
25✔
1822
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
25✔
1823
                "parents_fee=%v, parents_weight=%v, current_height=%v",
25✔
1824
                len(inputs), inputTypeSummary(inputs), feeRate,
25✔
1825
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
25✔
1826
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
25✔
1827

25✔
1828
        return txFee, changeOutsOpt, locktimeOpt, nil
25✔
1829
}
1830

1831
// handleReplacementTxError handles the error returned from creating the
1832
// replacement tx. It returns a BumpResult that should be notified to the
1833
// sweeper.
1834
func (t *TxPublisher) handleReplacementTxError(r *monitorRecord,
1835
        oldTx *wire.MsgTx, err error) fn.Option[BumpResult] {
4✔
1836

4✔
1837
        // If the error is fee related, we will return no error and let the fee
4✔
1838
        // bumper retry it at next block.
4✔
1839
        //
4✔
1840
        // NOTE: we can check the RBF error here and ask the fee function to
4✔
1841
        // recalculate the fee rate. However, this would defeat the purpose of
4✔
1842
        // using a deadline based fee function:
4✔
1843
        // - if the deadline is far away, there's no rush to RBF the tx.
4✔
1844
        // - if the deadline is close, we expect the fee function to give us a
4✔
1845
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
4✔
1846
        //   means the budget is not enough.
4✔
1847
        if errors.Is(err, chain.ErrInsufficientFee) ||
4✔
1848
                errors.Is(err, lnwallet.ErrMempoolFee) {
7✔
1849

3✔
1850
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
3✔
1851
                return fn.None[BumpResult]()
3✔
1852
        }
3✔
1853

1854
        // At least one of the inputs is missing, which means it has already
1855
        // been spent by another tx and confirmed. In this case we will handle
1856
        // it by returning a TxUnknownSpend bump result.
1857
        if errors.Is(err, ErrInputMissing) {
1✔
1858
                log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
×
1859
                bumpResult := t.handleMissingInputs(r)
×
1860

×
1861
                return fn.Some(*bumpResult)
×
1862
        }
×
1863

1864
        // If the error is not fee related, we will return a `TxFailed` event
1865
        // so this input can be retried.
1866
        result := fn.Some(BumpResult{
1✔
1867
                Event:     TxFailed,
1✔
1868
                Tx:        oldTx,
1✔
1869
                Err:       err,
1✔
1870
                requestID: r.requestID,
1✔
1871
        })
1✔
1872

1✔
1873
        // If the tx doesn't not have enought budget, we will return a result so
1✔
1874
        // the sweeper can handle it by re-clustering the utxos.
1✔
1875
        if errors.Is(err, ErrNotEnoughBudget) {
2✔
1876
                log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(), err)
1✔
1877
                return result
1✔
1878
        }
1✔
1879

1880
        // Otherwise, an unexpected error occurred, we will log an error and let
1881
        // the sweeper retry the whole process.
1882
        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
×
1883

×
1884
        return result
×
1885
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc