• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 12986279612

27 Jan 2025 09:51AM UTC coverage: 57.652% (-1.1%) from 58.788%
12986279612

Pull #9447

github

yyforyongyu
sweep: rename methods for clarity

We now rename "third party" to "unknown" as the inputs can be spent via
an older sweeping tx, a third party (anchor), or a remote party (pin).
In fee bumper we don't have the info to distinguish the above cases, and
leave them to be further handled by the sweeper as it has more context.
Pull Request #9447: sweep: start tracking input spending status in the fee bumper

83 of 87 new or added lines in 2 files covered. (95.4%)

19578 existing lines in 256 files now uncovered.

103448 of 179434 relevant lines covered (57.65%)

24884.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.97
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10

11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
)
14

15
const (
16
        // paymentSeqBlockSize is the block size used when we batch allocate
17
        // payment sequences for future payments.
18
        paymentSeqBlockSize = 1000
19
)
20

21
var (
22
        // ErrAlreadyPaid signals we have already paid this payment hash.
23
        ErrAlreadyPaid = errors.New("invoice is already paid")
24

25
        // ErrPaymentInFlight signals that payment for this payment hash is
26
        // already "in flight" on the network.
27
        ErrPaymentInFlight = errors.New("payment is in transition")
28

29
        // ErrPaymentExists is returned when we try to initialize an already
30
        // existing payment that is not failed.
31
        ErrPaymentExists = errors.New("payment already exists")
32

33
        // ErrPaymentInternal is returned when performing the payment has a
34
        // conflicting state, such as,
35
        // - payment has StatusSucceeded but remaining amount is not zero.
36
        // - payment has StatusInitiated but remaining amount is zero.
37
        // - payment has StatusFailed but remaining amount is zero.
38
        ErrPaymentInternal = errors.New("internal error")
39

40
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
41
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
42

43
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
44
        // change the status of a payment already succeeded.
45
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
46

47
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
48
        // a failed payment.
49
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
50

51
        // ErrUnknownPaymentStatus is returned when we do not recognize the
52
        // existing state of a payment.
53
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
54

55
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
56
        // already has reached a terminal condition.
57
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
58
                "condition")
59

60
        // ErrAttemptAlreadySettled is returned if we try to alter an already
61
        // settled HTLC attempt.
62
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
63

64
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
65
        // failed HTLC attempt.
66
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
67

68
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
69
        // with an amount that doesn't match the payment amount.
70
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
71
                "amount")
72

73
        // ErrValueExceedsAmt is returned if we try to register an attempt that
74
        // would take the total sent amount above the payment amount.
75
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
76
                "amount")
77

78
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
79
        // a payment that already has a non-MPP attempt registered.
80
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
81

82
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
83
        // a payment that already has an MPP attempt registered.
84
        ErrMPPayment = errors.New("payment has MPP attempts")
85

86
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
87
        // attempt with an MPP record for a payment to a blinded path.
88
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
89
                "contain MPP records")
90

91
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
92
        // register an HTLC shard to a blinded route where the total amount
93
        // doesn't match existing shards.
94
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
95
                "total amount mismatch")
96

97
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
98
        // shard where the payment address doesn't match existing shards.
99
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
100

101
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
102
        // shard where the total amount doesn't match existing shards.
103
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
104
                "mismatch")
105

106
        // ErrPaymentPendingSettled is returned when we try to add a new
107
        // attempt to a payment that has at least one of its HTLCs settled.
108
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
109

110
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
111
        // to a payment that already has a failure reason.
112
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
113

114
        // ErrSentExceedsTotal is returned if the payment's current total sent
115
        // amount exceed the total amount.
116
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
117

118
        // errNoAttemptInfo is returned when no attempt info is stored yet.
119
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
120
                "inflight payment")
121

122
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
123
        // index is made for a sequence number that is not indexed.
124
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
125
                "does not exist")
126
)
127

128
// PaymentControl implements persistence for payments and payment attempts.
129
type PaymentControl struct {
130
        paymentSeqMx     sync.Mutex
131
        currPaymentSeq   uint64
132
        storedPaymentSeq uint64
133
        db               *DB
134
}
135

136
// NewPaymentControl creates a new instance of the PaymentControl.
137
func NewPaymentControl(db *DB) *PaymentControl {
42✔
138
        return &PaymentControl{
42✔
139
                db: db,
42✔
140
        }
42✔
141
}
42✔
142

143
// InitPayment checks or records the given PaymentCreationInfo with the DB,
144
// making sure it does not already exist as an in-flight payment. When this
145
// method returns successfully, the payment is guaranteed to be in the InFlight
146
// state.
147
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
148
        info *PaymentCreationInfo) error {
149✔
149

149✔
150
        // Obtain a new sequence number for this payment. This is used
149✔
151
        // to sort the payments in order of creation, and also acts as
149✔
152
        // a unique identifier for each payment.
149✔
153
        sequenceNum, err := p.nextPaymentSequence()
149✔
154
        if err != nil {
149✔
155
                return err
×
156
        }
×
157

158
        var b bytes.Buffer
149✔
159
        if err := serializePaymentCreationInfo(&b, info); err != nil {
149✔
160
                return err
×
161
        }
×
162
        infoBytes := b.Bytes()
149✔
163

149✔
164
        var updateErr error
149✔
165
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
298✔
166
                // Reset the update error, to avoid carrying over an error
149✔
167
                // from a previous execution of the batched db transaction.
149✔
168
                updateErr = nil
149✔
169

149✔
170
                prefetchPayment(tx, paymentHash)
149✔
171
                bucket, err := createPaymentBucket(tx, paymentHash)
149✔
172
                if err != nil {
149✔
173
                        return err
×
174
                }
×
175

176
                // Get the existing status of this payment, if any.
177
                paymentStatus, err := fetchPaymentStatus(bucket)
149✔
178

149✔
179
                switch {
149✔
180
                // If no error is returned, it means we already have this
181
                // payment. We'll check the status to decide whether we allow
182
                // retrying the payment or return a specific error.
183
                case err == nil:
5✔
184
                        if err := paymentStatus.initializable(); err != nil {
9✔
185
                                updateErr = err
4✔
186
                                return nil
4✔
187
                        }
4✔
188

189
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
190
                // we'll return the error.
191
                case !errors.Is(err, ErrPaymentNotInitiated):
×
192
                        return err
×
193
                }
194

195
                // Before we set our new sequence number, we check whether this
196
                // payment has a previously set sequence number and remove its
197
                // index entry if it exists. This happens in the case where we
198
                // have a previously attempted payment which was left in a state
199
                // where we can retry.
200
                seqBytes := bucket.Get(paymentSequenceKey)
145✔
201
                if seqBytes != nil {
146✔
202
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
1✔
203
                        if err := indexBucket.Delete(seqBytes); err != nil {
1✔
204
                                return err
×
205
                        }
×
206
                }
207

208
                // Once we have obtained a sequence number, we add an entry
209
                // to our index bucket which will map the sequence number to
210
                // our payment identifier.
211
                err = createPaymentIndexEntry(
145✔
212
                        tx, sequenceNum, info.PaymentIdentifier,
145✔
213
                )
145✔
214
                if err != nil {
145✔
215
                        return err
×
216
                }
×
217

218
                err = bucket.Put(paymentSequenceKey, sequenceNum)
145✔
219
                if err != nil {
145✔
220
                        return err
×
221
                }
×
222

223
                // Add the payment info to the bucket, which contains the
224
                // static information for this payment
225
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
145✔
226
                if err != nil {
145✔
227
                        return err
×
228
                }
×
229

230
                // We'll delete any lingering HTLCs to start with, in case we
231
                // are initializing a payment that was attempted earlier, but
232
                // left in a state where we could retry.
233
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
145✔
234
                if err != nil && err != kvdb.ErrBucketNotFound {
145✔
235
                        return err
×
236
                }
×
237

238
                // Also delete any lingering failure info now that we are
239
                // re-attempting.
240
                return bucket.Delete(paymentFailInfoKey)
145✔
241
        })
242
        if err != nil {
149✔
243
                return fmt.Errorf("unable to init payment: %w", err)
×
244
        }
×
245

246
        return updateErr
149✔
247
}
248

249
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
250
// by the PaymentControl db.
251
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
8✔
252
        if !p.db.keepFailedPaymentAttempts {
12✔
253
                const failedHtlcsOnly = true
4✔
254
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
4✔
255
                if err != nil {
6✔
256
                        return err
2✔
257
                }
2✔
258
        }
259
        return nil
6✔
260
}
261

262
// paymentIndexTypeHash is a payment index type which indicates that we have
263
// created an index of payment sequence number to payment hash.
264
type paymentIndexType uint8
265

266
// paymentIndexTypeHash is a payment index type which indicates that we have
267
// created an index of payment sequence number to payment hash.
268
const paymentIndexTypeHash paymentIndexType = 0
269

270
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
271
// index produced contains a payment index type (which can be used in future to
272
// signal different payment index types) and the payment identifier.
273
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
274
        id lntypes.Hash) error {
166✔
275

166✔
276
        var b bytes.Buffer
166✔
277
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
166✔
278
                return err
×
279
        }
×
280

281
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
166✔
282
        return indexes.Put(sequenceNumber, b.Bytes())
166✔
283
}
284

285
// deserializePaymentIndex deserializes a payment index entry. This function
286
// currently only supports deserialization of payment hash indexes, and will
287
// fail for other types.
288
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
59✔
289
        var (
59✔
290
                indexType   paymentIndexType
59✔
291
                paymentHash []byte
59✔
292
        )
59✔
293

59✔
294
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
59✔
295
                return lntypes.Hash{}, err
×
296
        }
×
297

298
        // While we only have on payment index type, we do not need to use our
299
        // index type to deserialize the index. However, we sanity check that
300
        // this type is as expected, since we had to read it out anyway.
301
        if indexType != paymentIndexTypeHash {
59✔
302
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
303
                        "type: %v", indexType)
×
304
        }
×
305

306
        hash, err := lntypes.MakeHash(paymentHash)
59✔
307
        if err != nil {
59✔
308
                return lntypes.Hash{}, err
×
309
        }
×
310

311
        return hash, nil
59✔
312
}
313

314
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
315
// DB.
316
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
317
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
70✔
318

70✔
319
        // Serialize the information before opening the db transaction.
70✔
320
        var a bytes.Buffer
70✔
321
        err := serializeHTLCAttemptInfo(&a, attempt)
70✔
322
        if err != nil {
70✔
323
                return nil, err
×
324
        }
×
325
        htlcInfoBytes := a.Bytes()
70✔
326

70✔
327
        htlcIDBytes := make([]byte, 8)
70✔
328
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
70✔
329

70✔
330
        var payment *MPPayment
70✔
331
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
156✔
332
                prefetchPayment(tx, paymentHash)
86✔
333
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
86✔
334
                if err != nil {
86✔
335
                        return err
×
336
                }
×
337

338
                payment, err = fetchPayment(bucket)
86✔
339
                if err != nil {
86✔
340
                        return err
×
341
                }
×
342

343
                // Check if registering a new attempt is allowed.
344
                if err := payment.Registrable(); err != nil {
102✔
345
                        return err
16✔
346
                }
16✔
347

348
                // If the final hop has encrypted data, then we know this is a
349
                // blinded payment. In blinded payments, MPP records are not set
350
                // for split payments and the recipient is responsible for using
351
                // a consistent PathID across the various encrypted data
352
                // payloads that we received from them for this payment. All we
353
                // need to check is that the total amount field for each HTLC
354
                // in the split payment is correct.
355
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
70✔
356

70✔
357
                // Make sure any existing shards match the new one with regards
70✔
358
                // to MPP options.
70✔
359
                mpp := attempt.Route.FinalHop().MPP
70✔
360

70✔
361
                // MPP records should not be set for attempts to blinded paths.
70✔
362
                if isBlinded && mpp != nil {
70✔
363
                        return ErrMPPRecordInBlindedPayment
×
364
                }
×
365

366
                for _, h := range payment.InFlightHTLCs() {
114✔
367
                        hMpp := h.Route.FinalHop().MPP
44✔
368

44✔
369
                        // If this is a blinded payment, then no existing HTLCs
44✔
370
                        // should have MPP records.
44✔
371
                        if isBlinded && hMpp != nil {
44✔
372
                                return ErrMPPRecordInBlindedPayment
×
373
                        }
×
374

375
                        // If this is a blinded payment, then we just need to
376
                        // check that the TotalAmtMsat field for this shard
377
                        // is equal to that of any other shard in the same
378
                        // payment.
379
                        if isBlinded {
44✔
UNCOV
380
                                if attempt.Route.FinalHop().TotalAmtMsat !=
×
UNCOV
381
                                        h.Route.FinalHop().TotalAmtMsat {
×
382

×
383
                                        //nolint:ll
×
384
                                        return ErrBlindedPaymentTotalAmountMismatch
×
385
                                }
×
386

UNCOV
387
                                continue
×
388
                        }
389

390
                        switch {
44✔
391
                        // We tried to register a non-MPP attempt for a MPP
392
                        // payment.
393
                        case mpp == nil && hMpp != nil:
2✔
394
                                return ErrMPPayment
2✔
395

396
                        // We tried to register a MPP shard for a non-MPP
397
                        // payment.
398
                        case mpp != nil && hMpp == nil:
2✔
399
                                return ErrNonMPPayment
2✔
400

401
                        // Non-MPP payment, nothing more to validate.
402
                        case mpp == nil:
×
403
                                continue
×
404
                        }
405

406
                        // Check that MPP options match.
407
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
42✔
408
                                return ErrMPPPaymentAddrMismatch
2✔
409
                        }
2✔
410

411
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
40✔
412
                                return ErrMPPTotalAmountMismatch
2✔
413
                        }
2✔
414
                }
415

416
                // If this is a non-MPP attempt, it must match the total amount
417
                // exactly. Note that a blinded payment is considered an MPP
418
                // attempt.
419
                amt := attempt.Route.ReceiverAmt()
62✔
420
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
62✔
421
                        return ErrValueMismatch
×
422
                }
×
423

424
                // Ensure we aren't sending more than the total payment amount.
425
                sentAmt, _ := payment.SentAmt()
62✔
426
                if sentAmt+amt > payment.Info.Value {
70✔
427
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
8✔
428
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
8✔
429
                                payment.Info.Value)
8✔
430
                }
8✔
431

432
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
54✔
433
                        paymentHtlcsBucket,
54✔
434
                )
54✔
435
                if err != nil {
54✔
436
                        return err
×
437
                }
×
438

439
                err = htlcsBucket.Put(
54✔
440
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
54✔
441
                        htlcInfoBytes,
54✔
442
                )
54✔
443
                if err != nil {
54✔
444
                        return err
×
445
                }
×
446

447
                // Retrieve attempt info for the notification.
448
                payment, err = fetchPayment(bucket)
54✔
449
                return err
54✔
450
        })
451
        if err != nil {
86✔
452
                return nil, err
16✔
453
        }
16✔
454

455
        return payment, err
54✔
456
}
457

458
// SettleAttempt marks the given attempt settled with the preimage. If this is
459
// a multi shard payment, this might implicitly mean that the full payment
460
// succeeded.
461
//
462
// After invoking this method, InitPayment should always return an error to
463
// prevent us from making duplicate payments to the same payment hash. The
464
// provided preimage is atomically saved to the DB for record keeping.
465
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
466
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
16✔
467

16✔
468
        var b bytes.Buffer
16✔
469
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
16✔
470
                return nil, err
×
471
        }
×
472
        settleBytes := b.Bytes()
16✔
473

16✔
474
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
16✔
475
}
476

477
// FailAttempt marks the given payment attempt failed.
478
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
479
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
31✔
480

31✔
481
        var b bytes.Buffer
31✔
482
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
31✔
483
                return nil, err
×
484
        }
×
485
        failBytes := b.Bytes()
31✔
486

31✔
487
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
31✔
488
}
489

490
// updateHtlcKey updates a database key for the specified htlc.
491
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
492
        attemptID uint64, key, value []byte) (*MPPayment, error) {
47✔
493

47✔
494
        aid := make([]byte, 8)
47✔
495
        binary.BigEndian.PutUint64(aid, attemptID)
47✔
496

47✔
497
        var payment *MPPayment
47✔
498
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
95✔
499
                payment = nil
48✔
500

48✔
501
                prefetchPayment(tx, paymentHash)
48✔
502
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
48✔
503
                if err != nil {
50✔
504
                        return err
2✔
505
                }
2✔
506

507
                p, err := fetchPayment(bucket)
46✔
508
                if err != nil {
46✔
509
                        return err
×
510
                }
×
511

512
                // We can only update keys of in-flight payments. We allow
513
                // updating keys even if the payment has reached a terminal
514
                // condition, since the HTLC outcomes must still be updated.
515
                if err := p.Status.updatable(); err != nil {
46✔
516
                        return err
×
517
                }
×
518

519
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
46✔
520
                if htlcsBucket == nil {
46✔
521
                        return fmt.Errorf("htlcs bucket not found")
×
522
                }
×
523

524
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
46✔
525
                        return fmt.Errorf("HTLC with ID %v not registered",
×
526
                                attemptID)
×
527
                }
×
528

529
                // Make sure the shard is not already failed or settled.
530
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
46✔
531
                        return ErrAttemptAlreadyFailed
×
532
                }
×
533

534
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
46✔
535
                        return ErrAttemptAlreadySettled
×
536
                }
×
537

538
                // Add or update the key for this htlc.
539
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
46✔
540
                if err != nil {
46✔
541
                        return err
×
542
                }
×
543

544
                // Retrieve attempt info for the notification.
545
                payment, err = fetchPayment(bucket)
46✔
546
                return err
46✔
547
        })
548
        if err != nil {
48✔
549
                return nil, err
1✔
550
        }
1✔
551

552
        return payment, err
46✔
553
}
554

555
// Fail transitions a payment into the Failed state, and records the reason the
556
// payment failed. After invoking this method, InitPayment should return nil on
557
// its next call for this payment hash, allowing the switch to make a
558
// subsequent payment.
559
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
560
        reason FailureReason) (*MPPayment, error) {
16✔
561

16✔
562
        var (
16✔
563
                updateErr error
16✔
564
                payment   *MPPayment
16✔
565
        )
16✔
566
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
32✔
567
                // Reset the update error, to avoid carrying over an error
16✔
568
                // from a previous execution of the batched db transaction.
16✔
569
                updateErr = nil
16✔
570
                payment = nil
16✔
571

16✔
572
                prefetchPayment(tx, paymentHash)
16✔
573
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
16✔
574
                if err == ErrPaymentNotInitiated {
17✔
575
                        updateErr = ErrPaymentNotInitiated
1✔
576
                        return nil
1✔
577
                } else if err != nil {
16✔
578
                        return err
×
579
                }
×
580

581
                // We mark the payment as failed as long as it is known. This
582
                // lets the last attempt to fail with a terminal write its
583
                // failure to the PaymentControl without synchronizing with
584
                // other attempts.
585
                _, err = fetchPaymentStatus(bucket)
15✔
586
                if errors.Is(err, ErrPaymentNotInitiated) {
15✔
587
                        updateErr = ErrPaymentNotInitiated
×
588
                        return nil
×
589
                } else if err != nil {
15✔
590
                        return err
×
591
                }
×
592

593
                // Put the failure reason in the bucket for record keeping.
594
                v := []byte{byte(reason)}
15✔
595
                err = bucket.Put(paymentFailInfoKey, v)
15✔
596
                if err != nil {
15✔
597
                        return err
×
598
                }
×
599

600
                // Retrieve attempt info for the notification, if available.
601
                payment, err = fetchPayment(bucket)
15✔
602
                if err != nil {
15✔
603
                        return err
×
604
                }
×
605

606
                return nil
15✔
607
        })
608
        if err != nil {
16✔
609
                return nil, err
×
610
        }
×
611

612
        return payment, updateErr
16✔
613
}
614

615
// FetchPayment returns information about a payment from the database.
616
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
617
        *MPPayment, error) {
151✔
618

151✔
619
        var payment *MPPayment
151✔
620
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
302✔
621
                prefetchPayment(tx, paymentHash)
151✔
622
                bucket, err := fetchPaymentBucket(tx, paymentHash)
151✔
623
                if err != nil {
152✔
624
                        return err
1✔
625
                }
1✔
626

627
                payment, err = fetchPayment(bucket)
150✔
628

150✔
629
                return err
150✔
630
        }, func() {
151✔
631
                payment = nil
151✔
632
        })
151✔
633
        if err != nil {
152✔
634
                return nil, err
1✔
635
        }
1✔
636

637
        return payment, nil
150✔
638
}
639

640
// prefetchPayment attempts to prefetch as much of the payment as possible to
641
// reduce DB roundtrips.
642
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
450✔
643
        rb := kvdb.RootBucket(tx)
450✔
644
        kvdb.Prefetch(
450✔
645
                rb,
450✔
646
                []string{
450✔
647
                        // Prefetch all keys in the payment's bucket.
450✔
648
                        string(paymentsRootBucket),
450✔
649
                        string(paymentHash[:]),
450✔
650
                },
450✔
651
                []string{
450✔
652
                        // Prefetch all keys in the payment's htlc bucket.
450✔
653
                        string(paymentsRootBucket),
450✔
654
                        string(paymentHash[:]),
450✔
655
                        string(paymentHtlcsBucket),
450✔
656
                },
450✔
657
        )
450✔
658
}
450✔
659

660
// createPaymentBucket creates or fetches the sub-bucket assigned to this
661
// payment hash.
662
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
663
        kvdb.RwBucket, error) {
149✔
664

149✔
665
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
149✔
666
        if err != nil {
149✔
667
                return nil, err
×
668
        }
×
669

670
        return payments.CreateBucketIfNotExists(paymentHash[:])
149✔
671
}
672

673
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
674
// the bucket does not exist, it returns ErrPaymentNotInitiated.
675
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
676
        kvdb.RBucket, error) {
209✔
677

209✔
678
        payments := tx.ReadBucket(paymentsRootBucket)
209✔
679
        if payments == nil {
210✔
680
                return nil, ErrPaymentNotInitiated
1✔
681
        }
1✔
682

683
        bucket := payments.NestedReadBucket(paymentHash[:])
208✔
684
        if bucket == nil {
208✔
685
                return nil, ErrPaymentNotInitiated
×
686
        }
×
687

688
        return bucket, nil
208✔
689

690
}
691

692
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
693
// bucket that can be written to.
694
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
695
        kvdb.RwBucket, error) {
171✔
696

171✔
697
        payments := tx.ReadWriteBucket(paymentsRootBucket)
171✔
698
        if payments == nil {
174✔
699
                return nil, ErrPaymentNotInitiated
3✔
700
        }
3✔
701

702
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
168✔
703
        if bucket == nil {
168✔
704
                return nil, ErrPaymentNotInitiated
×
705
        }
×
706

707
        return bucket, nil
168✔
708
}
709

710
// nextPaymentSequence returns the next sequence number to store for a new
711
// payment.
712
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
149✔
713
        p.paymentSeqMx.Lock()
149✔
714
        defer p.paymentSeqMx.Unlock()
149✔
715

149✔
716
        // Set a new upper bound in the DB every 1000 payments to avoid
149✔
717
        // conflicts on the sequence when using etcd.
149✔
718
        if p.currPaymentSeq == p.storedPaymentSeq {
188✔
719
                var currPaymentSeq, newUpperBound uint64
39✔
720
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
78✔
721
                        paymentsBucket, err := tx.CreateTopLevelBucket(
39✔
722
                                paymentsRootBucket,
39✔
723
                        )
39✔
724
                        if err != nil {
39✔
725
                                return err
×
726
                        }
×
727

728
                        currPaymentSeq = paymentsBucket.Sequence()
39✔
729
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
39✔
730
                        return paymentsBucket.SetSequence(newUpperBound)
39✔
731
                }, func() {}); err != nil {
39✔
732
                        return nil, err
×
733
                }
×
734

735
                // We lazy initialize the cached currPaymentSeq here using the
736
                // first nextPaymentSequence() call. This if statement will auto
737
                // initialize our stored currPaymentSeq, since by default both
738
                // this variable and storedPaymentSeq are zero which in turn
739
                // will have us fetch the current values from the DB.
740
                if p.currPaymentSeq == 0 {
78✔
741
                        p.currPaymentSeq = currPaymentSeq
39✔
742
                }
39✔
743

744
                p.storedPaymentSeq = newUpperBound
39✔
745
        }
746

747
        p.currPaymentSeq++
149✔
748
        b := make([]byte, 8)
149✔
749
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
149✔
750

149✔
751
        return b, nil
149✔
752
}
753

754
// fetchPaymentStatus fetches the payment status of the payment. If the payment
755
// isn't found, it will return error `ErrPaymentNotInitiated`.
756
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
192✔
757
        // Creation info should be set for all payments, regardless of state.
192✔
758
        // If not, it is unknown.
192✔
759
        if bucket.Get(paymentCreationInfoKey) == nil {
336✔
760
                return 0, ErrPaymentNotInitiated
144✔
761
        }
144✔
762

763
        payment, err := fetchPayment(bucket)
48✔
764
        if err != nil {
48✔
765
                return 0, err
×
766
        }
×
767

768
        return payment.Status, nil
48✔
769
}
770

771
// FetchInFlightPayments returns all payments with status InFlight.
772
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
4✔
773
        var inFlights []*MPPayment
4✔
774
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
8✔
775
                payments := tx.ReadBucket(paymentsRootBucket)
4✔
776
                if payments == nil {
6✔
777
                        return nil
2✔
778
                }
2✔
779

780
                return payments.ForEach(func(k, _ []byte) error {
4✔
781
                        bucket := payments.NestedReadBucket(k)
2✔
782
                        if bucket == nil {
2✔
783
                                return fmt.Errorf("non bucket element")
×
784
                        }
×
785

786
                        p, err := fetchPayment(bucket)
2✔
787
                        if err != nil {
2✔
788
                                return err
×
789
                        }
×
790

791
                        // Skip the payment if it's terminated.
792
                        if p.Terminated() {
2✔
UNCOV
793
                                return nil
×
UNCOV
794
                        }
×
795

796
                        inFlights = append(inFlights, p)
2✔
797
                        return nil
2✔
798
                })
799
        }, func() {
4✔
800
                inFlights = nil
4✔
801
        })
4✔
802
        if err != nil {
4✔
803
                return nil, err
×
804
        }
×
805

806
        return inFlights, nil
4✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc