• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 15145941929

20 May 2025 07:14PM UTC coverage: 69.008% (+0.01%) from 68.996%
15145941929

Pull #9844

github

web-flow
Merge daced8b33 into c52a6ddeb
Pull Request #9844: Refactor Payment PR 3

385 of 494 new or added lines in 5 files covered. (77.94%)

66 existing lines in 19 files now uncovered.

133966 of 194131 relevant lines covered (69.01%)

22018.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.8
/channeldb/payments.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "sort"
11
        "sync"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lntypes"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        "github.com/lightningnetwork/lnd/record"
20
        "github.com/lightningnetwork/lnd/routing/route"
21
        "github.com/lightningnetwork/lnd/tlv"
22
)
23

24
var (
25
        // paymentsRootBucket is the name of the top-level bucket within the
26
        // database that stores all data related to payments. Within this
27
        // bucket, each payment hash its own sub-bucket keyed by its payment
28
        // hash.
29
        //
30
        // Bucket hierarchy:
31
        //
32
        // root-bucket
33
        //      |
34
        //      |-- <paymenthash>
35
        //      |        |--sequence-key: <sequence number>
36
        //      |        |--creation-info-key: <creation info>
37
        //      |        |--fail-info-key: <(optional) fail info>
38
        //      |        |
39
        //      |        |--payment-htlcs-bucket (shard-bucket)
40
        //      |        |        |
41
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
42
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
43
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
44
        //      |        |        |
45
        //      |        |       ...
46
        //      |        |
47
        //      |        |
48
        //      |        |--duplicate-bucket (only for old, completed payments)
49
        //      |                 |
50
        //      |                 |-- <seq-num>
51
        //      |                 |       |--sequence-key: <sequence number>
52
        //      |                 |       |--creation-info-key: <creation info>
53
        //      |                 |       |--ai: <attempt info>
54
        //      |                 |       |--si: <settle info>
55
        //      |                 |       |--fi: <fail info>
56
        //      |                 |
57
        //      |                 |-- <seq-num>
58
        //      |                 |       |
59
        //      |                ...     ...
60
        //      |
61
        //      |-- <paymenthash>
62
        //      |        |
63
        //      |       ...
64
        //     ...
65
        //
66
        paymentsRootBucket = []byte("payments-root-bucket")
67

68
        // paymentSequenceKey is a key used in the payment's sub-bucket to
69
        // store the sequence number of the payment.
70
        paymentSequenceKey = []byte("payment-sequence-key")
71

72
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
73
        // store the creation info of the payment.
74
        paymentCreationInfoKey = []byte("payment-creation-info")
75

76
        // paymentHtlcsBucket is a bucket where we'll store the information
77
        // about the HTLCs that were attempted for a payment.
78
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
79

80
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
81
        // to store the info about the attempt that was done for the HTLC in
82
        // question. The HTLC attempt ID is concatenated at the end.
83
        htlcAttemptInfoKey = []byte("ai")
84

85
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
86
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
87
        htlcSettleInfoKey = []byte("si")
88

89
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
90
        // failure information, if any.The  HTLC attempt ID is concatenated at
91
        // the end.
92
        htlcFailInfoKey = []byte("fi")
93

94
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
95
        // store information about the reason a payment failed.
96
        paymentFailInfoKey = []byte("payment-fail-info")
97

98
        // paymentsIndexBucket is the name of the top-level bucket within the
99
        // database that stores an index of payment sequence numbers to its
100
        // payment hash.
101
        // payments-sequence-index-bucket
102
        //         |--<sequence-number>: <payment hash>
103
        //         |--...
104
        //         |--<sequence-number>: <payment hash>
105
        paymentsIndexBucket = []byte("payments-index-bucket")
106
)
107

108
var (
109
        // ErrNoSequenceNumber is returned if we look up a payment which does
110
        // not have a sequence number.
111
        ErrNoSequenceNumber = errors.New("sequence number not found")
112

113
        // ErrDuplicateNotFound is returned when we lookup a payment by its
114
        // index and cannot find a payment with a matching sequence number.
115
        ErrDuplicateNotFound = errors.New("duplicate payment not found")
116

117
        // ErrNoDuplicateBucket is returned when we expect to find duplicates
118
        // when looking up a payment from its index, but the payment does not
119
        // have any.
120
        ErrNoDuplicateBucket = errors.New("expected duplicate bucket")
121

122
        // ErrNoDuplicateNestedBucket is returned if we do not find duplicate
123
        // payments in their own sub-bucket.
124
        ErrNoDuplicateNestedBucket = errors.New("nested duplicate bucket not " +
125
                "found")
126
)
127

128
// Payment operations related errors.
129
var (
130
        // ErrAlreadyPaid signals we have already paid this payment hash.
131
        ErrAlreadyPaid = errors.New("invoice is already paid")
132

133
        // ErrPaymentInFlight signals that payment for this payment hash is
134
        // already "in flight" on the network.
135
        ErrPaymentInFlight = errors.New("payment is in transition")
136

137
        // ErrPaymentExists is returned when we try to initialize an already
138
        // existing payment that is not failed.
139
        ErrPaymentExists = errors.New("payment already exists")
140

141
        // ErrPaymentInternal is returned when performing the payment has a
142
        // conflicting state, such as,
143
        // - payment has StatusSucceeded but remaining amount is not zero.
144
        // - payment has StatusInitiated but remaining amount is zero.
145
        // - payment has StatusFailed but remaining amount is zero.
146
        ErrPaymentInternal = errors.New("internal error")
147

148
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
149
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
150

151
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
152
        // change the status of a payment already succeeded.
153
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
154

155
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
156
        // a failed payment.
157
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
158

159
        // ErrUnknownPaymentStatus is returned when we do not recognize the
160
        // existing state of a payment.
161
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
162

163
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
164
        // already has reached a terminal condition.
165
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
166
                "condition")
167

168
        // ErrAttemptAlreadySettled is returned if we try to alter an already
169
        // settled HTLC attempt.
170
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
171

172
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
173
        // failed HTLC attempt.
174
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
175

176
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
177
        // with an amount that doesn't match the payment amount.
178
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
179
                "amount")
180

181
        // ErrValueExceedsAmt is returned if we try to register an attempt that
182
        // would take the total sent amount above the payment amount.
183
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
184
                "amount")
185

186
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
187
        // a payment that already has a non-MPP attempt registered.
188
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
189

190
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
191
        // a payment that already has an MPP attempt registered.
192
        ErrMPPayment = errors.New("payment has MPP attempts")
193

194
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
195
        // attempt with an MPP record for a payment to a blinded path.
196
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
197
                "contain MPP records")
198

199
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
200
        // register an HTLC shard to a blinded route where the total amount
201
        // doesn't match existing shards.
202
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
203
                "total amount mismatch")
204

205
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
206
        // shard where the payment address doesn't match existing shards.
207
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
208

209
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
210
        // shard where the total amount doesn't match existing shards.
211
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
212
                "mismatch")
213

214
        // ErrPaymentPendingSettled is returned when we try to add a new
215
        // attempt to a payment that has at least one of its HTLCs settled.
216
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
217

218
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
219
        // to a payment that already has a failure reason.
220
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
221

222
        // ErrSentExceedsTotal is returned if the payment's current total sent
223
        // amount exceed the total amount.
224
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
225

226
        // errNoAttemptInfo is returned when no attempt info is stored yet.
227
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
228
                "inflight payment")
229

230
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
231
        // index is made for a sequence number that is not indexed.
232
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
233
                "does not exist")
234
)
235

236
// Payment operations related constants.
237
const (
238
        // paymentSeqBlockSize is the block size used when we batch allocate
239
        // payment sequences for future payments.
240
        paymentSeqBlockSize = 1000
241

242
        // paymentProgressLogInterval is the interval we use limiting the
243
        // logging output of payment processing.
244
        paymentProgressLogInterval = 30 * time.Second
245
)
246

247
// FailureReason encodes the reason a payment ultimately failed.
248
type FailureReason byte
249

250
const (
251
        // FailureReasonTimeout indicates that the payment did timeout before a
252
        // successful payment attempt was made.
253
        FailureReasonTimeout FailureReason = 0
254

255
        // FailureReasonNoRoute indicates no successful route to the
256
        // destination was found during path finding.
257
        FailureReasonNoRoute FailureReason = 1
258

259
        // FailureReasonError indicates that an unexpected error happened during
260
        // payment.
261
        FailureReasonError FailureReason = 2
262

263
        // FailureReasonPaymentDetails indicates that either the hash is unknown
264
        // or the final cltv delta or amount is incorrect.
265
        FailureReasonPaymentDetails FailureReason = 3
266

267
        // FailureReasonInsufficientBalance indicates that we didn't have enough
268
        // balance to complete the payment.
269
        FailureReasonInsufficientBalance FailureReason = 4
270

271
        // FailureReasonCanceled indicates that the payment was canceled by the
272
        // user.
273
        FailureReasonCanceled FailureReason = 5
274

275
        // TODO(joostjager): Add failure reasons for:
276
        // LocalLiquidityInsufficient, RemoteCapacityInsufficient.
277
)
278

279
// Error returns a human-readable error string for the FailureReason.
280
func (r FailureReason) Error() string {
43✔
281
        return r.String()
43✔
282
}
43✔
283

284
// String returns a human-readable FailureReason.
285
func (r FailureReason) String() string {
43✔
286
        switch r {
43✔
287
        case FailureReasonTimeout:
12✔
288
                return "timeout"
12✔
289
        case FailureReasonNoRoute:
11✔
290
                return "no_route"
11✔
291
        case FailureReasonError:
14✔
292
                return "error"
14✔
293
        case FailureReasonPaymentDetails:
5✔
294
                return "incorrect_payment_details"
5✔
295
        case FailureReasonInsufficientBalance:
3✔
296
                return "insufficient_balance"
3✔
297
        case FailureReasonCanceled:
4✔
298
                return "canceled"
4✔
299
        }
300

301
        return "unknown"
×
302
}
303

304
// KVPaymentsDB implements persistence for payments and payment attempts.
305
type KVPaymentsDB struct {
306
        paymentSeqMx     sync.Mutex
307
        currPaymentSeq   uint64
308
        storedPaymentSeq uint64
309
        db               *DB
310
}
311

312
// NewKVPaymentsDB creates a new instance of the KVPaymentsDB.
313
func NewKVPaymentsDB(db *DB) *KVPaymentsDB {
45✔
314
        return &KVPaymentsDB{
45✔
315
                db: db,
45✔
316
        }
45✔
317
}
45✔
318

319
// InitPayment checks or records the given PaymentCreationInfo with the DB,
320
// making sure it does not already exist as an in-flight payment. When this
321
// method returns successfully, the payment is guaranteed to be in the InFlight
322
// state.
323
func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash,
324
        info *PaymentCreationInfo) error {
152✔
325

152✔
326
        // Obtain a new sequence number for this payment. This is used
152✔
327
        // to sort the payments in order of creation, and also acts as
152✔
328
        // a unique identifier for each payment.
152✔
329
        sequenceNum, err := p.nextPaymentSequence()
152✔
330
        if err != nil {
152✔
NEW
331
                return err
×
NEW
332
        }
×
333

334
        var b bytes.Buffer
152✔
335
        if err := serializePaymentCreationInfo(&b, info); err != nil {
152✔
NEW
336
                return err
×
NEW
337
        }
×
338
        infoBytes := b.Bytes()
152✔
339

152✔
340
        var updateErr error
152✔
341
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
304✔
342
                // Reset the update error, to avoid carrying over an error
152✔
343
                // from a previous execution of the batched db transaction.
152✔
344
                updateErr = nil
152✔
345

152✔
346
                prefetchPayment(tx, paymentHash)
152✔
347
                bucket, err := createPaymentBucket(tx, paymentHash)
152✔
348
                if err != nil {
152✔
NEW
349
                        return err
×
NEW
350
                }
×
351

352
                // Get the existing status of this payment, if any.
353
                paymentStatus, err := fetchPaymentStatus(bucket)
152✔
354

152✔
355
                switch {
152✔
356
                // If no error is returned, it means we already have this
357
                // payment. We'll check the status to decide whether we allow
358
                // retrying the payment or return a specific error.
359
                case err == nil:
8✔
360
                        if err := paymentStatus.initializable(); err != nil {
15✔
361
                                updateErr = err
7✔
362
                                return nil
7✔
363
                        }
7✔
364

365
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
366
                // we'll return the error.
NEW
367
                case !errors.Is(err, ErrPaymentNotInitiated):
×
NEW
368
                        return err
×
369
                }
370

371
                // Before we set our new sequence number, we check whether this
372
                // payment has a previously set sequence number and remove its
373
                // index entry if it exists. This happens in the case where we
374
                // have a previously attempted payment which was left in a state
375
                // where we can retry.
376
                seqBytes := bucket.Get(paymentSequenceKey)
148✔
377
                if seqBytes != nil {
152✔
378
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
4✔
379
                        if err := indexBucket.Delete(seqBytes); err != nil {
4✔
NEW
380
                                return err
×
NEW
381
                        }
×
382
                }
383

384
                // Once we have obtained a sequence number, we add an entry
385
                // to our index bucket which will map the sequence number to
386
                // our payment identifier.
387
                err = createPaymentIndexEntry(
148✔
388
                        tx, sequenceNum, info.PaymentIdentifier,
148✔
389
                )
148✔
390
                if err != nil {
148✔
NEW
391
                        return err
×
NEW
392
                }
×
393

394
                err = bucket.Put(paymentSequenceKey, sequenceNum)
148✔
395
                if err != nil {
148✔
NEW
396
                        return err
×
NEW
397
                }
×
398

399
                // Add the payment info to the bucket, which contains the
400
                // static information for this payment
401
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
148✔
402
                if err != nil {
148✔
NEW
403
                        return err
×
NEW
404
                }
×
405

406
                // We'll delete any lingering HTLCs to start with, in case we
407
                // are initializing a payment that was attempted earlier, but
408
                // left in a state where we could retry.
409
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
148✔
410
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
148✔
NEW
411
                        return err
×
NEW
412
                }
×
413

414
                // Also delete any lingering failure info now that we are
415
                // re-attempting.
416
                return bucket.Delete(paymentFailInfoKey)
148✔
417
        })
418
        if err != nil {
152✔
NEW
419
                return fmt.Errorf("unable to init payment: %w", err)
×
NEW
420
        }
×
421

422
        return updateErr
152✔
423
}
424

425
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
426
// by the KVPaymentsDB db.
427
func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error {
11✔
428
        if !p.db.keepFailedPaymentAttempts {
15✔
429
                const failedHtlcsOnly = true
4✔
430
                err := p.DeletePayment(hash, failedHtlcsOnly)
4✔
431
                if err != nil {
6✔
432
                        return err
2✔
433
                }
2✔
434
        }
435

436
        return nil
9✔
437
}
438

439
// paymentIndexTypeHash is a payment index type which indicates that we have
440
// created an index of payment sequence number to payment hash.
441
type paymentIndexType uint8
442

443
// paymentIndexTypeHash is a payment index type which indicates that we have
444
// created an index of payment sequence number to payment hash.
445
const paymentIndexTypeHash paymentIndexType = 0
446

447
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
448
// index produced contains a payment index type (which can be used in future to
449
// signal different payment index types) and the payment identifier.
450
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
451
        id lntypes.Hash) error {
169✔
452

169✔
453
        var b bytes.Buffer
169✔
454
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
169✔
NEW
455
                return err
×
NEW
456
        }
×
457

458
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
169✔
459

169✔
460
        return indexes.Put(sequenceNumber, b.Bytes())
169✔
461
}
462

463
// deserializePaymentIndex deserializes a payment index entry. This function
464
// currently only supports deserialization of payment hash indexes, and will
465
// fail for other types.
466
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
62✔
467
        var (
62✔
468
                indexType   paymentIndexType
62✔
469
                paymentHash []byte
62✔
470
        )
62✔
471

62✔
472
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
62✔
NEW
473
                return lntypes.Hash{}, err
×
NEW
474
        }
×
475

476
        // While we only have on payment index type, we do not need to use our
477
        // index type to deserialize the index. However, we sanity check that
478
        // this type is as expected, since we had to read it out anyway.
479
        if indexType != paymentIndexTypeHash {
62✔
NEW
480
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
NEW
481
                        "type: %v", indexType)
×
NEW
482
        }
×
483

484
        hash, err := lntypes.MakeHash(paymentHash)
62✔
485
        if err != nil {
62✔
NEW
486
                return lntypes.Hash{}, err
×
NEW
487
        }
×
488

489
        return hash, nil
62✔
490
}
491

492
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
493
// DB.
494
func (p *KVPaymentsDB) RegisterAttempt(paymentHash lntypes.Hash,
495
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
73✔
496

73✔
497
        // Serialize the information before opening the db transaction.
73✔
498
        var a bytes.Buffer
73✔
499
        err := serializeHTLCAttemptInfo(&a, attempt)
73✔
500
        if err != nil {
73✔
NEW
501
                return nil, err
×
NEW
502
        }
×
503
        htlcInfoBytes := a.Bytes()
73✔
504

73✔
505
        htlcIDBytes := make([]byte, 8)
73✔
506
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
73✔
507

73✔
508
        var payment *MPPayment
73✔
509
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
162✔
510
                prefetchPayment(tx, paymentHash)
89✔
511
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
89✔
512
                if err != nil {
89✔
NEW
513
                        return err
×
NEW
514
                }
×
515

516
                payment, err = fetchPayment(bucket)
89✔
517
                if err != nil {
89✔
NEW
518
                        return err
×
NEW
519
                }
×
520

521
                // Check if registering a new attempt is allowed.
522
                if err := payment.Registrable(); err != nil {
105✔
523
                        return err
16✔
524
                }
16✔
525

526
                // If the final hop has encrypted data, then we know this is a
527
                // blinded payment. In blinded payments, MPP records are not set
528
                // for split payments and the recipient is responsible for using
529
                // a consistent PathID across the various encrypted data
530
                // payloads that we received from them for this payment. All we
531
                // need to check is that the total amount field for each HTLC
532
                // in the split payment is correct.
533
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
73✔
534

73✔
535
                // Make sure any existing shards match the new one with regards
73✔
536
                // to MPP options.
73✔
537
                mpp := attempt.Route.FinalHop().MPP
73✔
538

73✔
539
                // MPP records should not be set for attempts to blinded paths.
73✔
540
                if isBlinded && mpp != nil {
73✔
NEW
541
                        return ErrMPPRecordInBlindedPayment
×
NEW
542
                }
×
543

544
                for _, h := range payment.InFlightHTLCs() {
120✔
545
                        hMpp := h.Route.FinalHop().MPP
47✔
546

47✔
547
                        // If this is a blinded payment, then no existing HTLCs
47✔
548
                        // should have MPP records.
47✔
549
                        if isBlinded && hMpp != nil {
47✔
NEW
550
                                return ErrMPPRecordInBlindedPayment
×
NEW
551
                        }
×
552

553
                        // If this is a blinded payment, then we just need to
554
                        // check that the TotalAmtMsat field for this shard
555
                        // is equal to that of any other shard in the same
556
                        // payment.
557
                        if isBlinded {
50✔
558
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
559
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
NEW
560

×
NEW
561
                                        //nolint:ll
×
NEW
562
                                        return ErrBlindedPaymentTotalAmountMismatch
×
NEW
563
                                }
×
564

565
                                continue
3✔
566
                        }
567

568
                        switch {
47✔
569
                        // We tried to register a non-MPP attempt for a MPP
570
                        // payment.
571
                        case mpp == nil && hMpp != nil:
2✔
572
                                return ErrMPPayment
2✔
573

574
                        // We tried to register a MPP shard for a non-MPP
575
                        // payment.
576
                        case mpp != nil && hMpp == nil:
2✔
577
                                return ErrNonMPPayment
2✔
578

579
                        // Non-MPP payment, nothing more to validate.
NEW
580
                        case mpp == nil:
×
NEW
581
                                continue
×
582
                        }
583

584
                        // Check that MPP options match.
585
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
45✔
586
                                return ErrMPPPaymentAddrMismatch
2✔
587
                        }
2✔
588

589
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
43✔
590
                                return ErrMPPTotalAmountMismatch
2✔
591
                        }
2✔
592
                }
593

594
                // If this is a non-MPP attempt, it must match the total amount
595
                // exactly. Note that a blinded payment is considered an MPP
596
                // attempt.
597
                amt := attempt.Route.ReceiverAmt()
65✔
598
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
65✔
NEW
599
                        return ErrValueMismatch
×
NEW
600
                }
×
601

602
                // Ensure we aren't sending more than the total payment amount.
603
                sentAmt, _ := payment.SentAmt()
65✔
604
                if sentAmt+amt > payment.Info.Value {
73✔
605
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
8✔
606
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
8✔
607
                                payment.Info.Value)
8✔
608
                }
8✔
609

610
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
57✔
611
                        paymentHtlcsBucket,
57✔
612
                )
57✔
613
                if err != nil {
57✔
NEW
614
                        return err
×
NEW
615
                }
×
616

617
                err = htlcsBucket.Put(
57✔
618
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
57✔
619
                        htlcInfoBytes,
57✔
620
                )
57✔
621
                if err != nil {
57✔
NEW
622
                        return err
×
NEW
623
                }
×
624

625
                // Retrieve attempt info for the notification.
626
                payment, err = fetchPayment(bucket)
57✔
627

57✔
628
                return err
57✔
629
        })
630
        if err != nil {
89✔
631
                return nil, err
16✔
632
        }
16✔
633

634
        return payment, err
57✔
635
}
636

637
// SettleAttempt marks the given attempt settled with the preimage. If this is
638
// a multi shard payment, this might implicitly mean that the full payment
639
// succeeded.
640
//
641
// After invoking this method, InitPayment should always return an error to
642
// prevent us from making duplicate payments to the same payment hash. The
643
// provided preimage is atomically saved to the DB for record keeping.
644
func (p *KVPaymentsDB) SettleAttempt(hash lntypes.Hash,
645
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
19✔
646

19✔
647
        var b bytes.Buffer
19✔
648
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
19✔
NEW
649
                return nil, err
×
NEW
650
        }
×
651
        settleBytes := b.Bytes()
19✔
652

19✔
653
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
19✔
654
}
655

656
// FailAttempt marks the given payment attempt failed.
657
func (p *KVPaymentsDB) FailAttempt(hash lntypes.Hash,
658
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
34✔
659

34✔
660
        var b bytes.Buffer
34✔
661
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
34✔
NEW
662
                return nil, err
×
NEW
663
        }
×
664
        failBytes := b.Bytes()
34✔
665

34✔
666
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
34✔
667
}
668

669
// updateHtlcKey updates a database key for the specified htlc.
670
func (p *KVPaymentsDB) updateHtlcKey(paymentHash lntypes.Hash,
671
        attemptID uint64, key, value []byte) (*MPPayment, error) {
50✔
672

50✔
673
        aid := make([]byte, 8)
50✔
674
        binary.BigEndian.PutUint64(aid, attemptID)
50✔
675

50✔
676
        var payment *MPPayment
50✔
677
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
101✔
678
                payment = nil
51✔
679

51✔
680
                prefetchPayment(tx, paymentHash)
51✔
681
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
51✔
682
                if err != nil {
53✔
683
                        return err
2✔
684
                }
2✔
685

686
                p, err := fetchPayment(bucket)
49✔
687
                if err != nil {
49✔
NEW
688
                        return err
×
NEW
689
                }
×
690

691
                // We can only update keys of in-flight payments. We allow
692
                // updating keys even if the payment has reached a terminal
693
                // condition, since the HTLC outcomes must still be updated.
694
                if err := p.Status.updatable(); err != nil {
49✔
NEW
695
                        return err
×
NEW
696
                }
×
697

698
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
49✔
699
                if htlcsBucket == nil {
49✔
NEW
700
                        return fmt.Errorf("htlcs bucket not found")
×
NEW
701
                }
×
702

703
                attemptInfo := htlcsBucket.Get(
49✔
704
                        htlcBucketKey(htlcAttemptInfoKey, aid),
49✔
705
                )
49✔
706
                if attemptInfo == nil {
49✔
NEW
707
                        return fmt.Errorf("HTLC with ID %v not registered",
×
NEW
708
                                attemptID)
×
NEW
709
                }
×
710

711
                failInfo := htlcsBucket.Get(
49✔
712
                        htlcBucketKey(htlcFailInfoKey, aid),
49✔
713
                )
49✔
714
                if failInfo != nil {
49✔
NEW
715
                        return ErrAttemptAlreadyFailed
×
NEW
716
                }
×
717

718
                settleInfo := htlcsBucket.Get(
49✔
719
                        htlcBucketKey(htlcSettleInfoKey, aid),
49✔
720
                )
49✔
721
                if settleInfo != nil {
49✔
NEW
722
                        return ErrAttemptAlreadySettled
×
NEW
723
                }
×
724

725
                // Add or update the key for this htlc.
726
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
49✔
727
                if err != nil {
49✔
NEW
728
                        return err
×
NEW
729
                }
×
730

731
                // Retrieve attempt info for the notification.
732
                payment, err = fetchPayment(bucket)
49✔
733

49✔
734
                return err
49✔
735
        })
736
        if err != nil {
51✔
737
                return nil, err
1✔
738
        }
1✔
739

740
        return payment, err
49✔
741
}
742

743
// Fail transitions a payment into the Failed state, and records the reason the
744
// payment failed. After invoking this method, InitPayment should return nil on
745
// its next call for this payment hash, allowing the switch to make a
746
// subsequent payment.
747
func (p *KVPaymentsDB) Fail(paymentHash lntypes.Hash,
748
        reason FailureReason) (*MPPayment, error) {
19✔
749

19✔
750
        var (
19✔
751
                updateErr error
19✔
752
                payment   *MPPayment
19✔
753
        )
19✔
754
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
38✔
755
                // Reset the update error, to avoid carrying over an error
19✔
756
                // from a previous execution of the batched db transaction.
19✔
757
                updateErr = nil
19✔
758
                payment = nil
19✔
759

19✔
760
                prefetchPayment(tx, paymentHash)
19✔
761
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
19✔
762
                if errors.Is(err, ErrPaymentNotInitiated) {
20✔
763
                        updateErr = ErrPaymentNotInitiated
1✔
764
                        return nil
1✔
765
                } else if err != nil {
19✔
NEW
766
                        return err
×
NEW
767
                }
×
768

769
                // We mark the payment as failed as long as it is known. This
770
                // lets the last attempt to fail with a terminal write its
771
                // failure to the KVPaymentsDB without synchronizing with
772
                // other attempts.
773
                _, err = fetchPaymentStatus(bucket)
18✔
774
                if errors.Is(err, ErrPaymentNotInitiated) {
18✔
NEW
775
                        updateErr = ErrPaymentNotInitiated
×
NEW
776
                        return nil
×
777
                } else if err != nil {
18✔
NEW
778
                        return err
×
NEW
779
                }
×
780

781
                // Put the failure reason in the bucket for record keeping.
782
                v := []byte{byte(reason)}
18✔
783
                err = bucket.Put(paymentFailInfoKey, v)
18✔
784
                if err != nil {
18✔
NEW
785
                        return err
×
NEW
786
                }
×
787

788
                // Retrieve attempt info for the notification, if available.
789
                payment, err = fetchPayment(bucket)
18✔
790
                if err != nil {
18✔
NEW
791
                        return err
×
NEW
792
                }
×
793

794
                return nil
18✔
795
        })
796
        if err != nil {
19✔
NEW
797
                return nil, err
×
NEW
798
        }
×
799

800
        return payment, updateErr
19✔
801
}
802

803
// FetchPayment returns information about a payment from the database.
804
func (p *KVPaymentsDB) FetchPayment(paymentHash lntypes.Hash) (
805
        *MPPayment, error) {
154✔
806

154✔
807
        var payment *MPPayment
154✔
808
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
308✔
809
                prefetchPayment(tx, paymentHash)
154✔
810
                bucket, err := fetchPaymentBucket(tx, paymentHash)
154✔
811
                if err != nil {
155✔
812
                        return err
1✔
813
                }
1✔
814

815
                payment, err = fetchPayment(bucket)
153✔
816

153✔
817
                return err
153✔
818
        }, func() {
154✔
819
                payment = nil
154✔
820
        })
154✔
821
        if err != nil {
155✔
822
                return nil, err
1✔
823
        }
1✔
824

825
        return payment, nil
153✔
826
}
827

828
// prefetchPayment attempts to prefetch as much of the payment as possible to
829
// reduce DB roundtrips.
830
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
453✔
831
        rb := kvdb.RootBucket(tx)
453✔
832
        kvdb.Prefetch(
453✔
833
                rb,
453✔
834
                []string{
453✔
835
                        // Prefetch all keys in the payment's bucket.
453✔
836
                        string(paymentsRootBucket),
453✔
837
                        string(paymentHash[:]),
453✔
838
                },
453✔
839
                []string{
453✔
840
                        // Prefetch all keys in the payment's htlc bucket.
453✔
841
                        string(paymentsRootBucket),
453✔
842
                        string(paymentHash[:]),
453✔
843
                        string(paymentHtlcsBucket),
453✔
844
                },
453✔
845
        )
453✔
846
}
453✔
847

848
// createPaymentBucket creates or fetches the sub-bucket assigned to this
849
// payment hash.
850
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
851
        kvdb.RwBucket, error) {
152✔
852

152✔
853
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
152✔
854
        if err != nil {
152✔
NEW
855
                return nil, err
×
NEW
856
        }
×
857

858
        return payments.CreateBucketIfNotExists(paymentHash[:])
152✔
859
}
860

861
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
862
// the bucket does not exist, it returns ErrPaymentNotInitiated.
863
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
864
        kvdb.RBucket, error) {
212✔
865

212✔
866
        payments := tx.ReadBucket(paymentsRootBucket)
212✔
867
        if payments == nil {
213✔
868
                return nil, ErrPaymentNotInitiated
1✔
869
        }
1✔
870

871
        bucket := payments.NestedReadBucket(paymentHash[:])
211✔
872
        if bucket == nil {
211✔
NEW
873
                return nil, ErrPaymentNotInitiated
×
NEW
874
        }
×
875

876
        return bucket, nil
211✔
877
}
878

879
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
880
// bucket that can be written to.
881
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
882
        kvdb.RwBucket, error) {
174✔
883

174✔
884
        payments := tx.ReadWriteBucket(paymentsRootBucket)
174✔
885
        if payments == nil {
177✔
886
                return nil, ErrPaymentNotInitiated
3✔
887
        }
3✔
888

889
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
171✔
890
        if bucket == nil {
171✔
NEW
891
                return nil, ErrPaymentNotInitiated
×
NEW
892
        }
×
893

894
        return bucket, nil
171✔
895
}
896

897
// nextPaymentSequence returns the next sequence number to store for a new
898
// payment.
899
func (p *KVPaymentsDB) nextPaymentSequence() ([]byte, error) {
152✔
900
        p.paymentSeqMx.Lock()
152✔
901
        defer p.paymentSeqMx.Unlock()
152✔
902

152✔
903
        // Set a new upper bound in the DB every 1000 payments to avoid
152✔
904
        // conflicts on the sequence when using etcd.
152✔
905
        if p.currPaymentSeq == p.storedPaymentSeq {
194✔
906
                var currPaymentSeq, newUpperBound uint64
42✔
907
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
84✔
908
                        paymentsBucket, err := tx.CreateTopLevelBucket(
42✔
909
                                paymentsRootBucket,
42✔
910
                        )
42✔
911
                        if err != nil {
42✔
NEW
912
                                return err
×
NEW
913
                        }
×
914

915
                        currPaymentSeq = paymentsBucket.Sequence()
42✔
916
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
42✔
917

42✔
918
                        return paymentsBucket.SetSequence(newUpperBound)
42✔
919
                }, func() {}); err != nil {
42✔
NEW
920
                        return nil, err
×
NEW
921
                }
×
922

923
                // We lazy initialize the cached currPaymentSeq here using the
924
                // first nextPaymentSequence() call. This if statement will auto
925
                // initialize our stored currPaymentSeq, since by default both
926
                // this variable and storedPaymentSeq are zero which in turn
927
                // will have us fetch the current values from the DB.
928
                if p.currPaymentSeq == 0 {
84✔
929
                        p.currPaymentSeq = currPaymentSeq
42✔
930
                }
42✔
931

932
                p.storedPaymentSeq = newUpperBound
42✔
933
        }
934

935
        p.currPaymentSeq++
152✔
936
        b := make([]byte, 8)
152✔
937
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
152✔
938

152✔
939
        return b, nil
152✔
940
}
941

942
// fetchPaymentStatus fetches the payment status of the payment. If the payment
943
// isn't found, it will return error `ErrPaymentNotInitiated`.
944
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
195✔
945
        // Creation info should be set for all payments, regardless of state.
195✔
946
        // If not, it is unknown.
195✔
947
        if bucket.Get(paymentCreationInfoKey) == nil {
342✔
948
                return 0, ErrPaymentNotInitiated
147✔
949
        }
147✔
950

951
        payment, err := fetchPayment(bucket)
51✔
952
        if err != nil {
51✔
NEW
953
                return 0, err
×
NEW
954
        }
×
955

956
        return payment.Status, nil
51✔
957
}
958

959
// FetchInFlightPayments returns all payments with status InFlight.
960
func (p *KVPaymentsDB) FetchInFlightPayments() ([]*MPPayment, error) {
7✔
961
        var (
7✔
962
                inFlights      []*MPPayment
7✔
963
                start          = time.Now()
7✔
964
                lastLogTime    = time.Now()
7✔
965
                processedCount int
7✔
966
        )
7✔
967

7✔
968
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
14✔
969
                payments := tx.ReadBucket(paymentsRootBucket)
7✔
970
                if payments == nil {
12✔
971
                        return nil
5✔
972
                }
5✔
973

974
                return payments.ForEach(func(k, _ []byte) error {
10✔
975
                        bucket := payments.NestedReadBucket(k)
5✔
976
                        if bucket == nil {
5✔
NEW
977
                                return fmt.Errorf("non bucket element")
×
NEW
978
                        }
×
979

980
                        p, err := fetchPayment(bucket)
5✔
981
                        if err != nil {
5✔
NEW
982
                                return err
×
NEW
983
                        }
×
984

985
                        processedCount++
5✔
986
                        if time.Since(lastLogTime) >=
5✔
987
                                paymentProgressLogInterval {
5✔
NEW
988

×
NEW
989
                                log.Debugf("Scanning inflight payments "+
×
NEW
990
                                        "(in progress), processed %d, last "+
×
NEW
991
                                        "processed payment: %v", processedCount,
×
NEW
992
                                        p.Info)
×
NEW
993

×
NEW
994
                                lastLogTime = time.Now()
×
NEW
995
                        }
×
996

997
                        // Skip the payment if it's terminated.
998
                        if p.Terminated() {
8✔
999
                                return nil
3✔
1000
                        }
3✔
1001

1002
                        inFlights = append(inFlights, p)
5✔
1003

5✔
1004
                        return nil
5✔
1005
                })
1006
        }, func() {
7✔
1007
                inFlights = nil
7✔
1008
        })
7✔
1009
        if err != nil {
7✔
NEW
1010
                return nil, err
×
NEW
1011
        }
×
1012

1013
        elapsed := time.Since(start)
7✔
1014
        log.Debugf("Completed scanning for inflight payments: "+
7✔
1015
                "total_processed=%d, found_inflight=%d, elapsed=%v",
7✔
1016
                processedCount, len(inFlights),
7✔
1017
                elapsed.Round(time.Millisecond))
7✔
1018

7✔
1019
        return inFlights, nil
7✔
1020
}
1021

1022
// PaymentCreationInfo is the information necessary to have ready when
1023
// initiating a payment, moving it into state InFlight.
1024
type PaymentCreationInfo struct {
1025
        // PaymentIdentifier is the hash this payment is paying to in case of
1026
        // non-AMP payments, and the SetID for AMP payments.
1027
        PaymentIdentifier lntypes.Hash
1028

1029
        // Value is the amount we are paying.
1030
        Value lnwire.MilliSatoshi
1031

1032
        // CreationTime is the time when this payment was initiated.
1033
        CreationTime time.Time
1034

1035
        // PaymentRequest is the full payment request, if any.
1036
        PaymentRequest []byte
1037

1038
        // FirstHopCustomRecords are the TLV records that are to be sent to the
1039
        // first hop of this payment. These records will be transmitted via the
1040
        // wire message only and therefore do not affect the onion payload size.
1041
        FirstHopCustomRecords lnwire.CustomRecords
1042
}
1043

1044
// String returns a human-readable description of the payment creation info.
1045
func (p *PaymentCreationInfo) String() string {
8✔
1046
        return fmt.Sprintf("payment_id=%v, amount=%v, created_at=%v",
8✔
1047
                p.PaymentIdentifier, p.Value, p.CreationTime)
8✔
1048
}
8✔
1049

1050
// htlcBucketKey creates a composite key from prefix and id where the result is
1051
// simply the two concatenated.
1052
func htlcBucketKey(prefix, id []byte) []byte {
268✔
1053
        key := make([]byte, len(prefix)+len(id))
268✔
1054
        copy(key, prefix)
268✔
1055
        copy(key[len(prefix):], id)
268✔
1056
        return key
268✔
1057
}
268✔
1058

1059
// FetchPayments returns all sent payments found in the DB.
1060
//
1061
// nolint: dupl
1062
func (p *KVPaymentsDB) FetchPayments() ([]*MPPayment, error) {
41✔
1063
        var payments []*MPPayment
41✔
1064

41✔
1065
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
82✔
1066
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
41✔
1067
                if paymentsBucket == nil {
41✔
1068
                        return nil
×
1069
                }
×
1070

1071
                return paymentsBucket.ForEach(func(k, v []byte) error {
189✔
1072
                        bucket := paymentsBucket.NestedReadBucket(k)
148✔
1073
                        if bucket == nil {
148✔
1074
                                // We only expect sub-buckets to be found in
×
1075
                                // this top-level bucket.
×
1076
                                return fmt.Errorf("non bucket element in " +
×
1077
                                        "payments bucket")
×
1078
                        }
×
1079

1080
                        p, err := fetchPayment(bucket)
148✔
1081
                        if err != nil {
148✔
1082
                                return err
×
1083
                        }
×
1084

1085
                        payments = append(payments, p)
148✔
1086

148✔
1087
                        // For older versions of lnd, duplicate payments to a
148✔
1088
                        // payment has was possible. These will be found in a
148✔
1089
                        // sub-bucket indexed by their sequence number if
148✔
1090
                        // available.
148✔
1091
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
148✔
1092
                        if err != nil {
148✔
1093
                                return err
×
1094
                        }
×
1095

1096
                        payments = append(payments, duplicatePayments...)
148✔
1097
                        return nil
148✔
1098
                })
1099
        }, func() {
41✔
1100
                payments = nil
41✔
1101
        })
41✔
1102
        if err != nil {
41✔
1103
                return nil, err
×
1104
        }
×
1105

1106
        // Before returning, sort the payments by their sequence number.
1107
        sort.Slice(payments, func(i, j int) bool {
303✔
1108
                return payments[i].SequenceNum < payments[j].SequenceNum
262✔
1109
        })
262✔
1110

1111
        return payments, nil
41✔
1112
}
1113

1114
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo, error) {
643✔
1115
        b := bucket.Get(paymentCreationInfoKey)
643✔
1116
        if b == nil {
643✔
1117
                return nil, fmt.Errorf("creation info not found")
×
1118
        }
×
1119

1120
        r := bytes.NewReader(b)
643✔
1121
        return deserializePaymentCreationInfo(r)
643✔
1122
}
1123

1124
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
643✔
1125
        seqBytes := bucket.Get(paymentSequenceKey)
643✔
1126
        if seqBytes == nil {
643✔
1127
                return nil, fmt.Errorf("sequence number not found")
×
1128
        }
×
1129

1130
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
643✔
1131

643✔
1132
        // Get the PaymentCreationInfo.
643✔
1133
        creationInfo, err := fetchCreationInfo(bucket)
643✔
1134
        if err != nil {
643✔
1135
                return nil, err
×
1136
        }
×
1137

1138
        var htlcs []HTLCAttempt
643✔
1139
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
643✔
1140
        if htlcsBucket != nil {
1,032✔
1141
                // Get the payment attempts. This can be empty.
389✔
1142
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
389✔
1143
                if err != nil {
389✔
1144
                        return nil, err
×
1145
                }
×
1146
        }
1147

1148
        // Get failure reason if available.
1149
        var failureReason *FailureReason
643✔
1150
        b := bucket.Get(paymentFailInfoKey)
643✔
1151
        if b != nil {
719✔
1152
                reason := FailureReason(b[0])
76✔
1153
                failureReason = &reason
76✔
1154
        }
76✔
1155

1156
        // Create a new payment.
1157
        payment := &MPPayment{
643✔
1158
                SequenceNum:   sequenceNum,
643✔
1159
                Info:          creationInfo,
643✔
1160
                HTLCs:         htlcs,
643✔
1161
                FailureReason: failureReason,
643✔
1162
        }
643✔
1163

643✔
1164
        // Set its state and status.
643✔
1165
        if err := payment.setState(); err != nil {
643✔
1166
                return nil, err
×
1167
        }
×
1168

1169
        return payment, nil
643✔
1170
}
1171

1172
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
1173
// the given bucket.
1174
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
396✔
1175
        htlcsMap := make(map[uint64]*HTLCAttempt)
396✔
1176

396✔
1177
        attemptInfoCount := 0
396✔
1178
        err := bucket.ForEach(func(k, v []byte) error {
1,494✔
1179
                aid := byteOrder.Uint64(k[len(k)-8:])
1,098✔
1180

1,098✔
1181
                if _, ok := htlcsMap[aid]; !ok {
1,802✔
1182
                        htlcsMap[aid] = &HTLCAttempt{}
704✔
1183
                }
704✔
1184

1185
                var err error
1,098✔
1186
                switch {
1,098✔
1187
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
704✔
1188
                        attemptInfo, err := readHtlcAttemptInfo(v)
704✔
1189
                        if err != nil {
704✔
1190
                                return err
×
1191
                        }
×
1192

1193
                        attemptInfo.AttemptID = aid
704✔
1194
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
704✔
1195
                        attemptInfoCount++
704✔
1196

1197
                case bytes.HasPrefix(k, htlcSettleInfoKey):
92✔
1198
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
92✔
1199
                        if err != nil {
92✔
1200
                                return err
×
1201
                        }
×
1202

1203
                case bytes.HasPrefix(k, htlcFailInfoKey):
308✔
1204
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
308✔
1205
                        if err != nil {
308✔
1206
                                return err
×
1207
                        }
×
1208

1209
                default:
×
1210
                        return fmt.Errorf("unknown htlc attempt key")
×
1211
                }
1212

1213
                return nil
1,098✔
1214
        })
1215
        if err != nil {
396✔
1216
                return nil, err
×
1217
        }
×
1218

1219
        // Sanity check that all htlcs have an attempt info.
1220
        if attemptInfoCount != len(htlcsMap) {
396✔
1221
                return nil, errNoAttemptInfo
×
1222
        }
×
1223

1224
        keys := make([]uint64, len(htlcsMap))
396✔
1225
        i := 0
396✔
1226
        for k := range htlcsMap {
1,100✔
1227
                keys[i] = k
704✔
1228
                i++
704✔
1229
        }
704✔
1230

1231
        // Sort HTLC attempts by their attempt ID. This is needed because in the
1232
        // DB we store the attempts with keys prefixed by their status which
1233
        // changes order (groups them together by status).
1234
        sort.Slice(keys, func(i, j int) bool {
741✔
1235
                return keys[i] < keys[j]
345✔
1236
        })
345✔
1237

1238
        htlcs := make([]HTLCAttempt, len(htlcsMap))
396✔
1239
        for i, key := range keys {
1,100✔
1240
                htlcs[i] = *htlcsMap[key]
704✔
1241
        }
704✔
1242

1243
        return htlcs, nil
396✔
1244
}
1245

1246
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1247
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
704✔
1248
        r := bytes.NewReader(b)
704✔
1249
        return deserializeHTLCAttemptInfo(r)
704✔
1250
}
704✔
1251

1252
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1253
// settled, nil is returned.
1254
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
92✔
1255
        r := bytes.NewReader(b)
92✔
1256
        return deserializeHTLCSettleInfo(r)
92✔
1257
}
92✔
1258

1259
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1260
// failed, nil is returned.
1261
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
308✔
1262
        r := bytes.NewReader(b)
308✔
1263
        return deserializeHTLCFailInfo(r)
308✔
1264
}
308✔
1265

1266
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1267
// payment bucket.
1268
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
7✔
1269
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
7✔
1270

7✔
1271
        var htlcs []HTLCAttempt
7✔
1272
        var err error
7✔
1273
        if htlcsBucket != nil {
14✔
1274
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
7✔
1275
                if err != nil {
7✔
1276
                        return nil, err
×
1277
                }
×
1278
        }
1279

1280
        // Now iterate though them and save the bucket keys for the failed
1281
        // HTLCs.
1282
        var htlcKeys [][]byte
7✔
1283
        for _, h := range htlcs {
19✔
1284
                if h.Failure == nil {
15✔
1285
                        continue
3✔
1286
                }
1287

1288
                htlcKeyBytes := make([]byte, 8)
9✔
1289
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
9✔
1290

9✔
1291
                htlcKeys = append(htlcKeys, htlcKeyBytes)
9✔
1292
        }
1293

1294
        return htlcKeys, nil
7✔
1295
}
1296

1297
// PaymentsQuery represents a query to the payments database starting or ending
1298
// at a certain offset index. The number of retrieved records can be limited.
1299
type PaymentsQuery struct {
1300
        // IndexOffset determines the starting point of the payments query and
1301
        // is always exclusive. In normal order, the query starts at the next
1302
        // higher (available) index compared to IndexOffset. In reversed order,
1303
        // the query ends at the next lower (available) index compared to the
1304
        // IndexOffset. In the case of a zero index_offset, the query will start
1305
        // with the oldest payment when paginating forwards, or will end with
1306
        // the most recent payment when paginating backwards.
1307
        IndexOffset uint64
1308

1309
        // MaxPayments is the maximal number of payments returned in the
1310
        // payments query.
1311
        MaxPayments uint64
1312

1313
        // Reversed gives a meaning to the IndexOffset. If reversed is set to
1314
        // true, the query will fetch payments with indices lower than the
1315
        // IndexOffset, otherwise, it will return payments with indices greater
1316
        // than the IndexOffset.
1317
        Reversed bool
1318

1319
        // If IncludeIncomplete is true, then return payments that have not yet
1320
        // fully completed. This means that pending payments, as well as failed
1321
        // payments will show up if this field is set to true.
1322
        IncludeIncomplete bool
1323

1324
        // CountTotal indicates that all payments currently present in the
1325
        // payment index (complete and incomplete) should be counted.
1326
        CountTotal bool
1327

1328
        // CreationDateStart, expressed in Unix seconds, if set, filters out
1329
        // all payments with a creation date greater than or equal to it.
1330
        CreationDateStart int64
1331

1332
        // CreationDateEnd, expressed in Unix seconds, if set, filters out all
1333
        // payments with a creation date less than or equal to it.
1334
        CreationDateEnd int64
1335
}
1336

1337
// PaymentsResponse contains the result of a query to the payments database.
1338
// It includes the set of payments that match the query and integers which
1339
// represent the index of the first and last item returned in the series of
1340
// payments. These integers allow callers to resume their query in the event
1341
// that the query's response exceeds the max number of returnable events.
1342
type PaymentsResponse struct {
1343
        // Payments is the set of payments returned from the database for the
1344
        // PaymentsQuery.
1345
        Payments []*MPPayment
1346

1347
        // FirstIndexOffset is the index of the first element in the set of
1348
        // returned MPPayments. Callers can use this to resume their query
1349
        // in the event that the slice has too many events to fit into a single
1350
        // response. The offset can be used to continue reverse pagination.
1351
        FirstIndexOffset uint64
1352

1353
        // LastIndexOffset is the index of the last element in the set of
1354
        // returned MPPayments. Callers can use this to resume their query
1355
        // in the event that the slice has too many events to fit into a single
1356
        // response. The offset can be used to continue forward pagination.
1357
        LastIndexOffset uint64
1358

1359
        // TotalCount represents the total number of payments that are currently
1360
        // stored in the payment database. This will only be set if the
1361
        // CountTotal field in the query was set to true.
1362
        TotalCount uint64
1363
}
1364

1365
// QueryPayments is a query to the payments database which is restricted
1366
// to a subset of payments by the payments query, containing an offset
1367
// index and a maximum number of returned payments.
1368
func (p *KVPaymentsDB) QueryPayments(_ context.Context,
1369
        query PaymentsQuery) (PaymentsResponse,
1370
        error) {
39✔
1371

39✔
1372
        var resp PaymentsResponse
39✔
1373

39✔
1374
        if err := kvdb.View(p.db, func(tx kvdb.RTx) error {
78✔
1375
                // Get the root payments bucket.
39✔
1376
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
39✔
1377
                if paymentsBucket == nil {
60✔
1378
                        return nil
21✔
1379
                }
21✔
1380

1381
                // Get the index bucket which maps sequence number -> payment
1382
                // hash and duplicate bool. If we have a payments bucket, we
1383
                // should have an indexes bucket as well.
1384
                indexes := tx.ReadBucket(paymentsIndexBucket)
21✔
1385
                if indexes == nil {
21✔
1386
                        return fmt.Errorf("index bucket does not exist")
×
1387
                }
×
1388

1389
                // accumulatePayments gets payments with the sequence number
1390
                // and hash provided and adds them to our list of payments if
1391
                // they meet the criteria of our query. It returns the number
1392
                // of payments that were added.
1393
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
21✔
1394
                        error) {
76✔
1395

55✔
1396
                        r := bytes.NewReader(hash)
55✔
1397
                        paymentHash, err := deserializePaymentIndex(r)
55✔
1398
                        if err != nil {
55✔
1399
                                return false, err
×
1400
                        }
×
1401

1402
                        payment, err := fetchPaymentWithSequenceNumber(
55✔
1403
                                tx, paymentHash, sequenceKey,
55✔
1404
                        )
55✔
1405
                        if err != nil {
55✔
1406
                                return false, err
×
1407
                        }
×
1408

1409
                        // To keep compatibility with the old API, we only
1410
                        // return non-succeeded payments if requested.
1411
                        if payment.Status != StatusSucceeded &&
55✔
1412
                                !query.IncludeIncomplete {
60✔
1413

5✔
1414
                                return false, err
5✔
1415
                        }
5✔
1416

1417
                        // Get the creation time in Unix seconds, this always
1418
                        // rounds down the nanoseconds to full seconds.
1419
                        createTime := payment.Info.CreationTime.Unix()
50✔
1420

50✔
1421
                        // Skip any payments that were created before the
50✔
1422
                        // specified time.
50✔
1423
                        if createTime < query.CreationDateStart {
62✔
1424
                                return false, nil
12✔
1425
                        }
12✔
1426

1427
                        // Skip any payments that were created after the
1428
                        // specified time.
1429
                        if query.CreationDateEnd != 0 &&
41✔
1430
                                createTime > query.CreationDateEnd {
46✔
1431

5✔
1432
                                return false, nil
5✔
1433
                        }
5✔
1434

1435
                        // At this point, we've exhausted the offset, so we'll
1436
                        // begin collecting invoices found within the range.
1437
                        resp.Payments = append(resp.Payments, payment)
39✔
1438
                        return true, nil
39✔
1439
                }
1440

1441
                // Create a paginator which reads from our sequence index bucket
1442
                // with the parameters provided by the payments query.
1443
                paginator := newPaginator(
21✔
1444
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
21✔
1445
                        query.MaxPayments,
21✔
1446
                )
21✔
1447

21✔
1448
                // Run a paginated query, adding payments to our response.
21✔
1449
                if err := paginator.query(accumulatePayments); err != nil {
21✔
1450
                        return err
×
1451
                }
×
1452

1453
                // Counting the total number of payments is expensive, since we
1454
                // literally have to traverse the cursor linearly, which can
1455
                // take quite a while. So it's an optional query parameter.
1456
                if query.CountTotal {
21✔
1457
                        var (
×
1458
                                totalPayments uint64
×
1459
                                err           error
×
1460
                        )
×
1461
                        countFn := func(_, _ []byte) error {
×
1462
                                totalPayments++
×
1463

×
1464
                                return nil
×
1465
                        }
×
1466

1467
                        // In non-boltdb database backends, there's a faster
1468
                        // ForAll query that allows for batch fetching items.
1469
                        if fastBucket, ok := indexes.(kvdb.ExtendedRBucket); ok {
×
1470
                                err = fastBucket.ForAll(countFn)
×
1471
                        } else {
×
1472
                                err = indexes.ForEach(countFn)
×
1473
                        }
×
1474
                        if err != nil {
×
1475
                                return fmt.Errorf("error counting payments: %w",
×
1476
                                        err)
×
1477
                        }
×
1478

1479
                        resp.TotalCount = totalPayments
×
1480
                }
1481

1482
                return nil
21✔
1483
        }, func() {
39✔
1484
                resp = PaymentsResponse{}
39✔
1485
        }); err != nil {
39✔
1486
                return resp, err
×
1487
        }
×
1488

1489
        // Need to swap the payments slice order if reversed order.
1490
        if query.Reversed {
55✔
1491
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
24✔
1492
                        resp.Payments[l], resp.Payments[r] =
8✔
1493
                                resp.Payments[r], resp.Payments[l]
8✔
1494
                }
8✔
1495
        }
1496

1497
        // Set the first and last index of the returned payments so that the
1498
        // caller can resume from this point later on.
1499
        if len(resp.Payments) > 0 {
58✔
1500
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
19✔
1501
                resp.LastIndexOffset =
19✔
1502
                        resp.Payments[len(resp.Payments)-1].SequenceNum
19✔
1503
        }
19✔
1504

1505
        return resp, nil
39✔
1506
}
1507

1508
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1509
// *and* sequence number provided from the database. This is required because
1510
// we previously had more than one payment per hash, so we have multiple indexes
1511
// pointing to a single payment; we want to retrieve the correct one.
1512
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1513
        sequenceNumber []byte) (*MPPayment, error) {
61✔
1514

61✔
1515
        // We can now lookup the payment keyed by its hash in
61✔
1516
        // the payments root bucket.
61✔
1517
        bucket, err := fetchPaymentBucket(tx, paymentHash)
61✔
1518
        if err != nil {
61✔
1519
                return nil, err
×
1520
        }
×
1521

1522
        // A single payment hash can have multiple payments associated with it.
1523
        // We lookup our sequence number first, to determine whether this is
1524
        // the payment we are actually looking for.
1525
        seqBytes := bucket.Get(paymentSequenceKey)
61✔
1526
        if seqBytes == nil {
61✔
1527
                return nil, ErrNoSequenceNumber
×
1528
        }
×
1529

1530
        // If this top level payment has the sequence number we are looking for,
1531
        // return it.
1532
        if bytes.Equal(seqBytes, sequenceNumber) {
109✔
1533
                return fetchPayment(bucket)
48✔
1534
        }
48✔
1535

1536
        // If we were not looking for the top level payment, we are looking for
1537
        // one of our duplicate payments. We need to iterate through the seq
1538
        // numbers in this bucket to find the correct payments. If we do not
1539
        // find a duplicate payments bucket here, something is wrong.
1540
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
13✔
1541
        if dup == nil {
14✔
1542
                return nil, ErrNoDuplicateBucket
1✔
1543
        }
1✔
1544

1545
        var duplicatePayment *MPPayment
12✔
1546
        err = dup.ForEach(func(k, v []byte) error {
27✔
1547
                subBucket := dup.NestedReadBucket(k)
15✔
1548
                if subBucket == nil {
15✔
1549
                        // We one bucket for each duplicate to be found.
×
1550
                        return ErrNoDuplicateNestedBucket
×
1551
                }
×
1552

1553
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
15✔
1554
                if seqBytes == nil {
15✔
1555
                        return err
×
1556
                }
×
1557

1558
                // If this duplicate payment is not the sequence number we are
1559
                // looking for, we can continue.
1560
                if !bytes.Equal(seqBytes, sequenceNumber) {
19✔
1561
                        return nil
4✔
1562
                }
4✔
1563

1564
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
11✔
1565
                if err != nil {
11✔
1566
                        return err
×
1567
                }
×
1568

1569
                return nil
11✔
1570
        })
1571
        if err != nil {
12✔
1572
                return nil, err
×
1573
        }
×
1574

1575
        // If none of the duplicate payments matched our sequence number, we
1576
        // failed to find the payment with this sequence number; something is
1577
        // wrong.
1578
        if duplicatePayment == nil {
13✔
1579
                return nil, ErrDuplicateNotFound
1✔
1580
        }
1✔
1581

1582
        return duplicatePayment, nil
11✔
1583
}
1584

1585
// DeletePayment deletes a payment from the DB given its payment hash. If
1586
// failedHtlcsOnly is set, only failed HTLC attempts of the payment will be
1587
// deleted.
1588
func (p *KVPaymentsDB) DeletePayment(paymentHash lntypes.Hash,
1589
        failedHtlcsOnly bool) error {
11✔
1590

11✔
1591
        return kvdb.Update(p.db, func(tx kvdb.RwTx) error {
22✔
1592
                payments := tx.ReadWriteBucket(paymentsRootBucket)
11✔
1593
                if payments == nil {
11✔
1594
                        return nil
×
1595
                }
×
1596

1597
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
11✔
1598
                if bucket == nil {
12✔
1599
                        return fmt.Errorf("non bucket element in payments " +
1✔
1600
                                "bucket")
1✔
1601
                }
1✔
1602

1603
                // If the status is InFlight, we cannot safely delete
1604
                // the payment information, so we return early.
1605
                paymentStatus, err := fetchPaymentStatus(bucket)
10✔
1606
                if err != nil {
10✔
1607
                        return err
×
1608
                }
×
1609

1610
                // If the payment has inflight HTLCs, we cannot safely delete
1611
                // the payment information, so we return an error.
1612
                if err := paymentStatus.removable(); err != nil {
13✔
1613
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
3✔
1614
                                "and therefore cannot be deleted: %w",
3✔
1615
                                paymentHash.String(), err)
3✔
1616
                }
3✔
1617

1618
                // Delete the failed HTLC attempts we found.
1619
                if failedHtlcsOnly {
11✔
1620
                        toDelete, err := fetchFailedHtlcKeys(bucket)
4✔
1621
                        if err != nil {
4✔
1622
                                return err
×
1623
                        }
×
1624

1625
                        htlcsBucket := bucket.NestedReadWriteBucket(
4✔
1626
                                paymentHtlcsBucket,
4✔
1627
                        )
4✔
1628

4✔
1629
                        for _, htlcID := range toDelete {
10✔
1630
                                err = htlcsBucket.Delete(
6✔
1631
                                        htlcBucketKey(htlcAttemptInfoKey, htlcID),
6✔
1632
                                )
6✔
1633
                                if err != nil {
6✔
1634
                                        return err
×
1635
                                }
×
1636

1637
                                err = htlcsBucket.Delete(
6✔
1638
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
6✔
1639
                                )
6✔
1640
                                if err != nil {
6✔
1641
                                        return err
×
1642
                                }
×
1643

1644
                                err = htlcsBucket.Delete(
6✔
1645
                                        htlcBucketKey(htlcSettleInfoKey, htlcID),
6✔
1646
                                )
6✔
1647
                                if err != nil {
6✔
1648
                                        return err
×
1649
                                }
×
1650
                        }
1651

1652
                        return nil
4✔
1653
                }
1654

1655
                seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1656
                if err != nil {
3✔
1657
                        return err
×
1658
                }
×
1659

1660
                if err := payments.DeleteNestedBucket(paymentHash[:]); err != nil {
3✔
1661
                        return err
×
1662
                }
×
1663

1664
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1665
                for _, k := range seqNrs {
6✔
1666
                        if err := indexBucket.Delete(k); err != nil {
3✔
1667
                                return err
×
1668
                        }
×
1669
                }
1670

1671
                return nil
3✔
1672
        }, func() {})
11✔
1673
}
1674

1675
// DeletePayments deletes all completed and failed payments from the DB. If
1676
// failedOnly is set, only failed payments will be considered for deletion. If
1677
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1678
// attempts. The method returns the number of deleted payments, which is always
1679
// 0 if failedHtlcsOnly is set.
1680
func (p *KVPaymentsDB) DeletePayments(failedOnly, failedHtlcsOnly bool) (int,
1681
        error) {
9✔
1682

9✔
1683
        var numPayments int
9✔
1684
        err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
18✔
1685
                payments := tx.ReadWriteBucket(paymentsRootBucket)
9✔
1686
                if payments == nil {
9✔
1687
                        return nil
×
1688
                }
×
1689

1690
                var (
9✔
1691
                        // deleteBuckets is the set of payment buckets we need
9✔
1692
                        // to delete.
9✔
1693
                        deleteBuckets [][]byte
9✔
1694

9✔
1695
                        // deleteIndexes is the set of indexes pointing to these
9✔
1696
                        // payments that need to be deleted.
9✔
1697
                        deleteIndexes [][]byte
9✔
1698

9✔
1699
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
9✔
1700
                        // want to delete for that payment.
9✔
1701
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
9✔
1702
                )
9✔
1703
                err := payments.ForEach(func(k, _ []byte) error {
30✔
1704
                        bucket := payments.NestedReadBucket(k)
21✔
1705
                        if bucket == nil {
21✔
1706
                                // We only expect sub-buckets to be found in
×
1707
                                // this top-level bucket.
×
1708
                                return fmt.Errorf("non bucket element in " +
×
1709
                                        "payments bucket")
×
1710
                        }
×
1711

1712
                        // If the status is InFlight, we cannot safely delete
1713
                        // the payment information, so we return early.
1714
                        paymentStatus, err := fetchPaymentStatus(bucket)
21✔
1715
                        if err != nil {
21✔
1716
                                return err
×
1717
                        }
×
1718

1719
                        // If the payment has inflight HTLCs, we cannot safely
1720
                        // delete the payment information, so we return an nil
1721
                        // to skip it.
1722
                        if err := paymentStatus.removable(); err != nil {
27✔
1723
                                return nil
6✔
1724
                        }
6✔
1725

1726
                        // If we requested to only delete failed payments, we
1727
                        // can return if this one is not.
1728
                        if failedOnly && paymentStatus != StatusFailed {
19✔
1729
                                return nil
4✔
1730
                        }
4✔
1731

1732
                        // If we are only deleting failed HTLCs, fetch them.
1733
                        if failedHtlcsOnly {
14✔
1734
                                toDelete, err := fetchFailedHtlcKeys(bucket)
3✔
1735
                                if err != nil {
3✔
1736
                                        return err
×
1737
                                }
×
1738

1739
                                hash, err := lntypes.MakeHash(k)
3✔
1740
                                if err != nil {
3✔
1741
                                        return err
×
1742
                                }
×
1743

1744
                                deleteHtlcs[hash] = toDelete
3✔
1745

3✔
1746
                                // We return, we are only deleting attempts.
3✔
1747
                                return nil
3✔
1748
                        }
1749

1750
                        // Add the bucket to the set of buckets we can delete.
1751
                        deleteBuckets = append(deleteBuckets, k)
8✔
1752

8✔
1753
                        // Get all the sequence number associated with the
8✔
1754
                        // payment, including duplicates.
8✔
1755
                        seqNrs, err := fetchSequenceNumbers(bucket)
8✔
1756
                        if err != nil {
8✔
1757
                                return err
×
1758
                        }
×
1759

1760
                        deleteIndexes = append(deleteIndexes, seqNrs...)
8✔
1761
                        numPayments++
8✔
1762
                        return nil
8✔
1763
                })
1764
                if err != nil {
9✔
1765
                        return err
×
1766
                }
×
1767

1768
                // Delete the failed HTLC attempts we found.
1769
                for hash, htlcIDs := range deleteHtlcs {
12✔
1770
                        bucket := payments.NestedReadWriteBucket(hash[:])
3✔
1771
                        htlcsBucket := bucket.NestedReadWriteBucket(
3✔
1772
                                paymentHtlcsBucket,
3✔
1773
                        )
3✔
1774

3✔
1775
                        for _, aid := range htlcIDs {
6✔
1776
                                if err := htlcsBucket.Delete(
3✔
1777
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
3✔
1778
                                ); err != nil {
3✔
1779
                                        return err
×
1780
                                }
×
1781

1782
                                if err := htlcsBucket.Delete(
3✔
1783
                                        htlcBucketKey(htlcFailInfoKey, aid),
3✔
1784
                                ); err != nil {
3✔
1785
                                        return err
×
1786
                                }
×
1787

1788
                                if err := htlcsBucket.Delete(
3✔
1789
                                        htlcBucketKey(htlcSettleInfoKey, aid),
3✔
1790
                                ); err != nil {
3✔
1791
                                        return err
×
1792
                                }
×
1793
                        }
1794
                }
1795

1796
                for _, k := range deleteBuckets {
17✔
1797
                        if err := payments.DeleteNestedBucket(k); err != nil {
8✔
1798
                                return err
×
1799
                        }
×
1800
                }
1801

1802
                // Get our index bucket and delete all indexes pointing to the
1803
                // payments we are deleting.
1804
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
9✔
1805
                for _, k := range deleteIndexes {
18✔
1806
                        if err := indexBucket.Delete(k); err != nil {
9✔
1807
                                return err
×
1808
                        }
×
1809
                }
1810

1811
                return nil
9✔
1812
        }, func() {
9✔
1813
                numPayments = 0
9✔
1814
        })
9✔
1815
        if err != nil {
9✔
1816
                return 0, err
×
1817
        }
×
1818

1819
        return numPayments, nil
9✔
1820
}
1821

1822
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1823
// payment, including those belonging to any duplicate payments.
1824
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
11✔
1825
        seqNum := paymentBucket.Get(paymentSequenceKey)
11✔
1826
        if seqNum == nil {
11✔
1827
                return nil, errors.New("expected sequence number")
×
1828
        }
×
1829

1830
        sequenceNumbers := [][]byte{seqNum}
11✔
1831

11✔
1832
        // Get the duplicate payments bucket, if it has no duplicates, just
11✔
1833
        // return early with the payment sequence number.
11✔
1834
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
11✔
1835
        if duplicates == nil {
21✔
1836
                return sequenceNumbers, nil
10✔
1837
        }
10✔
1838

1839
        // If we do have duplicated, they are keyed by sequence number, so we
1840
        // iterate through the duplicates bucket and add them to our set of
1841
        // sequence numbers.
1842
        if err := duplicates.ForEach(func(k, v []byte) error {
2✔
1843
                sequenceNumbers = append(sequenceNumbers, k)
1✔
1844
                return nil
1✔
1845
        }); err != nil {
1✔
1846
                return nil, err
×
1847
        }
×
1848

1849
        return sequenceNumbers, nil
1✔
1850
}
1851

1852
// nolint: dupl
1853
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
154✔
1854
        var scratch [8]byte
154✔
1855

154✔
1856
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
154✔
1857
                return err
×
1858
        }
×
1859

1860
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
154✔
1861
        if _, err := w.Write(scratch[:]); err != nil {
154✔
1862
                return err
×
1863
        }
×
1864

1865
        if err := serializeTime(w, c.CreationTime); err != nil {
154✔
1866
                return err
×
1867
        }
×
1868

1869
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
154✔
1870
        if _, err := w.Write(scratch[:4]); err != nil {
154✔
1871
                return err
×
1872
        }
×
1873

1874
        if _, err := w.Write(c.PaymentRequest[:]); err != nil {
154✔
1875
                return err
×
1876
        }
×
1877

1878
        // Any remaining bytes are TLV encoded records. Currently, these are
1879
        // only the custom records provided by the user to be sent to the first
1880
        // hop. But this can easily be extended with further records by merging
1881
        // the records into a single TLV stream.
1882
        err := c.FirstHopCustomRecords.SerializeTo(w)
154✔
1883
        if err != nil {
154✔
1884
                return err
×
1885
        }
×
1886

1887
        return nil
154✔
1888
}
1889

1890
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1891
        error) {
645✔
1892

645✔
1893
        var scratch [8]byte
645✔
1894

645✔
1895
        c := &PaymentCreationInfo{}
645✔
1896

645✔
1897
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
645✔
1898
                return nil, err
×
1899
        }
×
1900

1901
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
645✔
1902
                return nil, err
×
1903
        }
×
1904
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
645✔
1905

645✔
1906
        creationTime, err := deserializeTime(r)
645✔
1907
        if err != nil {
645✔
1908
                return nil, err
×
1909
        }
×
1910
        c.CreationTime = creationTime
645✔
1911

645✔
1912
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
645✔
1913
                return nil, err
×
1914
        }
×
1915

1916
        reqLen := uint32(byteOrder.Uint32(scratch[:4]))
645✔
1917
        payReq := make([]byte, reqLen)
645✔
1918
        if reqLen > 0 {
1,290✔
1919
                if _, err := io.ReadFull(r, payReq); err != nil {
645✔
1920
                        return nil, err
×
1921
                }
×
1922
        }
1923
        c.PaymentRequest = payReq
645✔
1924

645✔
1925
        // Any remaining bytes are TLV encoded records. Currently, these are
645✔
1926
        // only the custom records provided by the user to be sent to the first
645✔
1927
        // hop. But this can easily be extended with further records by merging
645✔
1928
        // the records into a single TLV stream.
645✔
1929
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
645✔
1930
        if err != nil {
645✔
1931
                return nil, err
×
1932
        }
×
1933

1934
        return c, nil
645✔
1935
}
1936

1937
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
75✔
1938
        if err := WriteElements(w, a.sessionKey); err != nil {
75✔
1939
                return err
×
1940
        }
×
1941

1942
        if err := SerializeRoute(w, a.Route); err != nil {
75✔
1943
                return err
×
1944
        }
×
1945

1946
        if err := serializeTime(w, a.AttemptTime); err != nil {
75✔
1947
                return err
×
1948
        }
×
1949

1950
        // If the hash is nil we can just return.
1951
        if a.Hash == nil {
75✔
1952
                return nil
×
1953
        }
×
1954

1955
        if _, err := w.Write(a.Hash[:]); err != nil {
75✔
1956
                return err
×
1957
        }
×
1958

1959
        // Merge the fixed/known records together with the custom records to
1960
        // serialize them as a single blob. We can't do this in SerializeRoute
1961
        // because we're in the middle of the byte stream there. We can only do
1962
        // TLV serialization at the end of the stream, since EOF is allowed for
1963
        // a stream if no more data is expected.
1964
        producers := []tlv.RecordProducer{
75✔
1965
                &a.Route.FirstHopAmount,
75✔
1966
        }
75✔
1967
        tlvData, err := lnwire.MergeAndEncode(
75✔
1968
                producers, nil, a.Route.FirstHopWireCustomRecords,
75✔
1969
        )
75✔
1970
        if err != nil {
75✔
1971
                return err
×
1972
        }
×
1973

1974
        if _, err := w.Write(tlvData); err != nil {
75✔
1975
                return err
×
1976
        }
×
1977

1978
        return nil
75✔
1979
}
1980

1981
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
706✔
1982
        a := &HTLCAttemptInfo{}
706✔
1983
        err := ReadElements(r, &a.sessionKey)
706✔
1984
        if err != nil {
706✔
1985
                return nil, err
×
1986
        }
×
1987

1988
        a.Route, err = DeserializeRoute(r)
706✔
1989
        if err != nil {
706✔
1990
                return nil, err
×
1991
        }
×
1992

1993
        a.AttemptTime, err = deserializeTime(r)
706✔
1994
        if err != nil {
706✔
1995
                return nil, err
×
1996
        }
×
1997

1998
        hash := lntypes.Hash{}
706✔
1999
        _, err = io.ReadFull(r, hash[:])
706✔
2000

706✔
2001
        switch {
706✔
2002
        // Older payment attempts wouldn't have the hash set, in which case we
2003
        // can just return.
2004
        case err == io.EOF, err == io.ErrUnexpectedEOF:
×
2005
                return a, nil
×
2006

2007
        case err != nil:
×
2008
                return nil, err
×
2009

2010
        default:
706✔
2011
        }
2012

2013
        a.Hash = &hash
706✔
2014

706✔
2015
        // Read any remaining data (if any) and parse it into the known records
706✔
2016
        // and custom records.
706✔
2017
        extraData, err := io.ReadAll(r)
706✔
2018
        if err != nil {
706✔
2019
                return nil, err
×
2020
        }
×
2021

2022
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
706✔
2023
                extraData, &a.Route.FirstHopAmount,
706✔
2024
        )
706✔
2025
        if err != nil {
706✔
2026
                return nil, err
×
2027
        }
×
2028

2029
        a.Route.FirstHopWireCustomRecords = customRecords
706✔
2030

706✔
2031
        return a, nil
706✔
2032
}
2033

2034
func serializeHop(w io.Writer, h *route.Hop) error {
152✔
2035
        if err := WriteElements(w,
152✔
2036
                h.PubKeyBytes[:],
152✔
2037
                h.ChannelID,
152✔
2038
                h.OutgoingTimeLock,
152✔
2039
                h.AmtToForward,
152✔
2040
        ); err != nil {
152✔
2041
                return err
×
2042
        }
×
2043

2044
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
152✔
2045
                return err
×
2046
        }
×
2047

2048
        // For legacy payloads, we don't need to write any TLV records, so
2049
        // we'll write a zero indicating the our serialized TLV map has no
2050
        // records.
2051
        if h.LegacyPayload {
232✔
2052
                return WriteElements(w, uint32(0))
80✔
2053
        }
80✔
2054

2055
        // Gather all non-primitive TLV records so that they can be serialized
2056
        // as a single blob.
2057
        //
2058
        // TODO(conner): add migration to unify all fields in a single TLV
2059
        // blobs. The split approach will cause headaches down the road as more
2060
        // fields are added, which we can avoid by having a single TLV stream
2061
        // for all payload fields.
2062
        var records []tlv.Record
72✔
2063
        if h.MPP != nil {
139✔
2064
                records = append(records, h.MPP.Record())
67✔
2065
        }
67✔
2066

2067
        // Add blinding point and encrypted data if present.
2068
        if h.EncryptedData != nil {
77✔
2069
                records = append(records, record.NewEncryptedDataRecord(
5✔
2070
                        &h.EncryptedData,
5✔
2071
                ))
5✔
2072
        }
5✔
2073

2074
        if h.BlindingPoint != nil {
76✔
2075
                records = append(records, record.NewBlindingPointRecord(
4✔
2076
                        &h.BlindingPoint,
4✔
2077
                ))
4✔
2078
        }
4✔
2079

2080
        if h.AMP != nil {
75✔
2081
                records = append(records, h.AMP.Record())
3✔
2082
        }
3✔
2083

2084
        if h.Metadata != nil {
139✔
2085
                records = append(records, record.NewMetadataRecord(&h.Metadata))
67✔
2086
        }
67✔
2087

2088
        if h.TotalAmtMsat != 0 {
76✔
2089
                totalMsatInt := uint64(h.TotalAmtMsat)
4✔
2090
                records = append(
4✔
2091
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
4✔
2092
                )
4✔
2093
        }
4✔
2094

2095
        // Final sanity check to absolutely rule out custom records that are not
2096
        // custom and write into the standard range.
2097
        if err := h.CustomRecords.Validate(); err != nil {
72✔
2098
                return err
×
2099
        }
×
2100

2101
        // Convert custom records to tlv and add to the record list.
2102
        // MapToRecords sorts the list, so adding it here will keep the list
2103
        // canonical.
2104
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
72✔
2105
        records = append(records, tlvRecords...)
72✔
2106

72✔
2107
        // Otherwise, we'll transform our slice of records into a map of the
72✔
2108
        // raw bytes, then serialize them in-line with a length (number of
72✔
2109
        // elements) prefix.
72✔
2110
        mapRecords, err := tlv.RecordsToMap(records)
72✔
2111
        if err != nil {
72✔
2112
                return err
×
2113
        }
×
2114

2115
        numRecords := uint32(len(mapRecords))
72✔
2116
        if err := WriteElements(w, numRecords); err != nil {
72✔
2117
                return err
×
2118
        }
×
2119

2120
        for recordType, rawBytes := range mapRecords {
342✔
2121
                if err := WriteElements(w, recordType); err != nil {
270✔
2122
                        return err
×
2123
                }
×
2124

2125
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
270✔
2126
                        return err
×
2127
                }
×
2128
        }
2129

2130
        return nil
72✔
2131
}
2132

2133
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
2134
// to read/write a TLV stream larger than this.
2135
const maxOnionPayloadSize = 1300
2136

2137
func deserializeHop(r io.Reader) (*route.Hop, error) {
1,414✔
2138
        h := &route.Hop{}
1,414✔
2139

1,414✔
2140
        var pub []byte
1,414✔
2141
        if err := ReadElements(r, &pub); err != nil {
1,414✔
2142
                return nil, err
×
2143
        }
×
2144
        copy(h.PubKeyBytes[:], pub)
1,414✔
2145

1,414✔
2146
        if err := ReadElements(r,
1,414✔
2147
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
1,414✔
2148
        ); err != nil {
1,414✔
2149
                return nil, err
×
2150
        }
×
2151

2152
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
2153
        // legacy default?
2154
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
1,414✔
2155
        if err != nil {
1,414✔
2156
                return nil, err
×
2157
        }
×
2158

2159
        var numElements uint32
1,414✔
2160
        if err := ReadElements(r, &numElements); err != nil {
1,414✔
2161
                return nil, err
×
2162
        }
×
2163

2164
        // If there're no elements, then we can return early.
2165
        if numElements == 0 {
2,149✔
2166
                return h, nil
735✔
2167
        }
735✔
2168

2169
        tlvMap := make(map[uint64][]byte)
682✔
2170
        for i := uint32(0); i < numElements; i++ {
3,391✔
2171
                var tlvType uint64
2,709✔
2172
                if err := ReadElements(r, &tlvType); err != nil {
2,709✔
2173
                        return nil, err
×
2174
                }
×
2175

2176
                rawRecordBytes, err := wire.ReadVarBytes(
2,709✔
2177
                        r, 0, maxOnionPayloadSize, "tlv",
2,709✔
2178
                )
2,709✔
2179
                if err != nil {
2,709✔
2180
                        return nil, err
×
2181
                }
×
2182

2183
                tlvMap[tlvType] = rawRecordBytes
2,709✔
2184
        }
2185

2186
        // If the MPP type is present, remove it from the generic TLV map and
2187
        // parse it back into a proper MPP struct.
2188
        //
2189
        // TODO(conner): add migration to unify all fields in a single TLV
2190
        // blobs. The split approach will cause headaches down the road as more
2191
        // fields are added, which we can avoid by having a single TLV stream
2192
        // for all payload fields.
2193
        mppType := uint64(record.MPPOnionType)
682✔
2194
        if mppBytes, ok := tlvMap[mppType]; ok {
1,358✔
2195
                delete(tlvMap, mppType)
676✔
2196

676✔
2197
                var (
676✔
2198
                        mpp    = &record.MPP{}
676✔
2199
                        mppRec = mpp.Record()
676✔
2200
                        r      = bytes.NewReader(mppBytes)
676✔
2201
                )
676✔
2202
                err := mppRec.Decode(r, uint64(len(mppBytes)))
676✔
2203
                if err != nil {
676✔
2204
                        return nil, err
×
2205
                }
×
2206
                h.MPP = mpp
676✔
2207
        }
2208

2209
        // If encrypted data or blinding key are present, remove them from
2210
        // the TLV map and parse into proper types.
2211
        encryptedDataType := uint64(record.EncryptedDataOnionType)
682✔
2212
        if data, ok := tlvMap[encryptedDataType]; ok {
687✔
2213
                delete(tlvMap, encryptedDataType)
5✔
2214
                h.EncryptedData = data
5✔
2215
        }
5✔
2216

2217
        blindingType := uint64(record.BlindingPointOnionType)
682✔
2218
        if blindingPoint, ok := tlvMap[blindingType]; ok {
686✔
2219
                delete(tlvMap, blindingType)
4✔
2220

4✔
2221
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
4✔
2222
                if err != nil {
4✔
2223
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
2224
                                err)
×
2225
                }
×
2226
        }
2227

2228
        ampType := uint64(record.AMPOnionType)
682✔
2229
        if ampBytes, ok := tlvMap[ampType]; ok {
685✔
2230
                delete(tlvMap, ampType)
3✔
2231

3✔
2232
                var (
3✔
2233
                        amp    = &record.AMP{}
3✔
2234
                        ampRec = amp.Record()
3✔
2235
                        r      = bytes.NewReader(ampBytes)
3✔
2236
                )
3✔
2237
                err := ampRec.Decode(r, uint64(len(ampBytes)))
3✔
2238
                if err != nil {
3✔
2239
                        return nil, err
×
2240
                }
×
2241
                h.AMP = amp
3✔
2242
        }
2243

2244
        // If the metadata type is present, remove it from the tlv map and
2245
        // populate directly on the hop.
2246
        metadataType := uint64(record.MetadataOnionType)
682✔
2247
        if metadata, ok := tlvMap[metadataType]; ok {
1,359✔
2248
                delete(tlvMap, metadataType)
677✔
2249

677✔
2250
                h.Metadata = metadata
677✔
2251
        }
677✔
2252

2253
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
682✔
2254
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
686✔
2255
                delete(tlvMap, totalAmtMsatType)
4✔
2256

4✔
2257
                var (
4✔
2258
                        totalAmtMsatInt uint64
4✔
2259
                        buf             [8]byte
4✔
2260
                )
4✔
2261
                if err := tlv.DTUint64(
4✔
2262
                        bytes.NewReader(totalAmtMsat),
4✔
2263
                        &totalAmtMsatInt,
4✔
2264
                        &buf,
4✔
2265
                        uint64(len(totalAmtMsat)),
4✔
2266
                ); err != nil {
4✔
2267
                        return nil, err
×
2268
                }
×
2269

2270
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
4✔
2271
        }
2272

2273
        h.CustomRecords = tlvMap
682✔
2274

682✔
2275
        return h, nil
682✔
2276
}
2277

2278
// SerializeRoute serializes a route.
2279
func SerializeRoute(w io.Writer, r route.Route) error {
77✔
2280
        if err := WriteElements(w,
77✔
2281
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
77✔
2282
        ); err != nil {
77✔
2283
                return err
×
2284
        }
×
2285

2286
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
77✔
2287
                return err
×
2288
        }
×
2289

2290
        for _, h := range r.Hops {
229✔
2291
                if err := serializeHop(w, h); err != nil {
152✔
2292
                        return err
×
2293
                }
×
2294
        }
2295

2296
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
2297

2298
        return nil
77✔
2299
}
2300

2301
// DeserializeRoute deserializes a route.
2302
func DeserializeRoute(r io.Reader) (route.Route, error) {
708✔
2303
        rt := route.Route{}
708✔
2304
        if err := ReadElements(r,
708✔
2305
                &rt.TotalTimeLock, &rt.TotalAmount,
708✔
2306
        ); err != nil {
708✔
2307
                return rt, err
×
2308
        }
×
2309

2310
        var pub []byte
708✔
2311
        if err := ReadElements(r, &pub); err != nil {
708✔
2312
                return rt, err
×
2313
        }
×
2314
        copy(rt.SourcePubKey[:], pub)
708✔
2315

708✔
2316
        var numHops uint32
708✔
2317
        if err := ReadElements(r, &numHops); err != nil {
708✔
2318
                return rt, err
×
2319
        }
×
2320

2321
        var hops []*route.Hop
708✔
2322
        for i := uint32(0); i < numHops; i++ {
2,122✔
2323
                hop, err := deserializeHop(r)
1,414✔
2324
                if err != nil {
1,414✔
2325
                        return rt, err
×
2326
                }
×
2327
                hops = append(hops, hop)
1,414✔
2328
        }
2329
        rt.Hops = hops
708✔
2330

708✔
2331
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
708✔
2332

708✔
2333
        return rt, nil
708✔
2334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc