• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13523316608

25 Feb 2025 02:12PM UTC coverage: 49.351% (-9.5%) from 58.835%
13523316608

Pull #9549

github

yyforyongyu
routing/chainview: refactor `TestFilteredChainView`

So each test has its own miner and chainView.
Pull Request #9549: Fix unit test flake `TestHistoricalConfDetailsTxIndex`

0 of 120 new or added lines in 1 file covered. (0.0%)

27196 existing lines in 434 files now uncovered.

100945 of 204543 relevant lines covered (49.35%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.11
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10

11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
)
14

15
const (
16
        // paymentSeqBlockSize is the block size used when we batch allocate
17
        // payment sequences for future payments.
18
        paymentSeqBlockSize = 1000
19
)
20

21
var (
22
        // ErrAlreadyPaid signals we have already paid this payment hash.
23
        ErrAlreadyPaid = errors.New("invoice is already paid")
24

25
        // ErrPaymentInFlight signals that payment for this payment hash is
26
        // already "in flight" on the network.
27
        ErrPaymentInFlight = errors.New("payment is in transition")
28

29
        // ErrPaymentExists is returned when we try to initialize an already
30
        // existing payment that is not failed.
31
        ErrPaymentExists = errors.New("payment already exists")
32

33
        // ErrPaymentInternal is returned when performing the payment has a
34
        // conflicting state, such as,
35
        // - payment has StatusSucceeded but remaining amount is not zero.
36
        // - payment has StatusInitiated but remaining amount is zero.
37
        // - payment has StatusFailed but remaining amount is zero.
38
        ErrPaymentInternal = errors.New("internal error")
39

40
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
41
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
42

43
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
44
        // change the status of a payment already succeeded.
45
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
46

47
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
48
        // a failed payment.
49
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
50

51
        // ErrUnknownPaymentStatus is returned when we do not recognize the
52
        // existing state of a payment.
53
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
54

55
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
56
        // already has reached a terminal condition.
57
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
58
                "condition")
59

60
        // ErrAttemptAlreadySettled is returned if we try to alter an already
61
        // settled HTLC attempt.
62
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
63

64
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
65
        // failed HTLC attempt.
66
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
67

68
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
69
        // with an amount that doesn't match the payment amount.
70
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
71
                "amount")
72

73
        // ErrValueExceedsAmt is returned if we try to register an attempt that
74
        // would take the total sent amount above the payment amount.
75
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
76
                "amount")
77

78
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
79
        // a payment that already has a non-MPP attempt registered.
80
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
81

82
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
83
        // a payment that already has an MPP attempt registered.
84
        ErrMPPayment = errors.New("payment has MPP attempts")
85

86
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
87
        // attempt with an MPP record for a payment to a blinded path.
88
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
89
                "contain MPP records")
90

91
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
92
        // register an HTLC shard to a blinded route where the total amount
93
        // doesn't match existing shards.
94
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
95
                "total amount mismatch")
96

97
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
98
        // shard where the payment address doesn't match existing shards.
99
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
100

101
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
102
        // shard where the total amount doesn't match existing shards.
103
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
104
                "mismatch")
105

106
        // ErrPaymentPendingSettled is returned when we try to add a new
107
        // attempt to a payment that has at least one of its HTLCs settled.
108
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
109

110
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
111
        // to a payment that already has a failure reason.
112
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
113

114
        // ErrSentExceedsTotal is returned if the payment's current total sent
115
        // amount exceed the total amount.
116
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
117

118
        // errNoAttemptInfo is returned when no attempt info is stored yet.
119
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
120
                "inflight payment")
121

122
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
123
        // index is made for a sequence number that is not indexed.
124
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
125
                "does not exist")
126
)
127

128
// PaymentControl implements persistence for payments and payment attempts.
129
type PaymentControl struct {
130
        paymentSeqMx     sync.Mutex
131
        currPaymentSeq   uint64
132
        storedPaymentSeq uint64
133
        db               *DB
134
}
135

136
// NewPaymentControl creates a new instance of the PaymentControl.
137
func NewPaymentControl(db *DB) *PaymentControl {
3✔
138
        return &PaymentControl{
3✔
139
                db: db,
3✔
140
        }
3✔
141
}
3✔
142

143
// InitPayment checks or records the given PaymentCreationInfo with the DB,
144
// making sure it does not already exist as an in-flight payment. When this
145
// method returns successfully, the payment is guaranteed to be in the InFlight
146
// state.
147
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
148
        info *PaymentCreationInfo) error {
3✔
149

3✔
150
        // Obtain a new sequence number for this payment. This is used
3✔
151
        // to sort the payments in order of creation, and also acts as
3✔
152
        // a unique identifier for each payment.
3✔
153
        sequenceNum, err := p.nextPaymentSequence()
3✔
154
        if err != nil {
3✔
155
                return err
×
156
        }
×
157

158
        var b bytes.Buffer
3✔
159
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
160
                return err
×
161
        }
×
162
        infoBytes := b.Bytes()
3✔
163

3✔
164
        var updateErr error
3✔
165
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
166
                // Reset the update error, to avoid carrying over an error
3✔
167
                // from a previous execution of the batched db transaction.
3✔
168
                updateErr = nil
3✔
169

3✔
170
                prefetchPayment(tx, paymentHash)
3✔
171
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
172
                if err != nil {
3✔
173
                        return err
×
174
                }
×
175

176
                // Get the existing status of this payment, if any.
177
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
178

3✔
179
                switch {
3✔
180
                // If no error is returned, it means we already have this
181
                // payment. We'll check the status to decide whether we allow
182
                // retrying the payment or return a specific error.
183
                case err == nil:
3✔
184
                        if err := paymentStatus.initializable(); err != nil {
6✔
185
                                updateErr = err
3✔
186
                                return nil
3✔
187
                        }
3✔
188

189
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
190
                // we'll return the error.
191
                case !errors.Is(err, ErrPaymentNotInitiated):
×
192
                        return err
×
193
                }
194

195
                // Before we set our new sequence number, we check whether this
196
                // payment has a previously set sequence number and remove its
197
                // index entry if it exists. This happens in the case where we
198
                // have a previously attempted payment which was left in a state
199
                // where we can retry.
200
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
201
                if seqBytes != nil {
6✔
202
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
203
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
204
                                return err
×
205
                        }
×
206
                }
207

208
                // Once we have obtained a sequence number, we add an entry
209
                // to our index bucket which will map the sequence number to
210
                // our payment identifier.
211
                err = createPaymentIndexEntry(
3✔
212
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
213
                )
3✔
214
                if err != nil {
3✔
215
                        return err
×
216
                }
×
217

218
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
219
                if err != nil {
3✔
220
                        return err
×
221
                }
×
222

223
                // Add the payment info to the bucket, which contains the
224
                // static information for this payment
225
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
226
                if err != nil {
3✔
227
                        return err
×
228
                }
×
229

230
                // We'll delete any lingering HTLCs to start with, in case we
231
                // are initializing a payment that was attempted earlier, but
232
                // left in a state where we could retry.
233
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
234
                if err != nil && err != kvdb.ErrBucketNotFound {
3✔
235
                        return err
×
236
                }
×
237

238
                // Also delete any lingering failure info now that we are
239
                // re-attempting.
240
                return bucket.Delete(paymentFailInfoKey)
3✔
241
        })
242
        if err != nil {
3✔
243
                return fmt.Errorf("unable to init payment: %w", err)
×
244
        }
×
245

246
        return updateErr
3✔
247
}
248

249
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
250
// by the PaymentControl db.
251
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
252
        if !p.db.keepFailedPaymentAttempts {
3✔
UNCOV
253
                const failedHtlcsOnly = true
×
UNCOV
254
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
×
UNCOV
255
                if err != nil {
×
UNCOV
256
                        return err
×
UNCOV
257
                }
×
258
        }
259
        return nil
3✔
260
}
261

262
// paymentIndexTypeHash is a payment index type which indicates that we have
263
// created an index of payment sequence number to payment hash.
264
type paymentIndexType uint8
265

266
// paymentIndexTypeHash is a payment index type which indicates that we have
267
// created an index of payment sequence number to payment hash.
268
const paymentIndexTypeHash paymentIndexType = 0
269

270
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
271
// index produced contains a payment index type (which can be used in future to
272
// signal different payment index types) and the payment identifier.
273
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
274
        id lntypes.Hash) error {
3✔
275

3✔
276
        var b bytes.Buffer
3✔
277
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
3✔
278
                return err
×
279
        }
×
280

281
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
282
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
283
}
284

285
// deserializePaymentIndex deserializes a payment index entry. This function
286
// currently only supports deserialization of payment hash indexes, and will
287
// fail for other types.
288
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
289
        var (
3✔
290
                indexType   paymentIndexType
3✔
291
                paymentHash []byte
3✔
292
        )
3✔
293

3✔
294
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
295
                return lntypes.Hash{}, err
×
296
        }
×
297

298
        // While we only have on payment index type, we do not need to use our
299
        // index type to deserialize the index. However, we sanity check that
300
        // this type is as expected, since we had to read it out anyway.
301
        if indexType != paymentIndexTypeHash {
3✔
302
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
303
                        "type: %v", indexType)
×
304
        }
×
305

306
        hash, err := lntypes.MakeHash(paymentHash)
3✔
307
        if err != nil {
3✔
308
                return lntypes.Hash{}, err
×
309
        }
×
310

311
        return hash, nil
3✔
312
}
313

314
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
315
// DB.
316
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
317
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
318

3✔
319
        // Serialize the information before opening the db transaction.
3✔
320
        var a bytes.Buffer
3✔
321
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
322
        if err != nil {
3✔
323
                return nil, err
×
324
        }
×
325
        htlcInfoBytes := a.Bytes()
3✔
326

3✔
327
        htlcIDBytes := make([]byte, 8)
3✔
328
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
329

3✔
330
        var payment *MPPayment
3✔
331
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
332
                prefetchPayment(tx, paymentHash)
3✔
333
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
334
                if err != nil {
3✔
335
                        return err
×
336
                }
×
337

338
                payment, err = fetchPayment(bucket)
3✔
339
                if err != nil {
3✔
340
                        return err
×
341
                }
×
342

343
                // Check if registering a new attempt is allowed.
344
                if err := payment.Registrable(); err != nil {
3✔
UNCOV
345
                        return err
×
UNCOV
346
                }
×
347

348
                // If the final hop has encrypted data, then we know this is a
349
                // blinded payment. In blinded payments, MPP records are not set
350
                // for split payments and the recipient is responsible for using
351
                // a consistent PathID across the various encrypted data
352
                // payloads that we received from them for this payment. All we
353
                // need to check is that the total amount field for each HTLC
354
                // in the split payment is correct.
355
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
3✔
356

3✔
357
                // Make sure any existing shards match the new one with regards
3✔
358
                // to MPP options.
3✔
359
                mpp := attempt.Route.FinalHop().MPP
3✔
360

3✔
361
                // MPP records should not be set for attempts to blinded paths.
3✔
362
                if isBlinded && mpp != nil {
3✔
363
                        return ErrMPPRecordInBlindedPayment
×
364
                }
×
365

366
                for _, h := range payment.InFlightHTLCs() {
6✔
367
                        hMpp := h.Route.FinalHop().MPP
3✔
368

3✔
369
                        // If this is a blinded payment, then no existing HTLCs
3✔
370
                        // should have MPP records.
3✔
371
                        if isBlinded && hMpp != nil {
3✔
372
                                return ErrMPPRecordInBlindedPayment
×
373
                        }
×
374

375
                        // If this is a blinded payment, then we just need to
376
                        // check that the TotalAmtMsat field for this shard
377
                        // is equal to that of any other shard in the same
378
                        // payment.
379
                        if isBlinded {
6✔
380
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
381
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
382

×
383
                                        //nolint:ll
×
384
                                        return ErrBlindedPaymentTotalAmountMismatch
×
385
                                }
×
386

387
                                continue
3✔
388
                        }
389

390
                        switch {
3✔
391
                        // We tried to register a non-MPP attempt for a MPP
392
                        // payment.
UNCOV
393
                        case mpp == nil && hMpp != nil:
×
UNCOV
394
                                return ErrMPPayment
×
395

396
                        // We tried to register a MPP shard for a non-MPP
397
                        // payment.
UNCOV
398
                        case mpp != nil && hMpp == nil:
×
UNCOV
399
                                return ErrNonMPPayment
×
400

401
                        // Non-MPP payment, nothing more to validate.
402
                        case mpp == nil:
×
403
                                continue
×
404
                        }
405

406
                        // Check that MPP options match.
407
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
3✔
UNCOV
408
                                return ErrMPPPaymentAddrMismatch
×
UNCOV
409
                        }
×
410

411
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
3✔
UNCOV
412
                                return ErrMPPTotalAmountMismatch
×
UNCOV
413
                        }
×
414
                }
415

416
                // If this is a non-MPP attempt, it must match the total amount
417
                // exactly. Note that a blinded payment is considered an MPP
418
                // attempt.
419
                amt := attempt.Route.ReceiverAmt()
3✔
420
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
3✔
421
                        return ErrValueMismatch
×
422
                }
×
423

424
                // Ensure we aren't sending more than the total payment amount.
425
                sentAmt, _ := payment.SentAmt()
3✔
426
                if sentAmt+amt > payment.Info.Value {
3✔
UNCOV
427
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
×
UNCOV
428
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
×
UNCOV
429
                                payment.Info.Value)
×
UNCOV
430
                }
×
431

432
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
433
                        paymentHtlcsBucket,
3✔
434
                )
3✔
435
                if err != nil {
3✔
436
                        return err
×
437
                }
×
438

439
                err = htlcsBucket.Put(
3✔
440
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
441
                        htlcInfoBytes,
3✔
442
                )
3✔
443
                if err != nil {
3✔
444
                        return err
×
445
                }
×
446

447
                // Retrieve attempt info for the notification.
448
                payment, err = fetchPayment(bucket)
3✔
449
                return err
3✔
450
        })
451
        if err != nil {
3✔
UNCOV
452
                return nil, err
×
UNCOV
453
        }
×
454

455
        return payment, err
3✔
456
}
457

458
// SettleAttempt marks the given attempt settled with the preimage. If this is
459
// a multi shard payment, this might implicitly mean that the full payment
460
// succeeded.
461
//
462
// After invoking this method, InitPayment should always return an error to
463
// prevent us from making duplicate payments to the same payment hash. The
464
// provided preimage is atomically saved to the DB for record keeping.
465
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
466
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
467

3✔
468
        var b bytes.Buffer
3✔
469
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
470
                return nil, err
×
471
        }
×
472
        settleBytes := b.Bytes()
3✔
473

3✔
474
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
475
}
476

477
// FailAttempt marks the given payment attempt failed.
478
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
479
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
480

3✔
481
        var b bytes.Buffer
3✔
482
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
483
                return nil, err
×
484
        }
×
485
        failBytes := b.Bytes()
3✔
486

3✔
487
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
488
}
489

490
// updateHtlcKey updates a database key for the specified htlc.
491
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
492
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
493

3✔
494
        aid := make([]byte, 8)
3✔
495
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
496

3✔
497
        var payment *MPPayment
3✔
498
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
499
                payment = nil
3✔
500

3✔
501
                prefetchPayment(tx, paymentHash)
3✔
502
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
503
                if err != nil {
3✔
UNCOV
504
                        return err
×
UNCOV
505
                }
×
506

507
                p, err := fetchPayment(bucket)
3✔
508
                if err != nil {
3✔
509
                        return err
×
510
                }
×
511

512
                // We can only update keys of in-flight payments. We allow
513
                // updating keys even if the payment has reached a terminal
514
                // condition, since the HTLC outcomes must still be updated.
515
                if err := p.Status.updatable(); err != nil {
3✔
516
                        return err
×
517
                }
×
518

519
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
520
                if htlcsBucket == nil {
3✔
521
                        return fmt.Errorf("htlcs bucket not found")
×
522
                }
×
523

524
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
3✔
525
                        return fmt.Errorf("HTLC with ID %v not registered",
×
526
                                attemptID)
×
527
                }
×
528

529
                // Make sure the shard is not already failed or settled.
530
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
3✔
531
                        return ErrAttemptAlreadyFailed
×
532
                }
×
533

534
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
3✔
535
                        return ErrAttemptAlreadySettled
×
536
                }
×
537

538
                // Add or update the key for this htlc.
539
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
540
                if err != nil {
3✔
541
                        return err
×
542
                }
×
543

544
                // Retrieve attempt info for the notification.
545
                payment, err = fetchPayment(bucket)
3✔
546
                return err
3✔
547
        })
548
        if err != nil {
3✔
UNCOV
549
                return nil, err
×
UNCOV
550
        }
×
551

552
        return payment, err
3✔
553
}
554

555
// Fail transitions a payment into the Failed state, and records the reason the
556
// payment failed. After invoking this method, InitPayment should return nil on
557
// its next call for this payment hash, allowing the switch to make a
558
// subsequent payment.
559
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
560
        reason FailureReason) (*MPPayment, error) {
3✔
561

3✔
562
        var (
3✔
563
                updateErr error
3✔
564
                payment   *MPPayment
3✔
565
        )
3✔
566
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
567
                // Reset the update error, to avoid carrying over an error
3✔
568
                // from a previous execution of the batched db transaction.
3✔
569
                updateErr = nil
3✔
570
                payment = nil
3✔
571

3✔
572
                prefetchPayment(tx, paymentHash)
3✔
573
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
574
                if err == ErrPaymentNotInitiated {
3✔
UNCOV
575
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
576
                        return nil
×
577
                } else if err != nil {
3✔
578
                        return err
×
579
                }
×
580

581
                // We mark the payment as failed as long as it is known. This
582
                // lets the last attempt to fail with a terminal write its
583
                // failure to the PaymentControl without synchronizing with
584
                // other attempts.
585
                _, err = fetchPaymentStatus(bucket)
3✔
586
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
587
                        updateErr = ErrPaymentNotInitiated
×
588
                        return nil
×
589
                } else if err != nil {
3✔
590
                        return err
×
591
                }
×
592

593
                // Put the failure reason in the bucket for record keeping.
594
                v := []byte{byte(reason)}
3✔
595
                err = bucket.Put(paymentFailInfoKey, v)
3✔
596
                if err != nil {
3✔
597
                        return err
×
598
                }
×
599

600
                // Retrieve attempt info for the notification, if available.
601
                payment, err = fetchPayment(bucket)
3✔
602
                if err != nil {
3✔
603
                        return err
×
604
                }
×
605

606
                return nil
3✔
607
        })
608
        if err != nil {
3✔
609
                return nil, err
×
610
        }
×
611

612
        return payment, updateErr
3✔
613
}
614

615
// FetchPayment returns information about a payment from the database.
616
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
617
        *MPPayment, error) {
3✔
618

3✔
619
        var payment *MPPayment
3✔
620
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
621
                prefetchPayment(tx, paymentHash)
3✔
622
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
623
                if err != nil {
3✔
UNCOV
624
                        return err
×
UNCOV
625
                }
×
626

627
                payment, err = fetchPayment(bucket)
3✔
628

3✔
629
                return err
3✔
630
        }, func() {
3✔
631
                payment = nil
3✔
632
        })
3✔
633
        if err != nil {
3✔
UNCOV
634
                return nil, err
×
UNCOV
635
        }
×
636

637
        return payment, nil
3✔
638
}
639

640
// prefetchPayment attempts to prefetch as much of the payment as possible to
641
// reduce DB roundtrips.
642
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
643
        rb := kvdb.RootBucket(tx)
3✔
644
        kvdb.Prefetch(
3✔
645
                rb,
3✔
646
                []string{
3✔
647
                        // Prefetch all keys in the payment's bucket.
3✔
648
                        string(paymentsRootBucket),
3✔
649
                        string(paymentHash[:]),
3✔
650
                },
3✔
651
                []string{
3✔
652
                        // Prefetch all keys in the payment's htlc bucket.
3✔
653
                        string(paymentsRootBucket),
3✔
654
                        string(paymentHash[:]),
3✔
655
                        string(paymentHtlcsBucket),
3✔
656
                },
3✔
657
        )
3✔
658
}
3✔
659

660
// createPaymentBucket creates or fetches the sub-bucket assigned to this
661
// payment hash.
662
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
663
        kvdb.RwBucket, error) {
3✔
664

3✔
665
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
666
        if err != nil {
3✔
667
                return nil, err
×
668
        }
×
669

670
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
671
}
672

673
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
674
// the bucket does not exist, it returns ErrPaymentNotInitiated.
675
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
676
        kvdb.RBucket, error) {
3✔
677

3✔
678
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
679
        if payments == nil {
3✔
UNCOV
680
                return nil, ErrPaymentNotInitiated
×
UNCOV
681
        }
×
682

683
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
684
        if bucket == nil {
3✔
685
                return nil, ErrPaymentNotInitiated
×
686
        }
×
687

688
        return bucket, nil
3✔
689

690
}
691

692
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
693
// bucket that can be written to.
694
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
695
        kvdb.RwBucket, error) {
3✔
696

3✔
697
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
698
        if payments == nil {
3✔
UNCOV
699
                return nil, ErrPaymentNotInitiated
×
UNCOV
700
        }
×
701

702
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
703
        if bucket == nil {
3✔
704
                return nil, ErrPaymentNotInitiated
×
705
        }
×
706

707
        return bucket, nil
3✔
708
}
709

710
// nextPaymentSequence returns the next sequence number to store for a new
711
// payment.
712
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
3✔
713
        p.paymentSeqMx.Lock()
3✔
714
        defer p.paymentSeqMx.Unlock()
3✔
715

3✔
716
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
717
        // conflicts on the sequence when using etcd.
3✔
718
        if p.currPaymentSeq == p.storedPaymentSeq {
6✔
719
                var currPaymentSeq, newUpperBound uint64
3✔
720
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
721
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
722
                                paymentsRootBucket,
3✔
723
                        )
3✔
724
                        if err != nil {
3✔
725
                                return err
×
726
                        }
×
727

728
                        currPaymentSeq = paymentsBucket.Sequence()
3✔
729
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
3✔
730
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
731
                }, func() {}); err != nil {
3✔
732
                        return nil, err
×
733
                }
×
734

735
                // We lazy initialize the cached currPaymentSeq here using the
736
                // first nextPaymentSequence() call. This if statement will auto
737
                // initialize our stored currPaymentSeq, since by default both
738
                // this variable and storedPaymentSeq are zero which in turn
739
                // will have us fetch the current values from the DB.
740
                if p.currPaymentSeq == 0 {
6✔
741
                        p.currPaymentSeq = currPaymentSeq
3✔
742
                }
3✔
743

744
                p.storedPaymentSeq = newUpperBound
3✔
745
        }
746

747
        p.currPaymentSeq++
3✔
748
        b := make([]byte, 8)
3✔
749
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
3✔
750

3✔
751
        return b, nil
3✔
752
}
753

754
// fetchPaymentStatus fetches the payment status of the payment. If the payment
755
// isn't found, it will return error `ErrPaymentNotInitiated`.
756
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
757
        // Creation info should be set for all payments, regardless of state.
3✔
758
        // If not, it is unknown.
3✔
759
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
760
                return 0, ErrPaymentNotInitiated
3✔
761
        }
3✔
762

763
        payment, err := fetchPayment(bucket)
3✔
764
        if err != nil {
3✔
765
                return 0, err
×
766
        }
×
767

768
        return payment.Status, nil
3✔
769
}
770

771
// FetchInFlightPayments returns all payments with status InFlight.
772
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
773
        var inFlights []*MPPayment
3✔
774
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
775
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
776
                if payments == nil {
6✔
777
                        return nil
3✔
778
                }
3✔
779

780
                return payments.ForEach(func(k, _ []byte) error {
6✔
781
                        bucket := payments.NestedReadBucket(k)
3✔
782
                        if bucket == nil {
3✔
783
                                return fmt.Errorf("non bucket element")
×
784
                        }
×
785

786
                        p, err := fetchPayment(bucket)
3✔
787
                        if err != nil {
3✔
788
                                return err
×
789
                        }
×
790

791
                        // Skip the payment if it's terminated.
792
                        if p.Terminated() {
6✔
793
                                return nil
3✔
794
                        }
3✔
795

796
                        inFlights = append(inFlights, p)
3✔
797
                        return nil
3✔
798
                })
799
        }, func() {
3✔
800
                inFlights = nil
3✔
801
        })
3✔
802
        if err != nil {
3✔
803
                return nil, err
×
804
        }
×
805

806
        return inFlights, nil
3✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc