• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16693114906

02 Aug 2025 11:25AM UTC coverage: 67.036% (-0.01%) from 67.047%
16693114906

Pull #9826

github

web-flow
Merge 2b3b27f5a into 37523b6cb
Pull Request #9826: Refactor Payment PR 2

1002 of 1343 new or added lines in 5 files covered. (74.61%)

83 existing lines in 18 files now uncovered.

135561 of 202222 relevant lines covered (67.04%)

21699.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.47
/channeldb/payments_kv_store.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "encoding/binary"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "sort"
10
        "sync"
11
        "time"
12

13
        "github.com/btcsuite/btcd/btcec/v2"
14
        "github.com/btcsuite/btcd/wire"
15
        "github.com/lightningnetwork/lnd/kvdb"
16
        "github.com/lightningnetwork/lnd/lntypes"
17
        "github.com/lightningnetwork/lnd/lnwire"
18
        "github.com/lightningnetwork/lnd/record"
19
        "github.com/lightningnetwork/lnd/routing/route"
20
        "github.com/lightningnetwork/lnd/tlv"
21
)
22

23
const (
24
        // paymentSeqBlockSize is the block size used when we batch allocate
25
        // payment sequences for future payments.
26
        paymentSeqBlockSize = 1000
27

28
        // paymentProgressLogInterval is the interval we use limiting the
29
        // logging output of payment processing.
30
        paymentProgressLogInterval = 30 * time.Second
31
)
32

33
var (
34
        // ErrAlreadyPaid signals we have already paid this payment hash.
35
        ErrAlreadyPaid = errors.New("invoice is already paid")
36

37
        // ErrPaymentInFlight signals that payment for this payment hash is
38
        // already "in flight" on the network.
39
        ErrPaymentInFlight = errors.New("payment is in transition")
40

41
        // ErrPaymentExists is returned when we try to initialize an already
42
        // existing payment that is not failed.
43
        ErrPaymentExists = errors.New("payment already exists")
44

45
        // ErrPaymentInternal is returned when performing the payment has a
46
        // conflicting state, such as,
47
        // - payment has StatusSucceeded but remaining amount is not zero.
48
        // - payment has StatusInitiated but remaining amount is zero.
49
        // - payment has StatusFailed but remaining amount is zero.
50
        ErrPaymentInternal = errors.New("internal error")
51

52
        // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
53
        ErrPaymentNotInitiated = errors.New("payment isn't initiated")
54

55
        // ErrPaymentAlreadySucceeded is returned in the event we attempt to
56
        // change the status of a payment already succeeded.
57
        ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
58

59
        // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
60
        // a failed payment.
61
        ErrPaymentAlreadyFailed = errors.New("payment has already failed")
62

63
        // ErrUnknownPaymentStatus is returned when we do not recognize the
64
        // existing state of a payment.
65
        ErrUnknownPaymentStatus = errors.New("unknown payment status")
66

67
        // ErrPaymentTerminal is returned if we attempt to alter a payment that
68
        // already has reached a terminal condition.
69
        ErrPaymentTerminal = errors.New("payment has reached terminal " +
70
                "condition")
71

72
        // ErrAttemptAlreadySettled is returned if we try to alter an already
73
        // settled HTLC attempt.
74
        ErrAttemptAlreadySettled = errors.New("attempt already settled")
75

76
        // ErrAttemptAlreadyFailed is returned if we try to alter an already
77
        // failed HTLC attempt.
78
        ErrAttemptAlreadyFailed = errors.New("attempt already failed")
79

80
        // ErrValueMismatch is returned if we try to register a non-MPP attempt
81
        // with an amount that doesn't match the payment amount.
82
        ErrValueMismatch = errors.New("attempted value doesn't match payment " +
83
                "amount")
84

85
        // ErrValueExceedsAmt is returned if we try to register an attempt that
86
        // would take the total sent amount above the payment amount.
87
        ErrValueExceedsAmt = errors.New("attempted value exceeds payment " +
88
                "amount")
89

90
        // ErrNonMPPayment is returned if we try to register an MPP attempt for
91
        // a payment that already has a non-MPP attempt registered.
92
        ErrNonMPPayment = errors.New("payment has non-MPP attempts")
93

94
        // ErrMPPayment is returned if we try to register a non-MPP attempt for
95
        // a payment that already has an MPP attempt registered.
96
        ErrMPPayment = errors.New("payment has MPP attempts")
97

98
        // ErrMPPRecordInBlindedPayment is returned if we try to register an
99
        // attempt with an MPP record for a payment to a blinded path.
100
        ErrMPPRecordInBlindedPayment = errors.New("blinded payment cannot " +
101
                "contain MPP records")
102

103
        // ErrBlindedPaymentTotalAmountMismatch is returned if we try to
104
        // register an HTLC shard to a blinded route where the total amount
105
        // doesn't match existing shards.
106
        ErrBlindedPaymentTotalAmountMismatch = errors.New("blinded path " +
107
                "total amount mismatch")
108

109
        // ErrMPPPaymentAddrMismatch is returned if we try to register an MPP
110
        // shard where the payment address doesn't match existing shards.
111
        ErrMPPPaymentAddrMismatch = errors.New("payment address mismatch")
112

113
        // ErrMPPTotalAmountMismatch is returned if we try to register an MPP
114
        // shard where the total amount doesn't match existing shards.
115
        ErrMPPTotalAmountMismatch = errors.New("mp payment total amount " +
116
                "mismatch")
117

118
        // ErrPaymentPendingSettled is returned when we try to add a new
119
        // attempt to a payment that has at least one of its HTLCs settled.
120
        ErrPaymentPendingSettled = errors.New("payment has settled htlcs")
121

122
        // ErrPaymentPendingFailed is returned when we try to add a new attempt
123
        // to a payment that already has a failure reason.
124
        ErrPaymentPendingFailed = errors.New("payment has failure reason")
125

126
        // ErrSentExceedsTotal is returned if the payment's current total sent
127
        // amount exceed the total amount.
128
        ErrSentExceedsTotal = errors.New("total sent exceeds total amount")
129

130
        // errNoAttemptInfo is returned when no attempt info is stored yet.
131
        errNoAttemptInfo = errors.New("unable to find attempt info for " +
132
                "inflight payment")
133

134
        // errNoSequenceNrIndex is returned when an attempt to lookup a payment
135
        // index is made for a sequence number that is not indexed.
136
        errNoSequenceNrIndex = errors.New("payment sequence number index " +
137
                "does not exist")
138
)
139

140
//nolint:ll
141
var (
142
        // paymentsRootBucket is the name of the top-level bucket within the
143
        // database that stores all data related to payments. Within this
144
        // bucket, each payment hash its own sub-bucket keyed by its payment
145
        // hash.
146
        //
147
        // Bucket hierarchy:
148
        //
149
        // root-bucket
150
        //      |
151
        //      |-- <paymenthash>
152
        //      |        |--sequence-key: <sequence number>
153
        //      |        |--creation-info-key: <creation info>
154
        //      |        |--fail-info-key: <(optional) fail info>
155
        //      |        |
156
        //      |        |--payment-htlcs-bucket (shard-bucket)
157
        //      |        |        |
158
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
159
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
160
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
161
        //      |        |        |
162
        //      |        |       ...
163
        //      |        |
164
        //      |        |
165
        //      |        |--duplicate-bucket (only for old, completed payments)
166
        //      |                 |
167
        //      |                 |-- <seq-num>
168
        //      |                 |       |--sequence-key: <sequence number>
169
        //      |                 |       |--creation-info-key: <creation info>
170
        //      |                 |       |--ai: <attempt info>
171
        //      |                 |       |--si: <settle info>
172
        //      |                 |       |--fi: <fail info>
173
        //      |                 |
174
        //      |                 |-- <seq-num>
175
        //      |                 |       |
176
        //      |                ...     ...
177
        //      |
178
        //      |-- <paymenthash>
179
        //      |        |
180
        //      |       ...
181
        //     ...
182
        //
183
        paymentsRootBucket = []byte("payments-root-bucket")
184

185
        // paymentSequenceKey is a key used in the payment's sub-bucket to
186
        // store the sequence number of the payment.
187
        paymentSequenceKey = []byte("payment-sequence-key")
188

189
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
190
        // store the creation info of the payment.
191
        paymentCreationInfoKey = []byte("payment-creation-info")
192

193
        // paymentHtlcsBucket is a bucket where we'll store the information
194
        // about the HTLCs that were attempted for a payment.
195
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
196

197
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
198
        // to store the info about the attempt that was done for the HTLC in
199
        // question. The HTLC attempt ID is concatenated at the end.
200
        htlcAttemptInfoKey = []byte("ai")
201

202
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
203
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
204
        htlcSettleInfoKey = []byte("si")
205

206
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
207
        // failure information, if any.The  HTLC attempt ID is concatenated at
208
        // the end.
209
        htlcFailInfoKey = []byte("fi")
210

211
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
212
        // store information about the reason a payment failed.
213
        paymentFailInfoKey = []byte("payment-fail-info")
214

215
        // paymentsIndexBucket is the name of the top-level bucket within the
216
        // database that stores an index of payment sequence numbers to its
217
        // payment hash.
218
        // payments-sequence-index-bucket
219
        //         |--<sequence-number>: <payment hash>
220
        //         |--...
221
        //         |--<sequence-number>: <payment hash>
222
        paymentsIndexBucket = []byte("payments-index-bucket")
223
)
224

225
var (
226
        // ErrNoSequenceNumber is returned if we look up a payment which does
227
        // not have a sequence number.
228
        ErrNoSequenceNumber = errors.New("sequence number not found")
229

230
        // ErrDuplicateNotFound is returned when we lookup a payment by its
231
        // index and cannot find a payment with a matching sequence number.
232
        ErrDuplicateNotFound = errors.New("duplicate payment not found")
233

234
        // ErrNoDuplicateBucket is returned when we expect to find duplicates
235
        // when looking up a payment from its index, but the payment does not
236
        // have any.
237
        ErrNoDuplicateBucket = errors.New("expected duplicate bucket")
238

239
        // ErrNoDuplicateNestedBucket is returned if we do not find duplicate
240
        // payments in their own sub-bucket.
241
        ErrNoDuplicateNestedBucket = errors.New("nested duplicate bucket not " +
242
                "found")
243
)
244

245
// KVPaymentsDB implements persistence for payments and payment attempts.
246
type KVPaymentsDB struct {
247
        paymentSeqMx     sync.Mutex
248
        currPaymentSeq   uint64
249
        storedPaymentSeq uint64
250
        db               *DB
251
}
252

253
// NewKVPaymentsDB creates a new instance of the KVPaymentsDB.
254
func NewKVPaymentsDB(db *DB) *KVPaymentsDB {
45✔
255
        return &KVPaymentsDB{
45✔
256
                db: db,
45✔
257
        }
45✔
258
}
45✔
259

260
// InitPayment checks or records the given PaymentCreationInfo with the DB,
261
// making sure it does not already exist as an in-flight payment. When this
262
// method returns successfully, the payment is guaranteed to be in the InFlight
263
// state.
264
func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash,
265
        info *PaymentCreationInfo) error {
152✔
266

152✔
267
        // Obtain a new sequence number for this payment. This is used
152✔
268
        // to sort the payments in order of creation, and also acts as
152✔
269
        // a unique identifier for each payment.
152✔
270
        sequenceNum, err := p.nextPaymentSequence()
152✔
271
        if err != nil {
152✔
NEW
272
                return err
×
NEW
273
        }
×
274

275
        var b bytes.Buffer
152✔
276
        if err := serializePaymentCreationInfo(&b, info); err != nil {
152✔
NEW
277
                return err
×
NEW
278
        }
×
279
        infoBytes := b.Bytes()
152✔
280

152✔
281
        var updateErr error
152✔
282
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
304✔
283
                // Reset the update error, to avoid carrying over an error
152✔
284
                // from a previous execution of the batched db transaction.
152✔
285
                updateErr = nil
152✔
286

152✔
287
                prefetchPayment(tx, paymentHash)
152✔
288
                bucket, err := createPaymentBucket(tx, paymentHash)
152✔
289
                if err != nil {
152✔
NEW
290
                        return err
×
NEW
291
                }
×
292

293
                // Get the existing status of this payment, if any.
294
                paymentStatus, err := fetchPaymentStatus(bucket)
152✔
295

152✔
296
                switch {
152✔
297
                // If no error is returned, it means we already have this
298
                // payment. We'll check the status to decide whether we allow
299
                // retrying the payment or return a specific error.
300
                case err == nil:
8✔
301
                        if err := paymentStatus.initializable(); err != nil {
15✔
302
                                updateErr = err
7✔
303
                                return nil
7✔
304
                        }
7✔
305

306
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
307
                // we'll return the error.
NEW
308
                case !errors.Is(err, ErrPaymentNotInitiated):
×
NEW
309
                        return err
×
310
                }
311

312
                // Before we set our new sequence number, we check whether this
313
                // payment has a previously set sequence number and remove its
314
                // index entry if it exists. This happens in the case where we
315
                // have a previously attempted payment which was left in a state
316
                // where we can retry.
317
                seqBytes := bucket.Get(paymentSequenceKey)
148✔
318
                if seqBytes != nil {
152✔
319
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
4✔
320
                        if err := indexBucket.Delete(seqBytes); err != nil {
4✔
NEW
321
                                return err
×
NEW
322
                        }
×
323
                }
324

325
                // Once we have obtained a sequence number, we add an entry
326
                // to our index bucket which will map the sequence number to
327
                // our payment identifier.
328
                err = createPaymentIndexEntry(
148✔
329
                        tx, sequenceNum, info.PaymentIdentifier,
148✔
330
                )
148✔
331
                if err != nil {
148✔
NEW
332
                        return err
×
NEW
333
                }
×
334

335
                err = bucket.Put(paymentSequenceKey, sequenceNum)
148✔
336
                if err != nil {
148✔
NEW
337
                        return err
×
NEW
338
                }
×
339

340
                // Add the payment info to the bucket, which contains the
341
                // static information for this payment
342
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
148✔
343
                if err != nil {
148✔
NEW
344
                        return err
×
NEW
345
                }
×
346

347
                // We'll delete any lingering HTLCs to start with, in case we
348
                // are initializing a payment that was attempted earlier, but
349
                // left in a state where we could retry.
350
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
148✔
351
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
148✔
NEW
352
                        return err
×
NEW
353
                }
×
354

355
                // Also delete any lingering failure info now that we are
356
                // re-attempting.
357
                return bucket.Delete(paymentFailInfoKey)
148✔
358
        })
359
        if err != nil {
152✔
NEW
360
                return fmt.Errorf("unable to init payment: %w", err)
×
NEW
361
        }
×
362

363
        return updateErr
152✔
364
}
365

366
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
367
// by the KVPaymentsDB db.
368
func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error {
11✔
369
        if !p.db.keepFailedPaymentAttempts {
15✔
370
                const failedHtlcsOnly = true
4✔
371
                err := p.DeletePayment(hash, failedHtlcsOnly)
4✔
372
                if err != nil {
6✔
373
                        return err
2✔
374
                }
2✔
375
        }
376

377
        return nil
9✔
378
}
379

380
// paymentIndexTypeHash is a payment index type which indicates that we have
381
// created an index of payment sequence number to payment hash.
382
type paymentIndexType uint8
383

384
// paymentIndexTypeHash is a payment index type which indicates that we have
385
// created an index of payment sequence number to payment hash.
386
const paymentIndexTypeHash paymentIndexType = 0
387

388
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
389
// index produced contains a payment index type (which can be used in future to
390
// signal different payment index types) and the payment identifier.
391
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
392
        id lntypes.Hash) error {
169✔
393

169✔
394
        var b bytes.Buffer
169✔
395
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
169✔
NEW
396
                return err
×
NEW
397
        }
×
398

399
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
169✔
400

169✔
401
        return indexes.Put(sequenceNumber, b.Bytes())
169✔
402
}
403

404
// deserializePaymentIndex deserializes a payment index entry. This function
405
// currently only supports deserialization of payment hash indexes, and will
406
// fail for other types.
407
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
62✔
408
        var (
62✔
409
                indexType   paymentIndexType
62✔
410
                paymentHash []byte
62✔
411
        )
62✔
412

62✔
413
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
62✔
NEW
414
                return lntypes.Hash{}, err
×
NEW
415
        }
×
416

417
        // While we only have on payment index type, we do not need to use our
418
        // index type to deserialize the index. However, we sanity check that
419
        // this type is as expected, since we had to read it out anyway.
420
        if indexType != paymentIndexTypeHash {
62✔
NEW
421
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
NEW
422
                        "type: %v", indexType)
×
NEW
423
        }
×
424

425
        hash, err := lntypes.MakeHash(paymentHash)
62✔
426
        if err != nil {
62✔
NEW
427
                return lntypes.Hash{}, err
×
NEW
428
        }
×
429

430
        return hash, nil
62✔
431
}
432

433
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
434
// DB.
435
func (p *KVPaymentsDB) RegisterAttempt(paymentHash lntypes.Hash,
436
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
73✔
437

73✔
438
        // Serialize the information before opening the db transaction.
73✔
439
        var a bytes.Buffer
73✔
440
        err := serializeHTLCAttemptInfo(&a, attempt)
73✔
441
        if err != nil {
73✔
NEW
442
                return nil, err
×
NEW
443
        }
×
444
        htlcInfoBytes := a.Bytes()
73✔
445

73✔
446
        htlcIDBytes := make([]byte, 8)
73✔
447
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
73✔
448

73✔
449
        var payment *MPPayment
73✔
450
        err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
162✔
451
                prefetchPayment(tx, paymentHash)
89✔
452
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
89✔
453
                if err != nil {
89✔
NEW
454
                        return err
×
NEW
455
                }
×
456

457
                payment, err = fetchPayment(bucket)
89✔
458
                if err != nil {
89✔
NEW
459
                        return err
×
NEW
460
                }
×
461

462
                // Check if registering a new attempt is allowed.
463
                if err := payment.Registrable(); err != nil {
105✔
464
                        return err
16✔
465
                }
16✔
466

467
                // If the final hop has encrypted data, then we know this is a
468
                // blinded payment. In blinded payments, MPP records are not set
469
                // for split payments and the recipient is responsible for using
470
                // a consistent PathID across the various encrypted data
471
                // payloads that we received from them for this payment. All we
472
                // need to check is that the total amount field for each HTLC
473
                // in the split payment is correct.
474
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
73✔
475

73✔
476
                // Make sure any existing shards match the new one with regards
73✔
477
                // to MPP options.
73✔
478
                mpp := attempt.Route.FinalHop().MPP
73✔
479

73✔
480
                // MPP records should not be set for attempts to blinded paths.
73✔
481
                if isBlinded && mpp != nil {
73✔
NEW
482
                        return ErrMPPRecordInBlindedPayment
×
NEW
483
                }
×
484

485
                for _, h := range payment.InFlightHTLCs() {
120✔
486
                        hMpp := h.Route.FinalHop().MPP
47✔
487

47✔
488
                        // If this is a blinded payment, then no existing HTLCs
47✔
489
                        // should have MPP records.
47✔
490
                        if isBlinded && hMpp != nil {
47✔
NEW
491
                                return ErrMPPRecordInBlindedPayment
×
NEW
492
                        }
×
493

494
                        // If this is a blinded payment, then we just need to
495
                        // check that the TotalAmtMsat field for this shard
496
                        // is equal to that of any other shard in the same
497
                        // payment.
498
                        if isBlinded {
50✔
499
                                if attempt.Route.FinalHop().TotalAmtMsat !=
3✔
500
                                        h.Route.FinalHop().TotalAmtMsat {
3✔
NEW
501

×
NEW
502
                                        //nolint:ll
×
NEW
503
                                        return ErrBlindedPaymentTotalAmountMismatch
×
NEW
504
                                }
×
505

506
                                continue
3✔
507
                        }
508

509
                        switch {
47✔
510
                        // We tried to register a non-MPP attempt for a MPP
511
                        // payment.
512
                        case mpp == nil && hMpp != nil:
2✔
513
                                return ErrMPPayment
2✔
514

515
                        // We tried to register a MPP shard for a non-MPP
516
                        // payment.
517
                        case mpp != nil && hMpp == nil:
2✔
518
                                return ErrNonMPPayment
2✔
519

520
                        // Non-MPP payment, nothing more to validate.
NEW
521
                        case mpp == nil:
×
NEW
522
                                continue
×
523
                        }
524

525
                        // Check that MPP options match.
526
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
45✔
527
                                return ErrMPPPaymentAddrMismatch
2✔
528
                        }
2✔
529

530
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
43✔
531
                                return ErrMPPTotalAmountMismatch
2✔
532
                        }
2✔
533
                }
534

535
                // If this is a non-MPP attempt, it must match the total amount
536
                // exactly. Note that a blinded payment is considered an MPP
537
                // attempt.
538
                amt := attempt.Route.ReceiverAmt()
65✔
539
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
65✔
NEW
540
                        return ErrValueMismatch
×
NEW
541
                }
×
542

543
                // Ensure we aren't sending more than the total payment amount.
544
                sentAmt, _ := payment.SentAmt()
65✔
545
                if sentAmt+amt > payment.Info.Value {
73✔
546
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
8✔
547
                                "%v", ErrValueExceedsAmt, sentAmt+amt,
8✔
548
                                payment.Info.Value)
8✔
549
                }
8✔
550

551
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
57✔
552
                        paymentHtlcsBucket,
57✔
553
                )
57✔
554
                if err != nil {
57✔
NEW
555
                        return err
×
NEW
556
                }
×
557

558
                err = htlcsBucket.Put(
57✔
559
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
57✔
560
                        htlcInfoBytes,
57✔
561
                )
57✔
562
                if err != nil {
57✔
NEW
563
                        return err
×
NEW
564
                }
×
565

566
                // Retrieve attempt info for the notification.
567
                payment, err = fetchPayment(bucket)
57✔
568

57✔
569
                return err
57✔
570
        })
571
        if err != nil {
89✔
572
                return nil, err
16✔
573
        }
16✔
574

575
        return payment, err
57✔
576
}
577

578
// SettleAttempt marks the given attempt settled with the preimage. If this is
579
// a multi shard payment, this might implicitly mean that the full payment
580
// succeeded.
581
//
582
// After invoking this method, InitPayment should always return an error to
583
// prevent us from making duplicate payments to the same payment hash. The
584
// provided preimage is atomically saved to the DB for record keeping.
585
func (p *KVPaymentsDB) SettleAttempt(hash lntypes.Hash,
586
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
19✔
587

19✔
588
        var b bytes.Buffer
19✔
589
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
19✔
NEW
590
                return nil, err
×
NEW
591
        }
×
592
        settleBytes := b.Bytes()
19✔
593

19✔
594
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
19✔
595
}
596

597
// FailAttempt marks the given payment attempt failed.
598
func (p *KVPaymentsDB) FailAttempt(hash lntypes.Hash,
599
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
34✔
600

34✔
601
        var b bytes.Buffer
34✔
602
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
34✔
NEW
603
                return nil, err
×
NEW
604
        }
×
605
        failBytes := b.Bytes()
34✔
606

34✔
607
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
34✔
608
}
609

610
// updateHtlcKey updates a database key for the specified htlc.
611
func (p *KVPaymentsDB) updateHtlcKey(paymentHash lntypes.Hash,
612
        attemptID uint64, key, value []byte) (*MPPayment, error) {
50✔
613

50✔
614
        aid := make([]byte, 8)
50✔
615
        binary.BigEndian.PutUint64(aid, attemptID)
50✔
616

50✔
617
        var payment *MPPayment
50✔
618
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
101✔
619
                payment = nil
51✔
620

51✔
621
                prefetchPayment(tx, paymentHash)
51✔
622
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
51✔
623
                if err != nil {
53✔
624
                        return err
2✔
625
                }
2✔
626

627
                p, err := fetchPayment(bucket)
49✔
628
                if err != nil {
49✔
NEW
629
                        return err
×
NEW
630
                }
×
631

632
                // We can only update keys of in-flight payments. We allow
633
                // updating keys even if the payment has reached a terminal
634
                // condition, since the HTLC outcomes must still be updated.
635
                if err := p.Status.updatable(); err != nil {
49✔
NEW
636
                        return err
×
NEW
637
                }
×
638

639
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
49✔
640
                if htlcsBucket == nil {
49✔
NEW
641
                        return fmt.Errorf("htlcs bucket not found")
×
NEW
642
                }
×
643

644
                attemptKey := htlcBucketKey(htlcAttemptInfoKey, aid)
49✔
645
                if htlcsBucket.Get(attemptKey) == nil {
49✔
NEW
646
                        return fmt.Errorf("HTLC with ID %v not registered",
×
NEW
647
                                attemptID)
×
NEW
648
                }
×
649

650
                // Make sure the shard is not already failed or settled.
651
                failKey := htlcBucketKey(htlcFailInfoKey, aid)
49✔
652
                if htlcsBucket.Get(failKey) != nil {
49✔
NEW
653
                        return ErrAttemptAlreadyFailed
×
NEW
654
                }
×
655

656
                settleKey := htlcBucketKey(htlcSettleInfoKey, aid)
49✔
657
                if htlcsBucket.Get(settleKey) != nil {
49✔
NEW
658
                        return ErrAttemptAlreadySettled
×
NEW
659
                }
×
660

661
                // Add or update the key for this htlc.
662
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
49✔
663
                if err != nil {
49✔
NEW
664
                        return err
×
NEW
665
                }
×
666

667
                // Retrieve attempt info for the notification.
668
                payment, err = fetchPayment(bucket)
49✔
669

49✔
670
                return err
49✔
671
        })
672
        if err != nil {
51✔
673
                return nil, err
1✔
674
        }
1✔
675

676
        return payment, err
49✔
677
}
678

679
// Fail transitions a payment into the Failed state, and records the reason the
680
// payment failed. After invoking this method, InitPayment should return nil on
681
// its next call for this payment hash, allowing the switch to make a
682
// subsequent payment.
683
func (p *KVPaymentsDB) Fail(paymentHash lntypes.Hash,
684
        reason FailureReason) (*MPPayment, error) {
19✔
685

19✔
686
        var (
19✔
687
                updateErr error
19✔
688
                payment   *MPPayment
19✔
689
        )
19✔
690
        err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
38✔
691
                // Reset the update error, to avoid carrying over an error
19✔
692
                // from a previous execution of the batched db transaction.
19✔
693
                updateErr = nil
19✔
694
                payment = nil
19✔
695

19✔
696
                prefetchPayment(tx, paymentHash)
19✔
697
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
19✔
698
                if errors.Is(err, ErrPaymentNotInitiated) {
20✔
699
                        updateErr = ErrPaymentNotInitiated
1✔
700
                        return nil
1✔
701
                } else if err != nil {
19✔
NEW
702
                        return err
×
NEW
703
                }
×
704

705
                // We mark the payment as failed as long as it is known. This
706
                // lets the last attempt to fail with a terminal write its
707
                // failure to the KVPaymentsDB without synchronizing with
708
                // other attempts.
709
                _, err = fetchPaymentStatus(bucket)
18✔
710
                if errors.Is(err, ErrPaymentNotInitiated) {
18✔
NEW
711
                        updateErr = ErrPaymentNotInitiated
×
NEW
712
                        return nil
×
713
                } else if err != nil {
18✔
NEW
714
                        return err
×
NEW
715
                }
×
716

717
                // Put the failure reason in the bucket for record keeping.
718
                v := []byte{byte(reason)}
18✔
719
                err = bucket.Put(paymentFailInfoKey, v)
18✔
720
                if err != nil {
18✔
NEW
721
                        return err
×
NEW
722
                }
×
723

724
                // Retrieve attempt info for the notification, if available.
725
                payment, err = fetchPayment(bucket)
18✔
726
                if err != nil {
18✔
NEW
727
                        return err
×
NEW
728
                }
×
729

730
                return nil
18✔
731
        })
732
        if err != nil {
19✔
NEW
733
                return nil, err
×
NEW
734
        }
×
735

736
        return payment, updateErr
19✔
737
}
738

739
// FetchPayment returns information about a payment from the database.
740
func (p *KVPaymentsDB) FetchPayment(paymentHash lntypes.Hash) (
741
        *MPPayment, error) {
154✔
742

154✔
743
        var payment *MPPayment
154✔
744
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
308✔
745
                prefetchPayment(tx, paymentHash)
154✔
746
                bucket, err := fetchPaymentBucket(tx, paymentHash)
154✔
747
                if err != nil {
155✔
748
                        return err
1✔
749
                }
1✔
750

751
                payment, err = fetchPayment(bucket)
153✔
752

153✔
753
                return err
153✔
754
        }, func() {
154✔
755
                payment = nil
154✔
756
        })
154✔
757
        if err != nil {
155✔
758
                return nil, err
1✔
759
        }
1✔
760

761
        return payment, nil
153✔
762
}
763

764
// prefetchPayment attempts to prefetch as much of the payment as possible to
765
// reduce DB roundtrips.
766
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
453✔
767
        rb := kvdb.RootBucket(tx)
453✔
768
        kvdb.Prefetch(
453✔
769
                rb,
453✔
770
                []string{
453✔
771
                        // Prefetch all keys in the payment's bucket.
453✔
772
                        string(paymentsRootBucket),
453✔
773
                        string(paymentHash[:]),
453✔
774
                },
453✔
775
                []string{
453✔
776
                        // Prefetch all keys in the payment's htlc bucket.
453✔
777
                        string(paymentsRootBucket),
453✔
778
                        string(paymentHash[:]),
453✔
779
                        string(paymentHtlcsBucket),
453✔
780
                },
453✔
781
        )
453✔
782
}
453✔
783

784
// createPaymentBucket creates or fetches the sub-bucket assigned to this
785
// payment hash.
786
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
787
        kvdb.RwBucket, error) {
152✔
788

152✔
789
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
152✔
790
        if err != nil {
152✔
NEW
791
                return nil, err
×
NEW
792
        }
×
793

794
        return payments.CreateBucketIfNotExists(paymentHash[:])
152✔
795
}
796

797
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
798
// the bucket does not exist, it returns ErrPaymentNotInitiated.
799
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
800
        kvdb.RBucket, error) {
212✔
801

212✔
802
        payments := tx.ReadBucket(paymentsRootBucket)
212✔
803
        if payments == nil {
213✔
804
                return nil, ErrPaymentNotInitiated
1✔
805
        }
1✔
806

807
        bucket := payments.NestedReadBucket(paymentHash[:])
211✔
808
        if bucket == nil {
211✔
NEW
809
                return nil, ErrPaymentNotInitiated
×
NEW
810
        }
×
811

812
        return bucket, nil
211✔
813
}
814

815
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
816
// bucket that can be written to.
817
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
818
        kvdb.RwBucket, error) {
174✔
819

174✔
820
        payments := tx.ReadWriteBucket(paymentsRootBucket)
174✔
821
        if payments == nil {
177✔
822
                return nil, ErrPaymentNotInitiated
3✔
823
        }
3✔
824

825
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
171✔
826
        if bucket == nil {
171✔
NEW
827
                return nil, ErrPaymentNotInitiated
×
NEW
828
        }
×
829

830
        return bucket, nil
171✔
831
}
832

833
// nextPaymentSequence returns the next sequence number to store for a new
834
// payment.
835
func (p *KVPaymentsDB) nextPaymentSequence() ([]byte, error) {
152✔
836
        p.paymentSeqMx.Lock()
152✔
837
        defer p.paymentSeqMx.Unlock()
152✔
838

152✔
839
        // Set a new upper bound in the DB every 1000 payments to avoid
152✔
840
        // conflicts on the sequence when using etcd.
152✔
841
        if p.currPaymentSeq == p.storedPaymentSeq {
194✔
842
                var currPaymentSeq, newUpperBound uint64
42✔
843
                if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
84✔
844
                        paymentsBucket, err := tx.CreateTopLevelBucket(
42✔
845
                                paymentsRootBucket,
42✔
846
                        )
42✔
847
                        if err != nil {
42✔
NEW
848
                                return err
×
NEW
849
                        }
×
850

851
                        currPaymentSeq = paymentsBucket.Sequence()
42✔
852
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
42✔
853

42✔
854
                        return paymentsBucket.SetSequence(newUpperBound)
42✔
855
                }, func() {}); err != nil {
42✔
NEW
856
                        return nil, err
×
NEW
857
                }
×
858

859
                // We lazy initialize the cached currPaymentSeq here using the
860
                // first nextPaymentSequence() call. This if statement will auto
861
                // initialize our stored currPaymentSeq, since by default both
862
                // this variable and storedPaymentSeq are zero which in turn
863
                // will have us fetch the current values from the DB.
864
                if p.currPaymentSeq == 0 {
84✔
865
                        p.currPaymentSeq = currPaymentSeq
42✔
866
                }
42✔
867

868
                p.storedPaymentSeq = newUpperBound
42✔
869
        }
870

871
        p.currPaymentSeq++
152✔
872
        b := make([]byte, 8)
152✔
873
        binary.BigEndian.PutUint64(b, p.currPaymentSeq)
152✔
874

152✔
875
        return b, nil
152✔
876
}
877

878
// fetchPaymentStatus fetches the payment status of the payment. If the payment
879
// isn't found, it will return error `ErrPaymentNotInitiated`.
880
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
195✔
881
        // Creation info should be set for all payments, regardless of state.
195✔
882
        // If not, it is unknown.
195✔
883
        if bucket.Get(paymentCreationInfoKey) == nil {
342✔
884
                return 0, ErrPaymentNotInitiated
147✔
885
        }
147✔
886

887
        payment, err := fetchPayment(bucket)
51✔
888
        if err != nil {
51✔
NEW
889
                return 0, err
×
NEW
890
        }
×
891

892
        return payment.Status, nil
51✔
893
}
894

895
// FetchInFlightPayments returns all payments with status InFlight.
896
func (p *KVPaymentsDB) FetchInFlightPayments() ([]*MPPayment, error) {
7✔
897
        var (
7✔
898
                inFlights      []*MPPayment
7✔
899
                start          = time.Now()
7✔
900
                lastLogTime    = time.Now()
7✔
901
                processedCount int
7✔
902
        )
7✔
903

7✔
904
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
14✔
905
                payments := tx.ReadBucket(paymentsRootBucket)
7✔
906
                if payments == nil {
12✔
907
                        return nil
5✔
908
                }
5✔
909

910
                return payments.ForEach(func(k, _ []byte) error {
10✔
911
                        bucket := payments.NestedReadBucket(k)
5✔
912
                        if bucket == nil {
5✔
NEW
913
                                return fmt.Errorf("non bucket element")
×
NEW
914
                        }
×
915

916
                        p, err := fetchPayment(bucket)
5✔
917
                        if err != nil {
5✔
NEW
918
                                return err
×
NEW
919
                        }
×
920

921
                        processedCount++
5✔
922
                        if time.Since(lastLogTime) >=
5✔
923
                                paymentProgressLogInterval {
5✔
NEW
924

×
NEW
925
                                log.Debugf("Scanning inflight payments "+
×
NEW
926
                                        "(in progress), processed %d, last "+
×
NEW
927
                                        "processed payment: %v", processedCount,
×
NEW
928
                                        p.Info)
×
NEW
929

×
NEW
930
                                lastLogTime = time.Now()
×
NEW
931
                        }
×
932

933
                        // Skip the payment if it's terminated.
934
                        if p.Terminated() {
8✔
935
                                return nil
3✔
936
                        }
3✔
937

938
                        inFlights = append(inFlights, p)
5✔
939

5✔
940
                        return nil
5✔
941
                })
942
        }, func() {
7✔
943
                inFlights = nil
7✔
944
        })
7✔
945
        if err != nil {
7✔
NEW
946
                return nil, err
×
NEW
947
        }
×
948

949
        elapsed := time.Since(start)
7✔
950
        log.Debugf("Completed scanning for inflight payments: "+
7✔
951
                "total_processed=%d, found_inflight=%d, elapsed=%v",
7✔
952
                processedCount, len(inFlights),
7✔
953
                elapsed.Round(time.Millisecond))
7✔
954

7✔
955
        return inFlights, nil
7✔
956
}
957

958
// htlcBucketKey creates a composite key from prefix and id where the result is
959
// simply the two concatenated.
960
func htlcBucketKey(prefix, id []byte) []byte {
268✔
961
        key := make([]byte, len(prefix)+len(id))
268✔
962
        copy(key, prefix)
268✔
963
        copy(key[len(prefix):], id)
268✔
964

268✔
965
        return key
268✔
966
}
268✔
967

968
// FetchPayments returns all sent payments found in the DB.
969
func (p *KVPaymentsDB) FetchPayments() ([]*MPPayment, error) {
41✔
970
        var payments []*MPPayment
41✔
971

41✔
972
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
82✔
973
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
41✔
974
                if paymentsBucket == nil {
41✔
NEW
975
                        return nil
×
NEW
976
                }
×
977

978
                return paymentsBucket.ForEach(func(k, v []byte) error {
189✔
979
                        bucket := paymentsBucket.NestedReadBucket(k)
148✔
980
                        if bucket == nil {
148✔
NEW
981
                                // We only expect sub-buckets to be found in
×
NEW
982
                                // this top-level bucket.
×
NEW
983
                                return fmt.Errorf("non bucket element in " +
×
NEW
984
                                        "payments bucket")
×
NEW
985
                        }
×
986

987
                        p, err := fetchPayment(bucket)
148✔
988
                        if err != nil {
148✔
NEW
989
                                return err
×
NEW
990
                        }
×
991

992
                        payments = append(payments, p)
148✔
993

148✔
994
                        // For older versions of lnd, duplicate payments to a
148✔
995
                        // payment has was possible. These will be found in a
148✔
996
                        // sub-bucket indexed by their sequence number if
148✔
997
                        // available.
148✔
998
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
148✔
999
                        if err != nil {
148✔
NEW
1000
                                return err
×
NEW
1001
                        }
×
1002

1003
                        payments = append(payments, duplicatePayments...)
148✔
1004

148✔
1005
                        return nil
148✔
1006
                })
1007
        }, func() {
41✔
1008
                payments = nil
41✔
1009
        })
41✔
1010
        if err != nil {
41✔
NEW
1011
                return nil, err
×
NEW
1012
        }
×
1013

1014
        // Before returning, sort the payments by their sequence number.
1015
        sort.Slice(payments, func(i, j int) bool {
291✔
1016
                return payments[i].SequenceNum < payments[j].SequenceNum
250✔
1017
        })
250✔
1018

1019
        return payments, nil
41✔
1020
}
1021

1022
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo, error) {
643✔
1023
        b := bucket.Get(paymentCreationInfoKey)
643✔
1024
        if b == nil {
643✔
NEW
1025
                return nil, fmt.Errorf("creation info not found")
×
NEW
1026
        }
×
1027

1028
        r := bytes.NewReader(b)
643✔
1029

643✔
1030
        return deserializePaymentCreationInfo(r)
643✔
1031
}
1032

1033
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
643✔
1034
        seqBytes := bucket.Get(paymentSequenceKey)
643✔
1035
        if seqBytes == nil {
643✔
NEW
1036
                return nil, fmt.Errorf("sequence number not found")
×
NEW
1037
        }
×
1038

1039
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
643✔
1040

643✔
1041
        // Get the PaymentCreationInfo.
643✔
1042
        creationInfo, err := fetchCreationInfo(bucket)
643✔
1043
        if err != nil {
643✔
NEW
1044
                return nil, err
×
NEW
1045
        }
×
1046

1047
        var htlcs []HTLCAttempt
643✔
1048
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
643✔
1049
        if htlcsBucket != nil {
1,032✔
1050
                // Get the payment attempts. This can be empty.
389✔
1051
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
389✔
1052
                if err != nil {
389✔
NEW
1053
                        return nil, err
×
NEW
1054
                }
×
1055
        }
1056

1057
        // Get failure reason if available.
1058
        var failureReason *FailureReason
643✔
1059
        b := bucket.Get(paymentFailInfoKey)
643✔
1060
        if b != nil {
719✔
1061
                reason := FailureReason(b[0])
76✔
1062
                failureReason = &reason
76✔
1063
        }
76✔
1064

1065
        // Create a new payment.
1066
        payment := &MPPayment{
643✔
1067
                SequenceNum:   sequenceNum,
643✔
1068
                Info:          creationInfo,
643✔
1069
                HTLCs:         htlcs,
643✔
1070
                FailureReason: failureReason,
643✔
1071
        }
643✔
1072

643✔
1073
        // Set its state and status.
643✔
1074
        if err := payment.setState(); err != nil {
643✔
NEW
1075
                return nil, err
×
NEW
1076
        }
×
1077

1078
        return payment, nil
643✔
1079
}
1080

1081
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
1082
// the given bucket.
1083
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
396✔
1084
        htlcsMap := make(map[uint64]*HTLCAttempt)
396✔
1085

396✔
1086
        attemptInfoCount := 0
396✔
1087
        err := bucket.ForEach(func(k, v []byte) error {
1,494✔
1088
                aid := byteOrder.Uint64(k[len(k)-8:])
1,098✔
1089

1,098✔
1090
                if _, ok := htlcsMap[aid]; !ok {
1,802✔
1091
                        htlcsMap[aid] = &HTLCAttempt{}
704✔
1092
                }
704✔
1093

1094
                var err error
1,098✔
1095
                switch {
1,098✔
1096
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
704✔
1097
                        attemptInfo, err := readHtlcAttemptInfo(v)
704✔
1098
                        if err != nil {
704✔
NEW
1099
                                return err
×
NEW
1100
                        }
×
1101

1102
                        attemptInfo.AttemptID = aid
704✔
1103
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
704✔
1104
                        attemptInfoCount++
704✔
1105

1106
                case bytes.HasPrefix(k, htlcSettleInfoKey):
92✔
1107
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
92✔
1108
                        if err != nil {
92✔
NEW
1109
                                return err
×
NEW
1110
                        }
×
1111

1112
                case bytes.HasPrefix(k, htlcFailInfoKey):
308✔
1113
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
308✔
1114
                        if err != nil {
308✔
NEW
1115
                                return err
×
NEW
1116
                        }
×
1117

NEW
1118
                default:
×
NEW
1119
                        return fmt.Errorf("unknown htlc attempt key")
×
1120
                }
1121

1122
                return nil
1,098✔
1123
        })
1124
        if err != nil {
396✔
NEW
1125
                return nil, err
×
NEW
1126
        }
×
1127

1128
        // Sanity check that all htlcs have an attempt info.
1129
        if attemptInfoCount != len(htlcsMap) {
396✔
NEW
1130
                return nil, errNoAttemptInfo
×
NEW
1131
        }
×
1132

1133
        keys := make([]uint64, len(htlcsMap))
396✔
1134
        i := 0
396✔
1135
        for k := range htlcsMap {
1,100✔
1136
                keys[i] = k
704✔
1137
                i++
704✔
1138
        }
704✔
1139

1140
        // Sort HTLC attempts by their attempt ID. This is needed because in the
1141
        // DB we store the attempts with keys prefixed by their status which
1142
        // changes order (groups them together by status).
1143
        sort.Slice(keys, func(i, j int) bool {
739✔
1144
                return keys[i] < keys[j]
343✔
1145
        })
343✔
1146

1147
        htlcs := make([]HTLCAttempt, len(htlcsMap))
396✔
1148
        for i, key := range keys {
1,100✔
1149
                htlcs[i] = *htlcsMap[key]
704✔
1150
        }
704✔
1151

1152
        return htlcs, nil
396✔
1153
}
1154

1155
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1156
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
704✔
1157
        r := bytes.NewReader(b)
704✔
1158
        return deserializeHTLCAttemptInfo(r)
704✔
1159
}
704✔
1160

1161
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1162
// settled, nil is returned.
1163
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
92✔
1164
        r := bytes.NewReader(b)
92✔
1165
        return deserializeHTLCSettleInfo(r)
92✔
1166
}
92✔
1167

1168
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1169
// failed, nil is returned.
1170
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
308✔
1171
        r := bytes.NewReader(b)
308✔
1172
        return deserializeHTLCFailInfo(r)
308✔
1173
}
308✔
1174

1175
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1176
// payment bucket.
1177
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
7✔
1178
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
7✔
1179

7✔
1180
        var htlcs []HTLCAttempt
7✔
1181
        var err error
7✔
1182
        if htlcsBucket != nil {
14✔
1183
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
7✔
1184
                if err != nil {
7✔
NEW
1185
                        return nil, err
×
NEW
1186
                }
×
1187
        }
1188

1189
        // Now iterate though them and save the bucket keys for the failed
1190
        // HTLCs.
1191
        var htlcKeys [][]byte
7✔
1192
        for _, h := range htlcs {
19✔
1193
                if h.Failure == nil {
15✔
1194
                        continue
3✔
1195
                }
1196

1197
                htlcKeyBytes := make([]byte, 8)
9✔
1198
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
9✔
1199

9✔
1200
                htlcKeys = append(htlcKeys, htlcKeyBytes)
9✔
1201
        }
1202

1203
        return htlcKeys, nil
7✔
1204
}
1205

1206
// QueryPayments is a query to the payments database which is restricted
1207
// to a subset of payments by the payments query, containing an offset
1208
// index and a maximum number of returned payments.
1209
func (p *KVPaymentsDB) QueryPayments(query PaymentsQuery) (PaymentsResponse,
1210
        error) {
39✔
1211

39✔
1212
        var resp PaymentsResponse
39✔
1213

39✔
1214
        if err := kvdb.View(p.db, func(tx kvdb.RTx) error {
78✔
1215
                // Get the root payments bucket.
39✔
1216
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
39✔
1217
                if paymentsBucket == nil {
60✔
1218
                        return nil
21✔
1219
                }
21✔
1220

1221
                // Get the index bucket which maps sequence number -> payment
1222
                // hash and duplicate bool. If we have a payments bucket, we
1223
                // should have an indexes bucket as well.
1224
                indexes := tx.ReadBucket(paymentsIndexBucket)
21✔
1225
                if indexes == nil {
21✔
NEW
1226
                        return fmt.Errorf("index bucket does not exist")
×
NEW
1227
                }
×
1228

1229
                // accumulatePayments gets payments with the sequence number
1230
                // and hash provided and adds them to our list of payments if
1231
                // they meet the criteria of our query. It returns the number
1232
                // of payments that were added.
1233
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
21✔
1234
                        error) {
76✔
1235

55✔
1236
                        r := bytes.NewReader(hash)
55✔
1237
                        paymentHash, err := deserializePaymentIndex(r)
55✔
1238
                        if err != nil {
55✔
NEW
1239
                                return false, err
×
NEW
1240
                        }
×
1241

1242
                        payment, err := fetchPaymentWithSequenceNumber(
55✔
1243
                                tx, paymentHash, sequenceKey,
55✔
1244
                        )
55✔
1245
                        if err != nil {
55✔
NEW
1246
                                return false, err
×
NEW
1247
                        }
×
1248

1249
                        // To keep compatibility with the old API, we only
1250
                        // return non-succeeded payments if requested.
1251
                        if payment.Status != StatusSucceeded &&
55✔
1252
                                !query.IncludeIncomplete {
60✔
1253

5✔
1254
                                return false, err
5✔
1255
                        }
5✔
1256

1257
                        // Get the creation time in Unix seconds, this always
1258
                        // rounds down the nanoseconds to full seconds.
1259
                        createTime := payment.Info.CreationTime.Unix()
50✔
1260

50✔
1261
                        // Skip any payments that were created before the
50✔
1262
                        // specified time.
50✔
1263
                        if createTime < query.CreationDateStart {
62✔
1264
                                return false, nil
12✔
1265
                        }
12✔
1266

1267
                        // Skip any payments that were created after the
1268
                        // specified time.
1269
                        if query.CreationDateEnd != 0 &&
41✔
1270
                                createTime > query.CreationDateEnd {
46✔
1271

5✔
1272
                                return false, nil
5✔
1273
                        }
5✔
1274

1275
                        // At this point, we've exhausted the offset, so we'll
1276
                        // begin collecting invoices found within the range.
1277
                        resp.Payments = append(resp.Payments, payment)
39✔
1278

39✔
1279
                        return true, nil
39✔
1280
                }
1281

1282
                // Create a paginator which reads from our sequence index bucket
1283
                // with the parameters provided by the payments query.
1284
                paginator := newPaginator(
21✔
1285
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
21✔
1286
                        query.MaxPayments,
21✔
1287
                )
21✔
1288

21✔
1289
                // Run a paginated query, adding payments to our response.
21✔
1290
                if err := paginator.query(accumulatePayments); err != nil {
21✔
NEW
1291
                        return err
×
NEW
1292
                }
×
1293

1294
                // Counting the total number of payments is expensive, since we
1295
                // literally have to traverse the cursor linearly, which can
1296
                // take quite a while. So it's an optional query parameter.
1297
                if query.CountTotal {
21✔
NEW
1298
                        var (
×
NEW
1299
                                totalPayments uint64
×
NEW
1300
                                err           error
×
NEW
1301
                        )
×
NEW
1302
                        countFn := func(_, _ []byte) error {
×
NEW
1303
                                totalPayments++
×
NEW
1304

×
NEW
1305
                                return nil
×
NEW
1306
                        }
×
1307

1308
                        // In non-boltdb database backends, there's a faster
1309
                        // ForAll query that allows for batch fetching items.
NEW
1310
                        fastBucket, ok := indexes.(kvdb.ExtendedRBucket)
×
NEW
1311
                        if ok {
×
NEW
1312
                                err = fastBucket.ForAll(countFn)
×
NEW
1313
                        } else {
×
NEW
1314
                                err = indexes.ForEach(countFn)
×
NEW
1315
                        }
×
NEW
1316
                        if err != nil {
×
NEW
1317
                                return fmt.Errorf("error counting payments: %w",
×
NEW
1318
                                        err)
×
NEW
1319
                        }
×
1320

NEW
1321
                        resp.TotalCount = totalPayments
×
1322
                }
1323

1324
                return nil
21✔
1325
        }, func() {
39✔
1326
                resp = PaymentsResponse{}
39✔
1327
        }); err != nil {
39✔
NEW
1328
                return resp, err
×
NEW
1329
        }
×
1330

1331
        // Need to swap the payments slice order if reversed order.
1332
        if query.Reversed {
55✔
1333
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
24✔
1334
                        resp.Payments[l], resp.Payments[r] =
8✔
1335
                                resp.Payments[r], resp.Payments[l]
8✔
1336
                }
8✔
1337
        }
1338

1339
        // Set the first and last index of the returned payments so that the
1340
        // caller can resume from this point later on.
1341
        if len(resp.Payments) > 0 {
58✔
1342
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
19✔
1343
                resp.LastIndexOffset =
19✔
1344
                        resp.Payments[len(resp.Payments)-1].SequenceNum
19✔
1345
        }
19✔
1346

1347
        return resp, nil
39✔
1348
}
1349

1350
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1351
// *and* sequence number provided from the database. This is required because
1352
// we previously had more than one payment per hash, so we have multiple indexes
1353
// pointing to a single payment; we want to retrieve the correct one.
1354
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1355
        sequenceNumber []byte) (*MPPayment, error) {
61✔
1356

61✔
1357
        // We can now lookup the payment keyed by its hash in
61✔
1358
        // the payments root bucket.
61✔
1359
        bucket, err := fetchPaymentBucket(tx, paymentHash)
61✔
1360
        if err != nil {
61✔
NEW
1361
                return nil, err
×
NEW
1362
        }
×
1363

1364
        // A single payment hash can have multiple payments associated with it.
1365
        // We lookup our sequence number first, to determine whether this is
1366
        // the payment we are actually looking for.
1367
        seqBytes := bucket.Get(paymentSequenceKey)
61✔
1368
        if seqBytes == nil {
61✔
NEW
1369
                return nil, ErrNoSequenceNumber
×
NEW
1370
        }
×
1371

1372
        // If this top level payment has the sequence number we are looking for,
1373
        // return it.
1374
        if bytes.Equal(seqBytes, sequenceNumber) {
109✔
1375
                return fetchPayment(bucket)
48✔
1376
        }
48✔
1377

1378
        // If we were not looking for the top level payment, we are looking for
1379
        // one of our duplicate payments. We need to iterate through the seq
1380
        // numbers in this bucket to find the correct payments. If we do not
1381
        // find a duplicate payments bucket here, something is wrong.
1382
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
13✔
1383
        if dup == nil {
14✔
1384
                return nil, ErrNoDuplicateBucket
1✔
1385
        }
1✔
1386

1387
        var duplicatePayment *MPPayment
12✔
1388
        err = dup.ForEach(func(k, v []byte) error {
27✔
1389
                subBucket := dup.NestedReadBucket(k)
15✔
1390
                if subBucket == nil {
15✔
NEW
1391
                        // We one bucket for each duplicate to be found.
×
NEW
1392
                        return ErrNoDuplicateNestedBucket
×
NEW
1393
                }
×
1394

1395
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
15✔
1396
                if seqBytes == nil {
15✔
NEW
1397
                        return err
×
NEW
1398
                }
×
1399

1400
                // If this duplicate payment is not the sequence number we are
1401
                // looking for, we can continue.
1402
                if !bytes.Equal(seqBytes, sequenceNumber) {
19✔
1403
                        return nil
4✔
1404
                }
4✔
1405

1406
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
11✔
1407
                if err != nil {
11✔
NEW
1408
                        return err
×
NEW
1409
                }
×
1410

1411
                return nil
11✔
1412
        })
1413
        if err != nil {
12✔
NEW
1414
                return nil, err
×
NEW
1415
        }
×
1416

1417
        // If none of the duplicate payments matched our sequence number, we
1418
        // failed to find the payment with this sequence number; something is
1419
        // wrong.
1420
        if duplicatePayment == nil {
13✔
1421
                return nil, ErrDuplicateNotFound
1✔
1422
        }
1✔
1423

1424
        return duplicatePayment, nil
11✔
1425
}
1426

1427
// DeletePayment deletes a payment from the DB given its payment hash. If
1428
// failedHtlcsOnly is set, only failed HTLC attempts of the payment will be
1429
// deleted.
1430
func (p *KVPaymentsDB) DeletePayment(paymentHash lntypes.Hash,
1431
        failedHtlcsOnly bool) error {
11✔
1432

11✔
1433
        return kvdb.Update(p.db, func(tx kvdb.RwTx) error {
22✔
1434
                payments := tx.ReadWriteBucket(paymentsRootBucket)
11✔
1435
                if payments == nil {
11✔
NEW
1436
                        return nil
×
NEW
1437
                }
×
1438

1439
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
11✔
1440
                if bucket == nil {
12✔
1441
                        return fmt.Errorf("non bucket element in payments " +
1✔
1442
                                "bucket")
1✔
1443
                }
1✔
1444

1445
                // If the status is InFlight, we cannot safely delete
1446
                // the payment information, so we return early.
1447
                paymentStatus, err := fetchPaymentStatus(bucket)
10✔
1448
                if err != nil {
10✔
NEW
1449
                        return err
×
NEW
1450
                }
×
1451

1452
                // If the payment has inflight HTLCs, we cannot safely delete
1453
                // the payment information, so we return an error.
1454
                if err := paymentStatus.removable(); err != nil {
13✔
1455
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
3✔
1456
                                "and therefore cannot be deleted: %w",
3✔
1457
                                paymentHash.String(), err)
3✔
1458
                }
3✔
1459

1460
                // Delete the failed HTLC attempts we found.
1461
                if failedHtlcsOnly {
11✔
1462
                        toDelete, err := fetchFailedHtlcKeys(bucket)
4✔
1463
                        if err != nil {
4✔
NEW
1464
                                return err
×
NEW
1465
                        }
×
1466

1467
                        htlcsBucket := bucket.NestedReadWriteBucket(
4✔
1468
                                paymentHtlcsBucket,
4✔
1469
                        )
4✔
1470

4✔
1471
                        for _, htlcID := range toDelete {
10✔
1472
                                err = htlcsBucket.Delete(
6✔
1473
                                        htlcBucketKey(
6✔
1474
                                                htlcAttemptInfoKey, htlcID,
6✔
1475
                                        ),
6✔
1476
                                )
6✔
1477
                                if err != nil {
6✔
NEW
1478
                                        return err
×
NEW
1479
                                }
×
1480

1481
                                err = htlcsBucket.Delete(
6✔
1482
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
6✔
1483
                                )
6✔
1484
                                if err != nil {
6✔
NEW
1485
                                        return err
×
NEW
1486
                                }
×
1487

1488
                                err = htlcsBucket.Delete(
6✔
1489
                                        htlcBucketKey(
6✔
1490
                                                htlcSettleInfoKey, htlcID,
6✔
1491
                                        ),
6✔
1492
                                )
6✔
1493
                                if err != nil {
6✔
NEW
1494
                                        return err
×
NEW
1495
                                }
×
1496
                        }
1497

1498
                        return nil
4✔
1499
                }
1500

1501
                seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1502
                if err != nil {
3✔
NEW
1503
                        return err
×
NEW
1504
                }
×
1505

1506
                err = payments.DeleteNestedBucket(paymentHash[:])
3✔
1507
                if err != nil {
3✔
NEW
1508
                        return err
×
NEW
1509
                }
×
1510

1511
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1512
                for _, k := range seqNrs {
6✔
1513
                        if err := indexBucket.Delete(k); err != nil {
3✔
NEW
1514
                                return err
×
NEW
1515
                        }
×
1516
                }
1517

1518
                return nil
3✔
1519
        }, func() {})
11✔
1520
}
1521

1522
// DeletePayments deletes all completed and failed payments from the DB. If
1523
// failedOnly is set, only failed payments will be considered for deletion. If
1524
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1525
// attempts. The method returns the number of deleted payments, which is always
1526
// 0 if failedHtlcsOnly is set.
1527
func (p *KVPaymentsDB) DeletePayments(failedOnly,
1528
        failedHtlcsOnly bool) (int, error) {
9✔
1529

9✔
1530
        var numPayments int
9✔
1531
        err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
18✔
1532
                payments := tx.ReadWriteBucket(paymentsRootBucket)
9✔
1533
                if payments == nil {
9✔
NEW
1534
                        return nil
×
NEW
1535
                }
×
1536

1537
                var (
9✔
1538
                        // deleteBuckets is the set of payment buckets we need
9✔
1539
                        // to delete.
9✔
1540
                        deleteBuckets [][]byte
9✔
1541

9✔
1542
                        // deleteIndexes is the set of indexes pointing to these
9✔
1543
                        // payments that need to be deleted.
9✔
1544
                        deleteIndexes [][]byte
9✔
1545

9✔
1546
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
9✔
1547
                        // want to delete for that payment.
9✔
1548
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
9✔
1549
                )
9✔
1550
                err := payments.ForEach(func(k, _ []byte) error {
30✔
1551
                        bucket := payments.NestedReadBucket(k)
21✔
1552
                        if bucket == nil {
21✔
NEW
1553
                                // We only expect sub-buckets to be found in
×
NEW
1554
                                // this top-level bucket.
×
NEW
1555
                                return fmt.Errorf("non bucket element in " +
×
NEW
1556
                                        "payments bucket")
×
NEW
1557
                        }
×
1558

1559
                        // If the status is InFlight, we cannot safely delete
1560
                        // the payment information, so we return early.
1561
                        paymentStatus, err := fetchPaymentStatus(bucket)
21✔
1562
                        if err != nil {
21✔
NEW
1563
                                return err
×
NEW
1564
                        }
×
1565

1566
                        // If the payment has inflight HTLCs, we cannot safely
1567
                        // delete the payment information, so we return an nil
1568
                        // to skip it.
1569
                        if err := paymentStatus.removable(); err != nil {
27✔
1570
                                return nil
6✔
1571
                        }
6✔
1572

1573
                        // If we requested to only delete failed payments, we
1574
                        // can return if this one is not.
1575
                        if failedOnly && paymentStatus != StatusFailed {
19✔
1576
                                return nil
4✔
1577
                        }
4✔
1578

1579
                        // If we are only deleting failed HTLCs, fetch them.
1580
                        if failedHtlcsOnly {
14✔
1581
                                toDelete, err := fetchFailedHtlcKeys(bucket)
3✔
1582
                                if err != nil {
3✔
NEW
1583
                                        return err
×
NEW
1584
                                }
×
1585

1586
                                hash, err := lntypes.MakeHash(k)
3✔
1587
                                if err != nil {
3✔
NEW
1588
                                        return err
×
NEW
1589
                                }
×
1590

1591
                                deleteHtlcs[hash] = toDelete
3✔
1592

3✔
1593
                                // We return, we are only deleting attempts.
3✔
1594
                                return nil
3✔
1595
                        }
1596

1597
                        // Add the bucket to the set of buckets we can delete.
1598
                        deleteBuckets = append(deleteBuckets, k)
8✔
1599

8✔
1600
                        // Get all the sequence number associated with the
8✔
1601
                        // payment, including duplicates.
8✔
1602
                        seqNrs, err := fetchSequenceNumbers(bucket)
8✔
1603
                        if err != nil {
8✔
NEW
1604
                                return err
×
NEW
1605
                        }
×
1606

1607
                        deleteIndexes = append(deleteIndexes, seqNrs...)
8✔
1608
                        numPayments++
8✔
1609

8✔
1610
                        return nil
8✔
1611
                })
1612
                if err != nil {
9✔
NEW
1613
                        return err
×
NEW
1614
                }
×
1615

1616
                // Delete the failed HTLC attempts we found.
1617
                for hash, htlcIDs := range deleteHtlcs {
12✔
1618
                        bucket := payments.NestedReadWriteBucket(hash[:])
3✔
1619
                        htlcsBucket := bucket.NestedReadWriteBucket(
3✔
1620
                                paymentHtlcsBucket,
3✔
1621
                        )
3✔
1622

3✔
1623
                        for _, aid := range htlcIDs {
6✔
1624
                                if err := htlcsBucket.Delete(
3✔
1625
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
3✔
1626
                                ); err != nil {
3✔
NEW
1627
                                        return err
×
NEW
1628
                                }
×
1629

1630
                                if err := htlcsBucket.Delete(
3✔
1631
                                        htlcBucketKey(htlcFailInfoKey, aid),
3✔
1632
                                ); err != nil {
3✔
NEW
1633
                                        return err
×
NEW
1634
                                }
×
1635

1636
                                if err := htlcsBucket.Delete(
3✔
1637
                                        htlcBucketKey(htlcSettleInfoKey, aid),
3✔
1638
                                ); err != nil {
3✔
NEW
1639
                                        return err
×
NEW
1640
                                }
×
1641
                        }
1642
                }
1643

1644
                for _, k := range deleteBuckets {
17✔
1645
                        if err := payments.DeleteNestedBucket(k); err != nil {
8✔
NEW
1646
                                return err
×
NEW
1647
                        }
×
1648
                }
1649

1650
                // Get our index bucket and delete all indexes pointing to the
1651
                // payments we are deleting.
1652
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
9✔
1653
                for _, k := range deleteIndexes {
18✔
1654
                        if err := indexBucket.Delete(k); err != nil {
9✔
NEW
1655
                                return err
×
NEW
1656
                        }
×
1657
                }
1658

1659
                return nil
9✔
1660
        }, func() {
9✔
1661
                numPayments = 0
9✔
1662
        })
9✔
1663
        if err != nil {
9✔
NEW
1664
                return 0, err
×
NEW
1665
        }
×
1666

1667
        return numPayments, nil
9✔
1668
}
1669

1670
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1671
// payment, including those belonging to any duplicate payments.
1672
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
11✔
1673
        seqNum := paymentBucket.Get(paymentSequenceKey)
11✔
1674
        if seqNum == nil {
11✔
NEW
1675
                return nil, errors.New("expected sequence number")
×
NEW
1676
        }
×
1677

1678
        sequenceNumbers := [][]byte{seqNum}
11✔
1679

11✔
1680
        // Get the duplicate payments bucket, if it has no duplicates, just
11✔
1681
        // return early with the payment sequence number.
11✔
1682
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
11✔
1683
        if duplicates == nil {
21✔
1684
                return sequenceNumbers, nil
10✔
1685
        }
10✔
1686

1687
        // If we do have duplicated, they are keyed by sequence number, so we
1688
        // iterate through the duplicates bucket and add them to our set of
1689
        // sequence numbers.
1690
        if err := duplicates.ForEach(func(k, v []byte) error {
2✔
1691
                sequenceNumbers = append(sequenceNumbers, k)
1✔
1692
                return nil
1✔
1693
        }); err != nil {
1✔
NEW
1694
                return nil, err
×
NEW
1695
        }
×
1696

1697
        return sequenceNumbers, nil
1✔
1698
}
1699

1700
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
154✔
1701
        var scratch [8]byte
154✔
1702

154✔
1703
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
154✔
NEW
1704
                return err
×
NEW
1705
        }
×
1706

1707
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
154✔
1708
        if _, err := w.Write(scratch[:]); err != nil {
154✔
NEW
1709
                return err
×
NEW
1710
        }
×
1711

1712
        if err := serializeTime(w, c.CreationTime); err != nil {
154✔
NEW
1713
                return err
×
NEW
1714
        }
×
1715

1716
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
154✔
1717
        if _, err := w.Write(scratch[:4]); err != nil {
154✔
NEW
1718
                return err
×
NEW
1719
        }
×
1720

1721
        if _, err := w.Write(c.PaymentRequest); err != nil {
154✔
NEW
1722
                return err
×
NEW
1723
        }
×
1724

1725
        // Any remaining bytes are TLV encoded records. Currently, these are
1726
        // only the custom records provided by the user to be sent to the first
1727
        // hop. But this can easily be extended with further records by merging
1728
        // the records into a single TLV stream.
1729
        err := c.FirstHopCustomRecords.SerializeTo(w)
154✔
1730
        if err != nil {
154✔
NEW
1731
                return err
×
NEW
1732
        }
×
1733

1734
        return nil
154✔
1735
}
1736

1737
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1738
        error) {
645✔
1739

645✔
1740
        var scratch [8]byte
645✔
1741

645✔
1742
        c := &PaymentCreationInfo{}
645✔
1743

645✔
1744
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
645✔
NEW
1745
                return nil, err
×
NEW
1746
        }
×
1747

1748
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
645✔
NEW
1749
                return nil, err
×
NEW
1750
        }
×
1751
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
645✔
1752

645✔
1753
        creationTime, err := deserializeTime(r)
645✔
1754
        if err != nil {
645✔
NEW
1755
                return nil, err
×
NEW
1756
        }
×
1757
        c.CreationTime = creationTime
645✔
1758

645✔
1759
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
645✔
NEW
1760
                return nil, err
×
NEW
1761
        }
×
1762

1763
        reqLen := byteOrder.Uint32(scratch[:4])
645✔
1764
        payReq := make([]byte, reqLen)
645✔
1765
        if reqLen > 0 {
1,290✔
1766
                if _, err := io.ReadFull(r, payReq); err != nil {
645✔
NEW
1767
                        return nil, err
×
NEW
1768
                }
×
1769
        }
1770
        c.PaymentRequest = payReq
645✔
1771

645✔
1772
        // Any remaining bytes are TLV encoded records. Currently, these are
645✔
1773
        // only the custom records provided by the user to be sent to the first
645✔
1774
        // hop. But this can easily be extended with further records by merging
645✔
1775
        // the records into a single TLV stream.
645✔
1776
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
645✔
1777
        if err != nil {
645✔
NEW
1778
                return nil, err
×
NEW
1779
        }
×
1780

1781
        return c, nil
645✔
1782
}
1783

1784
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
75✔
1785
        if err := WriteElements(w, a.sessionKey); err != nil {
75✔
NEW
1786
                return err
×
NEW
1787
        }
×
1788

1789
        if err := SerializeRoute(w, a.Route); err != nil {
75✔
NEW
1790
                return err
×
NEW
1791
        }
×
1792

1793
        if err := serializeTime(w, a.AttemptTime); err != nil {
75✔
NEW
1794
                return err
×
NEW
1795
        }
×
1796

1797
        // If the hash is nil we can just return.
1798
        if a.Hash == nil {
75✔
NEW
1799
                return nil
×
NEW
1800
        }
×
1801

1802
        if _, err := w.Write(a.Hash[:]); err != nil {
75✔
NEW
1803
                return err
×
NEW
1804
        }
×
1805

1806
        // Merge the fixed/known records together with the custom records to
1807
        // serialize them as a single blob. We can't do this in SerializeRoute
1808
        // because we're in the middle of the byte stream there. We can only do
1809
        // TLV serialization at the end of the stream, since EOF is allowed for
1810
        // a stream if no more data is expected.
1811
        producers := []tlv.RecordProducer{
75✔
1812
                &a.Route.FirstHopAmount,
75✔
1813
        }
75✔
1814
        tlvData, err := lnwire.MergeAndEncode(
75✔
1815
                producers, nil, a.Route.FirstHopWireCustomRecords,
75✔
1816
        )
75✔
1817
        if err != nil {
75✔
NEW
1818
                return err
×
NEW
1819
        }
×
1820

1821
        if _, err := w.Write(tlvData); err != nil {
75✔
NEW
1822
                return err
×
NEW
1823
        }
×
1824

1825
        return nil
75✔
1826
}
1827

1828
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
706✔
1829
        a := &HTLCAttemptInfo{}
706✔
1830
        err := ReadElements(r, &a.sessionKey)
706✔
1831
        if err != nil {
706✔
NEW
1832
                return nil, err
×
NEW
1833
        }
×
1834

1835
        a.Route, err = DeserializeRoute(r)
706✔
1836
        if err != nil {
706✔
NEW
1837
                return nil, err
×
NEW
1838
        }
×
1839

1840
        a.AttemptTime, err = deserializeTime(r)
706✔
1841
        if err != nil {
706✔
NEW
1842
                return nil, err
×
NEW
1843
        }
×
1844

1845
        hash := lntypes.Hash{}
706✔
1846
        _, err = io.ReadFull(r, hash[:])
706✔
1847

706✔
1848
        switch {
706✔
1849
        // Older payment attempts wouldn't have the hash set, in which case we
1850
        // can just return.
NEW
1851
        case errors.Is(err, io.EOF), errors.Is(err, io.ErrUnexpectedEOF):
×
NEW
1852
                return a, nil
×
1853

NEW
1854
        case err != nil:
×
NEW
1855
                return nil, err
×
1856

1857
        default:
706✔
1858
        }
1859

1860
        a.Hash = &hash
706✔
1861

706✔
1862
        // Read any remaining data (if any) and parse it into the known records
706✔
1863
        // and custom records.
706✔
1864
        extraData, err := io.ReadAll(r)
706✔
1865
        if err != nil {
706✔
NEW
1866
                return nil, err
×
NEW
1867
        }
×
1868

1869
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
706✔
1870
                extraData, &a.Route.FirstHopAmount,
706✔
1871
        )
706✔
1872
        if err != nil {
706✔
NEW
1873
                return nil, err
×
NEW
1874
        }
×
1875

1876
        a.Route.FirstHopWireCustomRecords = customRecords
706✔
1877

706✔
1878
        return a, nil
706✔
1879
}
1880

1881
func serializeHop(w io.Writer, h *route.Hop) error {
152✔
1882
        if err := WriteElements(w,
152✔
1883
                h.PubKeyBytes[:],
152✔
1884
                h.ChannelID,
152✔
1885
                h.OutgoingTimeLock,
152✔
1886
                h.AmtToForward,
152✔
1887
        ); err != nil {
152✔
NEW
1888
                return err
×
NEW
1889
        }
×
1890

1891
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
152✔
NEW
1892
                return err
×
NEW
1893
        }
×
1894

1895
        // For legacy payloads, we don't need to write any TLV records, so
1896
        // we'll write a zero indicating the our serialized TLV map has no
1897
        // records.
1898
        if h.LegacyPayload {
232✔
1899
                return WriteElements(w, uint32(0))
80✔
1900
        }
80✔
1901

1902
        // Gather all non-primitive TLV records so that they can be serialized
1903
        // as a single blob.
1904
        //
1905
        // TODO(conner): add migration to unify all fields in a single TLV
1906
        // blobs. The split approach will cause headaches down the road as more
1907
        // fields are added, which we can avoid by having a single TLV stream
1908
        // for all payload fields.
1909
        var records []tlv.Record
72✔
1910
        if h.MPP != nil {
139✔
1911
                records = append(records, h.MPP.Record())
67✔
1912
        }
67✔
1913

1914
        // Add blinding point and encrypted data if present.
1915
        if h.EncryptedData != nil {
77✔
1916
                records = append(records, record.NewEncryptedDataRecord(
5✔
1917
                        &h.EncryptedData,
5✔
1918
                ))
5✔
1919
        }
5✔
1920

1921
        if h.BlindingPoint != nil {
76✔
1922
                records = append(records, record.NewBlindingPointRecord(
4✔
1923
                        &h.BlindingPoint,
4✔
1924
                ))
4✔
1925
        }
4✔
1926

1927
        if h.AMP != nil {
75✔
1928
                records = append(records, h.AMP.Record())
3✔
1929
        }
3✔
1930

1931
        if h.Metadata != nil {
139✔
1932
                records = append(records, record.NewMetadataRecord(&h.Metadata))
67✔
1933
        }
67✔
1934

1935
        if h.TotalAmtMsat != 0 {
76✔
1936
                totalMsatInt := uint64(h.TotalAmtMsat)
4✔
1937
                records = append(
4✔
1938
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
4✔
1939
                )
4✔
1940
        }
4✔
1941

1942
        // Final sanity check to absolutely rule out custom records that are not
1943
        // custom and write into the standard range.
1944
        if err := h.CustomRecords.Validate(); err != nil {
72✔
NEW
1945
                return err
×
NEW
1946
        }
×
1947

1948
        // Convert custom records to tlv and add to the record list.
1949
        // MapToRecords sorts the list, so adding it here will keep the list
1950
        // canonical.
1951
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
72✔
1952
        records = append(records, tlvRecords...)
72✔
1953

72✔
1954
        // Otherwise, we'll transform our slice of records into a map of the
72✔
1955
        // raw bytes, then serialize them in-line with a length (number of
72✔
1956
        // elements) prefix.
72✔
1957
        mapRecords, err := tlv.RecordsToMap(records)
72✔
1958
        if err != nil {
72✔
NEW
1959
                return err
×
NEW
1960
        }
×
1961

1962
        numRecords := uint32(len(mapRecords))
72✔
1963
        if err := WriteElements(w, numRecords); err != nil {
72✔
NEW
1964
                return err
×
NEW
1965
        }
×
1966

1967
        for recordType, rawBytes := range mapRecords {
342✔
1968
                if err := WriteElements(w, recordType); err != nil {
270✔
NEW
1969
                        return err
×
NEW
1970
                }
×
1971

1972
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
270✔
NEW
1973
                        return err
×
NEW
1974
                }
×
1975
        }
1976

1977
        return nil
72✔
1978
}
1979

1980
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
1981
// to read/write a TLV stream larger than this.
1982
const maxOnionPayloadSize = 1300
1983

1984
func deserializeHop(r io.Reader) (*route.Hop, error) {
1,414✔
1985
        h := &route.Hop{}
1,414✔
1986

1,414✔
1987
        var pub []byte
1,414✔
1988
        if err := ReadElements(r, &pub); err != nil {
1,414✔
NEW
1989
                return nil, err
×
NEW
1990
        }
×
1991
        copy(h.PubKeyBytes[:], pub)
1,414✔
1992

1,414✔
1993
        if err := ReadElements(r,
1,414✔
1994
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
1,414✔
1995
        ); err != nil {
1,414✔
NEW
1996
                return nil, err
×
NEW
1997
        }
×
1998

1999
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
2000
        // legacy default?
2001
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
1,414✔
2002
        if err != nil {
1,414✔
NEW
2003
                return nil, err
×
NEW
2004
        }
×
2005

2006
        var numElements uint32
1,414✔
2007
        if err := ReadElements(r, &numElements); err != nil {
1,414✔
NEW
2008
                return nil, err
×
NEW
2009
        }
×
2010

2011
        // If there're no elements, then we can return early.
2012
        if numElements == 0 {
2,149✔
2013
                return h, nil
735✔
2014
        }
735✔
2015

2016
        tlvMap := make(map[uint64][]byte)
682✔
2017
        for i := uint32(0); i < numElements; i++ {
3,391✔
2018
                var tlvType uint64
2,709✔
2019
                if err := ReadElements(r, &tlvType); err != nil {
2,709✔
NEW
2020
                        return nil, err
×
NEW
2021
                }
×
2022

2023
                rawRecordBytes, err := wire.ReadVarBytes(
2,709✔
2024
                        r, 0, maxOnionPayloadSize, "tlv",
2,709✔
2025
                )
2,709✔
2026
                if err != nil {
2,709✔
NEW
2027
                        return nil, err
×
NEW
2028
                }
×
2029

2030
                tlvMap[tlvType] = rawRecordBytes
2,709✔
2031
        }
2032

2033
        // If the MPP type is present, remove it from the generic TLV map and
2034
        // parse it back into a proper MPP struct.
2035
        //
2036
        // TODO(conner): add migration to unify all fields in a single TLV
2037
        // blobs. The split approach will cause headaches down the road as more
2038
        // fields are added, which we can avoid by having a single TLV stream
2039
        // for all payload fields.
2040
        mppType := uint64(record.MPPOnionType)
682✔
2041
        if mppBytes, ok := tlvMap[mppType]; ok {
1,358✔
2042
                delete(tlvMap, mppType)
676✔
2043

676✔
2044
                var (
676✔
2045
                        mpp    = &record.MPP{}
676✔
2046
                        mppRec = mpp.Record()
676✔
2047
                        r      = bytes.NewReader(mppBytes)
676✔
2048
                )
676✔
2049
                err := mppRec.Decode(r, uint64(len(mppBytes)))
676✔
2050
                if err != nil {
676✔
NEW
2051
                        return nil, err
×
NEW
2052
                }
×
2053
                h.MPP = mpp
676✔
2054
        }
2055

2056
        // If encrypted data or blinding key are present, remove them from
2057
        // the TLV map and parse into proper types.
2058
        encryptedDataType := uint64(record.EncryptedDataOnionType)
682✔
2059
        if data, ok := tlvMap[encryptedDataType]; ok {
687✔
2060
                delete(tlvMap, encryptedDataType)
5✔
2061
                h.EncryptedData = data
5✔
2062
        }
5✔
2063

2064
        blindingType := uint64(record.BlindingPointOnionType)
682✔
2065
        if blindingPoint, ok := tlvMap[blindingType]; ok {
686✔
2066
                delete(tlvMap, blindingType)
4✔
2067

4✔
2068
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
4✔
2069
                if err != nil {
4✔
NEW
2070
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
NEW
2071
                                err)
×
NEW
2072
                }
×
2073
        }
2074

2075
        ampType := uint64(record.AMPOnionType)
682✔
2076
        if ampBytes, ok := tlvMap[ampType]; ok {
685✔
2077
                delete(tlvMap, ampType)
3✔
2078

3✔
2079
                var (
3✔
2080
                        amp    = &record.AMP{}
3✔
2081
                        ampRec = amp.Record()
3✔
2082
                        r      = bytes.NewReader(ampBytes)
3✔
2083
                )
3✔
2084
                err := ampRec.Decode(r, uint64(len(ampBytes)))
3✔
2085
                if err != nil {
3✔
NEW
2086
                        return nil, err
×
NEW
2087
                }
×
2088
                h.AMP = amp
3✔
2089
        }
2090

2091
        // If the metadata type is present, remove it from the tlv map and
2092
        // populate directly on the hop.
2093
        metadataType := uint64(record.MetadataOnionType)
682✔
2094
        if metadata, ok := tlvMap[metadataType]; ok {
1,359✔
2095
                delete(tlvMap, metadataType)
677✔
2096

677✔
2097
                h.Metadata = metadata
677✔
2098
        }
677✔
2099

2100
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
682✔
2101
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
686✔
2102
                delete(tlvMap, totalAmtMsatType)
4✔
2103

4✔
2104
                var (
4✔
2105
                        totalAmtMsatInt uint64
4✔
2106
                        buf             [8]byte
4✔
2107
                )
4✔
2108
                if err := tlv.DTUint64(
4✔
2109
                        bytes.NewReader(totalAmtMsat),
4✔
2110
                        &totalAmtMsatInt,
4✔
2111
                        &buf,
4✔
2112
                        uint64(len(totalAmtMsat)),
4✔
2113
                ); err != nil {
4✔
NEW
2114
                        return nil, err
×
NEW
2115
                }
×
2116

2117
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
4✔
2118
        }
2119

2120
        h.CustomRecords = tlvMap
682✔
2121

682✔
2122
        return h, nil
682✔
2123
}
2124

2125
// SerializeRoute serializes a route.
2126
func SerializeRoute(w io.Writer, r route.Route) error {
77✔
2127
        if err := WriteElements(w,
77✔
2128
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
77✔
2129
        ); err != nil {
77✔
NEW
2130
                return err
×
NEW
2131
        }
×
2132

2133
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
77✔
NEW
2134
                return err
×
NEW
2135
        }
×
2136

2137
        for _, h := range r.Hops {
229✔
2138
                if err := serializeHop(w, h); err != nil {
152✔
NEW
2139
                        return err
×
NEW
2140
                }
×
2141
        }
2142

2143
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
2144

2145
        return nil
77✔
2146
}
2147

2148
// DeserializeRoute deserializes a route.
2149
func DeserializeRoute(r io.Reader) (route.Route, error) {
708✔
2150
        rt := route.Route{}
708✔
2151
        if err := ReadElements(r,
708✔
2152
                &rt.TotalTimeLock, &rt.TotalAmount,
708✔
2153
        ); err != nil {
708✔
NEW
2154
                return rt, err
×
NEW
2155
        }
×
2156

2157
        var pub []byte
708✔
2158
        if err := ReadElements(r, &pub); err != nil {
708✔
NEW
2159
                return rt, err
×
NEW
2160
        }
×
2161
        copy(rt.SourcePubKey[:], pub)
708✔
2162

708✔
2163
        var numHops uint32
708✔
2164
        if err := ReadElements(r, &numHops); err != nil {
708✔
NEW
2165
                return rt, err
×
NEW
2166
        }
×
2167

2168
        var hops []*route.Hop
708✔
2169
        for i := uint32(0); i < numHops; i++ {
2,122✔
2170
                hop, err := deserializeHop(r)
1,414✔
2171
                if err != nil {
1,414✔
NEW
2172
                        return rt, err
×
NEW
2173
                }
×
2174
                hops = append(hops, hop)
1,414✔
2175
        }
2176
        rt.Hops = hops
708✔
2177

708✔
2178
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
708✔
2179

708✔
2180
        return rt, nil
708✔
2181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc