• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 9915780197

13 Jul 2024 12:30AM UTC coverage: 49.268% (-9.1%) from 58.413%
9915780197

push

github

web-flow
Merge pull request #8653 from ProofOfKeags/fn-prim

DynComms [0/n]: `fn` package additions

92837 of 188433 relevant lines covered (49.27%)

1.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.73
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10

11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
)
14

15
const (
16
        // paymentSeqBlockSize is the block size used when we batch allocate
17
        // payment sequences for future payments.
18
        paymentSeqBlockSize = 1000
19
)
20

21
var (
22
        // ErrAlreadyPaid signals we have already paid this payment hash.
23
        ErrAlreadyPaid = errors.New("invoice is already paid")
24

25
        // ErrPaymentInFlight signals that payment for this payment hash is
26
        // already "in flight" on the network.
27
        ErrPaymentInFlight = errors.New("payment is in transition")
28

29
        // ErrPaymentExists is returned when we try to initialize an already
30
        // existing payment that is not failed.
31
        ErrPaymentExists = errors.New("payment already exists")
32

33
        // ErrPaymentInternal is returned when performing the payment has a
34
        // conflicting state, such as,
35
        // - payment has StatusSucceeded but remaining amount is not zero.
36
        // - payment has StatusInitiated but remaining amount is zero.
37
        // - payment has StatusFailed but remaining amount is zero.
38
        ErrPaymentInternal = errors.New("internal error")
39

40
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
41
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
42

43
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
44
        // change the status of a payment already succeeded.
45
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
46

47
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
48
        // a failed payment.
49
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
50

51
        // ErrUnknownPaymentStatus is returned when we do not recognize the
52
        // existing state of a payment.
53
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
54

55
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
56
        // already has reached a terminal condition.
57
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
58
                "condition")
59

60
        // ErrAttemptAlreadySettled is returned if we try to alter an already
61
        // settled HTLC attempt.
62
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
63

64
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
65
        // failed HTLC attempt.
66
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
67

68
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
69
        // with an amount that doesn't match the payment amount.
70
        ErrValueMismatch = errors.New("attempted value doesn't match payment" +
71
                "amount")
72

73
        // ErrValueExceedsAmt is returned if we try to register an attempt that
74
        // would take the total sent amount above the payment amount.
75
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment" +
76
                "amount")
77

78
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
79
        // a payment that already has a non-MPP attempt registered.
80
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
81

82
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
83
        // a payment that already has an MPP attempt registered.
84
        ErrMPPayment = errors.New("payment has MPP attempts")
85

86
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
87
        // shard where the payment address doesn't match existing shards.
88
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
89

90
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
91
        // shard where the total amount doesn't match existing shards.
92
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
93
                "mismatch")
94

95
        // ErrPaymentPendingSettled is returned when we try to add a new
96
        // attempt to a payment that has at least one of its HTLCs settled.
97
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
98

99
        // ErrPaymentAlreadyFailed is returned when we try to add a new attempt
100
        // to a payment that already has a failure reason.
101
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
102

103
        // ErrSentExceedsTotal is returned if the payment's current total sent
104
        // amount exceed the total amount.
105
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
106

107
        // errNoAttemptInfo is returned when no attempt info is stored yet.
108
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
109
                "inflight payment")
110

111
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
112
        // index is made for a sequence number that is not indexed.
113
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
114
                "does not exist")
115
)
116

117
// PaymentControl implements persistence for payments and payment attempts.
118
type PaymentControl struct {
119
        paymentSeqMx     sync.Mutex
120
        currPaymentSeq   uint64
121
        storedPaymentSeq uint64
122
        db               *DB
123
}
124

125
// NewPaymentControl creates a new instance of the PaymentControl.
126
func NewPaymentControl(db *DB) *PaymentControl {
3✔
127
        return &PaymentControl{
3✔
128
                db: db,
3✔
129
        }
3✔
130
}
3✔
131

132
// InitPayment checks or records the given PaymentCreationInfo with the DB,
133
// making sure it does not already exist as an in-flight payment. When this
134
// method returns successfully, the payment is guaranteed to be in the InFlight
135
// state.
136
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
137
        info *PaymentCreationInfo) error {
3✔
138

3✔
139
        // Obtain a new sequence number for this payment. This is used
3✔
140
        // to sort the payments in order of creation, and also acts as
3✔
141
        // a unique identifier for each payment.
3✔
142
        sequenceNum, err := p.nextPaymentSequence()
3✔
143
        if err != nil {
3✔
144
                return err
×
145
        }
×
146

147
        var b bytes.Buffer
3✔
148
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
149
                return err
×
150
        }
×
151
        infoBytes := b.Bytes()
3✔
152

3✔
153
        var updateErr error
3✔
154
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
155
                // Reset the update error, to avoid carrying over an error
3✔
156
                // from a previous execution of the batched db transaction.
3✔
157
                updateErr = nil
3✔
158

3✔
159
                prefetchPayment(tx, paymentHash)
3✔
160
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
161
                if err != nil {
3✔
162
                        return err
×
163
                }
×
164

165
                // Get the existing status of this payment, if any.
166
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
167

3✔
168
                switch {
3✔
169
                // If no error is returned, it means we already have this
170
                // payment. We'll check the status to decide whether we allow
171
                // retrying the payment or return a specific error.
172
                case err == nil:
3✔
173
                        if err := paymentStatus.initializable(); err != nil {
6✔
174
                                updateErr = err
3✔
175
                                return nil
3✔
176
                        }
3✔
177

178
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
179
                // we'll return the error.
180
                case !errors.Is(err, ErrPaymentNotInitiated):
×
181
                        return err
×
182
                }
183

184
                // Before we set our new sequence number, we check whether this
185
                // payment has a previously set sequence number and remove its
186
                // index entry if it exists. This happens in the case where we
187
                // have a previously attempted payment which was left in a state
188
                // where we can retry.
189
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
190
                if seqBytes != nil {
6✔
191
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
192
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
193
                                return err
×
194
                        }
×
195
                }
196

197
                // Once we have obtained a sequence number, we add an entry
198
                // to our index bucket which will map the sequence number to
199
                // our payment identifier.
200
                err = createPaymentIndexEntry(
3✔
201
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
202
                )
3✔
203
                if err != nil {
3✔
204
                        return err
×
205
                }
×
206

207
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
208
                if err != nil {
3✔
209
                        return err
×
210
                }
×
211

212
                // Add the payment info to the bucket, which contains the
213
                // static information for this payment
214
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
215
                if err != nil {
3✔
216
                        return err
×
217
                }
×
218

219
                // We'll delete any lingering HTLCs to start with, in case we
220
                // are initializing a payment that was attempted earlier, but
221
                // left in a state where we could retry.
222
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
223
                if err != nil && err != kvdb.ErrBucketNotFound {
3✔
224
                        return err
×
225
                }
×
226

227
                // Also delete any lingering failure info now that we are
228
                // re-attempting.
229
                return bucket.Delete(paymentFailInfoKey)
3✔
230
        })
231
        if err != nil {
3✔
232
                return fmt.Errorf("unable to init payment: %w", err)
×
233
        }
×
234

235
        return updateErr
3✔
236
}
237

238
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
239
// by the PaymentControl db.
240
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
241
        if !p.db.keepFailedPaymentAttempts {
3✔
242
                const failedHtlcsOnly = true
×
243
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
×
244
                if err != nil {
×
245
                        return err
×
246
                }
×
247
        }
248
        return nil
3✔
249
}
250

251
// paymentIndexTypeHash is a payment index type which indicates that we have
252
// created an index of payment sequence number to payment hash.
253
type paymentIndexType uint8
254

255
// paymentIndexTypeHash is a payment index type which indicates that we have
256
// created an index of payment sequence number to payment hash.
257
const paymentIndexTypeHash paymentIndexType = 0
258

259
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
260
// index produced contains a payment index type (which can be used in future to
261
// signal different payment index types) and the payment identifier.
262
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
263
        id lntypes.Hash) error {
3✔
264

3✔
265
        var b bytes.Buffer
3✔
266
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
3✔
267
                return err
×
268
        }
×
269

270
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
271
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
272
}
273

274
// deserializePaymentIndex deserializes a payment index entry. This function
275
// currently only supports deserialization of payment hash indexes, and will
276
// fail for other types.
277
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
278
        var (
3✔
279
                indexType   paymentIndexType
3✔
280
                paymentHash []byte
3✔
281
        )
3✔
282

3✔
283
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
284
                return lntypes.Hash{}, err
×
285
        }
×
286

287
        // While we only have on payment index type, we do not need to use our
288
        // index type to deserialize the index. However, we sanity check that
289
        // this type is as expected, since we had to read it out anyway.
290
        if indexType != paymentIndexTypeHash {
3✔
291
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
292
                        "type: %v", indexType)
×
293
        }
×
294

295
        hash, err := lntypes.MakeHash(paymentHash)
3✔
296
        if err != nil {
3✔
297
                return lntypes.Hash{}, err
×
298
        }
×
299

300
        return hash, nil
3✔
301
}
302

303
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
304
// DB.
305
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
306
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
307

3✔
308
        // Serialize the information before opening the db transaction.
3✔
309
        var a bytes.Buffer
3✔
310
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
311
        if err != nil {
3✔
312
                return nil, err
×
313
        }
×
314
        htlcInfoBytes := a.Bytes()
3✔
315

3✔
316
        htlcIDBytes := make([]byte, 8)
3✔
317
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
318

3✔
319
        var payment *MPPayment
3✔
320
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
321
                prefetchPayment(tx, paymentHash)
3✔
322
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
323
                if err != nil {
3✔
324
                        return err
×
325
                }
×
326

327
                payment, err = fetchPayment(bucket)
3✔
328
                if err != nil {
3✔
329
                        return err
×
330
                }
×
331

332
                // Check if registering a new attempt is allowed.
333
                if err := payment.Registrable(); err != nil {
3✔
334
                        return err
×
335
                }
×
336

337
                // Make sure any existing shards match the new one with regards
338
                // to MPP options.
339
                mpp := attempt.Route.FinalHop().MPP
3✔
340
                for _, h := range payment.InFlightHTLCs() {
6✔
341
                        hMpp := h.Route.FinalHop().MPP
3✔
342

3✔
343
                        switch {
3✔
344
                        // We tried to register a non-MPP attempt for a MPP
345
                        // payment.
346
                        case mpp == nil && hMpp != nil:
×
347
                                return ErrMPPayment
×
348

349
                        // We tried to register a MPP shard for a non-MPP
350
                        // payment.
351
                        case mpp != nil && hMpp == nil:
×
352
                                return ErrNonMPPayment
×
353

354
                        // Non-MPP payment, nothing more to validate.
355
                        case mpp == nil:
×
356
                                continue
×
357
                        }
358

359
                        // Check that MPP options match.
360
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
3✔
361
                                return ErrMPPPaymentAddrMismatch
×
362
                        }
×
363

364
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
3✔
365
                                return ErrMPPTotalAmountMismatch
×
366
                        }
×
367
                }
368

369
                // If this is a non-MPP attempt, it must match the total amount
370
                // exactly.
371
                amt := attempt.Route.ReceiverAmt()
3✔
372
                if mpp == nil && amt != payment.Info.Value {
3✔
373
                        return ErrValueMismatch
×
374
                }
×
375

376
                // Ensure we aren't sending more than the total payment amount.
377
                sentAmt, _ := payment.SentAmt()
3✔
378
                if sentAmt+amt > payment.Info.Value {
3✔
379
                        return ErrValueExceedsAmt
×
380
                }
×
381

382
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
383
                        paymentHtlcsBucket,
3✔
384
                )
3✔
385
                if err != nil {
3✔
386
                        return err
×
387
                }
×
388

389
                err = htlcsBucket.Put(
3✔
390
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
391
                        htlcInfoBytes,
3✔
392
                )
3✔
393
                if err != nil {
3✔
394
                        return err
×
395
                }
×
396

397
                // Retrieve attempt info for the notification.
398
                payment, err = fetchPayment(bucket)
3✔
399
                return err
3✔
400
        })
401
        if err != nil {
3✔
402
                return nil, err
×
403
        }
×
404

405
        return payment, err
3✔
406
}
407

408
// SettleAttempt marks the given attempt settled with the preimage. If this is
409
// a multi shard payment, this might implicitly mean that the full payment
410
// succeeded.
411
//
412
// After invoking this method, InitPayment should always return an error to
413
// prevent us from making duplicate payments to the same payment hash. The
414
// provided preimage is atomically saved to the DB for record keeping.
415
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
416
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
417

3✔
418
        var b bytes.Buffer
3✔
419
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
420
                return nil, err
×
421
        }
×
422
        settleBytes := b.Bytes()
3✔
423

3✔
424
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
425
}
426

427
// FailAttempt marks the given payment attempt failed.
428
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
429
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
430

3✔
431
        var b bytes.Buffer
3✔
432
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
433
                return nil, err
×
434
        }
×
435
        failBytes := b.Bytes()
3✔
436

3✔
437
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
438
}
439

440
// updateHtlcKey updates a database key for the specified htlc.
441
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
442
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
443

3✔
444
        aid := make([]byte, 8)
3✔
445
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
446

3✔
447
        var payment *MPPayment
3✔
448
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
449
                payment = nil
3✔
450

3✔
451
                prefetchPayment(tx, paymentHash)
3✔
452
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
453
                if err != nil {
3✔
454
                        return err
×
455
                }
×
456

457
                p, err := fetchPayment(bucket)
3✔
458
                if err != nil {
3✔
459
                        return err
×
460
                }
×
461

462
                // We can only update keys of in-flight payments. We allow
463
                // updating keys even if the payment has reached a terminal
464
                // condition, since the HTLC outcomes must still be updated.
465
                if err := p.Status.updatable(); err != nil {
3✔
466
                        return err
×
467
                }
×
468

469
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
470
                if htlcsBucket == nil {
3✔
471
                        return fmt.Errorf("htlcs bucket not found")
×
472
                }
×
473

474
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
3✔
475
                        return fmt.Errorf("HTLC with ID %v not registered",
×
476
                                attemptID)
×
477
                }
×
478

479
                // Make sure the shard is not already failed or settled.
480
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
3✔
481
                        return ErrAttemptAlreadyFailed
×
482
                }
×
483

484
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
3✔
485
                        return ErrAttemptAlreadySettled
×
486
                }
×
487

488
                // Add or update the key for this htlc.
489
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
490
                if err != nil {
3✔
491
                        return err
×
492
                }
×
493

494
                // Retrieve attempt info for the notification.
495
                payment, err = fetchPayment(bucket)
3✔
496
                return err
3✔
497
        })
498
        if err != nil {
3✔
499
                return nil, err
×
500
        }
×
501

502
        return payment, err
3✔
503
}
504

505
// Fail transitions a payment into the Failed state, and records the reason the
506
// payment failed. After invoking this method, InitPayment should return nil on
507
// its next call for this payment hash, allowing the switch to make a
508
// subsequent payment.
509
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
510
        reason FailureReason) (*MPPayment, error) {
3✔
511

3✔
512
        var (
3✔
513
                updateErr error
3✔
514
                payment   *MPPayment
3✔
515
        )
3✔
516
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
517
                // Reset the update error, to avoid carrying over an error
3✔
518
                // from a previous execution of the batched db transaction.
3✔
519
                updateErr = nil
3✔
520
                payment = nil
3✔
521

3✔
522
                prefetchPayment(tx, paymentHash)
3✔
523
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
524
                if err == ErrPaymentNotInitiated {
3✔
525
                        updateErr = ErrPaymentNotInitiated
×
526
                        return nil
×
527
                } else if err != nil {
3✔
528
                        return err
×
529
                }
×
530

531
                // We mark the payment as failed as long as it is known. This
532
                // lets the last attempt to fail with a terminal write its
533
                // failure to the PaymentControl without synchronizing with
534
                // other attempts.
535
                _, err = fetchPaymentStatus(bucket)
3✔
536
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
537
                        updateErr = ErrPaymentNotInitiated
×
538
                        return nil
×
539
                } else if err != nil {
3✔
540
                        return err
×
541
                }
×
542

543
                // Put the failure reason in the bucket for record keeping.
544
                v := []byte{byte(reason)}
3✔
545
                err = bucket.Put(paymentFailInfoKey, v)
3✔
546
                if err != nil {
3✔
547
                        return err
×
548
                }
×
549

550
                // Retrieve attempt info for the notification, if available.
551
                payment, err = fetchPayment(bucket)
3✔
552
                if err != nil {
3✔
553
                        return err
×
554
                }
×
555

556
                return nil
3✔
557
        })
558
        if err != nil {
3✔
559
                return nil, err
×
560
        }
×
561

562
        return payment, updateErr
3✔
563
}
564

565
// FetchPayment returns information about a payment from the database.
566
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
567
        *MPPayment, error) {
3✔
568

3✔
569
        var payment *MPPayment
3✔
570
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
571
                prefetchPayment(tx, paymentHash)
3✔
572
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
573
                if err != nil {
3✔
574
                        return err
×
575
                }
×
576

577
                payment, err = fetchPayment(bucket)
3✔
578

3✔
579
                return err
3✔
580
        }, func() {
3✔
581
                payment = nil
3✔
582
        })
3✔
583
        if err != nil {
3✔
584
                return nil, err
×
585
        }
×
586

587
        return payment, nil
3✔
588
}
589

590
// prefetchPayment attempts to prefetch as much of the payment as possible to
591
// reduce DB roundtrips.
592
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
593
        rb := kvdb.RootBucket(tx)
3✔
594
        kvdb.Prefetch(
3✔
595
                rb,
3✔
596
                []string{
3✔
597
                        // Prefetch all keys in the payment's bucket.
3✔
598
                        string(paymentsRootBucket),
3✔
599
                        string(paymentHash[:]),
3✔
600
                },
3✔
601
                []string{
3✔
602
                        // Prefetch all keys in the payment's htlc bucket.
3✔
603
                        string(paymentsRootBucket),
3✔
604
                        string(paymentHash[:]),
3✔
605
                        string(paymentHtlcsBucket),
3✔
606
                },
3✔
607
        )
3✔
608
}
3✔
609

610
// createPaymentBucket creates or fetches the sub-bucket assigned to this
611
// payment hash.
612
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
613
        kvdb.RwBucket, error) {
3✔
614

3✔
615
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
616
        if err != nil {
3✔
617
                return nil, err
×
618
        }
×
619

620
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
621
}
622

623
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
624
// the bucket does not exist, it returns ErrPaymentNotInitiated.
625
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
626
        kvdb.RBucket, error) {
3✔
627

3✔
628
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
629
        if payments == nil {
3✔
630
                return nil, ErrPaymentNotInitiated
×
631
        }
×
632

633
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
634
        if bucket == nil {
3✔
635
                return nil, ErrPaymentNotInitiated
×
636
        }
×
637

638
        return bucket, nil
3✔
639

640
}
641

642
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
643
// bucket that can be written to.
644
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
645
        kvdb.RwBucket, error) {
3✔
646

3✔
647
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
648
        if payments == nil {
3✔
649
                return nil, ErrPaymentNotInitiated
×
650
        }
×
651

652
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
653
        if bucket == nil {
3✔
654
                return nil, ErrPaymentNotInitiated
×
655
        }
×
656

657
        return bucket, nil
3✔
658
}
659

660
// nextPaymentSequence returns the next sequence number to store for a new
661
// payment.
662
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
3✔
663
        p.paymentSeqMx.Lock()
3✔
664
        defer p.paymentSeqMx.Unlock()
3✔
665

3✔
666
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
667
        // conflicts on the sequence when using etcd.
3✔
668
        if p.currPaymentSeq == p.storedPaymentSeq {
6✔
669
                var currPaymentSeq, newUpperBound uint64
3✔
670
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
671
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
672
                                paymentsRootBucket,
3✔
673
                        )
3✔
674
                        if err != nil {
3✔
675
                                return err
×
676
                        }
×
677

678
                        currPaymentSeq = paymentsBucket.Sequence()
3✔
679
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
3✔
680
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
681
                }, func() {}); err != nil {
3✔
682
                        return nil, err
×
683
                }
×
684

685
                // We lazy initialize the cached currPaymentSeq here using the
686
                // first nextPaymentSequence() call. This if statement will auto
687
                // initialize our stored currPaymentSeq, since by default both
688
                // this variable and storedPaymentSeq are zero which in turn
689
                // will have us fetch the current values from the DB.
690
                if p.currPaymentSeq == 0 {
6✔
691
                        p.currPaymentSeq = currPaymentSeq
3✔
692
                }
3✔
693

694
                p.storedPaymentSeq = newUpperBound
3✔
695
        }
696

697
        p.currPaymentSeq++
3✔
698
        b := make([]byte, 8)
3✔
699
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
3✔
700

3✔
701
        return b, nil
3✔
702
}
703

704
// fetchPaymentStatus fetches the payment status of the payment. If the payment
705
// isn't found, it will return error `ErrPaymentNotInitiated`.
706
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
707
        // Creation info should be set for all payments, regardless of state.
3✔
708
        // If not, it is unknown.
3✔
709
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
710
                return 0, ErrPaymentNotInitiated
3✔
711
        }
3✔
712

713
        payment, err := fetchPayment(bucket)
3✔
714
        if err != nil {
3✔
715
                return 0, err
×
716
        }
×
717

718
        return payment.Status, nil
3✔
719
}
720

721
// FetchInFlightPayments returns all payments with status InFlight.
722
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
723
        var inFlights []*MPPayment
3✔
724
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
725
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
726
                if payments == nil {
6✔
727
                        return nil
3✔
728
                }
3✔
729

730
                return payments.ForEach(func(k, _ []byte) error {
6✔
731
                        bucket := payments.NestedReadBucket(k)
3✔
732
                        if bucket == nil {
3✔
733
                                return fmt.Errorf("non bucket element")
×
734
                        }
×
735

736
                        p, err := fetchPayment(bucket)
3✔
737
                        if err != nil {
3✔
738
                                return err
×
739
                        }
×
740

741
                        // Skip the payment if it's terminated.
742
                        if p.Terminated() {
6✔
743
                                return nil
3✔
744
                        }
3✔
745

746
                        inFlights = append(inFlights, p)
3✔
747
                        return nil
3✔
748
                })
749
        }, func() {
3✔
750
                inFlights = nil
3✔
751
        })
3✔
752
        if err != nil {
3✔
753
                return nil, err
×
754
        }
×
755

756
        return inFlights, nil
3✔
757
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc