• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15561477203

10 Jun 2025 01:54PM UTC coverage: 58.351% (-10.1%) from 68.487%
15561477203

Pull #9356

github

web-flow
Merge 6440b25db into c6d6d4c0b
Pull Request #9356: lnrpc: add incoming/outgoing channel ids filter to forwarding history request

33 of 36 new or added lines in 2 files covered. (91.67%)

28366 existing lines in 455 files now uncovered.

97715 of 167461 relevant lines covered (58.35%)

1.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.91
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10
        "time"
11

12
        "github.com/lightningnetwork/lnd/kvdb"
13
        "github.com/lightningnetwork/lnd/lntypes"
14
)
15

16
const (
17
        // paymentSeqBlockSize is the block size used when we batch allocate
18
        // payment sequences for future payments.
19
        paymentSeqBlockSize = 1000
20

21
        // paymentProgressLogInterval is the interval we use limiting the
22
        // logging output of payment processing.
23
        paymentProgressLogInterval = 30 * time.Second
24
)
25

26
var (
27
        // ErrAlreadyPaid signals we have already paid this payment hash.
28
        ErrAlreadyPaid = errors.New("invoice is already paid")
29

30
        // ErrPaymentInFlight signals that payment for this payment hash is
31
        // already "in flight" on the network.
32
        ErrPaymentInFlight = errors.New("payment is in transition")
33

34
        // ErrPaymentExists is returned when we try to initialize an already
35
        // existing payment that is not failed.
36
        ErrPaymentExists = errors.New("payment already exists")
37

38
        // ErrPaymentInternal is returned when performing the payment has a
39
        // conflicting state, such as,
40
        // - payment has StatusSucceeded but remaining amount is not zero.
41
        // - payment has StatusInitiated but remaining amount is zero.
42
        // - payment has StatusFailed but remaining amount is zero.
43
        ErrPaymentInternal = errors.New("internal error")
44

45
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
46
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
47

48
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
49
        // change the status of a payment already succeeded.
50
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
51

52
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
53
        // a failed payment.
54
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
55

56
        // ErrUnknownPaymentStatus is returned when we do not recognize the
57
        // existing state of a payment.
58
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
59

60
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
61
        // already has reached a terminal condition.
62
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
63
                "condition")
64

65
        // ErrAttemptAlreadySettled is returned if we try to alter an already
66
        // settled HTLC attempt.
67
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
68

69
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
70
        // failed HTLC attempt.
71
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
72

73
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
74
        // with an amount that doesn't match the payment amount.
75
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
76
                "amount")
77

78
        // ErrValueExceedsAmt is returned if we try to register an attempt that
79
        // would take the total sent amount above the payment amount.
80
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
81
                "amount")
82

83
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
84
        // a payment that already has a non-MPP attempt registered.
85
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
86

87
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
88
        // a payment that already has an MPP attempt registered.
89
        ErrMPPayment = errors.New("payment has MPP attempts")
90

91
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
92
        // attempt with an MPP record for a payment to a blinded path.
93
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
94
                "contain MPP records")
95

96
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
97
        // register an HTLC shard to a blinded route where the total amount
98
        // doesn't match existing shards.
99
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
100
                "total amount mismatch")
101

102
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
103
        // shard where the payment address doesn't match existing shards.
104
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
105

106
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
107
        // shard where the total amount doesn't match existing shards.
108
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
109
                "mismatch")
110

111
        // ErrPaymentPendingSettled is returned when we try to add a new
112
        // attempt to a payment that has at least one of its HTLCs settled.
113
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
114

115
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
116
        // to a payment that already has a failure reason.
117
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
118

119
        // ErrSentExceedsTotal is returned if the payment's current total sent
120
        // amount exceed the total amount.
121
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
122

123
        // errNoAttemptInfo is returned when no attempt info is stored yet.
124
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
125
                "inflight payment")
126

127
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
128
        // index is made for a sequence number that is not indexed.
129
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
130
                "does not exist")
131
)
132

133
// PaymentControl implements persistence for payments and payment attempts.
134
type PaymentControl struct {
135
        paymentSeqMx     sync.Mutex
136
        currPaymentSeq   uint64
137
        storedPaymentSeq uint64
138
        db               *DB
139
}
140

141
// NewPaymentControl creates a new instance of the PaymentControl.
142
func NewPaymentControl(db *DB) *PaymentControl {
3✔
143
        return &PaymentControl{
3✔
144
                db: db,
3✔
145
        }
3✔
146
}
3✔
147

148
// InitPayment checks or records the given PaymentCreationInfo with the DB,
149
// making sure it does not already exist as an in-flight payment. When this
150
// method returns successfully, the payment is guaranteed to be in the InFlight
151
// state.
152
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
153
        info *PaymentCreationInfo) error {
3✔
154

3✔
155
        // Obtain a new sequence number for this payment. This is used
3✔
156
        // to sort the payments in order of creation, and also acts as
3✔
157
        // a unique identifier for each payment.
3✔
158
        sequenceNum, err := p.nextPaymentSequence()
3✔
159
        if err != nil {
3✔
160
                return err
×
161
        }
×
162

163
        var b bytes.Buffer
3✔
164
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
165
                return err
×
166
        }
×
167
        infoBytes := b.Bytes()
3✔
168

3✔
169
        var updateErr error
3✔
170
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
171
                // Reset the update error, to avoid carrying over an error
3✔
172
                // from a previous execution of the batched db transaction.
3✔
173
                updateErr = nil
3✔
174

3✔
175
                prefetchPayment(tx, paymentHash)
3✔
176
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
177
                if err != nil {
3✔
178
                        return err
×
179
                }
×
180

181
                // Get the existing status of this payment, if any.
182
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
183

3✔
184
                switch {
3✔
185
                // If no error is returned, it means we already have this
186
                // payment. We'll check the status to decide whether we allow
187
                // retrying the payment or return a specific error.
188
                case err == nil:
3✔
189
                        if err := paymentStatus.initializable(); err != nil {
6✔
190
                                updateErr = err
3✔
191
                                return nil
3✔
192
                        }
3✔
193

194
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
195
                // we'll return the error.
196
                case !errors.Is(err, ErrPaymentNotInitiated):
×
197
                        return err
×
198
                }
199

200
                // Before we set our new sequence number, we check whether this
201
                // payment has a previously set sequence number and remove its
202
                // index entry if it exists. This happens in the case where we
203
                // have a previously attempted payment which was left in a state
204
                // where we can retry.
205
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
206
                if seqBytes != nil {
6✔
207
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
208
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
209
                                return err
×
210
                        }
×
211
                }
212

213
                // Once we have obtained a sequence number, we add an entry
214
                // to our index bucket which will map the sequence number to
215
                // our payment identifier.
216
                err = createPaymentIndexEntry(
3✔
217
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
218
                )
3✔
219
                if err != nil {
3✔
220
                        return err
×
221
                }
×
222

223
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
224
                if err != nil {
3✔
225
                        return err
×
226
                }
×
227

228
                // Add the payment info to the bucket, which contains the
229
                // static information for this payment
230
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
231
                if err != nil {
3✔
232
                        return err
×
233
                }
×
234

235
                // We'll delete any lingering HTLCs to start with, in case we
236
                // are initializing a payment that was attempted earlier, but
237
                // left in a state where we could retry.
238
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
239
                if err != nil && err != kvdb.ErrBucketNotFound {
3✔
240
                        return err
×
241
                }
×
242

243
                // Also delete any lingering failure info now that we are
244
                // re-attempting.
245
                return bucket.Delete(paymentFailInfoKey)
3✔
246
        })
247
        if err != nil {
3✔
248
                return fmt.Errorf("unable to init payment: %w", err)
×
249
        }
×
250

251
        return updateErr
3✔
252
}
253

254
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
255
// by the PaymentControl db.
256
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
257
        if !p.db.keepFailedPaymentAttempts {
3✔
UNCOV
258
                const failedHtlcsOnly = true
×
UNCOV
259
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
×
UNCOV
260
                if err != nil {
×
UNCOV
261
                        return err
×
UNCOV
262
                }
×
263
        }
264
        return nil
3✔
265
}
266

267
// paymentIndexTypeHash is a payment index type which indicates that we have
268
// created an index of payment sequence number to payment hash.
269
type paymentIndexType uint8
270

271
// paymentIndexTypeHash is a payment index type which indicates that we have
272
// created an index of payment sequence number to payment hash.
273
const paymentIndexTypeHash paymentIndexType = 0
274

275
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
276
// index produced contains a payment index type (which can be used in future to
277
// signal different payment index types) and the payment identifier.
278
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
279
        id lntypes.Hash) error {
3✔
280

3✔
281
        var b bytes.Buffer
3✔
282
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
3✔
283
                return err
×
284
        }
×
285

286
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
287
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
288
}
289

290
// deserializePaymentIndex deserializes a payment index entry. This function
291
// currently only supports deserialization of payment hash indexes, and will
292
// fail for other types.
293
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
294
        var (
3✔
295
                indexType   paymentIndexType
3✔
296
                paymentHash []byte
3✔
297
        )
3✔
298

3✔
299
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
300
                return lntypes.Hash{}, err
×
301
        }
×
302

303
        // While we only have on payment index type, we do not need to use our
304
        // index type to deserialize the index. However, we sanity check that
305
        // this type is as expected, since we had to read it out anyway.
306
        if indexType != paymentIndexTypeHash {
3✔
307
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
308
                        "type: %v", indexType)
×
309
        }
×
310

311
        hash, err := lntypes.MakeHash(paymentHash)
3✔
312
        if err != nil {
3✔
313
                return lntypes.Hash{}, err
×
314
        }
×
315

316
        return hash, nil
3✔
317
}
318

319
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
320
// DB.
321
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
322
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
323

3✔
324
        // Serialize the information before opening the db transaction.
3✔
325
        var a bytes.Buffer
3✔
326
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
327
        if err != nil {
3✔
328
                return nil, err
×
329
        }
×
330
        htlcInfoBytes := a.Bytes()
3✔
331

3✔
332
        htlcIDBytes := make([]byte, 8)
3✔
333
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
334

3✔
335
        var payment *MPPayment
3✔
336
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
337
                prefetchPayment(tx, paymentHash)
3✔
338
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
339
                if err != nil {
3✔
340
                        return err
×
341
                }
×
342

343
                payment, err = fetchPayment(bucket)
3✔
344
                if err != nil {
3✔
345
                        return err
×
346
                }
×
347

348
                // Check if registering a new attempt is allowed.
349
                if err := payment.Registrable(); err != nil {
3✔
UNCOV
350
                        return err
×
UNCOV
351
                }
×
352

353
                // If the final hop has encrypted data, then we know this is a
354
                // blinded payment. In blinded payments, MPP records are not set
355
                // for split payments and the recipient is responsible for using
356
                // a consistent PathID across the various encrypted data
357
                // payloads that we received from them for this payment. All we
358
                // need to check is that the total amount field for each HTLC
359
                // in the split payment is correct.
360
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
3✔
361

3✔
362
                // Make sure any existing shards match the new one with regards
3✔
363
                // to MPP options.
3✔
364
                mpp := attempt.Route.FinalHop().MPP
3✔
365

3✔
366
                // MPP records should not be set for attempts to blinded paths.
3✔
367
                if isBlinded && mpp != nil {
3✔
368
                        return ErrMPPRecordInBlindedPayment
×
369
                }
×
370

371
                for _, h := range payment.InFlightHTLCs() {
6✔
372
                        hMpp := h.Route.FinalHop().MPP
3✔
373

3✔
374
                        // If this is a blinded payment, then no existing HTLCs
3✔
375
                        // should have MPP records.
3✔
376
                        if isBlinded && hMpp != nil {
3✔
377
                                return ErrMPPRecordInBlindedPayment
×
378
                        }
×
379

380
                        // If this is a blinded payment, then we just need to
381
                        // check that the TotalAmtMsat field for this shard
382
                        // is equal to that of any other shard in the same
383
                        // payment.
384
                        if isBlinded {
6✔
385
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
386
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
387

×
388
                                        //nolint:ll
×
389
                                        return ErrBlindedPaymentTotalAmountMismatch
×
390
                                }
×
391

392
                                continue
3✔
393
                        }
394

395
                        switch {
3✔
396
                        // We tried to register a non-MPP attempt for a MPP
397
                        // payment.
UNCOV
398
                        case mpp == nil && hMpp != nil:
×
UNCOV
399
                                return ErrMPPayment
×
400

401
                        // We tried to register a MPP shard for a non-MPP
402
                        // payment.
UNCOV
403
                        case mpp != nil && hMpp == nil:
×
UNCOV
404
                                return ErrNonMPPayment
×
405

406
                        // Non-MPP payment, nothing more to validate.
407
                        case mpp == nil:
×
408
                                continue
×
409
                        }
410

411
                        // Check that MPP options match.
412
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
3✔
UNCOV
413
                                return ErrMPPPaymentAddrMismatch
×
UNCOV
414
                        }
×
415

416
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
3✔
UNCOV
417
                                return ErrMPPTotalAmountMismatch
×
UNCOV
418
                        }
×
419
                }
420

421
                // If this is a non-MPP attempt, it must match the total amount
422
                // exactly. Note that a blinded payment is considered an MPP
423
                // attempt.
424
                amt := attempt.Route.ReceiverAmt()
3✔
425
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
3✔
426
                        return ErrValueMismatch
×
427
                }
×
428

429
                // Ensure we aren't sending more than the total payment amount.
430
                sentAmt, _ := payment.SentAmt()
3✔
431
                if sentAmt+amt > payment.Info.Value {
3✔
UNCOV
432
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
×
UNCOV
433
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
×
UNCOV
434
                                payment.Info.Value)
×
UNCOV
435
                }
×
436

437
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
438
                        paymentHtlcsBucket,
3✔
439
                )
3✔
440
                if err != nil {
3✔
441
                        return err
×
442
                }
×
443

444
                err = htlcsBucket.Put(
3✔
445
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
446
                        htlcInfoBytes,
3✔
447
                )
3✔
448
                if err != nil {
3✔
449
                        return err
×
450
                }
×
451

452
                // Retrieve attempt info for the notification.
453
                payment, err = fetchPayment(bucket)
3✔
454
                return err
3✔
455
        })
456
        if err != nil {
3✔
UNCOV
457
                return nil, err
×
UNCOV
458
        }
×
459

460
        return payment, err
3✔
461
}
462

463
// SettleAttempt marks the given attempt settled with the preimage. If this is
464
// a multi shard payment, this might implicitly mean that the full payment
465
// succeeded.
466
//
467
// After invoking this method, InitPayment should always return an error to
468
// prevent us from making duplicate payments to the same payment hash. The
469
// provided preimage is atomically saved to the DB for record keeping.
470
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
471
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
472

3✔
473
        var b bytes.Buffer
3✔
474
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
475
                return nil, err
×
476
        }
×
477
        settleBytes := b.Bytes()
3✔
478

3✔
479
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
480
}
481

482
// FailAttempt marks the given payment attempt failed.
483
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
484
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
485

3✔
486
        var b bytes.Buffer
3✔
487
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
488
                return nil, err
×
489
        }
×
490
        failBytes := b.Bytes()
3✔
491

3✔
492
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
493
}
494

495
// updateHtlcKey updates a database key for the specified htlc.
496
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
497
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
498

3✔
499
        aid := make([]byte, 8)
3✔
500
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
501

3✔
502
        var payment *MPPayment
3✔
503
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
504
                payment = nil
3✔
505

3✔
506
                prefetchPayment(tx, paymentHash)
3✔
507
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
508
                if err != nil {
3✔
UNCOV
509
                        return err
×
UNCOV
510
                }
×
511

512
                p, err := fetchPayment(bucket)
3✔
513
                if err != nil {
3✔
514
                        return err
×
515
                }
×
516

517
                // We can only update keys of in-flight payments. We allow
518
                // updating keys even if the payment has reached a terminal
519
                // condition, since the HTLC outcomes must still be updated.
520
                if err := p.Status.updatable(); err != nil {
3✔
521
                        return err
×
522
                }
×
523

524
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
525
                if htlcsBucket == nil {
3✔
526
                        return fmt.Errorf("htlcs bucket not found")
×
527
                }
×
528

529
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
3✔
530
                        return fmt.Errorf("HTLC with ID %v not registered",
×
531
                                attemptID)
×
532
                }
×
533

534
                // Make sure the shard is not already failed or settled.
535
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
3✔
536
                        return ErrAttemptAlreadyFailed
×
537
                }
×
538

539
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
3✔
540
                        return ErrAttemptAlreadySettled
×
541
                }
×
542

543
                // Add or update the key for this htlc.
544
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
545
                if err != nil {
3✔
546
                        return err
×
547
                }
×
548

549
                // Retrieve attempt info for the notification.
550
                payment, err = fetchPayment(bucket)
3✔
551
                return err
3✔
552
        })
553
        if err != nil {
3✔
UNCOV
554
                return nil, err
×
UNCOV
555
        }
×
556

557
        return payment, err
3✔
558
}
559

560
// Fail transitions a payment into the Failed state, and records the reason the
561
// payment failed. After invoking this method, InitPayment should return nil on
562
// its next call for this payment hash, allowing the switch to make a
563
// subsequent payment.
564
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
565
        reason FailureReason) (*MPPayment, error) {
3✔
566

3✔
567
        var (
3✔
568
                updateErr error
3✔
569
                payment   *MPPayment
3✔
570
        )
3✔
571
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
572
                // Reset the update error, to avoid carrying over an error
3✔
573
                // from a previous execution of the batched db transaction.
3✔
574
                updateErr = nil
3✔
575
                payment = nil
3✔
576

3✔
577
                prefetchPayment(tx, paymentHash)
3✔
578
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
579
                if err == ErrPaymentNotInitiated {
3✔
UNCOV
580
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
581
                        return nil
×
582
                } else if err != nil {
3✔
583
                        return err
×
584
                }
×
585

586
                // We mark the payment as failed as long as it is known. This
587
                // lets the last attempt to fail with a terminal write its
588
                // failure to the PaymentControl without synchronizing with
589
                // other attempts.
590
                _, err = fetchPaymentStatus(bucket)
3✔
591
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
592
                        updateErr = ErrPaymentNotInitiated
×
593
                        return nil
×
594
                } else if err != nil {
3✔
595
                        return err
×
596
                }
×
597

598
                // Put the failure reason in the bucket for record keeping.
599
                v := []byte{byte(reason)}
3✔
600
                err = bucket.Put(paymentFailInfoKey, v)
3✔
601
                if err != nil {
3✔
602
                        return err
×
603
                }
×
604

605
                // Retrieve attempt info for the notification, if available.
606
                payment, err = fetchPayment(bucket)
3✔
607
                if err != nil {
3✔
608
                        return err
×
609
                }
×
610

611
                return nil
3✔
612
        })
613
        if err != nil {
3✔
614
                return nil, err
×
615
        }
×
616

617
        return payment, updateErr
3✔
618
}
619

620
// FetchPayment returns information about a payment from the database.
621
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
622
        *MPPayment, error) {
3✔
623

3✔
624
        var payment *MPPayment
3✔
625
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
626
                prefetchPayment(tx, paymentHash)
3✔
627
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
628
                if err != nil {
3✔
UNCOV
629
                        return err
×
UNCOV
630
                }
×
631

632
                payment, err = fetchPayment(bucket)
3✔
633

3✔
634
                return err
3✔
635
        }, func() {
3✔
636
                payment = nil
3✔
637
        })
3✔
638
        if err != nil {
3✔
UNCOV
639
                return nil, err
×
UNCOV
640
        }
×
641

642
        return payment, nil
3✔
643
}
644

645
// prefetchPayment attempts to prefetch as much of the payment as possible to
646
// reduce DB roundtrips.
647
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
648
        rb := kvdb.RootBucket(tx)
3✔
649
        kvdb.Prefetch(
3✔
650
                rb,
3✔
651
                []string{
3✔
652
                        // Prefetch all keys in the payment's bucket.
3✔
653
                        string(paymentsRootBucket),
3✔
654
                        string(paymentHash[:]),
3✔
655
                },
3✔
656
                []string{
3✔
657
                        // Prefetch all keys in the payment's htlc bucket.
3✔
658
                        string(paymentsRootBucket),
3✔
659
                        string(paymentHash[:]),
3✔
660
                        string(paymentHtlcsBucket),
3✔
661
                },
3✔
662
        )
3✔
663
}
3✔
664

665
// createPaymentBucket creates or fetches the sub-bucket assigned to this
666
// payment hash.
667
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
668
        kvdb.RwBucket, error) {
3✔
669

3✔
670
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
671
        if err != nil {
3✔
672
                return nil, err
×
673
        }
×
674

675
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
676
}
677

678
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
679
// the bucket does not exist, it returns ErrPaymentNotInitiated.
680
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
681
        kvdb.RBucket, error) {
3✔
682

3✔
683
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
684
        if payments == nil {
3✔
UNCOV
685
                return nil, ErrPaymentNotInitiated
×
UNCOV
686
        }
×
687

688
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
689
        if bucket == nil {
3✔
690
                return nil, ErrPaymentNotInitiated
×
691
        }
×
692

693
        return bucket, nil
3✔
694

695
}
696

697
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
698
// bucket that can be written to.
699
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
700
        kvdb.RwBucket, error) {
3✔
701

3✔
702
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
703
        if payments == nil {
3✔
UNCOV
704
                return nil, ErrPaymentNotInitiated
×
UNCOV
705
        }
×
706

707
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
708
        if bucket == nil {
3✔
709
                return nil, ErrPaymentNotInitiated
×
710
        }
×
711

712
        return bucket, nil
3✔
713
}
714

715
// nextPaymentSequence returns the next sequence number to store for a new
716
// payment.
717
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
3✔
718
        p.paymentSeqMx.Lock()
3✔
719
        defer p.paymentSeqMx.Unlock()
3✔
720

3✔
721
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
722
        // conflicts on the sequence when using etcd.
3✔
723
        if p.currPaymentSeq == p.storedPaymentSeq {
6✔
724
                var currPaymentSeq, newUpperBound uint64
3✔
725
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
726
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
727
                                paymentsRootBucket,
3✔
728
                        )
3✔
729
                        if err != nil {
3✔
730
                                return err
×
731
                        }
×
732

733
                        currPaymentSeq = paymentsBucket.Sequence()
3✔
734
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
3✔
735
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
736
                }, func() {}); err != nil {
3✔
737
                        return nil, err
×
738
                }
×
739

740
                // We lazy initialize the cached currPaymentSeq here using the
741
                // first nextPaymentSequence() call. This if statement will auto
742
                // initialize our stored currPaymentSeq, since by default both
743
                // this variable and storedPaymentSeq are zero which in turn
744
                // will have us fetch the current values from the DB.
745
                if p.currPaymentSeq == 0 {
6✔
746
                        p.currPaymentSeq = currPaymentSeq
3✔
747
                }
3✔
748

749
                p.storedPaymentSeq = newUpperBound
3✔
750
        }
751

752
        p.currPaymentSeq++
3✔
753
        b := make([]byte, 8)
3✔
754
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
3✔
755

3✔
756
        return b, nil
3✔
757
}
758

759
// fetchPaymentStatus fetches the payment status of the payment. If the payment
760
// isn't found, it will return error `ErrPaymentNotInitiated`.
761
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
762
        // Creation info should be set for all payments, regardless of state.
3✔
763
        // If not, it is unknown.
3✔
764
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
765
                return 0, ErrPaymentNotInitiated
3✔
766
        }
3✔
767

768
        payment, err := fetchPayment(bucket)
3✔
769
        if err != nil {
3✔
770
                return 0, err
×
771
        }
×
772

773
        return payment.Status, nil
3✔
774
}
775

776
// FetchInFlightPayments returns all payments with status InFlight.
777
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
778
        var (
3✔
779
                inFlights      []*MPPayment
3✔
780
                start          = time.Now()
3✔
781
                lastLogTime    = time.Now()
3✔
782
                processedCount int
3✔
783
        )
3✔
784

3✔
785
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
786
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
787
                if payments == nil {
6✔
788
                        return nil
3✔
789
                }
3✔
790

791
                return payments.ForEach(func(k, _ []byte) error {
6✔
792
                        bucket := payments.NestedReadBucket(k)
3✔
793
                        if bucket == nil {
3✔
794
                                return fmt.Errorf("non bucket element")
×
795
                        }
×
796

797
                        p, err := fetchPayment(bucket)
3✔
798
                        if err != nil {
3✔
799
                                return err
×
800
                        }
×
801

802
                        processedCount++
3✔
803
                        if time.Since(lastLogTime) >=
3✔
804
                                paymentProgressLogInterval {
3✔
805

×
806
                                log.Debugf("Scanning inflight payments "+
×
807
                                        "(in progress), processed %d, last "+
×
808
                                        "processed payment: %v", processedCount,
×
809
                                        p.Info)
×
810

×
811
                                lastLogTime = time.Now()
×
812
                        }
×
813

814
                        // Skip the payment if it's terminated.
815
                        if p.Terminated() {
6✔
816
                                return nil
3✔
817
                        }
3✔
818

819
                        inFlights = append(inFlights, p)
3✔
820
                        return nil
3✔
821
                })
822
        }, func() {
3✔
823
                inFlights = nil
3✔
824
        })
3✔
825
        if err != nil {
3✔
826
                return nil, err
×
827
        }
×
828

829
        elapsed := time.Since(start)
3✔
830
        log.Debugf("Completed scanning for inflight payments: "+
3✔
831
                "total_processed=%d, found_inflight=%d, elapsed=%v",
3✔
832
                processedCount, len(inFlights),
3✔
833
                elapsed.Round(time.Millisecond))
3✔
834

3✔
835
        return inFlights, nil
3✔
836
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc