• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 11216766535

07 Oct 2024 01:37PM UTC coverage: 57.817% (-1.0%) from 58.817%
11216766535

Pull #9148

github

ProofOfKeags
lnwire: remove kickoff feerate from propose/commit
Pull Request #9148: DynComms [2/n]: lnwire: add authenticated wire messages for Dyn*

571 of 879 new or added lines in 16 files covered. (64.96%)

23253 existing lines in 251 files now uncovered.

99022 of 171268 relevant lines covered (57.82%)

38420.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.97
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10

11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
)
14

15
const (
16
        // paymentSeqBlockSize is the block size used when we batch allocate
17
        // payment sequences for future payments.
18
        paymentSeqBlockSize = 1000
19
)
20

21
var (
22
        // ErrAlreadyPaid signals we have already paid this payment hash.
23
        ErrAlreadyPaid = errors.New("invoice is already paid")
24

25
        // ErrPaymentInFlight signals that payment for this payment hash is
26
        // already "in flight" on the network.
27
        ErrPaymentInFlight = errors.New("payment is in transition")
28

29
        // ErrPaymentExists is returned when we try to initialize an already
30
        // existing payment that is not failed.
31
        ErrPaymentExists = errors.New("payment already exists")
32

33
        // ErrPaymentInternal is returned when performing the payment has a
34
        // conflicting state, such as,
35
        // - payment has StatusSucceeded but remaining amount is not zero.
36
        // - payment has StatusInitiated but remaining amount is zero.
37
        // - payment has StatusFailed but remaining amount is zero.
38
        ErrPaymentInternal = errors.New("internal error")
39

40
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
41
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
42

43
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
44
        // change the status of a payment already succeeded.
45
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
46

47
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
48
        // a failed payment.
49
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
50

51
        // ErrUnknownPaymentStatus is returned when we do not recognize the
52
        // existing state of a payment.
53
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
54

55
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
56
        // already has reached a terminal condition.
57
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
58
                "condition")
59

60
        // ErrAttemptAlreadySettled is returned if we try to alter an already
61
        // settled HTLC attempt.
62
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
63

64
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
65
        // failed HTLC attempt.
66
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
67

68
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
69
        // with an amount that doesn't match the payment amount.
70
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
71
                "amount")
72

73
        // ErrValueExceedsAmt is returned if we try to register an attempt that
74
        // would take the total sent amount above the payment amount.
75
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
76
                "amount")
77

78
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
79
        // a payment that already has a non-MPP attempt registered.
80
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
81

82
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
83
        // a payment that already has an MPP attempt registered.
84
        ErrMPPayment = errors.New("payment has MPP attempts")
85

86
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
87
        // attempt with an MPP record for a payment to a blinded path.
88
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
89
                "contain MPP records")
90

91
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
92
        // register an HTLC shard to a blinded route where the total amount
93
        // doesn't match existing shards.
94
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
95
                "total amount mismatch")
96

97
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
98
        // shard where the payment address doesn't match existing shards.
99
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
100

101
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
102
        // shard where the total amount doesn't match existing shards.
103
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
104
                "mismatch")
105

106
        // ErrPaymentPendingSettled is returned when we try to add a new
107
        // attempt to a payment that has at least one of its HTLCs settled.
108
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
109

110
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
111
        // to a payment that already has a failure reason.
112
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
113

114
        // ErrSentExceedsTotal is returned if the payment's current total sent
115
        // amount exceed the total amount.
116
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
117

118
        // errNoAttemptInfo is returned when no attempt info is stored yet.
119
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
120
                "inflight payment")
121

122
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
123
        // index is made for a sequence number that is not indexed.
124
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
125
                "does not exist")
126
)
127

128
// PaymentControl implements persistence for payments and payment attempts.
129
type PaymentControl struct {
130
        paymentSeqMx     sync.Mutex
131
        currPaymentSeq   uint64
132
        storedPaymentSeq uint64
133
        db               *DB
134
}
135

136
// NewPaymentControl creates a new instance of the PaymentControl.
137
func NewPaymentControl(db *DB) *PaymentControl {
42✔
138
        return &PaymentControl{
42✔
139
                db: db,
42✔
140
        }
42✔
141
}
42✔
142

143
// InitPayment checks or records the given PaymentCreationInfo with the DB,
144
// making sure it does not already exist as an in-flight payment. When this
145
// method returns successfully, the payment is guaranteed to be in the InFlight
146
// state.
147
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
148
        info *PaymentCreationInfo) error {
149✔
149

149✔
150
        // Obtain a new sequence number for this payment. This is used
149✔
151
        // to sort the payments in order of creation, and also acts as
149✔
152
        // a unique identifier for each payment.
149✔
153
        sequenceNum, err := p.nextPaymentSequence()
149✔
154
        if err != nil {
149✔
155
                return err
×
156
        }
×
157

158
        var b bytes.Buffer
149✔
159
        if err := serializePaymentCreationInfo(&b, info); err != nil {
149✔
160
                return err
×
161
        }
×
162
        infoBytes := b.Bytes()
149✔
163

149✔
164
        var updateErr error
149✔
165
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
298✔
166
                // Reset the update error, to avoid carrying over an error
149✔
167
                // from a previous execution of the batched db transaction.
149✔
168
                updateErr = nil
149✔
169

149✔
170
                prefetchPayment(tx, paymentHash)
149✔
171
                bucket, err := createPaymentBucket(tx, paymentHash)
149✔
172
                if err != nil {
149✔
173
                        return err
×
174
                }
×
175

176
                // Get the existing status of this payment, if any.
177
                paymentStatus, err := fetchPaymentStatus(bucket)
149✔
178

149✔
179
                switch {
149✔
180
                // If no error is returned, it means we already have this
181
                // payment. We'll check the status to decide whether we allow
182
                // retrying the payment or return a specific error.
183
                case err == nil:
5✔
184
                        if err := paymentStatus.initializable(); err != nil {
9✔
185
                                updateErr = err
4✔
186
                                return nil
4✔
187
                        }
4✔
188

189
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
190
                // we'll return the error.
191
                case !errors.Is(err, ErrPaymentNotInitiated):
×
192
                        return err
×
193
                }
194

195
                // Before we set our new sequence number, we check whether this
196
                // payment has a previously set sequence number and remove its
197
                // index entry if it exists. This happens in the case where we
198
                // have a previously attempted payment which was left in a state
199
                // where we can retry.
200
                seqBytes := bucket.Get(paymentSequenceKey)
145✔
201
                if seqBytes != nil {
146✔
202
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
1✔
203
                        if err := indexBucket.Delete(seqBytes); err != nil {
1✔
204
                                return err
×
205
                        }
×
206
                }
207

208
                // Once we have obtained a sequence number, we add an entry
209
                // to our index bucket which will map the sequence number to
210
                // our payment identifier.
211
                err = createPaymentIndexEntry(
145✔
212
                        tx, sequenceNum, info.PaymentIdentifier,
145✔
213
                )
145✔
214
                if err != nil {
145✔
215
                        return err
×
216
                }
×
217

218
                err = bucket.Put(paymentSequenceKey, sequenceNum)
145✔
219
                if err != nil {
145✔
220
                        return err
×
221
                }
×
222

223
                // Add the payment info to the bucket, which contains the
224
                // static information for this payment
225
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
145✔
226
                if err != nil {
145✔
227
                        return err
×
228
                }
×
229

230
                // We'll delete any lingering HTLCs to start with, in case we
231
                // are initializing a payment that was attempted earlier, but
232
                // left in a state where we could retry.
233
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
145✔
234
                if err != nil && err != kvdb.ErrBucketNotFound {
145✔
235
                        return err
×
236
                }
×
237

238
                // Also delete any lingering failure info now that we are
239
                // re-attempting.
240
                return bucket.Delete(paymentFailInfoKey)
145✔
241
        })
242
        if err != nil {
149✔
243
                return fmt.Errorf("unable to init payment: %w", err)
×
244
        }
×
245

246
        return updateErr
149✔
247
}
248

249
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
250
// by the PaymentControl db.
251
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
8✔
252
        if !p.db.keepFailedPaymentAttempts {
12✔
253
                const failedHtlcsOnly = true
4✔
254
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
4✔
255
                if err != nil {
6✔
256
                        return err
2✔
257
                }
2✔
258
        }
259
        return nil
6✔
260
}
261

262
// paymentIndexTypeHash is a payment index type which indicates that we have
263
// created an index of payment sequence number to payment hash.
264
type paymentIndexType uint8
265

266
// paymentIndexTypeHash is a payment index type which indicates that we have
267
// created an index of payment sequence number to payment hash.
268
const paymentIndexTypeHash paymentIndexType = 0
269

270
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
271
// index produced contains a payment index type (which can be used in future to
272
// signal different payment index types) and the payment identifier.
273
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
274
        id lntypes.Hash) error {
166✔
275

166✔
276
        var b bytes.Buffer
166✔
277
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
166✔
278
                return err
×
279
        }
×
280

281
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
166✔
282
        return indexes.Put(sequenceNumber, b.Bytes())
166✔
283
}
284

285
// deserializePaymentIndex deserializes a payment index entry. This function
286
// currently only supports deserialization of payment hash indexes, and will
287
// fail for other types.
288
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
59✔
289
        var (
59✔
290
                indexType   paymentIndexType
59✔
291
                paymentHash []byte
59✔
292
        )
59✔
293

59✔
294
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
59✔
295
                return lntypes.Hash{}, err
×
296
        }
×
297

298
        // While we only have on payment index type, we do not need to use our
299
        // index type to deserialize the index. However, we sanity check that
300
        // this type is as expected, since we had to read it out anyway.
301
        if indexType != paymentIndexTypeHash {
59✔
302
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
303
                        "type: %v", indexType)
×
304
        }
×
305

306
        hash, err := lntypes.MakeHash(paymentHash)
59✔
307
        if err != nil {
59✔
308
                return lntypes.Hash{}, err
×
309
        }
×
310

311
        return hash, nil
59✔
312
}
313

314
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
315
// DB.
316
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
317
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
70✔
318

70✔
319
        // Serialize the information before opening the db transaction.
70✔
320
        var a bytes.Buffer
70✔
321
        err := serializeHTLCAttemptInfo(&a, attempt)
70✔
322
        if err != nil {
70✔
323
                return nil, err
×
324
        }
×
325
        htlcInfoBytes := a.Bytes()
70✔
326

70✔
327
        htlcIDBytes := make([]byte, 8)
70✔
328
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
70✔
329

70✔
330
        var payment *MPPayment
70✔
331
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
156✔
332
                prefetchPayment(tx, paymentHash)
86✔
333
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
86✔
334
                if err != nil {
86✔
335
                        return err
×
336
                }
×
337

338
                payment, err = fetchPayment(bucket)
86✔
339
                if err != nil {
86✔
340
                        return err
×
341
                }
×
342

343
                // Check if registering a new attempt is allowed.
344
                if err := payment.Registrable(); err != nil {
102✔
345
                        return err
16✔
346
                }
16✔
347

348
                // If the final hop has encrypted data, then we know this is a
349
                // blinded payment. In blinded payments, MPP records are not set
350
                // for split payments and the recipient is responsible for using
351
                // a consistent PathID across the various encrypted data
352
                // payloads that we received from them for this payment. All we
353
                // need to check is that the total amount field for each HTLC
354
                // in the split payment is correct.
355
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
70✔
356

70✔
357
                // Make sure any existing shards match the new one with regards
70✔
358
                // to MPP options.
70✔
359
                mpp := attempt.Route.FinalHop().MPP
70✔
360

70✔
361
                // MPP records should not be set for attempts to blinded paths.
70✔
362
                if isBlinded && mpp != nil {
70✔
363
                        return ErrMPPRecordInBlindedPayment
×
364
                }
×
365

366
                for _, h := range payment.InFlightHTLCs() {
114✔
367
                        hMpp := h.Route.FinalHop().MPP
44✔
368

44✔
369
                        // If this is a blinded payment, then no existing HTLCs
44✔
370
                        // should have MPP records.
44✔
371
                        if isBlinded && hMpp != nil {
44✔
372
                                return ErrMPPRecordInBlindedPayment
×
373
                        }
×
374

375
                        // If this is a blinded payment, then we just need to
376
                        // check that the TotalAmtMsat field for this shard
377
                        // is equal to that of any other shard in the same
378
                        // payment.
379
                        if isBlinded {
44✔
UNCOV
380
                                if attempt.Route.FinalHop().TotalAmtMsat !=
×
UNCOV
381
                                        h.Route.FinalHop().TotalAmtMsat {
×
382

×
383
                                        //nolint:lll
×
384
                                        return ErrBlindedPaymentTotalAmountMismatch
×
385
                                }
×
386

UNCOV
387
                                continue
×
388
                        }
389

390
                        switch {
44✔
391
                        // We tried to register a non-MPP attempt for a MPP
392
                        // payment.
393
                        case mpp == nil && hMpp != nil:
2✔
394
                                return ErrMPPayment
2✔
395

396
                        // We tried to register a MPP shard for a non-MPP
397
                        // payment.
398
                        case mpp != nil && hMpp == nil:
2✔
399
                                return ErrNonMPPayment
2✔
400

401
                        // Non-MPP payment, nothing more to validate.
402
                        case mpp == nil:
×
403
                                continue
×
404
                        }
405

406
                        // Check that MPP options match.
407
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
42✔
408
                                return ErrMPPPaymentAddrMismatch
2✔
409
                        }
2✔
410

411
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
40✔
412
                                return ErrMPPTotalAmountMismatch
2✔
413
                        }
2✔
414
                }
415

416
                // If this is a non-MPP attempt, it must match the total amount
417
                // exactly. Note that a blinded payment is considered an MPP
418
                // attempt.
419
                amt := attempt.Route.ReceiverAmt()
62✔
420
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
62✔
421
                        return ErrValueMismatch
×
422
                }
×
423

424
                // Ensure we aren't sending more than the total payment amount.
425
                sentAmt, _ := payment.SentAmt()
62✔
426
                if sentAmt+amt > payment.Info.Value {
70✔
427
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
8✔
428
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
8✔
429
                                payment.Info.Value)
8✔
430
                }
8✔
431

432
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
54✔
433
                        paymentHtlcsBucket,
54✔
434
                )
54✔
435
                if err != nil {
54✔
436
                        return err
×
437
                }
×
438

439
                err = htlcsBucket.Put(
54✔
440
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
54✔
441
                        htlcInfoBytes,
54✔
442
                )
54✔
443
                if err != nil {
54✔
444
                        return err
×
445
                }
×
446

447
                // Retrieve attempt info for the notification.
448
                payment, err = fetchPayment(bucket)
54✔
449
                return err
54✔
450
        })
451
        if err != nil {
86✔
452
                return nil, err
16✔
453
        }
16✔
454

455
        return payment, err
54✔
456
}
457

458
// SettleAttempt marks the given attempt settled with the preimage. If this is
459
// a multi shard payment, this might implicitly mean that the full payment
460
// succeeded.
461
//
462
// After invoking this method, InitPayment should always return an error to
463
// prevent us from making duplicate payments to the same payment hash. The
464
// provided preimage is atomically saved to the DB for record keeping.
465
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
466
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
16✔
467

16✔
468
        var b bytes.Buffer
16✔
469
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
16✔
470
                return nil, err
×
471
        }
×
472
        settleBytes := b.Bytes()
16✔
473

16✔
474
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
16✔
475
}
476

477
// FailAttempt marks the given payment attempt failed.
478
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
479
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
31✔
480

31✔
481
        var b bytes.Buffer
31✔
482
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
31✔
483
                return nil, err
×
484
        }
×
485
        failBytes := b.Bytes()
31✔
486

31✔
487
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
31✔
488
}
489

490
// updateHtlcKey updates a database key for the specified htlc.
491
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
492
        attemptID uint64, key, value []byte) (*MPPayment, error) {
47✔
493

47✔
494
        aid := make([]byte, 8)
47✔
495
        binary.BigEndian.PutUint64(aid, attemptID)
47✔
496

47✔
497
        var payment *MPPayment
47✔
498
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
95✔
499
                payment = nil
48✔
500

48✔
501
                prefetchPayment(tx, paymentHash)
48✔
502
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
48✔
503
                if err != nil {
50✔
504
                        return err
2✔
505
                }
2✔
506

507
                p, err := fetchPayment(bucket)
46✔
508
                if err != nil {
46✔
509
                        return err
×
510
                }
×
511

512
                // We can only update keys of in-flight payments. We allow
513
                // updating keys even if the payment has reached a terminal
514
                // condition, since the HTLC outcomes must still be updated.
515
                if err := p.Status.updatable(); err != nil {
46✔
516
                        return err
×
517
                }
×
518

519
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
46✔
520
                if htlcsBucket == nil {
46✔
521
                        return fmt.Errorf("htlcs bucket not found")
×
522
                }
×
523

524
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
46✔
525
                        return fmt.Errorf("HTLC with ID %v not registered",
×
526
                                attemptID)
×
527
                }
×
528

529
                // Make sure the shard is not already failed or settled.
530
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
46✔
531
                        return ErrAttemptAlreadyFailed
×
532
                }
×
533

534
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
46✔
535
                        return ErrAttemptAlreadySettled
×
536
                }
×
537

538
                // Add or update the key for this htlc.
539
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
46✔
540
                if err != nil {
46✔
541
                        return err
×
542
                }
×
543

544
                // Retrieve attempt info for the notification.
545
                payment, err = fetchPayment(bucket)
46✔
546
                return err
46✔
547
        })
548
        if err != nil {
48✔
549
                return nil, err
1✔
550
        }
1✔
551

552
        return payment, err
46✔
553
}
554

555
// Fail transitions a payment into the Failed state, and records the reason the
556
// payment failed. After invoking this method, InitPayment should return nil on
557
// its next call for this payment hash, allowing the switch to make a
558
// subsequent payment.
559
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
560
        reason FailureReason) (*MPPayment, error) {
16✔
561

16✔
562
        var (
16✔
563
                updateErr error
16✔
564
                payment   *MPPayment
16✔
565
        )
16✔
566
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
32✔
567
                // Reset the update error, to avoid carrying over an error
16✔
568
                // from a previous execution of the batched db transaction.
16✔
569
                updateErr = nil
16✔
570
                payment = nil
16✔
571

16✔
572
                prefetchPayment(tx, paymentHash)
16✔
573
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
16✔
574
                if err == ErrPaymentNotInitiated {
17✔
575
                        updateErr = ErrPaymentNotInitiated
1✔
576
                        return nil
1✔
577
                } else if err != nil {
16✔
578
                        return err
×
579
                }
×
580

581
                // We mark the payment as failed as long as it is known. This
582
                // lets the last attempt to fail with a terminal write its
583
                // failure to the PaymentControl without synchronizing with
584
                // other attempts.
585
                _, err = fetchPaymentStatus(bucket)
15✔
586
                if errors.Is(err, ErrPaymentNotInitiated) {
15✔
587
                        updateErr = ErrPaymentNotInitiated
×
588
                        return nil
×
589
                } else if err != nil {
15✔
590
                        return err
×
591
                }
×
592

593
                // Put the failure reason in the bucket for record keeping.
594
                v := []byte{byte(reason)}
15✔
595
                err = bucket.Put(paymentFailInfoKey, v)
15✔
596
                if err != nil {
15✔
597
                        return err
×
598
                }
×
599

600
                // Retrieve attempt info for the notification, if available.
601
                payment, err = fetchPayment(bucket)
15✔
602
                if err != nil {
15✔
603
                        return err
×
604
                }
×
605

606
                return nil
15✔
607
        })
608
        if err != nil {
16✔
609
                return nil, err
×
610
        }
×
611

612
        return payment, updateErr
16✔
613
}
614

615
// FetchPayment returns information about a payment from the database.
616
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
617
        *MPPayment, error) {
151✔
618

151✔
619
        var payment *MPPayment
151✔
620
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
302✔
621
                prefetchPayment(tx, paymentHash)
151✔
622
                bucket, err := fetchPaymentBucket(tx, paymentHash)
151✔
623
                if err != nil {
152✔
624
                        return err
1✔
625
                }
1✔
626

627
                payment, err = fetchPayment(bucket)
150✔
628

150✔
629
                return err
150✔
630
        }, func() {
151✔
631
                payment = nil
151✔
632
        })
151✔
633
        if err != nil {
152✔
634
                return nil, err
1✔
635
        }
1✔
636

637
        return payment, nil
150✔
638
}
639

640
// prefetchPayment attempts to prefetch as much of the payment as possible to
641
// reduce DB roundtrips.
642
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
450✔
643
        rb := kvdb.RootBucket(tx)
450✔
644
        kvdb.Prefetch(
450✔
645
                rb,
450✔
646
                []string{
450✔
647
                        // Prefetch all keys in the payment's bucket.
450✔
648
                        string(paymentsRootBucket),
450✔
649
                        string(paymentHash[:]),
450✔
650
                },
450✔
651
                []string{
450✔
652
                        // Prefetch all keys in the payment's htlc bucket.
450✔
653
                        string(paymentsRootBucket),
450✔
654
                        string(paymentHash[:]),
450✔
655
                        string(paymentHtlcsBucket),
450✔
656
                },
450✔
657
        )
450✔
658
}
450✔
659

660
// createPaymentBucket creates or fetches the sub-bucket assigned to this
661
// payment hash.
662
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
663
        kvdb.RwBucket, error) {
149✔
664

149✔
665
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
149✔
666
        if err != nil {
149✔
667
                return nil, err
×
668
        }
×
669

670
        return payments.CreateBucketIfNotExists(paymentHash[:])
149✔
671
}
672

673
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
674
// the bucket does not exist, it returns ErrPaymentNotInitiated.
675
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
676
        kvdb.RBucket, error) {
209✔
677

209✔
678
        payments := tx.ReadBucket(paymentsRootBucket)
209✔
679
        if payments == nil {
210✔
680
                return nil, ErrPaymentNotInitiated
1✔
681
        }
1✔
682

683
        bucket := payments.NestedReadBucket(paymentHash[:])
208✔
684
        if bucket == nil {
208✔
685
                return nil, ErrPaymentNotInitiated
×
686
        }
×
687

688
        return bucket, nil
208✔
689

690
}
691

692
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
693
// bucket that can be written to.
694
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
695
        kvdb.RwBucket, error) {
171✔
696

171✔
697
        payments := tx.ReadWriteBucket(paymentsRootBucket)
171✔
698
        if payments == nil {
174✔
699
                return nil, ErrPaymentNotInitiated
3✔
700
        }
3✔
701

702
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
168✔
703
        if bucket == nil {
168✔
704
                return nil, ErrPaymentNotInitiated
×
705
        }
×
706

707
        return bucket, nil
168✔
708
}
709

710
// nextPaymentSequence returns the next sequence number to store for a new
711
// payment.
712
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
149✔
713
        p.paymentSeqMx.Lock()
149✔
714
        defer p.paymentSeqMx.Unlock()
149✔
715

149✔
716
        // Set a new upper bound in the DB every 1000 payments to avoid
149✔
717
        // conflicts on the sequence when using etcd.
149✔
718
        if p.currPaymentSeq == p.storedPaymentSeq {
188✔
719
                var currPaymentSeq, newUpperBound uint64
39✔
720
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
78✔
721
                        paymentsBucket, err := tx.CreateTopLevelBucket(
39✔
722
                                paymentsRootBucket,
39✔
723
                        )
39✔
724
                        if err != nil {
39✔
725
                                return err
×
726
                        }
×
727

728
                        currPaymentSeq = paymentsBucket.Sequence()
39✔
729
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
39✔
730
                        return paymentsBucket.SetSequence(newUpperBound)
39✔
731
                }, func() {}); err != nil {
39✔
732
                        return nil, err
×
733
                }
×
734

735
                // We lazy initialize the cached currPaymentSeq here using the
736
                // first nextPaymentSequence() call. This if statement will auto
737
                // initialize our stored currPaymentSeq, since by default both
738
                // this variable and storedPaymentSeq are zero which in turn
739
                // will have us fetch the current values from the DB.
740
                if p.currPaymentSeq == 0 {
78✔
741
                        p.currPaymentSeq = currPaymentSeq
39✔
742
                }
39✔
743

744
                p.storedPaymentSeq = newUpperBound
39✔
745
        }
746

747
        p.currPaymentSeq++
149✔
748
        b := make([]byte, 8)
149✔
749
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
149✔
750

149✔
751
        return b, nil
149✔
752
}
753

754
// fetchPaymentStatus fetches the payment status of the payment. If the payment
755
// isn't found, it will return error `ErrPaymentNotInitiated`.
756
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
192✔
757
        // Creation info should be set for all payments, regardless of state.
192✔
758
        // If not, it is unknown.
192✔
759
        if bucket.Get(paymentCreationInfoKey) == nil {
336✔
760
                return 0, ErrPaymentNotInitiated
144✔
761
        }
144✔
762

763
        payment, err := fetchPayment(bucket)
48✔
764
        if err != nil {
48✔
765
                return 0, err
×
766
        }
×
767

768
        return payment.Status, nil
48✔
769
}
770

771
// FetchInFlightPayments returns all payments with status InFlight.
772
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
4✔
773
        var inFlights []*MPPayment
4✔
774
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
8✔
775
                payments := tx.ReadBucket(paymentsRootBucket)
4✔
776
                if payments == nil {
6✔
777
                        return nil
2✔
778
                }
2✔
779

780
                return payments.ForEach(func(k, _ []byte) error {
4✔
781
                        bucket := payments.NestedReadBucket(k)
2✔
782
                        if bucket == nil {
2✔
783
                                return fmt.Errorf("non bucket element")
×
784
                        }
×
785

786
                        p, err := fetchPayment(bucket)
2✔
787
                        if err != nil {
2✔
788
                                return err
×
789
                        }
×
790

791
                        // Skip the payment if it's terminated.
792
                        if p.Terminated() {
2✔
UNCOV
793
                                return nil
×
UNCOV
794
                        }
×
795

796
                        inFlights = append(inFlights, p)
2✔
797
                        return nil
2✔
798
                })
799
        }, func() {
4✔
800
                inFlights = nil
4✔
801
        })
4✔
802
        if err != nil {
4✔
803
                return nil, err
×
804
        }
×
805

806
        return inFlights, nil
4✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc