• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.98
/sweep/fee_bumper.go
1
package sweep
2

3
import (
4
        "errors"
5
        "fmt"
6
        "sync"
7
        "sync/atomic"
8

9
        "github.com/btcsuite/btcd/btcutil"
10
        "github.com/btcsuite/btcd/chaincfg/chainhash"
11
        "github.com/btcsuite/btcd/rpcclient"
12
        "github.com/btcsuite/btcd/txscript"
13
        "github.com/btcsuite/btcd/wire"
14
        "github.com/btcsuite/btcwallet/chain"
15
        "github.com/lightningnetwork/lnd/chainio"
16
        "github.com/lightningnetwork/lnd/chainntnfs"
17
        "github.com/lightningnetwork/lnd/fn/v2"
18
        "github.com/lightningnetwork/lnd/input"
19
        "github.com/lightningnetwork/lnd/labels"
20
        "github.com/lightningnetwork/lnd/lntypes"
21
        "github.com/lightningnetwork/lnd/lnutils"
22
        "github.com/lightningnetwork/lnd/lnwallet"
23
        "github.com/lightningnetwork/lnd/lnwallet/chainfee"
24
        "github.com/lightningnetwork/lnd/tlv"
25
)
26

27
var (
28
        // ErrInvalidBumpResult is returned when the bump result is invalid.
29
        ErrInvalidBumpResult = errors.New("invalid bump result")
30

31
        // ErrNotEnoughBudget is returned when the fee bumper decides the
32
        // current budget cannot cover the fee.
33
        ErrNotEnoughBudget = errors.New("not enough budget")
34

35
        // ErrLocktimeImmature is returned when sweeping an input whose
36
        // locktime is not reached.
37
        ErrLocktimeImmature = errors.New("immature input")
38

39
        // ErrTxNoOutput is returned when an output cannot be created during tx
40
        // preparation, usually due to the output being dust.
41
        ErrTxNoOutput = errors.New("tx has no output")
42

43
        // Err*ErrUnknownSpent is returned when a unknown tx has spent the input
44
        // in the sweeping tx.
45
        ErrUnknownSpent = errors.New("unknown spent of input")
46
)
47

48
var (
49
        // dummyChangePkScript is a dummy tapscript change script that's used
50
        // when we don't need a real address, just something that can be used
51
        // for fee estimation.
52
        dummyChangePkScript = []byte{
53
                0x51, 0x20,
54
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
55
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
56
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
57
                0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
58
        }
59
)
60

61
// Bumper defines an interface that can be used by other subsystems for fee
62
// bumping.
63
type Bumper interface {
64
        // Broadcast is used to publish the tx created from the given inputs
65
        // specified in the request. It handles the tx creation, broadcasts it,
66
        // and monitors its confirmation status for potential fee bumping. It
67
        // returns a chan that the caller can use to receive updates about the
68
        // broadcast result and potential RBF attempts.
69
        Broadcast(req *BumpRequest) <-chan *BumpResult
70
}
71

72
// BumpEvent represents the event of a fee bumping attempt.
73
type BumpEvent uint8
74

75
const (
76
        // TxPublished is sent when the broadcast attempt is finished.
77
        TxPublished BumpEvent = iota
78

79
        // TxFailed is sent when the tx has encountered a fee-related error
80
        // during its creation or broadcast, or an internal error from the fee
81
        // bumper. In either case the inputs in this tx should be retried with
82
        // either a different grouping strategy or an increased budget.
83
        //
84
        // TODO(yy): Remove the above usage once we remove sweeping non-CPFP
85
        // anchors.
86
        TxFailed
87

88
        // TxReplaced is sent when the original tx is replaced by a new one.
89
        TxReplaced
90

91
        // TxConfirmed is sent when the tx is confirmed.
92
        TxConfirmed
93

94
        // TxUnknownSpend is sent when at least one of the inputs is spent but
95
        // not by the current sweeping tx, this can happen when,
96
        // - a remote party has replaced our sweeping tx by spending the
97
        //   input(s), e.g., via the direct preimage spend on our outgoing HTLC.
98
        // - a third party has replaced our sweeping tx, e.g., the anchor output
99
        //   after 16 blocks.
100
        // - A previous sweeping tx has confirmed but the fee bumper is not
101
        //   aware of it, e.g., a restart happens right after the sweeping tx is
102
        //   broadcast and confirmed.
103
        TxUnknownSpend
104

105
        // TxFatal is sent when the inputs in this tx cannot be retried. Txns
106
        // will end up in this state if they have encountered a non-fee related
107
        // error, which means they cannot be retried with increased budget.
108
        TxFatal
109

110
        // sentinalEvent is used to check if an event is unknown.
111
        sentinalEvent
112
)
113

114
// String returns a human-readable string for the event.
UNCOV
115
func (e BumpEvent) String() string {
×
UNCOV
116
        switch e {
×
UNCOV
117
        case TxPublished:
×
UNCOV
118
                return "Published"
×
UNCOV
119
        case TxFailed:
×
UNCOV
120
                return "Failed"
×
UNCOV
121
        case TxReplaced:
×
UNCOV
122
                return "Replaced"
×
UNCOV
123
        case TxConfirmed:
×
UNCOV
124
                return "Confirmed"
×
UNCOV
125
        case TxFatal:
×
UNCOV
126
                return "Fatal"
×
NEW
UNCOV
127
        case TxUnknownSpend:
×
NEW
UNCOV
128
                return "UnknownSpend"
×
129
        default:
×
130
                return "Unknown"
×
131
        }
132
}
133

134
// Unknown returns true if the event is unknown.
135
func (e BumpEvent) Unknown() bool {
11✔
136
        return e >= sentinalEvent
11✔
137
}
11✔
138

139
// BumpRequest is used by the caller to give the Bumper the necessary info to
140
// create and manage potential fee bumps for a set of inputs.
141
type BumpRequest struct {
142
        // Budget givens the total amount that can be used as fees by these
143
        // inputs.
144
        Budget btcutil.Amount
145

146
        // Inputs is the set of inputs to sweep.
147
        Inputs []input.Input
148

149
        // DeadlineHeight is the block height at which the tx should be
150
        // confirmed.
151
        DeadlineHeight int32
152

153
        // DeliveryAddress is the script to send the change output to.
154
        DeliveryAddress lnwallet.AddrWithKey
155

156
        // MaxFeeRate is the maximum fee rate that can be used for fee bumping.
157
        MaxFeeRate chainfee.SatPerKWeight
158

159
        // StartingFeeRate is an optional parameter that can be used to specify
160
        // the initial fee rate to use for the fee function.
161
        StartingFeeRate fn.Option[chainfee.SatPerKWeight]
162

163
        // ExtraTxOut tracks if this bump request has an optional set of extra
164
        // outputs to add to the transaction.
165
        ExtraTxOut fn.Option[SweepOutput]
166

167
        // Immediate is used to specify that the tx should be broadcast
168
        // immediately.
169
        Immediate bool
170
}
171

172
// MaxFeeRateAllowed returns the maximum fee rate allowed for the given
173
// request. It calculates the feerate using the supplied budget and the weight,
174
// compares it with the specified MaxFeeRate, and returns the smaller of the
175
// two.
176
func (r *BumpRequest) MaxFeeRateAllowed() (chainfee.SatPerKWeight, error) {
10✔
177
        // We'll want to know if we have any blobs, as we need to factor this
10✔
178
        // into the max fee rate for this bump request.
10✔
179
        hasBlobs := fn.Any(r.Inputs, func(i input.Input) bool {
19✔
180
                return fn.MapOptionZ(
9✔
181
                        i.ResolutionBlob(), func(b tlv.Blob) bool {
9✔
UNCOV
182
                                return len(b) > 0
×
UNCOV
183
                        },
×
184
                )
185
        })
186

187
        sweepAddrs := [][]byte{
10✔
188
                r.DeliveryAddress.DeliveryAddress,
10✔
189
        }
10✔
190

10✔
191
        // If we have blobs, then we'll add an extra sweep addr for the size
10✔
192
        // estimate below. We know that these blobs will also always be based on
10✔
193
        // p2tr addrs.
10✔
194
        if hasBlobs {
10✔
UNCOV
195
                // We need to pass in a real address, so we'll use a dummy
×
UNCOV
196
                // tapscript change script that's used elsewhere for tests.
×
UNCOV
197
                sweepAddrs = append(sweepAddrs, dummyChangePkScript)
×
UNCOV
198
        }
×
199

200
        // Get the size of the sweep tx, which will be used to calculate the
201
        // budget fee rate.
202
        size, err := calcSweepTxWeight(
10✔
203
                r.Inputs, sweepAddrs,
10✔
204
        )
10✔
205
        if err != nil {
11✔
206
                return 0, err
1✔
207
        }
1✔
208

209
        // Use the budget and MaxFeeRate to decide the max allowed fee rate.
210
        // This is needed as, when the input has a large value and the user
211
        // sets the budget to be proportional to the input value, the fee rate
212
        // can be very high and we need to make sure it doesn't exceed the max
213
        // fee rate.
214
        maxFeeRateAllowed := chainfee.NewSatPerKWeight(r.Budget, size)
9✔
215
        if maxFeeRateAllowed > r.MaxFeeRate {
11✔
216
                log.Debugf("Budget feerate %v exceeds MaxFeeRate %v, use "+
2✔
217
                        "MaxFeeRate instead, txWeight=%v", maxFeeRateAllowed,
2✔
218
                        r.MaxFeeRate, size)
2✔
219

2✔
220
                return r.MaxFeeRate, nil
2✔
221
        }
2✔
222

223
        log.Debugf("Budget feerate %v below MaxFeeRate %v, use budget feerate "+
7✔
224
                "instead, txWeight=%v", maxFeeRateAllowed, r.MaxFeeRate, size)
7✔
225

7✔
226
        return maxFeeRateAllowed, nil
7✔
227
}
228

229
// calcSweepTxWeight calculates the weight of the sweep tx. It assumes a
230
// sweeping tx always has a single output(change).
231
func calcSweepTxWeight(inputs []input.Input,
232
        outputPkScript [][]byte) (lntypes.WeightUnit, error) {
13✔
233

13✔
234
        // Use a const fee rate as we only use the weight estimator to
13✔
235
        // calculate the size.
13✔
236
        const feeRate = 1
13✔
237

13✔
238
        // Initialize the tx weight estimator with,
13✔
239
        // - nil outputs as we only have one single change output.
13✔
240
        // - const fee rate as we don't care about the fees here.
13✔
241
        // - 0 maxfeerate as we don't care about fees here.
13✔
242
        //
13✔
243
        // TODO(yy): we should refactor the weight estimator to not require a
13✔
244
        // fee rate and max fee rate and make it a pure tx weight calculator.
13✔
245
        _, estimator, err := getWeightEstimate(
13✔
246
                inputs, nil, feeRate, 0, outputPkScript,
13✔
247
        )
13✔
248
        if err != nil {
15✔
249
                return 0, err
2✔
250
        }
2✔
251

252
        return estimator.weight(), nil
11✔
253
}
254

255
// BumpResult is used by the Bumper to send updates about the tx being
256
// broadcast.
257
type BumpResult struct {
258
        // Event is the type of event that the result is for.
259
        Event BumpEvent
260

261
        // Tx is the tx being broadcast.
262
        Tx *wire.MsgTx
263

264
        // ReplacedTx is the old, replaced tx if a fee bump is attempted.
265
        ReplacedTx *wire.MsgTx
266

267
        // FeeRate is the fee rate used for the new tx.
268
        FeeRate chainfee.SatPerKWeight
269

270
        // Fee is the fee paid by the new tx.
271
        Fee btcutil.Amount
272

273
        // Err is the error that occurred during the broadcast.
274
        Err error
275

276
        // requestID is the ID of the request that created this record.
277
        requestID uint64
278
}
279

280
// String returns a human-readable string for the result.
281
func (b *BumpResult) String() string {
×
282
        desc := fmt.Sprintf("Event=%v", b.Event)
×
283
        if b.Tx != nil {
×
UNCOV
284
                desc += fmt.Sprintf(", Tx=%v", b.Tx.TxHash())
×
UNCOV
285
        }
×
286

UNCOV
287
        return fmt.Sprintf("[%s]", desc)
×
288
}
289

290
// Validate validates the BumpResult so it's safe to use.
291
func (b *BumpResult) Validate() error {
12✔
292
        isFailureEvent := b.Event == TxFailed || b.Event == TxFatal ||
12✔
293
                b.Event == TxUnknownSpend
12✔
294

12✔
295
        // Every result must have a tx except the fatal or failed case.
12✔
296
        if b.Tx == nil && !isFailureEvent {
13✔
297
                return fmt.Errorf("%w: nil tx", ErrInvalidBumpResult)
1✔
298
        }
1✔
299

300
        // Every result must have a known event.
301
        if b.Event.Unknown() {
12✔
302
                return fmt.Errorf("%w: unknown event", ErrInvalidBumpResult)
1✔
303
        }
1✔
304

305
        // If it's a replacing event, it must have a replaced tx.
306
        if b.Event == TxReplaced && b.ReplacedTx == nil {
11✔
307
                return fmt.Errorf("%w: nil replacing tx", ErrInvalidBumpResult)
1✔
308
        }
1✔
309

310
        // If it's a failed or fatal event, it must have an error.
311
        if isFailureEvent && b.Err == nil {
11✔
312
                return fmt.Errorf("%w: nil error", ErrInvalidBumpResult)
2✔
313
        }
2✔
314

315
        // If it's a confirmed event, it must have a fee rate and fee.
316
        if b.Event == TxConfirmed && (b.FeeRate == 0 || b.Fee == 0) {
8✔
317
                return fmt.Errorf("%w: missing fee rate or fee",
1✔
318
                        ErrInvalidBumpResult)
1✔
319
        }
1✔
320

321
        return nil
6✔
322
}
323

324
// TxPublisherConfig is the config used to create a new TxPublisher.
325
type TxPublisherConfig struct {
326
        // Signer is used to create the tx signature.
327
        Signer input.Signer
328

329
        // Wallet is used primarily to publish the tx.
330
        Wallet Wallet
331

332
        // Estimator is used to estimate the fee rate for the new tx based on
333
        // its deadline conf target.
334
        Estimator chainfee.Estimator
335

336
        // Notifier is used to monitor the confirmation status of the tx.
337
        Notifier chainntnfs.ChainNotifier
338

339
        // AuxSweeper is an optional interface that can be used to modify the
340
        // way sweep transaction are generated.
341
        AuxSweeper fn.Option[AuxSweeper]
342
}
343

344
// TxPublisher is an implementation of the Bumper interface. It utilizes the
345
// `testmempoolaccept` RPC to bump the fee of txns it created based on
346
// different fee function selected or configed by the caller. Its purpose is to
347
// take a list of inputs specified, and create a tx that spends them to a
348
// specified output. It will then monitor the confirmation status of the tx,
349
// and if it's not confirmed within a certain time frame, it will attempt to
350
// bump the fee of the tx by creating a new tx that spends the same inputs to
351
// the same output, but with a higher fee rate. It will continue to do this
352
// until the tx is confirmed or the fee rate reaches the maximum fee rate
353
// specified by the caller.
354
type TxPublisher struct {
355
        started atomic.Bool
356
        stopped atomic.Bool
357

358
        // Embed the blockbeat consumer struct to get access to the method
359
        // `NotifyBlockProcessed` and the `BlockbeatChan`.
360
        chainio.BeatConsumer
361

362
        wg sync.WaitGroup
363

364
        // cfg specifies the configuration of the TxPublisher.
365
        cfg *TxPublisherConfig
366

367
        // currentHeight is the current block height.
368
        currentHeight atomic.Int32
369

370
        // records is a map keyed by the requestCounter and the value is the tx
371
        // being monitored.
372
        records lnutils.SyncMap[uint64, *monitorRecord]
373

374
        // requestCounter is a monotonically increasing counter used to keep
375
        // track of how many requests have been made.
376
        requestCounter atomic.Uint64
377

378
        // subscriberChans is a map keyed by the requestCounter, each item is
379
        // the chan that the publisher sends the fee bump result to.
380
        subscriberChans lnutils.SyncMap[uint64, chan *BumpResult]
381

382
        // quit is used to signal the publisher to stop.
383
        quit chan struct{}
384
}
385

386
// Compile-time constraint to ensure TxPublisher implements Bumper.
387
var _ Bumper = (*TxPublisher)(nil)
388

389
// Compile-time check for the chainio.Consumer interface.
390
var _ chainio.Consumer = (*TxPublisher)(nil)
391

392
// NewTxPublisher creates a new TxPublisher.
393
func NewTxPublisher(cfg TxPublisherConfig) *TxPublisher {
21✔
394
        tp := &TxPublisher{
21✔
395
                cfg:             &cfg,
21✔
396
                records:         lnutils.SyncMap[uint64, *monitorRecord]{},
21✔
397
                subscriberChans: lnutils.SyncMap[uint64, chan *BumpResult]{},
21✔
398
                quit:            make(chan struct{}),
21✔
399
        }
21✔
400

21✔
401
        // Mount the block consumer.
21✔
402
        tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
21✔
403

21✔
404
        return tp
21✔
405
}
21✔
406

407
// isNeutrinoBackend checks if the wallet backend is neutrino.
UNCOV
408
func (t *TxPublisher) isNeutrinoBackend() bool {
×
UNCOV
409
        return t.cfg.Wallet.BackEnd() == "neutrino"
×
UNCOV
410
}
×
411

412
// Broadcast is used to publish the tx created from the given inputs. It will
413
// register the broadcast request and return a chan to the caller to subscribe
414
// the broadcast result. The initial broadcast is guaranteed to be
415
// RBF-compliant unless the budget specified cannot cover the fee.
416
//
417
// NOTE: part of the Bumper interface.
418
func (t *TxPublisher) Broadcast(req *BumpRequest) <-chan *BumpResult {
5✔
419
        log.Tracef("Received broadcast request: %s",
5✔
420
                lnutils.SpewLogClosure(req))
5✔
421

5✔
422
        // Store the request.
5✔
423
        record := t.storeInitialRecord(req)
5✔
424

5✔
425
        // Create a chan to send the result to the caller.
5✔
426
        subscriber := make(chan *BumpResult, 1)
5✔
427
        t.subscriberChans.Store(record.requestID, subscriber)
5✔
428

5✔
429
        // Publish the tx immediately if specified.
5✔
430
        if req.Immediate {
6✔
431
                t.handleInitialBroadcast(record)
1✔
432
        }
1✔
433

434
        return subscriber
5✔
435
}
436

437
// storeInitialRecord initializes a monitor record and saves it in the map.
438
func (t *TxPublisher) storeInitialRecord(req *BumpRequest) *monitorRecord {
5✔
439
        // Increase the request counter.
5✔
440
        //
5✔
441
        // NOTE: this is the only place where we increase the counter.
5✔
442
        requestID := t.requestCounter.Add(1)
5✔
443

5✔
444
        // Register the record.
5✔
445
        record := &monitorRecord{
5✔
446
                requestID: requestID,
5✔
447
                req:       req,
5✔
448
        }
5✔
449
        t.records.Store(requestID, record)
5✔
450

5✔
451
        return record
5✔
452
}
5✔
453

454
// updateRecord updates the given record's tx and fee, and saves it in the
455
// records map.
456
func (t *TxPublisher) updateRecord(r *monitorRecord,
457
        sweepCtx *sweepTxCtx) *monitorRecord {
19✔
458

19✔
459
        r.tx = sweepCtx.tx
19✔
460
        r.fee = sweepCtx.fee
19✔
461
        r.outpointToTxIndex = sweepCtx.outpointToTxIndex
19✔
462

19✔
463
        // Register the record.
19✔
464
        t.records.Store(r.requestID, r)
19✔
465

19✔
466
        return r
19✔
467
}
19✔
468

469
// NOTE: part of the `chainio.Consumer` interface.
470
func (t *TxPublisher) Name() string {
21✔
471
        return "TxPublisher"
21✔
472
}
21✔
473

474
// initializeTx initializes a fee function and creates an RBF-compliant tx. If
475
// succeeded, the initial tx is stored in the records map.
476
func (t *TxPublisher) initializeTx(r *monitorRecord) (*monitorRecord, error) {
5✔
477
        // Create a fee bumping algorithm to be used for future RBF.
5✔
478
        feeAlgo, err := t.initializeFeeFunction(r.req)
5✔
479
        if err != nil {
6✔
480
                return nil, fmt.Errorf("init fee function: %w", err)
1✔
481
        }
1✔
482

483
        // Attach the newly created fee function.
484
        //
485
        // TODO(yy): current we'd initialize a monitorRecord before creating the
486
        // fee function, while we could instead create the fee function first
487
        // then save it to the record. To make this happen we need to change the
488
        // conf target calculation below since we would be initializing the fee
489
        // function one block before.
490
        r.feeFunction = feeAlgo
4✔
491

4✔
492
        // Create the initial tx to be broadcasted. This tx is guaranteed to
4✔
493
        // comply with the RBF restrictions.
4✔
494
        record, err := t.createRBFCompliantTx(r)
4✔
495
        if err != nil {
5✔
496
                return nil, fmt.Errorf("create RBF-compliant tx: %w", err)
1✔
497
        }
1✔
498

499
        return record, nil
3✔
500
}
501

502
// initializeFeeFunction initializes a fee function to be used for this request
503
// for future fee bumping.
504
func (t *TxPublisher) initializeFeeFunction(
505
        req *BumpRequest) (FeeFunction, error) {
7✔
506

7✔
507
        // Get the max allowed feerate.
7✔
508
        maxFeeRateAllowed, err := req.MaxFeeRateAllowed()
7✔
509
        if err != nil {
7✔
UNCOV
510
                return nil, err
×
UNCOV
511
        }
×
512

513
        // Get the initial conf target.
514
        confTarget := calcCurrentConfTarget(
7✔
515
                t.currentHeight.Load(), req.DeadlineHeight,
7✔
516
        )
7✔
517

7✔
518
        log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
7✔
519
                "maxFeeRateAllowed=%v", confTarget, req.Budget,
7✔
520
                maxFeeRateAllowed)
7✔
521

7✔
522
        // Initialize the fee function and return it.
7✔
523
        //
7✔
524
        // TODO(yy): return based on differet req.Strategy?
7✔
525
        return NewLinearFeeFunction(
7✔
526
                maxFeeRateAllowed, confTarget, t.cfg.Estimator,
7✔
527
                req.StartingFeeRate,
7✔
528
        )
7✔
529
}
530

531
// createRBFCompliantTx creates a tx that is compliant with RBF rules. It does
532
// so by creating a tx, validate it using `TestMempoolAccept`, and bump its fee
533
// and redo the process until the tx is valid, or return an error when non-RBF
534
// related errors occur or the budget has been used up.
535
func (t *TxPublisher) createRBFCompliantTx(
536
        r *monitorRecord) (*monitorRecord, error) {
10✔
537

10✔
538
        f := r.feeFunction
10✔
539

10✔
540
        for {
23✔
541
                // Create a new tx with the given fee rate and check its
13✔
542
                // mempool acceptance.
13✔
543
                sweepCtx, err := t.createAndCheckTx(r.req, f)
13✔
544

13✔
545
                switch {
13✔
546
                case err == nil:
7✔
547
                        // The tx is valid, store it.
7✔
548
                        record := t.updateRecord(r, sweepCtx)
7✔
549

7✔
550
                        log.Infof("Created initial sweep tx=%v for %v inputs: "+
7✔
551
                                "feerate=%v, fee=%v, inputs:\n%v",
7✔
552
                                sweepCtx.tx.TxHash(), len(r.req.Inputs),
7✔
553
                                f.FeeRate(), sweepCtx.fee,
7✔
554
                                inputTypeSummary(r.req.Inputs))
7✔
555

7✔
556
                        return record, nil
7✔
557

558
                // If the error indicates the fees paid is not enough, we will
559
                // ask the fee function to increase the fee rate and retry.
560
                case errors.Is(err, lnwallet.ErrMempoolFee):
2✔
561
                        // We should at least start with a feerate above the
2✔
562
                        // mempool min feerate, so if we get this error, it
2✔
563
                        // means something is wrong earlier in the pipeline.
2✔
564
                        log.Errorf("Current fee=%v, feerate=%v, %v",
2✔
565
                                sweepCtx.fee, f.FeeRate(), err)
2✔
566

2✔
567
                        fallthrough
2✔
568

569
                // We are not paying enough fees so we increase it.
570
                case errors.Is(err, chain.ErrInsufficientFee):
4✔
571
                        increased := false
4✔
572

4✔
573
                        // Keep calling the fee function until the fee rate is
4✔
574
                        // increased or maxed out.
4✔
575
                        for !increased {
9✔
576
                                log.Debugf("Increasing fee for next round, "+
5✔
577
                                        "current fee=%v, feerate=%v",
5✔
578
                                        sweepCtx.fee, f.FeeRate())
5✔
579

5✔
580
                                // If the fee function tells us that we have
5✔
581
                                // used up the budget, we will return an error
5✔
582
                                // indicating this tx cannot be made. The
5✔
583
                                // sweeper should handle this error and try to
5✔
584
                                // cluster these inputs differetly.
5✔
585
                                increased, err = f.Increment()
5✔
586
                                if err != nil {
6✔
587
                                        return nil, err
1✔
588
                                }
1✔
589
                        }
590

591
                // TODO(yy): suppose there's only one bad input, we can do a
592
                // binary search to find out which input is causing this error
593
                // by recreating a tx using half of the inputs and check its
594
                // mempool acceptance.
595
                default:
2✔
596
                        log.Debugf("Failed to create RBF-compliant tx: %v", err)
2✔
597
                        return nil, err
2✔
598
                }
599
        }
600
}
601

602
// createAndCheckTx creates a tx based on the given inputs, change output
603
// script, and the fee rate. In addition, it validates the tx's mempool
604
// acceptance before returning a tx that can be published directly, along with
605
// its fee.
606
func (t *TxPublisher) createAndCheckTx(req *BumpRequest,
607
        f FeeFunction) (*sweepTxCtx, error) {
23✔
608

23✔
609
        // Create the sweep tx with max fee rate of 0 as the fee function
23✔
610
        // guarantees the fee rate used here won't exceed the max fee rate.
23✔
611
        sweepCtx, err := t.createSweepTx(
23✔
612
                req.Inputs, req.DeliveryAddress, f.FeeRate(),
23✔
613
        )
23✔
614
        if err != nil {
23✔
615
                return sweepCtx, fmt.Errorf("create sweep tx: %w", err)
×
616
        }
×
617

618
        // Sanity check the budget still covers the fee.
619
        if sweepCtx.fee > req.Budget {
25✔
620
                return sweepCtx, fmt.Errorf("%w: budget=%v, fee=%v",
2✔
621
                        ErrNotEnoughBudget, req.Budget, sweepCtx.fee)
2✔
622
        }
2✔
623

624
        // If we had an extra txOut, then we'll update the result to include
625
        // it.
626
        req.ExtraTxOut = sweepCtx.extraTxOut
21✔
627

21✔
628
        // Validate the tx's mempool acceptance.
21✔
629
        err = t.cfg.Wallet.CheckMempoolAcceptance(sweepCtx.tx)
21✔
630

21✔
631
        // Exit early if the tx is valid.
21✔
632
        if err == nil {
33✔
633
                return sweepCtx, nil
12✔
634
        }
12✔
635

636
        // Print an error log if the chain backend doesn't support the mempool
637
        // acceptance test RPC.
638
        if errors.Is(err, rpcclient.ErrBackendVersion) {
9✔
UNCOV
639
                log.Errorf("TestMempoolAccept not supported by backend, " +
×
UNCOV
640
                        "consider upgrading it to a newer version")
×
UNCOV
641
                return sweepCtx, nil
×
UNCOV
642
        }
×
643

644
        // We are running on a backend that doesn't implement the RPC
645
        // testmempoolaccept, eg, neutrino, so we'll skip the check.
646
        if errors.Is(err, chain.ErrUnimplemented) {
9✔
UNCOV
647
                log.Debug("Skipped testmempoolaccept due to not implemented")
×
648
                return sweepCtx, nil
×
649
        }
×
650

651
        return sweepCtx, fmt.Errorf("tx=%v failed mempool check: %w",
9✔
652
                sweepCtx.tx.TxHash(), err)
9✔
653
}
654

655
// broadcast takes a monitored tx and publishes it to the network. Prior to the
656
// broadcast, it will subscribe the tx's confirmation notification and attach
657
// the event channel to the record. Any broadcast-related errors will not be
658
// returned here, instead, they will be put inside the `BumpResult` and
659
// returned to the caller.
660
func (t *TxPublisher) broadcast(record *monitorRecord) (*BumpResult, error) {
9✔
661
        txid := record.tx.TxHash()
9✔
662

9✔
663
        tx := record.tx
9✔
664
        log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
9✔
665
                txid, len(tx.TxIn), t.currentHeight.Load())
9✔
666

9✔
667
        // Before we go to broadcast, we'll notify the aux sweeper, if it's
9✔
668
        // present of this new broadcast attempt.
9✔
669
        err := fn.MapOptionZ(t.cfg.AuxSweeper, func(aux AuxSweeper) error {
18✔
670
                return aux.NotifyBroadcast(
9✔
671
                        record.req, tx, record.fee, record.outpointToTxIndex,
9✔
672
                )
9✔
673
        })
9✔
674
        if err != nil {
9✔
UNCOV
675
                return nil, fmt.Errorf("unable to notify aux sweeper: %w", err)
×
676
        }
×
677

678
        // Set the event, and change it to TxFailed if the wallet fails to
679
        // publish it.
680
        event := TxPublished
9✔
681

9✔
682
        // Publish the sweeping tx with customized label. If the publish fails,
9✔
683
        // this error will be saved in the `BumpResult` and it will be removed
9✔
684
        // from being monitored.
9✔
685
        err = t.cfg.Wallet.PublishTransaction(
9✔
686
                tx, labels.MakeLabel(labels.LabelTypeSweepTransaction, nil),
9✔
687
        )
9✔
688
        if err != nil {
12✔
689
                // NOTE: we decide to attach this error to the result instead
3✔
690
                // of returning it here because by the time the tx reaches
3✔
691
                // here, it should have passed the mempool acceptance check. If
3✔
692
                // it still fails to be broadcast, it's likely a non-RBF
3✔
693
                // related error happened. So we send this error back to the
3✔
694
                // caller so that it can handle it properly.
3✔
695
                //
3✔
696
                // TODO(yy): find out which input is causing the failure.
3✔
697
                log.Errorf("Failed to publish tx %v: %v", txid, err)
3✔
698
                event = TxFailed
3✔
699
        }
3✔
700

701
        result := &BumpResult{
9✔
702
                Event:     event,
9✔
703
                Tx:        record.tx,
9✔
704
                Fee:       record.fee,
9✔
705
                FeeRate:   record.feeFunction.FeeRate(),
9✔
706
                Err:       err,
9✔
707
                requestID: record.requestID,
9✔
708
        }
9✔
709

9✔
710
        return result, nil
9✔
711
}
712

713
// notifyResult sends the result to the resultChan specified by the requestID.
714
// This channel is expected to be read by the caller.
715
func (t *TxPublisher) notifyResult(result *BumpResult) {
14✔
716
        id := result.requestID
14✔
717
        subscriber, ok := t.subscriberChans.Load(id)
14✔
718
        if !ok {
14✔
719
                log.Errorf("Result chan for id=%v not found", id)
×
720
                return
×
721
        }
×
722

723
        log.Debugf("Sending result %v for requestID=%v", result, id)
14✔
724

14✔
725
        select {
14✔
726
        // Send the result to the subscriber.
727
        //
728
        // TODO(yy): Add timeout in case it's blocking?
729
        case subscriber <- result:
13✔
730
        case <-t.quit:
1✔
731
                log.Debug("Fee bumper stopped")
1✔
732
        }
733
}
734

735
// removeResult removes the tracking of the result if the result contains a
736
// non-nil error, or the tx is confirmed, the record will be removed from the
737
// maps.
738
func (t *TxPublisher) removeResult(result *BumpResult) {
14✔
739
        id := result.requestID
14✔
740

14✔
741
        var txid chainhash.Hash
14✔
742
        if result.Tx != nil {
25✔
743
                txid = result.Tx.TxHash()
11✔
744
        }
11✔
745

746
        // Remove the record from the maps if there's an error or the tx is
747
        // confirmed. When there's an error, it means this tx has failed its
748
        // broadcast and cannot be retried. There are two cases it may fail,
749
        // - when the budget cannot cover the increased fee calculated by the
750
        //   fee function, hence the budget is used up.
751
        // - when a non-fee related error returned from PublishTransaction.
752
        switch result.Event {
14✔
753
        case TxFailed:
2✔
754
                log.Errorf("Removing monitor record=%v, tx=%v, due to err: %v",
2✔
755
                        id, txid, result.Err)
2✔
756

757
        case TxConfirmed:
3✔
758
                // Remove the record if the tx is confirmed.
3✔
759
                log.Debugf("Removing confirmed monitor record=%v, tx=%v", id,
3✔
760
                        txid)
3✔
761

762
        case TxFatal:
2✔
763
                // Remove the record if there's an error.
2✔
764
                log.Debugf("Removing monitor record=%v due to fatal err: %v",
2✔
765
                        id, result.Err)
2✔
766

767
        case TxUnknownSpend:
2✔
768
                // Remove the record if there's an unknown spend.
2✔
769
                log.Debugf("Removing monitor record=%v due unknown spent: "+
2✔
770
                        "%v", id, result.Err)
2✔
771

772
        // Do nothing if it's neither failed or confirmed.
773
        default:
5✔
774
                log.Tracef("Skipping record removal for id=%v, event=%v", id,
5✔
775
                        result.Event)
5✔
776

5✔
777
                return
5✔
778
        }
779

780
        t.records.Delete(id)
9✔
781
        t.subscriberChans.Delete(id)
9✔
782
}
783

784
// handleResult handles the result of a tx broadcast. It will notify the
785
// subscriber and remove the record if the tx is confirmed or failed to be
786
// broadcast.
787
func (t *TxPublisher) handleResult(result *BumpResult) {
11✔
788
        // Notify the subscriber.
11✔
789
        t.notifyResult(result)
11✔
790

11✔
791
        // Remove the record if it's failed or confirmed.
11✔
792
        t.removeResult(result)
11✔
793
}
11✔
794

795
// monitorRecord is used to keep track of the tx being monitored by the
796
// publisher internally.
797
type monitorRecord struct {
798
        // requestID is the ID of the request that created this record.
799
        requestID uint64
800

801
        // tx is the tx being monitored.
802
        tx *wire.MsgTx
803

804
        // req is the original request.
805
        req *BumpRequest
806

807
        // feeFunction is the fee bumping algorithm used by the publisher.
808
        feeFunction FeeFunction
809

810
        // fee is the fee paid by the tx.
811
        fee btcutil.Amount
812

813
        // outpointToTxIndex is a map of outpoint to tx index.
814
        outpointToTxIndex map[wire.OutPoint]int
815
}
816

817
// Start starts the publisher by subscribing to block epoch updates and kicking
818
// off the monitor loop.
UNCOV
819
func (t *TxPublisher) Start(beat chainio.Blockbeat) error {
×
UNCOV
820
        log.Info("TxPublisher starting...")
×
UNCOV
821

×
UNCOV
822
        if t.started.Swap(true) {
×
UNCOV
823
                return fmt.Errorf("TxPublisher started more than once")
×
UNCOV
824
        }
×
825

826
        // Set the current height.
UNCOV
827
        t.currentHeight.Store(beat.Height())
×
UNCOV
828

×
UNCOV
829
        t.wg.Add(1)
×
UNCOV
830
        go t.monitor()
×
UNCOV
831

×
UNCOV
832
        log.Debugf("TxPublisher started")
×
UNCOV
833

×
834
        return nil
×
835
}
836

837
// Stop stops the publisher and waits for the monitor loop to exit.
838
func (t *TxPublisher) Stop() error {
×
UNCOV
839
        log.Info("TxPublisher stopping...")
×
UNCOV
840

×
UNCOV
841
        if t.stopped.Swap(true) {
×
UNCOV
842
                return fmt.Errorf("TxPublisher stopped more than once")
×
UNCOV
843
        }
×
844

UNCOV
845
        close(t.quit)
×
UNCOV
846
        t.wg.Wait()
×
UNCOV
847

×
UNCOV
848
        log.Debug("TxPublisher stopped")
×
UNCOV
849

×
UNCOV
850
        return nil
×
851
}
852

853
// monitor is the main loop driven by new blocks. Whevenr a new block arrives,
854
// it will examine all the txns being monitored, and check if any of them needs
855
// to be bumped. If so, it will attempt to bump the fee of the tx.
856
//
857
// NOTE: Must be run as a goroutine.
UNCOV
858
func (t *TxPublisher) monitor() {
×
UNCOV
859
        defer t.wg.Done()
×
UNCOV
860

×
UNCOV
861
        for {
×
UNCOV
862
                select {
×
UNCOV
863
                case beat := <-t.BlockbeatChan:
×
UNCOV
864
                        height := beat.Height()
×
UNCOV
865
                        log.Debugf("TxPublisher received new block: %v", height)
×
UNCOV
866

×
UNCOV
867
                        // Update the best known height for the publisher.
×
UNCOV
868
                        t.currentHeight.Store(height)
×
UNCOV
869

×
UNCOV
870
                        // Check all monitored txns to see if any of them needs
×
UNCOV
871
                        // to be bumped.
×
UNCOV
872
                        t.processRecords()
×
UNCOV
873

×
UNCOV
874
                        // Notify we've processed the block.
×
UNCOV
875
                        t.NotifyBlockProcessed(beat, nil)
×
876

UNCOV
877
                case <-t.quit:
×
UNCOV
878
                        log.Debug("Fee bumper stopped, exit monitor")
×
UNCOV
879
                        return
×
880
                }
881
        }
882
}
883

884
// processRecords checks all the txns being monitored, and checks if any of
885
// them needs to be bumped. If so, it will attempt to bump the fee of the tx.
886
func (t *TxPublisher) processRecords() {
5✔
887
        // confirmedRecords stores a map of the records which have been
5✔
888
        // confirmed.
5✔
889
        confirmedRecords := make(map[uint64]*monitorRecord)
5✔
890

5✔
891
        // feeBumpRecords stores a map of records which need to be bumped.
5✔
892
        feeBumpRecords := make(map[uint64]*monitorRecord)
5✔
893

5✔
894
        // failedRecords stores a map of records which has inputs being spent
5✔
895
        // by a third party.
5✔
896
        failedRecords := make(map[uint64]*monitorRecord)
5✔
897

5✔
898
        // initialRecords stores a map of records which are being created and
5✔
899
        // published for the first time.
5✔
900
        initialRecords := make(map[uint64]*monitorRecord)
5✔
901

5✔
902
        // visitor is a helper closure that visits each record and divides them
5✔
903
        // into two groups.
5✔
904
        visitor := func(requestID uint64, r *monitorRecord) error {
10✔
905
                log.Tracef("Checking monitor recordID=%v", requestID)
5✔
906

5✔
907
                // Check whether the inputs have already been spent.
5✔
908
                spends := t.hasInputSpent(r)
5✔
909

5✔
910
                // If the any of the inputs has been spent, the record will be
5✔
911
                // marked as failed or confirmed.
5✔
912
                if len(spends) != 0 {
8✔
913
                        // When tx is nil, it means we haven't tried the initial
3✔
914
                        // broadcast yet the input is already spent. This could
3✔
915
                        // happen when the node shuts down, a previous sweeping
3✔
916
                        // tx confirmed, then the node comes back online and
3✔
917
                        // reoffers the inputs. Another case is the remote node
3✔
918
                        // spends the input quickly before we even attempt the
3✔
919
                        // sweep. In either case we will fail the record and let
3✔
920
                        // the sweeper handles it.
3✔
921
                        if r.tx == nil {
4✔
922
                                failedRecords[requestID] = r
1✔
923
                                return nil
1✔
924
                        }
1✔
925

926
                        // Check whether the inputs has been spent by a unknown
927
                        // tx.
928
                        if t.isUnknownSpent(r, spends) {
3✔
929
                                failedRecords[requestID] = r
1✔
930

1✔
931
                                // Move to the next record.
1✔
932
                                return nil
1✔
933
                        }
1✔
934

935
                        // The tx is ours, we can move it to the confirmed queue
936
                        // and stop monitoring it.
937
                        confirmedRecords[requestID] = r
1✔
938

1✔
939
                        // Move to the next record.
1✔
940
                        return nil
1✔
941
                }
942

943
                // This is the first time we see this record, so we put it in
944
                // the initial queue.
945
                if r.tx == nil {
3✔
946
                        initialRecords[requestID] = r
1✔
947

1✔
948
                        return nil
1✔
949
                }
1✔
950

951
                // We can only get here when the inputs are not spent and a
952
                // previous sweeping tx has been attempted. In this case we will
953
                // perform an RBF on it in the current block.
954
                feeBumpRecords[requestID] = r
1✔
955

1✔
956
                // Return nil to move to the next record.
1✔
957
                return nil
1✔
958
        }
959

960
        // Iterate through all the records and divide them into four groups.
961
        t.records.ForEach(visitor)
5✔
962

5✔
963
        // Handle the initial broadcast.
5✔
964
        for _, r := range initialRecords {
6✔
965
                t.handleInitialBroadcast(r)
1✔
966
        }
1✔
967

968
        // For records that are confirmed, we'll notify the caller about this
969
        // result.
970
        for _, r := range confirmedRecords {
6✔
971
                t.wg.Add(1)
1✔
972
                go t.handleTxConfirmed(r)
1✔
973
        }
1✔
974

975
        // Get the current height to be used in the following goroutines.
976
        currentHeight := t.currentHeight.Load()
5✔
977

5✔
978
        // For records that are not confirmed, we perform a fee bump if needed.
5✔
979
        for _, r := range feeBumpRecords {
6✔
980
                t.wg.Add(1)
1✔
981
                go t.handleFeeBumpTx(r, currentHeight)
1✔
982
        }
1✔
983

984
        // For records that are failed, we'll notify the caller about this
985
        // result.
986
        for _, r := range failedRecords {
7✔
987
                t.wg.Add(1)
2✔
988
                go t.handleUnknownSpent(r)
2✔
989
        }
2✔
990
}
991

992
// handleTxConfirmed is called when a monitored tx is confirmed. It will
993
// notify the subscriber then remove the record from the maps .
994
//
995
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
996
func (t *TxPublisher) handleTxConfirmed(r *monitorRecord) {
2✔
997
        defer t.wg.Done()
2✔
998

2✔
999
        log.Debugf("Record %v is spent in tx=%v", r.requestID, r.tx.TxHash())
2✔
1000

2✔
1001
        // Create a result that will be sent to the resultChan which is
2✔
1002
        // listened by the caller.
2✔
1003
        result := &BumpResult{
2✔
1004
                Event:     TxConfirmed,
2✔
1005
                Tx:        r.tx,
2✔
1006
                requestID: r.requestID,
2✔
1007
                Fee:       r.fee,
2✔
1008
                FeeRate:   r.feeFunction.FeeRate(),
2✔
1009
        }
2✔
1010

2✔
1011
        // Notify that this tx is confirmed and remove the record from the map.
2✔
1012
        t.handleResult(result)
2✔
1013
}
2✔
1014

1015
// handleInitialTxError takes the error from `initializeTx` and decides the
1016
// bump event. It will construct a BumpResult and handles it.
1017
func (t *TxPublisher) handleInitialTxError(requestID uint64, err error) {
2✔
1018
        // We now decide what type of event to send.
2✔
1019
        var event BumpEvent
2✔
1020

2✔
1021
        switch {
2✔
1022
        // When the error is due to a dust output, we'll send a TxFailed so
1023
        // these inputs can be retried with a different group in the next
1024
        // block.
1025
        case errors.Is(err, ErrTxNoOutput):
×
1026
                event = TxFailed
×
1027

1028
        // When the error is due to budget being used up, we'll send a TxFailed
1029
        // so these inputs can be retried with a different group in the next
1030
        // block.
UNCOV
1031
        case errors.Is(err, ErrMaxPosition):
×
UNCOV
1032
                event = TxFailed
×
1033

1034
        // When the error is due to zero fee rate delta, we'll send a TxFailed
1035
        // so these inputs can be retried in the next block.
UNCOV
1036
        case errors.Is(err, ErrZeroFeeRateDelta):
×
UNCOV
1037
                event = TxFailed
×
1038

1039
        // Otherwise this is not a fee-related error and the tx cannot be
1040
        // retried. In that case we will fail ALL the inputs in this tx, which
1041
        // means they will be removed from the sweeper and never be tried
1042
        // again.
1043
        //
1044
        // TODO(yy): Find out which input is causing the failure and fail that
1045
        // one only.
1046
        default:
2✔
1047
                event = TxFatal
2✔
1048
        }
1049

1050
        result := &BumpResult{
2✔
1051
                Event:     event,
2✔
1052
                Err:       err,
2✔
1053
                requestID: requestID,
2✔
1054
        }
2✔
1055

2✔
1056
        t.handleResult(result)
2✔
1057
}
1058

1059
// handleInitialBroadcast is called when a new request is received. It will
1060
// handle the initial tx creation and broadcast. In details,
1061
// 1. init a fee function based on the given strategy.
1062
// 2. create an RBF-compliant tx and monitor it for confirmation.
1063
// 3. notify the initial broadcast result back to the caller.
1064
func (t *TxPublisher) handleInitialBroadcast(r *monitorRecord) {
5✔
1065
        log.Debugf("Initial broadcast for requestID=%v", r.requestID)
5✔
1066

5✔
1067
        var (
5✔
1068
                result *BumpResult
5✔
1069
                err    error
5✔
1070
        )
5✔
1071

5✔
1072
        // Attempt an initial broadcast which is guaranteed to comply with the
5✔
1073
        // RBF rules.
5✔
1074
        //
5✔
1075
        // Create the initial tx to be broadcasted.
5✔
1076
        record, err := t.initializeTx(r)
5✔
1077
        if err != nil {
7✔
1078
                log.Errorf("Initial broadcast failed: %v", err)
2✔
1079

2✔
1080
                // We now handle the initialization error and exit.
2✔
1081
                t.handleInitialTxError(r.requestID, err)
2✔
1082

2✔
1083
                return
2✔
1084
        }
2✔
1085

1086
        // Successfully created the first tx, now broadcast it.
1087
        result, err = t.broadcast(record)
3✔
1088
        if err != nil {
3✔
UNCOV
1089
                // The broadcast failed, which can only happen if the tx record
×
UNCOV
1090
                // cannot be found or the aux sweeper returns an error. In
×
1091
                // either case, we will send back a TxFail event so these
×
1092
                // inputs can be retried.
×
1093
                result = &BumpResult{
×
1094
                        Event:     TxFailed,
×
1095
                        Err:       err,
×
1096
                        requestID: r.requestID,
×
UNCOV
1097
                }
×
1098
        }
×
1099

1100
        t.handleResult(result)
3✔
1101
}
1102

1103
// handleFeeBumpTx checks if the tx needs to be bumped, and if so, it will
1104
// attempt to bump the fee of the tx.
1105
//
1106
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1107
func (t *TxPublisher) handleFeeBumpTx(r *monitorRecord, currentHeight int32) {
4✔
1108
        defer t.wg.Done()
4✔
1109

4✔
1110
        log.Debugf("Attempting to fee bump tx=%v in record %v", r.tx.TxHash(),
4✔
1111
                r.requestID)
4✔
1112

4✔
1113
        oldTxid := r.tx.TxHash()
4✔
1114

4✔
1115
        // Get the current conf target for this record.
4✔
1116
        confTarget := calcCurrentConfTarget(currentHeight, r.req.DeadlineHeight)
4✔
1117

4✔
1118
        // Ask the fee function whether a bump is needed. We expect the fee
4✔
1119
        // function to increase its returned fee rate after calling this
4✔
1120
        // method.
4✔
1121
        increased, err := r.feeFunction.IncreaseFeeRate(confTarget)
4✔
1122
        if err != nil {
5✔
1123
                // TODO(yy): send this error back to the sweeper so it can
1✔
1124
                // re-group the inputs?
1✔
1125
                log.Errorf("Failed to increase fee rate for tx %v at "+
1✔
1126
                        "height=%v: %v", oldTxid, t.currentHeight.Load(), err)
1✔
1127

1✔
1128
                return
1✔
1129
        }
1✔
1130

1131
        // If the fee rate was not increased, there's no need to bump the fee.
1132
        if !increased {
4✔
1133
                log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
1✔
1134
                        t.currentHeight.Load())
1✔
1135

1✔
1136
                return
1✔
1137
        }
1✔
1138

1139
        // The fee function now has a new fee rate, we will use it to bump the
1140
        // fee of the tx.
1141
        resultOpt := t.createAndPublishTx(r)
2✔
1142

2✔
1143
        // If there's a result, we will notify the caller about the result.
2✔
1144
        resultOpt.WhenSome(func(result BumpResult) {
4✔
1145
                // Notify the new result.
2✔
1146
                t.handleResult(&result)
2✔
1147
        })
2✔
1148
}
1149

1150
// handleUnknownSpent is called when the inputs are spent by a unknown tx. It
1151
// will notify the subscriber then remove the record from the maps and send a
1152
// TxFailed event to the subscriber.
1153
//
1154
// NOTE: Must be run as a goroutine to avoid blocking on sending the result.
1155
func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
2✔
1156
        defer t.wg.Done()
2✔
1157

2✔
1158
        log.Debugf("Record %v has inputs spent by a tx unknown to the fee "+
2✔
1159
                "bumper, failing it now:\n%v", r.requestID,
2✔
1160
                inputTypeSummary(r.req.Inputs))
2✔
1161

2✔
1162
        // Create a result that will be sent to the resultChan which is
2✔
1163
        // listened by the caller.
2✔
1164
        result := &BumpResult{
2✔
1165
                Event:     TxUnknownSpend,
2✔
1166
                Tx:        r.tx,
2✔
1167
                requestID: r.requestID,
2✔
1168
                Err:       ErrUnknownSpent,
2✔
1169
        }
2✔
1170

2✔
1171
        // Notify that this tx is confirmed and remove the record from the map.
2✔
1172
        t.handleResult(result)
2✔
1173
}
2✔
1174

1175
// createAndPublishTx creates a new tx with a higher fee rate and publishes it
1176
// to the network. It will update the record with the new tx and fee rate if
1177
// successfully created, and return the result when published successfully.
1178
func (t *TxPublisher) createAndPublishTx(
1179
        r *monitorRecord) fn.Option[BumpResult] {
7✔
1180

7✔
1181
        // Fetch the old tx.
7✔
1182
        oldTx := r.tx
7✔
1183

7✔
1184
        // Create a new tx with the new fee rate.
7✔
1185
        //
7✔
1186
        // NOTE: The fee function is expected to have increased its returned
7✔
1187
        // fee rate after calling the SkipFeeBump method. So we can use it
7✔
1188
        // directly here.
7✔
1189
        sweepCtx, err := t.createAndCheckTx(r.req, r.feeFunction)
7✔
1190

7✔
1191
        // If the error is fee related, we will return no error and let the fee
7✔
1192
        // bumper retry it at next block.
7✔
1193
        //
7✔
1194
        // NOTE: we can check the RBF error here and ask the fee function to
7✔
1195
        // recalculate the fee rate. However, this would defeat the purpose of
7✔
1196
        // using a deadline based fee function:
7✔
1197
        // - if the deadline is far away, there's no rush to RBF the tx.
7✔
1198
        // - if the deadline is close, we expect the fee function to give us a
7✔
1199
        //   higher fee rate. If the fee rate cannot satisfy the RBF rules, it
7✔
1200
        //   means the budget is not enough.
7✔
1201
        if errors.Is(err, chain.ErrInsufficientFee) ||
7✔
1202
                errors.Is(err, lnwallet.ErrMempoolFee) {
9✔
1203

2✔
1204
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
2✔
1205
                return fn.None[BumpResult]()
2✔
1206
        }
2✔
1207

1208
        // If the error is not fee related, we will return a `TxFailed` event
1209
        // so this input can be retried.
1210
        if err != nil {
6✔
1211
                // If the tx doesn't not have enought budget, we will return a
1✔
1212
                // result so the sweeper can handle it by re-clustering the
1✔
1213
                // utxos.
1✔
1214
                if errors.Is(err, ErrNotEnoughBudget) {
2✔
1215
                        log.Warnf("Fail to fee bump tx %v: %v", oldTx.TxHash(),
1✔
1216
                                err)
1✔
1217
                } else {
1✔
UNCOV
1218
                        // Otherwise, an unexpected error occurred, we will
×
UNCOV
1219
                        // fail the tx and let the sweeper retry the whole
×
UNCOV
1220
                        // process.
×
UNCOV
1221
                        log.Errorf("Failed to bump tx %v: %v", oldTx.TxHash(),
×
UNCOV
1222
                                err)
×
UNCOV
1223
                }
×
1224

1225
                return fn.Some(BumpResult{
1✔
1226
                        Event:     TxFailed,
1✔
1227
                        Tx:        oldTx,
1✔
1228
                        Err:       err,
1✔
1229
                        requestID: r.requestID,
1✔
1230
                })
1✔
1231
        }
1232

1233
        // The tx has been created without any errors, we now register a new
1234
        // record by overwriting the same requestID.
1235
        record := t.updateRecord(r, sweepCtx)
4✔
1236

4✔
1237
        // Attempt to broadcast this new tx.
4✔
1238
        result, err := t.broadcast(record)
4✔
1239
        if err != nil {
4✔
1240
                log.Infof("Failed to broadcast replacement tx %v: %v",
×
1241
                        sweepCtx.tx.TxHash(), err)
×
1242

×
1243
                return fn.None[BumpResult]()
×
1244
        }
×
1245

1246
        // If the result error is fee related, we will return no error and let
1247
        // the fee bumper retry it at next block.
1248
        //
1249
        // NOTE: we may get this error if we've bypassed the mempool check,
1250
        // which means we are suing neutrino backend.
1251
        if errors.Is(result.Err, chain.ErrInsufficientFee) ||
4✔
1252
                errors.Is(result.Err, lnwallet.ErrMempoolFee) {
4✔
1253

×
UNCOV
1254
                log.Debugf("Failed to bump tx %v: %v", oldTx.TxHash(), err)
×
1255
                return fn.None[BumpResult]()
×
1256
        }
×
1257

1258
        // A successful replacement tx is created, attach the old tx.
1259
        result.ReplacedTx = oldTx
4✔
1260

4✔
1261
        // If the new tx failed to be published, we will return the result so
4✔
1262
        // the caller can handle it.
4✔
1263
        if result.Event == TxFailed {
5✔
1264
                return fn.Some(*result)
1✔
1265
        }
1✔
1266

1267
        log.Infof("Replaced tx=%v with new tx=%v", oldTx.TxHash(),
3✔
1268
                sweepCtx.tx.TxHash())
3✔
1269

3✔
1270
        // Otherwise, it's a successful RBF, set the event and return.
3✔
1271
        result.Event = TxReplaced
3✔
1272

3✔
1273
        return fn.Some(*result)
3✔
1274
}
1275

1276
// isUnknownSpent checks whether the inputs of the tx has already been spent by
1277
// a tx not known to us. When a tx is not confirmed, yet its inputs has been
1278
// spent, then it must be spent by a different tx other than the sweeping tx
1279
// here.
1280
func (t *TxPublisher) isUnknownSpent(r *monitorRecord,
1281
        spends map[wire.OutPoint]*wire.MsgTx) bool {
2✔
1282

2✔
1283
        txid := r.tx.TxHash()
2✔
1284

2✔
1285
        // Iterate all the spending txns and check if they match the sweeping
2✔
1286
        // tx.
2✔
1287
        for op, spendingTx := range spends {
4✔
1288
                spendingTxID := spendingTx.TxHash()
2✔
1289

2✔
1290
                // If the spending tx is the same as the sweeping tx then we are
2✔
1291
                // good.
2✔
1292
                if spendingTxID == txid {
3✔
1293
                        continue
1✔
1294
                }
1295

1296
                log.Warnf("Detected unknown spent of input=%v in tx=%v", op,
1✔
1297
                        spendingTx.TxHash())
1✔
1298

1✔
1299
                return true
1✔
1300
        }
1301

1302
        return false
1✔
1303
}
1304

1305
// hasInputSpent performs a non-blocking read on the spending subscriptions to
1306
// see whether any of the monitored inputs has been spent. A map of inputs with
1307
// their spending txns are returned if found.
1308
func (t *TxPublisher) hasInputSpent(
1309
        r *monitorRecord) map[wire.OutPoint]*wire.MsgTx {
6✔
1310

6✔
1311
        // Create a slice to record the inputs spent.
6✔
1312
        inputsSpent := make(map[wire.OutPoint]*wire.MsgTx, len(r.req.Inputs))
6✔
1313

6✔
1314
        // Iterate all the inputs and check if they have been spent already.
6✔
1315
        for _, inp := range r.req.Inputs {
14✔
1316
                op := inp.OutPoint()
8✔
1317

8✔
1318
                // For wallet utxos, the height hint is not set - we don't need
8✔
1319
                // to monitor them for third party spend.
8✔
1320
                //
8✔
1321
                // TODO(yy): We need to properly lock wallet utxos before
8✔
1322
                // skipping this check as the same wallet utxo can be used by
8✔
1323
                // different sweeping txns.
8✔
1324
                heightHint := inp.HeightHint()
8✔
1325
                if heightHint == 0 {
9✔
1326
                        heightHint = uint32(t.currentHeight.Load())
1✔
1327
                        log.Debugf("Checking wallet input %v using heightHint "+
1✔
1328
                                "%v", op, heightHint)
1✔
1329
                }
1✔
1330

1331
                // If the input has already been spent after the height hint, a
1332
                // spend event is sent back immediately.
1333
                spendEvent, err := t.cfg.Notifier.RegisterSpendNtfn(
8✔
1334
                        &op, inp.SignDesc().Output.PkScript, heightHint,
8✔
1335
                )
8✔
1336
                if err != nil {
8✔
UNCOV
1337
                        log.Criticalf("Failed to register spend ntfn for "+
×
UNCOV
1338
                                "input=%v: %v", op, err)
×
NEW
UNCOV
1339

×
NEW
UNCOV
1340
                        return nil
×
UNCOV
1341
                }
×
1342

1343
                // Remove the subscription when exit.
1344
                defer spendEvent.Cancel()
8✔
1345

8✔
1346
                // Do a non-blocking read to see if the output has been spent.
8✔
1347
                select {
8✔
1348
                case spend, ok := <-spendEvent.Spend:
4✔
1349
                        if !ok {
4✔
UNCOV
1350
                                log.Debugf("Spend ntfn for %v canceled", op)
×
UNCOV
1351

×
UNCOV
1352
                                continue
×
1353
                        }
1354

1355
                        spendingTx := spend.SpendingTx
4✔
1356

4✔
1357
                        log.Debugf("Detected spent of input=%v in tx=%v", op,
4✔
1358
                                spendingTx.TxHash())
4✔
1359

4✔
1360
                        inputsSpent[op] = spendingTx
4✔
1361

1362
                // Move to the next input.
1363
                default:
4✔
1364
                        log.Tracef("Input %v not spent yet", op)
4✔
1365
                }
1366
        }
1367

1368
        return inputsSpent
6✔
1369
}
1370

1371
// calcCurrentConfTarget calculates the current confirmation target based on
1372
// the deadline height. The conf target is capped at 0 if the deadline has
1373
// already been past.
1374
func calcCurrentConfTarget(currentHeight, deadline int32) uint32 {
13✔
1375
        var confTarget uint32
13✔
1376

13✔
1377
        // Calculate how many blocks left until the deadline.
13✔
1378
        deadlineDelta := deadline - currentHeight
13✔
1379

13✔
1380
        // If we are already past the deadline, we will set the conf target to
13✔
1381
        // be 1.
13✔
1382
        if deadlineDelta < 0 {
17✔
1383
                log.Warnf("Deadline is %d blocks behind current height %v",
4✔
1384
                        -deadlineDelta, currentHeight)
4✔
1385

4✔
1386
                confTarget = 0
4✔
1387
        } else {
13✔
1388
                confTarget = uint32(deadlineDelta)
9✔
1389
        }
9✔
1390

1391
        return confTarget
13✔
1392
}
1393

1394
// sweepTxCtx houses a sweep transaction with additional context.
1395
type sweepTxCtx struct {
1396
        tx *wire.MsgTx
1397

1398
        fee btcutil.Amount
1399

1400
        extraTxOut fn.Option[SweepOutput]
1401

1402
        // outpointToTxIndex maps the outpoint of the inputs to their index in
1403
        // the sweep transaction.
1404
        outpointToTxIndex map[wire.OutPoint]int
1405
}
1406

1407
// createSweepTx creates a sweeping tx based on the given inputs, change
1408
// address and fee rate.
1409
func (t *TxPublisher) createSweepTx(inputs []input.Input,
1410
        changePkScript lnwallet.AddrWithKey,
1411
        feeRate chainfee.SatPerKWeight) (*sweepTxCtx, error) {
23✔
1412

23✔
1413
        // Validate and calculate the fee and change amount.
23✔
1414
        txFee, changeOutputsOpt, locktimeOpt, err := prepareSweepTx(
23✔
1415
                inputs, changePkScript, feeRate, t.currentHeight.Load(),
23✔
1416
                t.cfg.AuxSweeper,
23✔
1417
        )
23✔
1418
        if err != nil {
23✔
1419
                return nil, err
×
UNCOV
1420
        }
×
1421

1422
        var (
23✔
1423
                // Create the sweep transaction that we will be building. We
23✔
1424
                // use version 2 as it is required for CSV.
23✔
1425
                sweepTx = wire.NewMsgTx(2)
23✔
1426

23✔
1427
                // We'll add the inputs as we go so we know the final ordering
23✔
1428
                // of inputs to sign.
23✔
1429
                idxs []input.Input
23✔
1430
        )
23✔
1431

23✔
1432
        // We start by adding all inputs that commit to an output. We do this
23✔
1433
        // since the input and output index must stay the same for the
23✔
1434
        // signatures to be valid.
23✔
1435
        outpointToTxIndex := make(map[wire.OutPoint]int)
23✔
1436
        for _, o := range inputs {
46✔
1437
                if o.RequiredTxOut() == nil {
46✔
1438
                        continue
23✔
1439
                }
1440

UNCOV
1441
                idxs = append(idxs, o)
×
UNCOV
1442
                sweepTx.AddTxIn(&wire.TxIn{
×
UNCOV
1443
                        PreviousOutPoint: o.OutPoint(),
×
1444
                        Sequence:         o.BlocksToMaturity(),
×
1445
                })
×
1446
                sweepTx.AddTxOut(o.RequiredTxOut())
×
1447

×
UNCOV
1448
                outpointToTxIndex[o.OutPoint()] = len(sweepTx.TxOut) - 1
×
1449
        }
1450

1451
        // Sum up the value contained in the remaining inputs, and add them to
1452
        // the sweep transaction.
1453
        for _, o := range inputs {
46✔
1454
                if o.RequiredTxOut() != nil {
23✔
UNCOV
1455
                        continue
×
1456
                }
1457

1458
                idxs = append(idxs, o)
23✔
1459
                sweepTx.AddTxIn(&wire.TxIn{
23✔
1460
                        PreviousOutPoint: o.OutPoint(),
23✔
1461
                        Sequence:         o.BlocksToMaturity(),
23✔
1462
                })
23✔
1463
        }
1464

1465
        // If we have change outputs to add, then add it the sweep transaction
1466
        // here.
1467
        changeOutputsOpt.WhenSome(func(changeOuts []SweepOutput) {
46✔
1468
                for i := range changeOuts {
69✔
1469
                        sweepTx.AddTxOut(&changeOuts[i].TxOut)
46✔
1470
                }
46✔
1471
        })
1472

1473
        // We'll default to using the current block height as locktime, if none
1474
        // of the inputs commits to a different locktime.
1475
        sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
23✔
1476

23✔
1477
        prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
23✔
1478
        if err != nil {
23✔
UNCOV
1479
                return nil, fmt.Errorf("error creating prev input fetcher "+
×
UNCOV
1480
                        "for hash cache: %v", err)
×
UNCOV
1481
        }
×
1482
        hashCache := txscript.NewTxSigHashes(sweepTx, prevInputFetcher)
23✔
1483

23✔
1484
        // With all the inputs in place, use each output's unique input script
23✔
1485
        // function to generate the final witness required for spending.
23✔
1486
        addInputScript := func(idx int, tso input.Input) error {
46✔
1487
                inputScript, err := tso.CraftInputScript(
23✔
1488
                        t.cfg.Signer, sweepTx, hashCache, prevInputFetcher, idx,
23✔
1489
                )
23✔
1490
                if err != nil {
23✔
UNCOV
1491
                        return err
×
UNCOV
1492
                }
×
1493

1494
                sweepTx.TxIn[idx].Witness = inputScript.Witness
23✔
1495

23✔
1496
                if len(inputScript.SigScript) == 0 {
46✔
1497
                        return nil
23✔
1498
                }
23✔
1499

UNCOV
1500
                sweepTx.TxIn[idx].SignatureScript = inputScript.SigScript
×
UNCOV
1501

×
UNCOV
1502
                return nil
×
1503
        }
1504

1505
        for idx, inp := range idxs {
46✔
1506
                if err := addInputScript(idx, inp); err != nil {
23✔
UNCOV
1507
                        return nil, err
×
UNCOV
1508
                }
×
1509
        }
1510

1511
        log.Debugf("Created sweep tx %v for inputs:\n%v", sweepTx.TxHash(),
23✔
1512
                inputTypeSummary(inputs))
23✔
1513

23✔
1514
        // Try to locate the extra change output, though there might be None.
23✔
1515
        extraTxOut := fn.MapOption(
23✔
1516
                func(sweepOuts []SweepOutput) fn.Option[SweepOutput] {
46✔
1517
                        for _, sweepOut := range sweepOuts {
69✔
1518
                                if !sweepOut.IsExtra {
92✔
1519
                                        continue
46✔
1520
                                }
1521

1522
                                // If we sweep outputs of a custom channel, the
1523
                                // custom leaves in those outputs will be merged
1524
                                // into a single output, even if we sweep
1525
                                // multiple outputs (e.g. to_remote and breached
1526
                                // to_local of a breached channel) at the same
1527
                                // time. So there will only ever be one extra
1528
                                // output.
UNCOV
1529
                                log.Debugf("Sweep produced extra_sweep_out=%v",
×
UNCOV
1530
                                        lnutils.SpewLogClosure(sweepOut))
×
UNCOV
1531

×
UNCOV
1532
                                return fn.Some(sweepOut)
×
1533
                        }
1534

1535
                        return fn.None[SweepOutput]()
23✔
1536
                },
1537
        )(changeOutputsOpt)
1538

1539
        return &sweepTxCtx{
23✔
1540
                tx:                sweepTx,
23✔
1541
                fee:               txFee,
23✔
1542
                extraTxOut:        fn.FlattenOption(extraTxOut),
23✔
1543
                outpointToTxIndex: outpointToTxIndex,
23✔
1544
        }, nil
23✔
1545
}
1546

1547
// prepareSweepTx returns the tx fee, a set of optional change outputs and an
1548
// optional locktime after a series of validations:
1549
// 1. check the locktime has been reached.
1550
// 2. check the locktimes are the same.
1551
// 3. check the inputs cover the outputs.
1552
//
1553
// NOTE: if the change amount is below dust, it will be added to the tx fee.
1554
func prepareSweepTx(inputs []input.Input, changePkScript lnwallet.AddrWithKey,
1555
        feeRate chainfee.SatPerKWeight, currentHeight int32,
1556
        auxSweeper fn.Option[AuxSweeper]) (
1557
        btcutil.Amount, fn.Option[[]SweepOutput], fn.Option[int32], error) {
23✔
1558

23✔
1559
        noChange := fn.None[[]SweepOutput]()
23✔
1560
        noLocktime := fn.None[int32]()
23✔
1561

23✔
1562
        // Given the set of inputs we have, if we have an aux sweeper, then
23✔
1563
        // we'll attempt to see if we have any other change outputs we'll need
23✔
1564
        // to add to the sweep transaction.
23✔
1565
        changePkScripts := [][]byte{changePkScript.DeliveryAddress}
23✔
1566

23✔
1567
        var extraChangeOut fn.Option[SweepOutput]
23✔
1568
        err := fn.MapOptionZ(
23✔
1569
                auxSweeper, func(aux AuxSweeper) error {
46✔
1570
                        extraOut := aux.DeriveSweepAddr(inputs, changePkScript)
23✔
1571
                        if err := extraOut.Err(); err != nil {
23✔
UNCOV
1572
                                return err
×
UNCOV
1573
                        }
×
1574

1575
                        extraChangeOut = extraOut.LeftToSome()
23✔
1576

23✔
1577
                        return nil
23✔
1578
                },
1579
        )
1580
        if err != nil {
23✔
UNCOV
1581
                return 0, noChange, noLocktime, err
×
UNCOV
1582
        }
×
1583

1584
        // Creating a weight estimator with nil outputs and zero max fee rate.
1585
        // We don't allow adding customized outputs in the sweeping tx, and the
1586
        // fee rate is already being managed before we get here.
1587
        inputs, estimator, err := getWeightEstimate(
23✔
1588
                inputs, nil, feeRate, 0, changePkScripts,
23✔
1589
        )
23✔
1590
        if err != nil {
23✔
UNCOV
1591
                return 0, noChange, noLocktime, err
×
UNCOV
1592
        }
×
1593

1594
        txFee := estimator.fee()
23✔
1595

23✔
1596
        var (
23✔
1597
                // Track whether any of the inputs require a certain locktime.
23✔
1598
                locktime = int32(-1)
23✔
1599

23✔
1600
                // We keep track of total input amount, and required output
23✔
1601
                // amount to use for calculating the change amount below.
23✔
1602
                totalInput     btcutil.Amount
23✔
1603
                requiredOutput btcutil.Amount
23✔
1604
        )
23✔
1605

23✔
1606
        // If we have an extra change output, then we'll add it as a required
23✔
1607
        // output amt.
23✔
1608
        extraChangeOut.WhenSome(func(o SweepOutput) {
46✔
1609
                requiredOutput += btcutil.Amount(o.Value)
23✔
1610
        })
23✔
1611

1612
        // Go through each input and check if the required lock times have
1613
        // reached and are the same.
1614
        for _, o := range inputs {
46✔
1615
                // If the input has a required output, we'll add it to the
23✔
1616
                // required output amount.
23✔
1617
                if o.RequiredTxOut() != nil {
23✔
UNCOV
1618
                        requiredOutput += btcutil.Amount(
×
UNCOV
1619
                                o.RequiredTxOut().Value,
×
UNCOV
1620
                        )
×
UNCOV
1621
                }
×
1622

1623
                // Update the total input amount.
1624
                totalInput += btcutil.Amount(o.SignDesc().Output.Value)
23✔
1625

23✔
1626
                lt, ok := o.RequiredLockTime()
23✔
1627

23✔
1628
                // Skip if the input doesn't require a lock time.
23✔
1629
                if !ok {
46✔
1630
                        continue
23✔
1631
                }
1632

1633
                // Check if the lock time has reached
UNCOV
1634
                if lt > uint32(currentHeight) {
×
UNCOV
1635
                        return 0, noChange, noLocktime,
×
1636
                                fmt.Errorf("%w: current height is %v, "+
×
1637
                                        "locktime is %v", ErrLocktimeImmature,
×
1638
                                        currentHeight, lt)
×
1639
                }
×
1640

1641
                // If another input commits to a different locktime, they
1642
                // cannot be combined in the same transaction.
UNCOV
1643
                if locktime != -1 && locktime != int32(lt) {
×
UNCOV
1644
                        return 0, noChange, noLocktime, ErrLocktimeConflict
×
UNCOV
1645
                }
×
1646

1647
                // Update the locktime for next iteration.
UNCOV
1648
                locktime = int32(lt)
×
1649
        }
1650

1651
        // Make sure total output amount is less than total input amount.
1652
        if requiredOutput+txFee > totalInput {
23✔
UNCOV
1653
                return 0, noChange, noLocktime, fmt.Errorf("insufficient "+
×
UNCOV
1654
                        "input to create sweep tx: input_sum=%v, "+
×
UNCOV
1655
                        "output_sum=%v", totalInput, requiredOutput+txFee)
×
UNCOV
1656
        }
×
1657

1658
        // The value remaining after the required output and fees is the
1659
        // change output.
1660
        changeAmt := totalInput - requiredOutput - txFee
23✔
1661
        changeOuts := make([]SweepOutput, 0, 2)
23✔
1662

23✔
1663
        extraChangeOut.WhenSome(func(o SweepOutput) {
46✔
1664
                changeOuts = append(changeOuts, o)
23✔
1665
        })
23✔
1666

1667
        // We'll calculate the dust limit for the given changePkScript since it
1668
        // is variable.
1669
        changeFloor := lnwallet.DustLimitForSize(
23✔
1670
                len(changePkScript.DeliveryAddress),
23✔
1671
        )
23✔
1672

23✔
1673
        switch {
23✔
1674
        // If the change amount is dust, we'll move it into the fees, and
1675
        // ignore it.
UNCOV
1676
        case changeAmt < changeFloor:
×
UNCOV
1677
                log.Infof("Change amt %v below dustlimit %v, not adding "+
×
UNCOV
1678
                        "change output", changeAmt, changeFloor)
×
1679

×
1680
                // If there's no required output, and the change output is a
×
UNCOV
1681
                // dust, it means we are creating a tx without any outputs. In
×
UNCOV
1682
                // this case we'll return an error. This could happen when
×
UNCOV
1683
                // creating a tx that has an anchor as the only input.
×
UNCOV
1684
                if requiredOutput == 0 {
×
UNCOV
1685
                        return 0, noChange, noLocktime, ErrTxNoOutput
×
UNCOV
1686
                }
×
1687

1688
                // The dust amount is added to the fee.
1689
                txFee += changeAmt
×
1690

1691
        // Otherwise, we'll actually recognize it as a change output.
1692
        default:
23✔
1693
                changeOuts = append(changeOuts, SweepOutput{
23✔
1694
                        TxOut: wire.TxOut{
23✔
1695
                                Value:    int64(changeAmt),
23✔
1696
                                PkScript: changePkScript.DeliveryAddress,
23✔
1697
                        },
23✔
1698
                        IsExtra:     false,
23✔
1699
                        InternalKey: changePkScript.InternalKey,
23✔
1700
                })
23✔
1701
        }
1702

1703
        // Optionally set the locktime.
1704
        locktimeOpt := fn.Some(locktime)
23✔
1705
        if locktime == -1 {
46✔
1706
                locktimeOpt = noLocktime
23✔
1707
        }
23✔
1708

1709
        var changeOutsOpt fn.Option[[]SweepOutput]
23✔
1710
        if len(changeOuts) > 0 {
46✔
1711
                changeOutsOpt = fn.Some(changeOuts)
23✔
1712
        }
23✔
1713

1714
        log.Debugf("Creating sweep tx for %v inputs (%s) using %v, "+
23✔
1715
                "tx_weight=%v, tx_fee=%v, locktime=%v, parents_count=%v, "+
23✔
1716
                "parents_fee=%v, parents_weight=%v, current_height=%v",
23✔
1717
                len(inputs), inputTypeSummary(inputs), feeRate,
23✔
1718
                estimator.weight(), txFee, locktimeOpt, len(estimator.parents),
23✔
1719
                estimator.parentsFee, estimator.parentsWeight, currentHeight)
23✔
1720

23✔
1721
        return txFee, changeOutsOpt, locktimeOpt, nil
23✔
1722
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc