• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 13211764208

08 Feb 2025 03:08AM UTC coverage: 49.288% (-9.5%) from 58.815%
13211764208

Pull #9489

github

calvinrzachman
itest: verify switchrpc server enforces send then track

We prevent the rpc server from allowing onion dispatches for
attempt IDs which have already been tracked by rpc clients.

This helps protect the client from leaking a duplicate onion
attempt. NOTE: This is not the only method for solving this
issue! The issue could be addressed via careful client side
programming which accounts for the uncertainty and async
nature of dispatching onions to a remote process via RPC.
This would require some lnd ChannelRouter changes for how
we intend to use these RPCs though.
Pull Request #9489: multi: add BuildOnion, SendOnion, and TrackOnion RPCs

474 of 990 new or added lines in 11 files covered. (47.88%)

27321 existing lines in 435 files now uncovered.

101192 of 205306 relevant lines covered (49.29%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.11
/channeldb/payment_control.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sync"
10

11
        "github.com/lightningnetwork/lnd/kvdb"
12
        "github.com/lightningnetwork/lnd/lntypes"
13
)
14

15
const (
16
        // paymentSeqBlockSize is the block size used when we batch allocate
17
        // payment sequences for future payments.
18
        paymentSeqBlockSize = 1000
19
)
20

21
var (
22
        // ErrAlreadyPaid signals we have already paid this payment hash.
23
        ErrAlreadyPaid = errors.New("invoice is already paid")
24

25
        // ErrPaymentInFlight signals that payment for this payment hash is
26
        // already "in flight" on the network.
27
        ErrPaymentInFlight = errors.New("payment is in transition")
28

29
        // ErrPaymentExists is returned when we try to initialize an already
30
        // existing payment that is not failed.
31
        ErrPaymentExists = errors.New("payment already exists")
32

33
        // ErrPaymentInternal is returned when performing the payment has a
34
        // conflicting state, such as,
35
        // - payment has StatusSucceeded but remaining amount is not zero.
36
        // - payment has StatusInitiated but remaining amount is zero.
37
        // - payment has StatusFailed but remaining amount is zero.
38
        ErrPaymentInternal = errors.New("internal error")
39

40
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
41
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
42

43
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
44
        // change the status of a payment already succeeded.
45
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
46

47
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
48
        // a failed payment.
49
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
50

51
        // ErrUnknownPaymentStatus is returned when we do not recognize the
52
        // existing state of a payment.
53
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
54

55
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
56
        // already has reached a terminal condition.
57
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
58
                "condition")
59

60
        // ErrAttemptAlreadySettled is returned if we try to alter an already
61
        // settled HTLC attempt.
62
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
63

64
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
65
        // failed HTLC attempt.
66
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
67

68
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
69
        // with an amount that doesn't match the payment amount.
70
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
71
                "amount")
72

73
        // ErrValueExceedsAmt is returned if we try to register an attempt that
74
        // would take the total sent amount above the payment amount.
75
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
76
                "amount")
77

78
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
79
        // a payment that already has a non-MPP attempt registered.
80
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
81

82
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
83
        // a payment that already has an MPP attempt registered.
84
        ErrMPPayment = errors.New("payment has MPP attempts")
85

86
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
87
        // attempt with an MPP record for a payment to a blinded path.
88
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
89
                "contain MPP records")
90

91
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
92
        // register an HTLC shard to a blinded route where the total amount
93
        // doesn't match existing shards.
94
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
95
                "total amount mismatch")
96

97
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
98
        // shard where the payment address doesn't match existing shards.
99
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
100

101
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
102
        // shard where the total amount doesn't match existing shards.
103
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
104
                "mismatch")
105

106
        // ErrPaymentPendingSettled is returned when we try to add a new
107
        // attempt to a payment that has at least one of its HTLCs settled.
108
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
109

110
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
111
        // to a payment that already has a failure reason.
112
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
113

114
        // ErrSentExceedsTotal is returned if the payment's current total sent
115
        // amount exceed the total amount.
116
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
117

118
        // errNoAttemptInfo is returned when no attempt info is stored yet.
119
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
120
                "inflight payment")
121

122
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
123
        // index is made for a sequence number that is not indexed.
124
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
125
                "does not exist")
126
)
127

128
// PaymentControl implements persistence for payments and payment attempts.
129
type PaymentControl struct {
130
        paymentSeqMx     sync.Mutex
131
        currPaymentSeq   uint64
132
        storedPaymentSeq uint64
133
        db               *DB
134
}
135

136
// NewPaymentControl creates a new instance of the PaymentControl.
137
func NewPaymentControl(db *DB) *PaymentControl {
3✔
138
        return &PaymentControl{
3✔
139
                db: db,
3✔
140
        }
3✔
141
}
3✔
142

143
// InitPayment checks or records the given PaymentCreationInfo with the DB,
144
// making sure it does not already exist as an in-flight payment. When this
145
// method returns successfully, the payment is guaranteed to be in the InFlight
146
// state.
147
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
148
        info *PaymentCreationInfo) error {
3✔
149

3✔
150
        // Obtain a new sequence number for this payment. This is used
3✔
151
        // to sort the payments in order of creation, and also acts as
3✔
152
        // a unique identifier for each payment.
3✔
153
        sequenceNum, err := p.nextPaymentSequence()
3✔
154
        if err != nil {
3✔
155
                return err
×
156
        }
×
157

158
        var b bytes.Buffer
3✔
159
        if err := serializePaymentCreationInfo(&b, info); err != nil {
3✔
160
                return err
×
161
        }
×
162
        infoBytes := b.Bytes()
3✔
163

3✔
164
        var updateErr error
3✔
165
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
166
                // Reset the update error, to avoid carrying over an error
3✔
167
                // from a previous execution of the batched db transaction.
3✔
168
                updateErr = nil
3✔
169

3✔
170
                prefetchPayment(tx, paymentHash)
3✔
171
                bucket, err := createPaymentBucket(tx, paymentHash)
3✔
172
                if err != nil {
3✔
173
                        return err
×
174
                }
×
175

176
                // Get the existing status of this payment, if any.
177
                paymentStatus, err := fetchPaymentStatus(bucket)
3✔
178

3✔
179
                switch {
3✔
180
                // If no error is returned, it means we already have this
181
                // payment. We'll check the status to decide whether we allow
182
                // retrying the payment or return a specific error.
183
                case err == nil:
3✔
184
                        if err := paymentStatus.initializable(); err != nil {
6✔
185
                                updateErr = err
3✔
186
                                return nil
3✔
187
                        }
3✔
188

189
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
190
                // we'll return the error.
191
                case !errors.Is(err, ErrPaymentNotInitiated):
×
192
                        return err
×
193
                }
194

195
                // Before we set our new sequence number, we check whether this
196
                // payment has a previously set sequence number and remove its
197
                // index entry if it exists. This happens in the case where we
198
                // have a previously attempted payment which was left in a state
199
                // where we can retry.
200
                seqBytes := bucket.Get(paymentSequenceKey)
3✔
201
                if seqBytes != nil {
6✔
202
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
203
                        if err := indexBucket.Delete(seqBytes); err != nil {
3✔
204
                                return err
×
205
                        }
×
206
                }
207

208
                // Once we have obtained a sequence number, we add an entry
209
                // to our index bucket which will map the sequence number to
210
                // our payment identifier.
211
                err = createPaymentIndexEntry(
3✔
212
                        tx, sequenceNum, info.PaymentIdentifier,
3✔
213
                )
3✔
214
                if err != nil {
3✔
215
                        return err
×
216
                }
×
217

218
                err = bucket.Put(paymentSequenceKey, sequenceNum)
3✔
219
                if err != nil {
3✔
220
                        return err
×
221
                }
×
222

223
                // Add the payment info to the bucket, which contains the
224
                // static information for this payment
225
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
3✔
226
                if err != nil {
3✔
227
                        return err
×
228
                }
×
229

230
                // We'll delete any lingering HTLCs to start with, in case we
231
                // are initializing a payment that was attempted earlier, but
232
                // left in a state where we could retry.
233
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
3✔
234
                if err != nil && err != kvdb.ErrBucketNotFound {
3✔
235
                        return err
×
236
                }
×
237

238
                // Also delete any lingering failure info now that we are
239
                // re-attempting.
240
                return bucket.Delete(paymentFailInfoKey)
3✔
241
        })
242
        if err != nil {
3✔
243
                return fmt.Errorf("unable to init payment: %w", err)
×
244
        }
×
245

246
        return updateErr
3✔
247
}
248

249
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
250
// by the PaymentControl db.
251
func (p *PaymentControl) DeleteFailedAttempts(hash lntypes.Hash) error {
3✔
252
        if !p.db.keepFailedPaymentAttempts {
3✔
UNCOV
253
                const failedHtlcsOnly = true
×
UNCOV
254
                err := p.db.DeletePayment(hash, failedHtlcsOnly)
×
UNCOV
255
                if err != nil {
×
UNCOV
256
                        return err
×
UNCOV
257
                }
×
258
        }
259
        return nil
3✔
260
}
261

262
// paymentIndexTypeHash is a payment index type which indicates that we have
263
// created an index of payment sequence number to payment hash.
264
type paymentIndexType uint8
265

266
// paymentIndexTypeHash is a payment index type which indicates that we have
267
// created an index of payment sequence number to payment hash.
268
const paymentIndexTypeHash paymentIndexType = 0
269

270
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
271
// index produced contains a payment index type (which can be used in future to
272
// signal different payment index types) and the payment identifier.
273
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
274
        id lntypes.Hash) error {
3✔
275

3✔
276
        var b bytes.Buffer
3✔
277
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
3✔
278
                return err
×
279
        }
×
280

281
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
282
        return indexes.Put(sequenceNumber, b.Bytes())
3✔
283
}
284

285
// deserializePaymentIndex deserializes a payment index entry. This function
286
// currently only supports deserialization of payment hash indexes, and will
287
// fail for other types.
288
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
3✔
289
        var (
3✔
290
                indexType   paymentIndexType
3✔
291
                paymentHash []byte
3✔
292
        )
3✔
293

3✔
294
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
3✔
295
                return lntypes.Hash{}, err
×
296
        }
×
297

298
        // While we only have on payment index type, we do not need to use our
299
        // index type to deserialize the index. However, we sanity check that
300
        // this type is as expected, since we had to read it out anyway.
301
        if indexType != paymentIndexTypeHash {
3✔
302
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
303
                        "type: %v", indexType)
×
304
        }
×
305

306
        hash, err := lntypes.MakeHash(paymentHash)
3✔
307
        if err != nil {
3✔
308
                return lntypes.Hash{}, err
×
309
        }
×
310

311
        return hash, nil
3✔
312
}
313

314
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
315
// DB.
316
func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
317
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
3✔
318

3✔
319
        // Serialize the information before opening the db transaction.
3✔
320
        var a bytes.Buffer
3✔
321
        err := serializeHTLCAttemptInfo(&a, attempt)
3✔
322
        if err != nil {
3✔
323
                return nil, err
×
324
        }
×
325
        htlcInfoBytes := a.Bytes()
3✔
326

3✔
327
        htlcIDBytes := make([]byte, 8)
3✔
328
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
3✔
329

3✔
330
        var payment *MPPayment
3✔
331
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
332
                prefetchPayment(tx, paymentHash)
3✔
333
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
334
                if err != nil {
3✔
335
                        return err
×
336
                }
×
337

338
                payment, err = fetchPayment(bucket)
3✔
339
                if err != nil {
3✔
340
                        return err
×
341
                }
×
342

343
                // Check if registering a new attempt is allowed.
344
                if err := payment.Registrable(); err != nil {
3✔
UNCOV
345
                        return err
×
UNCOV
346
                }
×
347

348
                // If the final hop has encrypted data, then we know this is a
349
                // blinded payment. In blinded payments, MPP records are not set
350
                // for split payments and the recipient is responsible for using
351
                // a consistent PathID across the various encrypted data
352
                // payloads that we received from them for this payment. All we
353
                // need to check is that the total amount field for each HTLC
354
                // in the split payment is correct.
355
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
3✔
356

3✔
357
                // Make sure any existing shards match the new one with regards
3✔
358
                // to MPP options.
3✔
359
                mpp := attempt.Route.FinalHop().MPP
3✔
360

3✔
361
                // MPP records should not be set for attempts to blinded paths.
3✔
362
                if isBlinded && mpp != nil {
3✔
363
                        return ErrMPPRecordInBlindedPayment
×
364
                }
×
365

366
                for _, h := range payment.InFlightHTLCs() {
6✔
367
                        hMpp := h.Route.FinalHop().MPP
3✔
368

3✔
369
                        // If this is a blinded payment, then no existing HTLCs
3✔
370
                        // should have MPP records.
3✔
371
                        if isBlinded && hMpp != nil {
3✔
372
                                return ErrMPPRecordInBlindedPayment
×
373
                        }
×
374

375
                        // If this is a blinded payment, then we just need to
376
                        // check that the TotalAmtMsat field for this shard
377
                        // is equal to that of any other shard in the same
378
                        // payment.
379
                        if isBlinded {
6✔
380
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
381
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
382

×
383
                                        //nolint:ll
×
384
                                        return ErrBlindedPaymentTotalAmountMismatch
×
385
                                }
×
386

387
                                continue
3✔
388
                        }
389

390
                        switch {
3✔
391
                        // We tried to register a non-MPP attempt for a MPP
392
                        // payment.
UNCOV
393
                        case mpp == nil && hMpp != nil:
×
UNCOV
394
                                return ErrMPPayment
×
395

396
                        // We tried to register a MPP shard for a non-MPP
397
                        // payment.
UNCOV
398
                        case mpp != nil && hMpp == nil:
×
UNCOV
399
                                return ErrNonMPPayment
×
400

401
                        // Non-MPP payment, nothing more to validate.
402
                        case mpp == nil:
×
403
                                continue
×
404
                        }
405

406
                        // Check that MPP options match.
407
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
3✔
UNCOV
408
                                return ErrMPPPaymentAddrMismatch
×
UNCOV
409
                        }
×
410

411
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
3✔
UNCOV
412
                                return ErrMPPTotalAmountMismatch
×
UNCOV
413
                        }
×
414
                }
415

416
                // If this is a non-MPP attempt, it must match the total amount
417
                // exactly. Note that a blinded payment is considered an MPP
418
                // attempt.
419
                amt := attempt.Route.ReceiverAmt()
3✔
420
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
3✔
421
                        return ErrValueMismatch
×
422
                }
×
423

424
                // Ensure we aren't sending more than the total payment amount.
425
                sentAmt, _ := payment.SentAmt()
3✔
426
                if sentAmt+amt > payment.Info.Value {
3✔
UNCOV
427
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
×
UNCOV
428
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
×
UNCOV
429
                                payment.Info.Value)
×
UNCOV
430
                }
×
431

432
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
3✔
433
                        paymentHtlcsBucket,
3✔
434
                )
3✔
435
                if err != nil {
3✔
436
                        return err
×
437
                }
×
438

439
                err = htlcsBucket.Put(
3✔
440
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
3✔
441
                        htlcInfoBytes,
3✔
442
                )
3✔
443
                if err != nil {
3✔
444
                        return err
×
445
                }
×
446

447
                // Retrieve attempt info for the notification.
448
                payment, err = fetchPayment(bucket)
3✔
449
                return err
3✔
450
        })
451
        if err != nil {
3✔
UNCOV
452
                return nil, err
×
UNCOV
453
        }
×
454

455
        return payment, err
3✔
456
}
457

458
// SettleAttempt marks the given attempt settled with the preimage. If this is
459
// a multi shard payment, this might implicitly mean that the full payment
460
// succeeded.
461
//
462
// After invoking this method, InitPayment should always return an error to
463
// prevent us from making duplicate payments to the same payment hash. The
464
// provided preimage is atomically saved to the DB for record keeping.
465
func (p *PaymentControl) SettleAttempt(hash lntypes.Hash,
466
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
3✔
467

3✔
468
        var b bytes.Buffer
3✔
469
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
3✔
470
                return nil, err
×
471
        }
×
472
        settleBytes := b.Bytes()
3✔
473

3✔
474
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
3✔
475
}
476

477
// FailAttempt marks the given payment attempt failed.
478
func (p *PaymentControl) FailAttempt(hash lntypes.Hash,
479
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
3✔
480

3✔
481
        var b bytes.Buffer
3✔
482
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
3✔
483
                return nil, err
×
484
        }
×
485
        failBytes := b.Bytes()
3✔
486

3✔
487
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
3✔
488
}
489

490
// updateHtlcKey updates a database key for the specified htlc.
491
func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
492
        attemptID uint64, key, value []byte) (*MPPayment, error) {
3✔
493

3✔
494
        aid := make([]byte, 8)
3✔
495
        binary.BigEndian.PutUint64(aid, attemptID)
3✔
496

3✔
497
        var payment *MPPayment
3✔
498
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
499
                payment = nil
3✔
500

3✔
501
                prefetchPayment(tx, paymentHash)
3✔
502
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
503
                if err != nil {
3✔
UNCOV
504
                        return err
×
UNCOV
505
                }
×
506

507
                p, err := fetchPayment(bucket)
3✔
508
                if err != nil {
3✔
509
                        return err
×
510
                }
×
511

512
                // We can only update keys of in-flight payments. We allow
513
                // updating keys even if the payment has reached a terminal
514
                // condition, since the HTLC outcomes must still be updated.
515
                if err := p.Status.updatable(); err != nil {
3✔
516
                        return err
×
517
                }
×
518

519
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
3✔
520
                if htlcsBucket == nil {
3✔
521
                        return fmt.Errorf("htlcs bucket not found")
×
522
                }
×
523

524
                if htlcsBucket.Get(htlcBucketKey(htlcAttemptInfoKey, aid)) == nil {
3✔
525
                        return fmt.Errorf("HTLC with ID %v not registered",
×
526
                                attemptID)
×
527
                }
×
528

529
                // Make sure the shard is not already failed or settled.
530
                if htlcsBucket.Get(htlcBucketKey(htlcFailInfoKey, aid)) != nil {
3✔
531
                        return ErrAttemptAlreadyFailed
×
532
                }
×
533

534
                if htlcsBucket.Get(htlcBucketKey(htlcSettleInfoKey, aid)) != nil {
3✔
535
                        return ErrAttemptAlreadySettled
×
536
                }
×
537

538
                // Add or update the key for this htlc.
539
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
3✔
540
                if err != nil {
3✔
541
                        return err
×
542
                }
×
543

544
                // Retrieve attempt info for the notification.
545
                payment, err = fetchPayment(bucket)
3✔
546
                return err
3✔
547
        })
548
        if err != nil {
3✔
UNCOV
549
                return nil, err
×
UNCOV
550
        }
×
551

552
        return payment, err
3✔
553
}
554

555
// Fail transitions a payment into the Failed state, and records the reason the
556
// payment failed. After invoking this method, InitPayment should return nil on
557
// its next call for this payment hash, allowing the switch to make a
558
// subsequent payment.
559
func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
560
        reason FailureReason) (*MPPayment, error) {
3✔
561

3✔
562
        var (
3✔
563
                updateErr error
3✔
564
                payment   *MPPayment
3✔
565
        )
3✔
566
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
567
                // Reset the update error, to avoid carrying over an error
3✔
568
                // from a previous execution of the batched db transaction.
3✔
569
                updateErr = nil
3✔
570
                payment = nil
3✔
571

3✔
572
                prefetchPayment(tx, paymentHash)
3✔
573
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
3✔
574
                if err == ErrPaymentNotInitiated {
3✔
UNCOV
575
                        updateErr = ErrPaymentNotInitiated
×
UNCOV
576
                        return nil
×
577
                } else if err != nil {
3✔
578
                        return err
×
579
                }
×
580

581
                // We mark the payment as failed as long as it is known. This
582
                // lets the last attempt to fail with a terminal write its
583
                // failure to the PaymentControl without synchronizing with
584
                // other attempts.
585
                _, err = fetchPaymentStatus(bucket)
3✔
586
                if errors.Is(err, ErrPaymentNotInitiated) {
3✔
587
                        updateErr = ErrPaymentNotInitiated
×
588
                        return nil
×
589
                } else if err != nil {
3✔
590
                        return err
×
591
                }
×
592

593
                // Put the failure reason in the bucket for record keeping.
594
                v := []byte{byte(reason)}
3✔
595
                err = bucket.Put(paymentFailInfoKey, v)
3✔
596
                if err != nil {
3✔
597
                        return err
×
598
                }
×
599

600
                // Retrieve attempt info for the notification, if available.
601
                payment, err = fetchPayment(bucket)
3✔
602
                if err != nil {
3✔
603
                        return err
×
604
                }
×
605

606
                return nil
3✔
607
        })
608
        if err != nil {
3✔
609
                return nil, err
×
610
        }
×
611

612
        return payment, updateErr
3✔
613
}
614

615
// FetchPayment returns information about a payment from the database.
616
func (p *PaymentControl) FetchPayment(paymentHash lntypes.Hash) (
617
        *MPPayment, error) {
3✔
618

3✔
619
        var payment *MPPayment
3✔
620
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
621
                prefetchPayment(tx, paymentHash)
3✔
622
                bucket, err := fetchPaymentBucket(tx, paymentHash)
3✔
623
                if err != nil {
3✔
UNCOV
624
                        return err
×
UNCOV
625
                }
×
626

627
                payment, err = fetchPayment(bucket)
3✔
628

3✔
629
                return err
3✔
630
        }, func() {
3✔
631
                payment = nil
3✔
632
        })
3✔
633
        if err != nil {
3✔
UNCOV
634
                return nil, err
×
UNCOV
635
        }
×
636

637
        return payment, nil
3✔
638
}
639

640
// prefetchPayment attempts to prefetch as much of the payment as possible to
641
// reduce DB roundtrips.
642
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
3✔
643
        rb := kvdb.RootBucket(tx)
3✔
644
        kvdb.Prefetch(
3✔
645
                rb,
3✔
646
                []string{
3✔
647
                        // Prefetch all keys in the payment's bucket.
3✔
648
                        string(paymentsRootBucket),
3✔
649
                        string(paymentHash[:]),
3✔
650
                },
3✔
651
                []string{
3✔
652
                        // Prefetch all keys in the payment's htlc bucket.
3✔
653
                        string(paymentsRootBucket),
3✔
654
                        string(paymentHash[:]),
3✔
655
                        string(paymentHtlcsBucket),
3✔
656
                },
3✔
657
        )
3✔
658
}
3✔
659

660
// createPaymentBucket creates or fetches the sub-bucket assigned to this
661
// payment hash.
662
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
663
        kvdb.RwBucket, error) {
3✔
664

3✔
665
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
3✔
666
        if err != nil {
3✔
667
                return nil, err
×
668
        }
×
669

670
        return payments.CreateBucketIfNotExists(paymentHash[:])
3✔
671
}
672

673
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
674
// the bucket does not exist, it returns ErrPaymentNotInitiated.
675
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
676
        kvdb.RBucket, error) {
3✔
677

3✔
678
        payments := tx.ReadBucket(paymentsRootBucket)
3✔
679
        if payments == nil {
3✔
UNCOV
680
                return nil, ErrPaymentNotInitiated
×
UNCOV
681
        }
×
682

683
        bucket := payments.NestedReadBucket(paymentHash[:])
3✔
684
        if bucket == nil {
3✔
685
                return nil, ErrPaymentNotInitiated
×
686
        }
×
687

688
        return bucket, nil
3✔
689

690
}
691

692
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
693
// bucket that can be written to.
694
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
695
        kvdb.RwBucket, error) {
3✔
696

3✔
697
        payments := tx.ReadWriteBucket(paymentsRootBucket)
3✔
698
        if payments == nil {
3✔
UNCOV
699
                return nil, ErrPaymentNotInitiated
×
UNCOV
700
        }
×
701

702
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
3✔
703
        if bucket == nil {
3✔
704
                return nil, ErrPaymentNotInitiated
×
705
        }
×
706

707
        return bucket, nil
3✔
708
}
709

710
// nextPaymentSequence returns the next sequence number to store for a new
711
// payment.
712
func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
3✔
713
        p.paymentSeqMx.Lock()
3✔
714
        defer p.paymentSeqMx.Unlock()
3✔
715

3✔
716
        // Set a new upper bound in the DB every 1000 payments to avoid
3✔
717
        // conflicts on the sequence when using etcd.
3✔
718
        if p.currPaymentSeq == p.storedPaymentSeq {
6✔
719
                var currPaymentSeq, newUpperBound uint64
3✔
720
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
6✔
721
                        paymentsBucket, err := tx.CreateTopLevelBucket(
3✔
722
                                paymentsRootBucket,
3✔
723
                        )
3✔
724
                        if err != nil {
3✔
725
                                return err
×
726
                        }
×
727

728
                        currPaymentSeq = paymentsBucket.Sequence()
3✔
729
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
3✔
730
                        return paymentsBucket.SetSequence(newUpperBound)
3✔
731
                }, func() {}); err != nil {
3✔
732
                        return nil, err
×
733
                }
×
734

735
                // We lazy initialize the cached currPaymentSeq here using the
736
                // first nextPaymentSequence() call. This if statement will auto
737
                // initialize our stored currPaymentSeq, since by default both
738
                // this variable and storedPaymentSeq are zero which in turn
739
                // will have us fetch the current values from the DB.
740
                if p.currPaymentSeq == 0 {
6✔
741
                        p.currPaymentSeq = currPaymentSeq
3✔
742
                }
3✔
743

744
                p.storedPaymentSeq = newUpperBound
3✔
745
        }
746

747
        p.currPaymentSeq++
3✔
748
        b := make([]byte, 8)
3✔
749
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
3✔
750

3✔
751
        return b, nil
3✔
752
}
753

754
// fetchPaymentStatus fetches the payment status of the payment. If the payment
755
// isn't found, it will return error `ErrPaymentNotInitiated`.
756
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
3✔
757
        // Creation info should be set for all payments, regardless of state.
3✔
758
        // If not, it is unknown.
3✔
759
        if bucket.Get(paymentCreationInfoKey) == nil {
6✔
760
                return 0, ErrPaymentNotInitiated
3✔
761
        }
3✔
762

763
        payment, err := fetchPayment(bucket)
3✔
764
        if err != nil {
3✔
765
                return 0, err
×
766
        }
×
767

768
        return payment.Status, nil
3✔
769
}
770

771
// FetchInFlightPayments returns all payments with status InFlight.
772
func (p *PaymentControl) FetchInFlightPayments() ([]*MPPayment, error) {
3✔
773
        var inFlights []*MPPayment
3✔
774
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
6✔
775
                payments := tx.ReadBucket(paymentsRootBucket)
3✔
776
                if payments == nil {
6✔
777
                        return nil
3✔
778
                }
3✔
779

780
                return payments.ForEach(func(k, _ []byte) error {
6✔
781
                        bucket := payments.NestedReadBucket(k)
3✔
782
                        if bucket == nil {
3✔
783
                                return fmt.Errorf("non bucket element")
×
784
                        }
×
785

786
                        p, err := fetchPayment(bucket)
3✔
787
                        if err != nil {
3✔
788
                                return err
×
789
                        }
×
790

791
                        // Skip the payment if it's terminated.
792
                        if p.Terminated() {
6✔
793
                                return nil
3✔
794
                        }
3✔
795

796
                        inFlights = append(inFlights, p)
3✔
797
                        return nil
3✔
798
                })
799
        }, func() {
3✔
800
                inFlights = nil
3✔
801
        })
3✔
802
        if err != nil {
3✔
803
                return nil, err
×
804
        }
×
805

806
        return inFlights, nil
3✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc