• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11170835610

03 Oct 2024 10:41PM UTC coverage: 49.188% (-9.6%) from 58.738%
11170835610

push

github

web-flow
Merge pull request #9154 from ziggie1984/master

multi: bump btcd version.

3 of 6 new or added lines in 6 files covered. (50.0%)

26110 existing lines in 428 files now uncovered.

97359 of 197934 relevant lines covered (49.19%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.11
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10

11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
)
14

15
const (
16
        // paymentSeqBlockSize is the block size used when we batch allocate
17
        // payment sequences for future payments.
18
        paymentSeqBlockSize = 1000
19
)
20

21
var (
22
        // ErrAlreadyPaid signals we have already paid this payment hash.
23
        ErrAlreadyPaid = errors.New("invoice is already paid")
24

25
        // ErrPaymentInFlight signals that payment for this payment hash is
26
        // already "in flight" on the network.
27
        ErrPaymentInFlight = errors.New("payment is in transition")
28

29
        // ErrPaymentExists is returned when we try to initialize an already
30
        // existing payment that is not failed.
31
        ErrPaymentExists = errors.New("payment already exists")
32

33
        // ErrPaymentInternal is returned when performing the payment has a
34
        // conflicting state, such as,
35
        // - payment has StatusSucceeded but remaining amount is not zero.
36
        // - payment has StatusInitiated but remaining amount is zero.
37
        // - payment has StatusFailed but remaining amount is zero.
38
        ErrPaymentInternal = errors.New("internal error")
39

40
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
41
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
42

43
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
44
        // change the status of a payment already succeeded.
45
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
46

47
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
48
        // a failed payment.
49
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
50

51
        // ErrUnknownPaymentStatus is returned when we do not recognize the
52
        // existing state of a payment.
53
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
54

55
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
56
        // already has reached a terminal condition.
57
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
58
                "condition")
59

60
        // ErrAttemptAlreadySettled is returned if we try to alter an already
61
        // settled HTLC attempt.
62
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
63

64
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
65
        // failed HTLC attempt.
66
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
67

68
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
69
        // with an amount that doesn't match the payment amount.
70
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
71
                "amount")
72

73
        // ErrValueExceedsAmt is returned if we try to register an attempt that
74
        // would take the total sent amount above the payment amount.
75
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
76
                "amount")
77

78
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
79
        // a payment that already has a non-MPP attempt registered.
80
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
81

82
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
83
        // a payment that already has an MPP attempt registered.
84
        ErrMPPayment = errors.New("payment has MPP attempts")
85

86
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
87
        // attempt with an MPP record for a payment to a blinded path.
88
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
89
                "contain MPP records")
90

91
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
92
        // register an HTLC shard to a blinded route where the total amount
93
        // doesn't match existing shards.
94
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
95
                "total amount mismatch")
96

97
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
98
        // shard where the payment address doesn't match existing shards.
99
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
100

101
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
102
        // shard where the total amount doesn't match existing shards.
103
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
104
                "mismatch")
105

106
        // ErrPaymentPendingSettled is returned when we try to add a new
107
        // attempt to a payment that has at least one of its HTLCs settled.
108
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
109

110
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
111
        // to a payment that already has a failure reason.
112
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
113

114
        // ErrSentExceedsTotal is returned if the payment's current total sent
115
        // amount exceed the total amount.
116
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
117

118
        // errNoAttemptInfo is returned when no attempt info is stored yet.
119
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
120
                "inflight payment")
121

122
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
123
        // index is made for a sequence number that is not indexed.
124
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
125
                "does not exist")
126
)
127

128
// PaymentControl implements persistence for payments and payment attempts.
129
type PaymentControl struct {
130
        paymentSeqMx     sync.Mutex
131
        currPaymentSeq   uint64
132
        storedPaymentSeq uint64
133
        db               *DB
134
}
135

136
// NewPaymentControl creates a new instance of the PaymentControl.
137
func NewPaymentControl(db *DB) *PaymentControl {
2✔
138
        return &PaymentControl{
2✔
139
                db: db,
2✔
140
        }
2✔
141
}
2✔
142

143
// InitPayment checks or records the given PaymentCreationInfo with the DB,
144
// making sure it does not already exist as an in-flight payment. When this
145
// method returns successfully, the payment is guaranteed to be in the InFlight
146
// state.
147
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
148
        info *PaymentCreationInfo) error {
2✔
149

2✔
150
        // Obtain a new sequence number for this payment. This is used
2✔
151
        // to sort the payments in order of creation, and also acts as
2✔
152
        // a unique identifier for each payment.
2✔
153
        sequenceNum, err := p.nextPaymentSequence()
2✔
154
        if err != nil {
2✔
155
                return err
×
156
        }
×
157

158
        var b bytes.Buffer
2✔
159
        if err := serializePaymentCreationInfo(&b, info); err != nil {
2✔
160
                return err
×
161
        }
×
162
        infoBytes := b.Bytes()
2✔
163

2✔
164
        var updateErr error
2✔
165
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
4✔
166
                // Reset the update error, to avoid carrying over an error
2✔
167
                // from a previous execution of the batched db transaction.
2✔
168
                updateErr = nil
2✔
169

2✔
170
                prefetchPayment(tx, paymentHash)
2✔
171
                bucket, err := createPaymentBucket(tx, paymentHash)
2✔
172
                if err != nil {
2✔
173
                        return err
×
174
                }
×
175

176
                // Get the existing status of this payment, if any.
177
                paymentStatus, err := fetchPaymentStatus(bucket)
2✔
178

2✔
179
                switch {
2✔
180
                // If no error is returned, it means we already have this
181
                // payment. We'll check the status to decide whether we allow
182
                // retrying the payment or return a specific error.
183
                case err == nil:
2✔
184
                        if err := paymentStatus.initializable(); err != nil {
4✔
185
                                updateErr = err
2✔
186
                                return nil
2✔
187
                        }
2✔
188

189
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
190
                // we'll return the error.
191
                case !errors.Is(err, ErrPaymentNotInitiated):
×
192
                        return err
×
193
                }
194

195
                // Before we set our new sequence number, we check whether this
196
                // payment has a previously set sequence number and remove its
197
                // index entry if it exists. This happens in the case where we
198
                // have a previously attempted payment which was left in a state
199
                // where we can retry.
200
                seqBytes := bucket.Get(paymentSequenceKey)
2✔
201
                if seqBytes != nil {
4✔
202
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
2✔
203
                        if err := indexBucket.Delete(seqBytes); err != nil {
2✔
204
                                return err
×
205
                        }
×
206
                }
207

208
                // Once we have obtained a sequence number, we add an entry
209
                // to our index bucket which will map the sequence number to
210
                // our payment identifier.
211
                err = createPaymentIndexEntry(
2✔
212
                        tx, sequenceNum, info.PaymentIdentifier,
2✔
213
                )
2✔
214
                if err != nil {
2✔
215
                        return err
×
216
                }
×
217

218
                err = bucket.Put(paymentSequenceKey, sequenceNum)
2✔
219
                if err != nil {
2✔
220
                        return err
×
221
                }
×
222

223
                // Add the payment info to the bucket, which contains the
224
                // static information for this payment
225
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
2✔
226
                if err != nil {
2✔
227
                        return err
×
228
                }
×
229

230
                // We'll delete any lingering HTLCs to start with, in case we
231
                // are initializing a payment that was attempted earlier, but
232
                // left in a state where we could retry.
233
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
2✔
234
                if err != nil && err != kvdb.ErrBucketNotFound {
2✔
235
                        return err
×
236
                }
×
237

238
                // Also delete any lingering failure info now that we are
239
                // re-attempting.
240
                return bucket.Delete(paymentFailInfoKey)
2✔
241
        })
242
        if err != nil {
2✔
243
                return fmt.Errorf("unable to init payment: %w", err)
×
244
        }
×
245

246
        return updateErr
2✔
247
}
248

249
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
250
// by the PaymentControl db.
251
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
2✔
252
        if !p.db.keepFailedPaymentAttempts {
2✔
UNCOV
253
                const failedHtlcsOnly = true
×
UNCOV
254
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
×
UNCOV
255
                if err != nil {
×
UNCOV
256
                        return err
×
UNCOV
257
                }
×
258
        }
259
        return nil
2✔
260
}
261

262
// paymentIndexTypeHash is a payment index type which indicates that we have
263
// created an index of payment sequence number to payment hash.
264
type paymentIndexType uint8
265

266
// paymentIndexTypeHash is a payment index type which indicates that we have
267
// created an index of payment sequence number to payment hash.
268
const paymentIndexTypeHash paymentIndexType = 0
269

270
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
271
// index produced contains a payment index type (which can be used in future to
272
// signal different payment index types) and the payment identifier.
273
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
274
        id lntypes.Hash) error {
2✔
275

2✔
276
        var b bytes.Buffer
2✔
277
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
2✔
278
                return err
×
279
        }
×
280

281
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
2✔
282
        return indexes.Put(sequenceNumber, b.Bytes())
2✔
283
}
284

285
// deserializePaymentIndex deserializes a payment index entry. This function
286
// currently only supports deserialization of payment hash indexes, and will
287
// fail for other types.
288
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
2✔
289
        var (
2✔
290
                indexType   paymentIndexType
2✔
291
                paymentHash []byte
2✔
292
        )
2✔
293

2✔
294
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
2✔
295
                return lntypes.Hash{}, err
×
296
        }
×
297

298
        // While we only have on payment index type, we do not need to use our
299
        // index type to deserialize the index. However, we sanity check that
300
        // this type is as expected, since we had to read it out anyway.
301
        if indexType != paymentIndexTypeHash {
2✔
302
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
303
                        "type: %v", indexType)
×
304
        }
×
305

306
        hash, err := lntypes.MakeHash(paymentHash)
2✔
307
        if err != nil {
2✔
308
                return lntypes.Hash{}, err
×
309
        }
×
310

311
        return hash, nil
2✔
312
}
313

314
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
315
// DB.
316
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
317
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
2✔
318

2✔
319
        // Serialize the information before opening the db transaction.
2✔
320
        var a bytes.Buffer
2✔
321
        err := serializeHTLCAttemptInfo(&a, attempt)
2✔
322
        if err != nil {
2✔
323
                return nil, err
×
324
        }
×
325
        htlcInfoBytes := a.Bytes()
2✔
326

2✔
327
        htlcIDBytes := make([]byte, 8)
2✔
328
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
2✔
329

2✔
330
        var payment *MPPayment
2✔
331
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
4✔
332
                prefetchPayment(tx, paymentHash)
2✔
333
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
2✔
334
                if err != nil {
2✔
335
                        return err
×
336
                }
×
337

338
                payment, err = fetchPayment(bucket)
2✔
339
                if err != nil {
2✔
340
                        return err
×
341
                }
×
342

343
                // Check if registering a new attempt is allowed.
344
                if err := payment.Registrable(); err != nil {
2✔
UNCOV
345
                        return err
×
UNCOV
346
                }
×
347

348
                // If the final hop has encrypted data, then we know this is a
349
                // blinded payment. In blinded payments, MPP records are not set
350
                // for split payments and the recipient is responsible for using
351
                // a consistent PathID across the various encrypted data
352
                // payloads that we received from them for this payment. All we
353
                // need to check is that the total amount field for each HTLC
354
                // in the split payment is correct.
355
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
2✔
356

2✔
357
                // Make sure any existing shards match the new one with regards
2✔
358
                // to MPP options.
2✔
359
                mpp := attempt.Route.FinalHop().MPP
2✔
360

2✔
361
                // MPP records should not be set for attempts to blinded paths.
2✔
362
                if isBlinded && mpp != nil {
2✔
363
                        return ErrMPPRecordInBlindedPayment
×
364
                }
×
365

366
                for _, h := range payment.InFlightHTLCs() {
4✔
367
                        hMpp := h.Route.FinalHop().MPP
2✔
368

2✔
369
                        // If this is a blinded payment, then no existing HTLCs
2✔
370
                        // should have MPP records.
2✔
371
                        if isBlinded && hMpp != nil {
2✔
372
                                return ErrMPPRecordInBlindedPayment
×
373
                        }
×
374

375
                        // If this is a blinded payment, then we just need to
376
                        // check that the TotalAmtMsat field for this shard
377
                        // is equal to that of any other shard in the same
378
                        // payment.
379
                        if isBlinded {
4✔
380
                                if attempt.Route.FinalHop().TotalAmtMsat !=
2✔
381
                                        h.Route.FinalHop().TotalAmtMsat {
2✔
382

×
383
                                        //nolint:lll
×
384
                                        return ErrBlindedPaymentTotalAmountMismatch
×
385
                                }
×
386

387
                                continue
2✔
388
                        }
389

390
                        switch {
2✔
391
                        // We tried to register a non-MPP attempt for a MPP
392
                        // payment.
UNCOV
393
                        case mpp == nil && hMpp != nil:
×
UNCOV
394
                                return ErrMPPayment
×
395

396
                        // We tried to register a MPP shard for a non-MPP
397
                        // payment.
UNCOV
398
                        case mpp != nil && hMpp == nil:
×
UNCOV
399
                                return ErrNonMPPayment
×
400

401
                        // Non-MPP payment, nothing more to validate.
402
                        case mpp == nil:
×
403
                                continue
×
404
                        }
405

406
                        // Check that MPP options match.
407
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
2✔
UNCOV
408
                                return ErrMPPPaymentAddrMismatch
×
UNCOV
409
                        }
×
410

411
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
2✔
UNCOV
412
                                return ErrMPPTotalAmountMismatch
×
UNCOV
413
                        }
×
414
                }
415

416
                // If this is a non-MPP attempt, it must match the total amount
417
                // exactly. Note that a blinded payment is considered an MPP
418
                // attempt.
419
                amt := attempt.Route.ReceiverAmt()
2✔
420
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
2✔
421
                        return ErrValueMismatch
×
422
                }
×
423

424
                // Ensure we aren't sending more than the total payment amount.
425
                sentAmt, _ := payment.SentAmt()
2✔
426
                if sentAmt+amt > payment.Info.Value {
2✔
UNCOV
427
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
×
UNCOV
428
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
×
UNCOV
429
                                payment.Info.Value)
×
UNCOV
430
                }
×
431

432
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
2✔
433
                        paymentHtlcsBucket,
2✔
434
                )
2✔
435
                if err != nil {
2✔
436
                        return err
×
437
                }
×
438

439
                err = htlcsBucket.Put(
2✔
440
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
2✔
441
                        htlcInfoBytes,
2✔
442
                )
2✔
443
                if err != nil {
2✔
444
                        return err
×
445
                }
×
446

447
                // Retrieve attempt info for the notification.
448
                payment, err = fetchPayment(bucket)
2✔
449
                return err
2✔
450
        })
451
        if err != nil {
2✔
UNCOV
452
                return nil, err
×
UNCOV
453
        }
×
454

455
        return payment, err
2✔
456
}
457

458
// SettleAttempt marks the given attempt settled with the preimage. If this is
459
// a multi shard payment, this might implicitly mean that the full payment
460
// succeeded.
461
//
462
// After invoking this method, InitPayment should always return an error to
463
// prevent us from making duplicate payments to the same payment hash. The
464
// provided preimage is atomically saved to the DB for record keeping.
465
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
466
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
2✔
467

2✔
468
        var b bytes.Buffer
2✔
469
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
2✔
470
                return nil, err
×
471
        }
×
472
        settleBytes := b.Bytes()
2✔
473

2✔
474
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
2✔
475
}
476

477
// FailAttempt marks the given payment attempt failed.
478
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
479
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
2✔
480

2✔
481
        var b bytes.Buffer
2✔
482
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
2✔
483
                return nil, err
×
484
        }
×
485
        failBytes := b.Bytes()
2✔
486

2✔
487
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
2✔
488
}
489

490
// updateHtlcKey updates a database key for the specified htlc.
491
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
492
        attemptID uint64, key, value []byte) (*MPPayment, error) {
2✔
493

2✔
494
        aid := make([]byte, 8)
2✔
495
        binary.BigEndian.PutUint64(aid, attemptID)
2✔
496

2✔
497
        var payment *MPPayment
2✔
498
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
4✔
499
                payment = nil
2✔
500

2✔
501
                prefetchPayment(tx, paymentHash)
2✔
502
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
2✔
503
                if err != nil {
2✔
UNCOV
504
                        return err
×
UNCOV
505
                }
×
506

507
                p, err := fetchPayment(bucket)
2✔
508
                if err != nil {
2✔
509
                        return err
×
510
                }
×
511

512
                // We can only update keys of in-flight payments. We allow
513
                // updating keys even if the payment has reached a terminal
514
                // condition, since the HTLC outcomes must still be updated.
515
                if err := p.Status.updatable(); err != nil {
2✔
516
                        return err
×
517
                }
×
518

519
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
2✔
520
                if htlcsBucket == nil {
2✔
521
                        return fmt.Errorf("htlcs bucket not found")
×
522
                }
×
523

524
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
2✔
525
                        return fmt.Errorf("HTLC with ID %v not registered",
×
526
                                attemptID)
×
527
                }
×
528

529
                // Make sure the shard is not already failed or settled.
530
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
2✔
531
                        return ErrAttemptAlreadyFailed
×
532
                }
×
533

534
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
2✔
535
                        return ErrAttemptAlreadySettled
×
536
                }
×
537

538
                // Add or update the key for this htlc.
539
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
2✔
540
                if err != nil {
2✔
541
                        return err
×
542
                }
×
543

544
                // Retrieve attempt info for the notification.
545
                payment, err = fetchPayment(bucket)
2✔
546
                return err
2✔
547
        })
548
        if err != nil {
2✔
UNCOV
549
                return nil, err
×
UNCOV
550
        }
×
551

552
        return payment, err
2✔
553
}
554

555
// Fail transitions a payment into the Failed state, and records the reason the
556
// payment failed. After invoking this method, InitPayment should return nil on
557
// its next call for this payment hash, allowing the switch to make a
558
// subsequent payment.
559
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
560
        reason FailureReason) (*MPPayment, error) {
2✔
561

2✔
562
        var (
2✔
563
                updateErr error
2✔
564
                payment   *MPPayment
2✔
565
        )
2✔
566
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
4✔
567
                // Reset the update error, to avoid carrying over an error
2✔
568
                // from a previous execution of the batched db transaction.
2✔
569
                updateErr = nil
2✔
570
                payment = nil
2✔
571

2✔
572
                prefetchPayment(tx, paymentHash)
2✔
573
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
2✔
574
                if err == ErrPaymentNotInitiated {
2✔
UNCOV
575
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
576
                        return nil
×
577
                } else if err != nil {
2✔
578
                        return err
×
579
                }
×
580

581
                // We mark the payment as failed as long as it is known. This
582
                // lets the last attempt to fail with a terminal write its
583
                // failure to the PaymentControl without synchronizing with
584
                // other attempts.
585
                _, err = fetchPaymentStatus(bucket)
2✔
586
                if errors.Is(err, ErrPaymentNotInitiated) {
2✔
587
                        updateErr = ErrPaymentNotInitiated
×
588
                        return nil
×
589
                } else if err != nil {
2✔
590
                        return err
×
591
                }
×
592

593
                // Put the failure reason in the bucket for record keeping.
594
                v := []byte{byte(reason)}
2✔
595
                err = bucket.Put(paymentFailInfoKey, v)
2✔
596
                if err != nil {
2✔
597
                        return err
×
598
                }
×
599

600
                // Retrieve attempt info for the notification, if available.
601
                payment, err = fetchPayment(bucket)
2✔
602
                if err != nil {
2✔
603
                        return err
×
604
                }
×
605

606
                return nil
2✔
607
        })
608
        if err != nil {
2✔
609
                return nil, err
×
610
        }
×
611

612
        return payment, updateErr
2✔
613
}
614

615
// FetchPayment returns information about a payment from the database.
616
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
617
        *MPPayment, error) {
2✔
618

2✔
619
        var payment *MPPayment
2✔
620
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
4✔
621
                prefetchPayment(tx, paymentHash)
2✔
622
                bucket, err := fetchPaymentBucket(tx, paymentHash)
2✔
623
                if err != nil {
2✔
UNCOV
624
                        return err
×
UNCOV
625
                }
×
626

627
                payment, err = fetchPayment(bucket)
2✔
628

2✔
629
                return err
2✔
630
        }, func() {
2✔
631
                payment = nil
2✔
632
        })
2✔
633
        if err != nil {
2✔
UNCOV
634
                return nil, err
×
UNCOV
635
        }
×
636

637
        return payment, nil
2✔
638
}
639

640
// prefetchPayment attempts to prefetch as much of the payment as possible to
641
// reduce DB roundtrips.
642
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
2✔
643
        rb := kvdb.RootBucket(tx)
2✔
644
        kvdb.Prefetch(
2✔
645
                rb,
2✔
646
                []string{
2✔
647
                        // Prefetch all keys in the payment's bucket.
2✔
648
                        string(paymentsRootBucket),
2✔
649
                        string(paymentHash[:]),
2✔
650
                },
2✔
651
                []string{
2✔
652
                        // Prefetch all keys in the payment's htlc bucket.
2✔
653
                        string(paymentsRootBucket),
2✔
654
                        string(paymentHash[:]),
2✔
655
                        string(paymentHtlcsBucket),
2✔
656
                },
2✔
657
        )
2✔
658
}
2✔
659

660
// createPaymentBucket creates or fetches the sub-bucket assigned to this
661
// payment hash.
662
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
663
        kvdb.RwBucket, error) {
2✔
664

2✔
665
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
2✔
666
        if err != nil {
2✔
667
                return nil, err
×
668
        }
×
669

670
        return payments.CreateBucketIfNotExists(paymentHash[:])
2✔
671
}
672

673
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
674
// the bucket does not exist, it returns ErrPaymentNotInitiated.
675
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
676
        kvdb.RBucket, error) {
2✔
677

2✔
678
        payments := tx.ReadBucket(paymentsRootBucket)
2✔
679
        if payments == nil {
2✔
UNCOV
680
                return nil, ErrPaymentNotInitiated
×
UNCOV
681
        }
×
682

683
        bucket := payments.NestedReadBucket(paymentHash[:])
2✔
684
        if bucket == nil {
2✔
685
                return nil, ErrPaymentNotInitiated
×
686
        }
×
687

688
        return bucket, nil
2✔
689

690
}
691

692
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
693
// bucket that can be written to.
694
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
695
        kvdb.RwBucket, error) {
2✔
696

2✔
697
        payments := tx.ReadWriteBucket(paymentsRootBucket)
2✔
698
        if payments == nil {
2✔
UNCOV
699
                return nil, ErrPaymentNotInitiated
×
UNCOV
700
        }
×
701

702
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
2✔
703
        if bucket == nil {
2✔
704
                return nil, ErrPaymentNotInitiated
×
705
        }
×
706

707
        return bucket, nil
2✔
708
}
709

710
// nextPaymentSequence returns the next sequence number to store for a new
711
// payment.
712
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
2✔
713
        p.paymentSeqMx.Lock()
2✔
714
        defer p.paymentSeqMx.Unlock()
2✔
715

2✔
716
        // Set a new upper bound in the DB every 1000 payments to avoid
2✔
717
        // conflicts on the sequence when using etcd.
2✔
718
        if p.currPaymentSeq == p.storedPaymentSeq {
4✔
719
                var currPaymentSeq, newUpperBound uint64
2✔
720
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
4✔
721
                        paymentsBucket, err := tx.CreateTopLevelBucket(
2✔
722
                                paymentsRootBucket,
2✔
723
                        )
2✔
724
                        if err != nil {
2✔
725
                                return err
×
726
                        }
×
727

728
                        currPaymentSeq = paymentsBucket.Sequence()
2✔
729
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
2✔
730
                        return paymentsBucket.SetSequence(newUpperBound)
2✔
731
                }, func() {}); err != nil {
2✔
732
                        return nil, err
×
733
                }
×
734

735
                // We lazy initialize the cached currPaymentSeq here using the
736
                // first nextPaymentSequence() call. This if statement will auto
737
                // initialize our stored currPaymentSeq, since by default both
738
                // this variable and storedPaymentSeq are zero which in turn
739
                // will have us fetch the current values from the DB.
740
                if p.currPaymentSeq == 0 {
4✔
741
                        p.currPaymentSeq = currPaymentSeq
2✔
742
                }
2✔
743

744
                p.storedPaymentSeq = newUpperBound
2✔
745
        }
746

747
        p.currPaymentSeq++
2✔
748
        b := make([]byte, 8)
2✔
749
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
2✔
750

2✔
751
        return b, nil
2✔
752
}
753

754
// fetchPaymentStatus fetches the payment status of the payment. If the payment
755
// isn't found, it will return error `ErrPaymentNotInitiated`.
756
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
2✔
757
        // Creation info should be set for all payments, regardless of state.
2✔
758
        // If not, it is unknown.
2✔
759
        if bucket.Get(paymentCreationInfoKey) == nil {
4✔
760
                return 0, ErrPaymentNotInitiated
2✔
761
        }
2✔
762

763
        payment, err := fetchPayment(bucket)
2✔
764
        if err != nil {
2✔
765
                return 0, err
×
766
        }
×
767

768
        return payment.Status, nil
2✔
769
}
770

771
// FetchInFlightPayments returns all payments with status InFlight.
772
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
2✔
773
        var inFlights []*MPPayment
2✔
774
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
4✔
775
                payments := tx.ReadBucket(paymentsRootBucket)
2✔
776
                if payments == nil {
4✔
777
                        return nil
2✔
778
                }
2✔
779

780
                return payments.ForEach(func(k, _ []byte) error {
4✔
781
                        bucket := payments.NestedReadBucket(k)
2✔
782
                        if bucket == nil {
2✔
783
                                return fmt.Errorf("non bucket element")
×
784
                        }
×
785

786
                        p, err := fetchPayment(bucket)
2✔
787
                        if err != nil {
2✔
788
                                return err
×
789
                        }
×
790

791
                        // Skip the payment if it's terminated.
792
                        if p.Terminated() {
4✔
793
                                return nil
2✔
794
                        }
2✔
795

796
                        inFlights = append(inFlights, p)
2✔
797
                        return nil
2✔
798
                })
799
        }, func() {
2✔
800
                inFlights = nil
2✔
801
        })
2✔
802
        if err != nil {
2✔
803
                return nil, err
×
804
        }
×
805

806
        return inFlights, nil
2✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc