• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lightningnetwork / lnd / 16948521526

13 Aug 2025 08:27PM UTC coverage: 54.877% (-12.1%) from 66.929%
16948521526

Pull #10155

github

web-flow
Merge 61c0fecf6 into c6a9116e3
Pull Request #10155: Add missing invoice index for native sql

108941 of 198518 relevant lines covered (54.88%)

22023.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.97
/channeldb/payments_kv_store.go
1
package channeldb
2

3
import (
4
        "bytes"
5
        "context"
6
        "encoding/binary"
7
        "errors"
8
        "fmt"
9
        "io"
10
        "sort"
11
        "sync"
12
        "time"
13

14
        "github.com/btcsuite/btcd/btcec/v2"
15
        "github.com/btcsuite/btcd/wire"
16
        "github.com/lightningnetwork/lnd/kvdb"
17
        "github.com/lightningnetwork/lnd/lntypes"
18
        "github.com/lightningnetwork/lnd/lnwire"
19
        paymentsdb "github.com/lightningnetwork/lnd/payments/db"
20
        "github.com/lightningnetwork/lnd/record"
21
        "github.com/lightningnetwork/lnd/routing/route"
22
        "github.com/lightningnetwork/lnd/tlv"
23
)
24

25
const (
26
        // paymentSeqBlockSize is the block size used when we batch allocate
27
        // payment sequences for future payments.
28
        paymentSeqBlockSize = 1000
29

30
        // paymentProgressLogInterval is the interval we use limiting the
31
        // logging output of payment processing.
32
        paymentProgressLogInterval = 30 * time.Second
33
)
34

35
//nolint:ll
36
var (
37
        // paymentsRootBucket is the name of the top-level bucket within the
38
        // database that stores all data related to payments. Within this
39
        // bucket, each payment hash its own sub-bucket keyed by its payment
40
        // hash.
41
        //
42
        // Bucket hierarchy:
43
        //
44
        // root-bucket
45
        //      |
46
        //      |-- <paymenthash>
47
        //      |        |--sequence-key: <sequence number>
48
        //      |        |--creation-info-key: <creation info>
49
        //      |        |--fail-info-key: <(optional) fail info>
50
        //      |        |
51
        //      |        |--payment-htlcs-bucket (shard-bucket)
52
        //      |        |        |
53
        //      |        |        |-- ai<htlc attempt ID>: <htlc attempt info>
54
        //      |        |        |-- si<htlc attempt ID>: <(optional) settle info>
55
        //      |        |        |-- fi<htlc attempt ID>: <(optional) fail info>
56
        //      |        |        |
57
        //      |        |       ...
58
        //      |        |
59
        //      |        |
60
        //      |        |--duplicate-bucket (only for old, completed payments)
61
        //      |                 |
62
        //      |                 |-- <seq-num>
63
        //      |                 |       |--sequence-key: <sequence number>
64
        //      |                 |       |--creation-info-key: <creation info>
65
        //      |                 |       |--ai: <attempt info>
66
        //      |                 |       |--si: <settle info>
67
        //      |                 |       |--fi: <fail info>
68
        //      |                 |
69
        //      |                 |-- <seq-num>
70
        //      |                 |       |
71
        //      |                ...     ...
72
        //      |
73
        //      |-- <paymenthash>
74
        //      |        |
75
        //      |       ...
76
        //     ...
77
        //
78
        paymentsRootBucket = []byte("payments-root-bucket")
79

80
        // paymentSequenceKey is a key used in the payment's sub-bucket to
81
        // store the sequence number of the payment.
82
        paymentSequenceKey = []byte("payment-sequence-key")
83

84
        // paymentCreationInfoKey is a key used in the payment's sub-bucket to
85
        // store the creation info of the payment.
86
        paymentCreationInfoKey = []byte("payment-creation-info")
87

88
        // paymentHtlcsBucket is a bucket where we'll store the information
89
        // about the HTLCs that were attempted for a payment.
90
        paymentHtlcsBucket = []byte("payment-htlcs-bucket")
91

92
        // htlcAttemptInfoKey is the key used as the prefix of an HTLC attempt
93
        // to store the info about the attempt that was done for the HTLC in
94
        // question. The HTLC attempt ID is concatenated at the end.
95
        htlcAttemptInfoKey = []byte("ai")
96

97
        // htlcSettleInfoKey is the key used as the prefix of an HTLC attempt
98
        // settle info, if any. The HTLC attempt ID is concatenated at the end.
99
        htlcSettleInfoKey = []byte("si")
100

101
        // htlcFailInfoKey is the key used as the prefix of an HTLC attempt
102
        // failure information, if any.The  HTLC attempt ID is concatenated at
103
        // the end.
104
        htlcFailInfoKey = []byte("fi")
105

106
        // paymentFailInfoKey is a key used in the payment's sub-bucket to
107
        // store information about the reason a payment failed.
108
        paymentFailInfoKey = []byte("payment-fail-info")
109

110
        // paymentsIndexBucket is the name of the top-level bucket within the
111
        // database that stores an index of payment sequence numbers to its
112
        // payment hash.
113
        // payments-sequence-index-bucket
114
        //         |--<sequence-number>: <payment hash>
115
        //         |--...
116
        //         |--<sequence-number>: <payment hash>
117
        paymentsIndexBucket = []byte("payments-index-bucket")
118
)
119

120
// KVPaymentsDB implements persistence for payments and payment attempts.
121
type KVPaymentsDB struct {
122
        // Sequence management for the kv store.
123
        seqMu     sync.Mutex
124
        currSeq   uint64
125
        storedSeq uint64
126

127
        // db is the underlying database implementation.
128
        db kvdb.Backend
129

130
        keepFailedPaymentAttempts bool
131
}
132

133
// defaultKVStoreOptions returns the default options for the KV store.
134
func defaultKVStoreOptions() *paymentsdb.StoreOptions {
42✔
135
        return &paymentsdb.StoreOptions{
42✔
136
                KeepFailedPaymentAttempts: false,
42✔
137
        }
42✔
138
}
42✔
139

140
// NewKVPaymentsDB creates a new KVStore for payments.
141
func NewKVPaymentsDB(db kvdb.Backend,
142
        options ...paymentsdb.OptionModifier) (*KVPaymentsDB, error) {
42✔
143

42✔
144
        opts := defaultKVStoreOptions()
42✔
145
        for _, applyOption := range options {
52✔
146
                applyOption(opts)
10✔
147
        }
10✔
148

149
        if !opts.NoMigration {
84✔
150
                if err := initKVStore(db); err != nil {
42✔
151
                        return nil, err
×
152
                }
×
153
        }
154

155
        return &KVPaymentsDB{
42✔
156
                db:                        db,
42✔
157
                keepFailedPaymentAttempts: opts.KeepFailedPaymentAttempts,
42✔
158
        }, nil
42✔
159
}
160

161
var paymentsTopLevelBuckets = [][]byte{
162
        paymentsRootBucket,
163
        paymentsIndexBucket,
164
}
165

166
// initKVStore creates and initializes the top-level buckets for the payment db.
167
func initKVStore(db kvdb.Backend) error {
42✔
168
        err := kvdb.Update(db, func(tx kvdb.RwTx) error {
84✔
169
                for _, tlb := range paymentsTopLevelBuckets {
126✔
170
                        if _, err := tx.CreateTopLevelBucket(tlb); err != nil {
84✔
171
                                return err
×
172
                        }
×
173
                }
174

175
                return nil
42✔
176
        }, func() {})
42✔
177
        if err != nil {
42✔
178
                return fmt.Errorf("unable to create new payments db: %w", err)
×
179
        }
×
180

181
        return nil
42✔
182
}
183

184
// InitPayment checks or records the given PaymentCreationInfo with the DB,
185
// making sure it does not already exist as an in-flight payment. When this
186
// method returns successfully, the payment is guaranteed to be in the InFlight
187
// state.
188
func (p *KVPaymentsDB) InitPayment(paymentHash lntypes.Hash,
189
        info *PaymentCreationInfo) error {
149✔
190

149✔
191
        // Obtain a new sequence number for this payment. This is used
149✔
192
        // to sort the payments in order of creation, and also acts as
149✔
193
        // a unique identifier for each payment.
149✔
194
        sequenceNum, err := p.nextPaymentSequence()
149✔
195
        if err != nil {
149✔
196
                return err
×
197
        }
×
198

199
        var b bytes.Buffer
149✔
200
        if err := serializePaymentCreationInfo(&b, info); err != nil {
149✔
201
                return err
×
202
        }
×
203
        infoBytes := b.Bytes()
149✔
204

149✔
205
        var updateErr error
149✔
206
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
298✔
207
                // Reset the update error, to avoid carrying over an error
149✔
208
                // from a previous execution of the batched db transaction.
149✔
209
                updateErr = nil
149✔
210

149✔
211
                prefetchPayment(tx, paymentHash)
149✔
212
                bucket, err := createPaymentBucket(tx, paymentHash)
149✔
213
                if err != nil {
149✔
214
                        return err
×
215
                }
×
216

217
                // Get the existing status of this payment, if any.
218
                paymentStatus, err := fetchPaymentStatus(bucket)
149✔
219

149✔
220
                switch {
149✔
221
                // If no error is returned, it means we already have this
222
                // payment. We'll check the status to decide whether we allow
223
                // retrying the payment or return a specific error.
224
                case err == nil:
5✔
225
                        if err := paymentStatus.initializable(); err != nil {
9✔
226
                                updateErr = err
4✔
227
                                return nil
4✔
228
                        }
4✔
229

230
                // Otherwise, if the error is not `ErrPaymentNotInitiated`,
231
                // we'll return the error.
232
                case !errors.Is(err, paymentsdb.ErrPaymentNotInitiated):
×
233
                        return err
×
234
                }
235

236
                // Before we set our new sequence number, we check whether this
237
                // payment has a previously set sequence number and remove its
238
                // index entry if it exists. This happens in the case where we
239
                // have a previously attempted payment which was left in a state
240
                // where we can retry.
241
                seqBytes := bucket.Get(paymentSequenceKey)
145✔
242
                if seqBytes != nil {
146✔
243
                        indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
1✔
244
                        if err := indexBucket.Delete(seqBytes); err != nil {
1✔
245
                                return err
×
246
                        }
×
247
                }
248

249
                // Once we have obtained a sequence number, we add an entry
250
                // to our index bucket which will map the sequence number to
251
                // our payment identifier.
252
                err = createPaymentIndexEntry(
145✔
253
                        tx, sequenceNum, info.PaymentIdentifier,
145✔
254
                )
145✔
255
                if err != nil {
145✔
256
                        return err
×
257
                }
×
258

259
                err = bucket.Put(paymentSequenceKey, sequenceNum)
145✔
260
                if err != nil {
145✔
261
                        return err
×
262
                }
×
263

264
                // Add the payment info to the bucket, which contains the
265
                // static information for this payment
266
                err = bucket.Put(paymentCreationInfoKey, infoBytes)
145✔
267
                if err != nil {
145✔
268
                        return err
×
269
                }
×
270

271
                // We'll delete any lingering HTLCs to start with, in case we
272
                // are initializing a payment that was attempted earlier, but
273
                // left in a state where we could retry.
274
                err = bucket.DeleteNestedBucket(paymentHtlcsBucket)
145✔
275
                if err != nil && !errors.Is(err, kvdb.ErrBucketNotFound) {
145✔
276
                        return err
×
277
                }
×
278

279
                // Also delete any lingering failure info now that we are
280
                // re-attempting.
281
                return bucket.Delete(paymentFailInfoKey)
145✔
282
        })
283
        if err != nil {
149✔
284
                return fmt.Errorf("unable to init payment: %w", err)
×
285
        }
×
286

287
        return updateErr
149✔
288
}
289

290
// DeleteFailedAttempts deletes all failed htlcs for a payment if configured
291
// by the KVPaymentsDB db.
292
func (p *KVPaymentsDB) DeleteFailedAttempts(hash lntypes.Hash) error {
8✔
293
        if !p.keepFailedPaymentAttempts {
12✔
294
                const failedHtlcsOnly = true
4✔
295
                err := p.DeletePayment(hash, failedHtlcsOnly)
4✔
296
                if err != nil {
6✔
297
                        return err
2✔
298
                }
2✔
299
        }
300

301
        return nil
6✔
302
}
303

304
// paymentIndexTypeHash is a payment index type which indicates that we have
305
// created an index of payment sequence number to payment hash.
306
type paymentIndexType uint8
307

308
// paymentIndexTypeHash is a payment index type which indicates that we have
309
// created an index of payment sequence number to payment hash.
310
const paymentIndexTypeHash paymentIndexType = 0
311

312
// createPaymentIndexEntry creates a payment hash typed index for a payment. The
313
// index produced contains a payment index type (which can be used in future to
314
// signal different payment index types) and the payment identifier.
315
func createPaymentIndexEntry(tx kvdb.RwTx, sequenceNumber []byte,
316
        id lntypes.Hash) error {
166✔
317

166✔
318
        var b bytes.Buffer
166✔
319
        if err := WriteElements(&b, paymentIndexTypeHash, id[:]); err != nil {
166✔
320
                return err
×
321
        }
×
322

323
        indexes := tx.ReadWriteBucket(paymentsIndexBucket)
166✔
324

166✔
325
        return indexes.Put(sequenceNumber, b.Bytes())
166✔
326
}
327

328
// deserializePaymentIndex deserializes a payment index entry. This function
329
// currently only supports deserialization of payment hash indexes, and will
330
// fail for other types.
331
func deserializePaymentIndex(r io.Reader) (lntypes.Hash, error) {
59✔
332
        var (
59✔
333
                indexType   paymentIndexType
59✔
334
                paymentHash []byte
59✔
335
        )
59✔
336

59✔
337
        if err := ReadElements(r, &indexType, &paymentHash); err != nil {
59✔
338
                return lntypes.Hash{}, err
×
339
        }
×
340

341
        // While we only have on payment index type, we do not need to use our
342
        // index type to deserialize the index. However, we sanity check that
343
        // this type is as expected, since we had to read it out anyway.
344
        if indexType != paymentIndexTypeHash {
59✔
345
                return lntypes.Hash{}, fmt.Errorf("unknown payment index "+
×
346
                        "type: %v", indexType)
×
347
        }
×
348

349
        hash, err := lntypes.MakeHash(paymentHash)
59✔
350
        if err != nil {
59✔
351
                return lntypes.Hash{}, err
×
352
        }
×
353

354
        return hash, nil
59✔
355
}
356

357
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
358
// DB.
359
func (p *KVPaymentsDB) RegisterAttempt(paymentHash lntypes.Hash,
360
        attempt *HTLCAttemptInfo) (*MPPayment, error) {
70✔
361

70✔
362
        // Serialize the information before opening the db transaction.
70✔
363
        var a bytes.Buffer
70✔
364
        err := serializeHTLCAttemptInfo(&a, attempt)
70✔
365
        if err != nil {
70✔
366
                return nil, err
×
367
        }
×
368
        htlcInfoBytes := a.Bytes()
70✔
369

70✔
370
        htlcIDBytes := make([]byte, 8)
70✔
371
        binary.BigEndian.PutUint64(htlcIDBytes, attempt.AttemptID)
70✔
372

70✔
373
        var payment *MPPayment
70✔
374
        err = kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
140✔
375
                prefetchPayment(tx, paymentHash)
70✔
376
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
70✔
377
                if err != nil {
70✔
378
                        return err
×
379
                }
×
380

381
                payment, err = fetchPayment(bucket)
70✔
382
                if err != nil {
70✔
383
                        return err
×
384
                }
×
385

386
                // Check if registering a new attempt is allowed.
387
                if err := payment.Registrable(); err != nil {
78✔
388
                        return err
8✔
389
                }
8✔
390

391
                // If the final hop has encrypted data, then we know this is a
392
                // blinded payment. In blinded payments, MPP records are not set
393
                // for split payments and the recipient is responsible for using
394
                // a consistent PathID across the various encrypted data
395
                // payloads that we received from them for this payment. All we
396
                // need to check is that the total amount field for each HTLC
397
                // in the split payment is correct.
398
                isBlinded := len(attempt.Route.FinalHop().EncryptedData) != 0
62✔
399

62✔
400
                // Make sure any existing shards match the new one with regards
62✔
401
                // to MPP options.
62✔
402
                mpp := attempt.Route.FinalHop().MPP
62✔
403

62✔
404
                // MPP records should not be set for attempts to blinded paths.
62✔
405
                if isBlinded && mpp != nil {
62✔
406
                        return paymentsdb.ErrMPPRecordInBlindedPayment
×
407
                }
×
408

409
                for _, h := range payment.InFlightHTLCs() {
90✔
410
                        hMpp := h.Route.FinalHop().MPP
28✔
411

28✔
412
                        // If this is a blinded payment, then no existing HTLCs
28✔
413
                        // should have MPP records.
28✔
414
                        if isBlinded && hMpp != nil {
28✔
415
                                return paymentsdb.ErrMPPRecordInBlindedPayment
×
416
                        }
×
417

418
                        // If this is a blinded payment, then we just need to
419
                        // check that the TotalAmtMsat field for this shard
420
                        // is equal to that of any other shard in the same
421
                        // payment.
422
                        if isBlinded {
28✔
423
                                if attempt.Route.FinalHop().TotalAmtMsat !=
×
424
                                        h.Route.FinalHop().TotalAmtMsat {
×
425

×
426
                                        //nolint:ll
×
427
                                        return paymentsdb.ErrBlindedPaymentTotalAmountMismatch
×
428
                                }
×
429

430
                                continue
×
431
                        }
432

433
                        switch {
28✔
434
                        // We tried to register a non-MPP attempt for a MPP
435
                        // payment.
436
                        case mpp == nil && hMpp != nil:
1✔
437
                                return paymentsdb.ErrMPPayment
1✔
438

439
                        // We tried to register a MPP shard for a non-MPP
440
                        // payment.
441
                        case mpp != nil && hMpp == nil:
1✔
442
                                return paymentsdb.ErrNonMPPayment
1✔
443

444
                        // Non-MPP payment, nothing more to validate.
445
                        case mpp == nil:
×
446
                                continue
×
447
                        }
448

449
                        // Check that MPP options match.
450
                        if mpp.PaymentAddr() != hMpp.PaymentAddr() {
27✔
451
                                return paymentsdb.ErrMPPPaymentAddrMismatch
1✔
452
                        }
1✔
453

454
                        if mpp.TotalMsat() != hMpp.TotalMsat() {
26✔
455
                                return paymentsdb.ErrMPPTotalAmountMismatch
1✔
456
                        }
1✔
457
                }
458

459
                // If this is a non-MPP attempt, it must match the total amount
460
                // exactly. Note that a blinded payment is considered an MPP
461
                // attempt.
462
                amt := attempt.Route.ReceiverAmt()
58✔
463
                if !isBlinded && mpp == nil && amt != payment.Info.Value {
58✔
464
                        return paymentsdb.ErrValueMismatch
×
465
                }
×
466

467
                // Ensure we aren't sending more than the total payment amount.
468
                sentAmt, _ := payment.SentAmt()
58✔
469
                if sentAmt+amt > payment.Info.Value {
62✔
470
                        return fmt.Errorf("%w: attempted=%v, payment amount="+
4✔
471
                                "%v", paymentsdb.ErrValueExceedsAmt,
4✔
472
                                sentAmt+amt, payment.Info.Value)
4✔
473
                }
4✔
474

475
                htlcsBucket, err := bucket.CreateBucketIfNotExists(
54✔
476
                        paymentHtlcsBucket,
54✔
477
                )
54✔
478
                if err != nil {
54✔
479
                        return err
×
480
                }
×
481

482
                err = htlcsBucket.Put(
54✔
483
                        htlcBucketKey(htlcAttemptInfoKey, htlcIDBytes),
54✔
484
                        htlcInfoBytes,
54✔
485
                )
54✔
486
                if err != nil {
54✔
487
                        return err
×
488
                }
×
489

490
                // Retrieve attempt info for the notification.
491
                payment, err = fetchPayment(bucket)
54✔
492

54✔
493
                return err
54✔
494
        })
495
        if err != nil {
86✔
496
                return nil, err
16✔
497
        }
16✔
498

499
        return payment, err
54✔
500
}
501

502
// SettleAttempt marks the given attempt settled with the preimage. If this is
503
// a multi shard payment, this might implicitly mean that the full payment
504
// succeeded.
505
//
506
// After invoking this method, InitPayment should always return an error to
507
// prevent us from making duplicate payments to the same payment hash. The
508
// provided preimage is atomically saved to the DB for record keeping.
509
func (p *KVPaymentsDB) SettleAttempt(hash lntypes.Hash,
510
        attemptID uint64, settleInfo *HTLCSettleInfo) (*MPPayment, error) {
16✔
511

16✔
512
        var b bytes.Buffer
16✔
513
        if err := serializeHTLCSettleInfo(&b, settleInfo); err != nil {
16✔
514
                return nil, err
×
515
        }
×
516
        settleBytes := b.Bytes()
16✔
517

16✔
518
        return p.updateHtlcKey(hash, attemptID, htlcSettleInfoKey, settleBytes)
16✔
519
}
520

521
// FailAttempt marks the given payment attempt failed.
522
func (p *KVPaymentsDB) FailAttempt(hash lntypes.Hash,
523
        attemptID uint64, failInfo *HTLCFailInfo) (*MPPayment, error) {
31✔
524

31✔
525
        var b bytes.Buffer
31✔
526
        if err := serializeHTLCFailInfo(&b, failInfo); err != nil {
31✔
527
                return nil, err
×
528
        }
×
529
        failBytes := b.Bytes()
31✔
530

31✔
531
        return p.updateHtlcKey(hash, attemptID, htlcFailInfoKey, failBytes)
31✔
532
}
533

534
// updateHtlcKey updates a database key for the specified htlc.
535
func (p *KVPaymentsDB) updateHtlcKey(paymentHash lntypes.Hash,
536
        attemptID uint64, key, value []byte) (*MPPayment, error) {
47✔
537

47✔
538
        aid := make([]byte, 8)
47✔
539
        binary.BigEndian.PutUint64(aid, attemptID)
47✔
540

47✔
541
        var payment *MPPayment
47✔
542
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
94✔
543
                payment = nil
47✔
544

47✔
545
                prefetchPayment(tx, paymentHash)
47✔
546
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
47✔
547
                if err != nil {
48✔
548
                        return err
1✔
549
                }
1✔
550

551
                p, err := fetchPayment(bucket)
46✔
552
                if err != nil {
46✔
553
                        return err
×
554
                }
×
555

556
                // We can only update keys of in-flight payments. We allow
557
                // updating keys even if the payment has reached a terminal
558
                // condition, since the HTLC outcomes must still be updated.
559
                if err := p.Status.updatable(); err != nil {
46✔
560
                        return err
×
561
                }
×
562

563
                htlcsBucket := bucket.NestedReadWriteBucket(paymentHtlcsBucket)
46✔
564
                if htlcsBucket == nil {
46✔
565
                        return fmt.Errorf("htlcs bucket not found")
×
566
                }
×
567

568
                attemptKey := htlcBucketKey(htlcAttemptInfoKey, aid)
46✔
569
                if htlcsBucket.Get(attemptKey) == nil {
46✔
570
                        return fmt.Errorf("HTLC with ID %v not registered",
×
571
                                attemptID)
×
572
                }
×
573

574
                // Make sure the shard is not already failed or settled.
575
                failKey := htlcBucketKey(htlcFailInfoKey, aid)
46✔
576
                if htlcsBucket.Get(failKey) != nil {
46✔
577
                        return paymentsdb.ErrAttemptAlreadyFailed
×
578
                }
×
579

580
                settleKey := htlcBucketKey(htlcSettleInfoKey, aid)
46✔
581
                if htlcsBucket.Get(settleKey) != nil {
46✔
582
                        return paymentsdb.ErrAttemptAlreadySettled
×
583
                }
×
584

585
                // Add or update the key for this htlc.
586
                err = htlcsBucket.Put(htlcBucketKey(key, aid), value)
46✔
587
                if err != nil {
46✔
588
                        return err
×
589
                }
×
590

591
                // Retrieve attempt info for the notification.
592
                payment, err = fetchPayment(bucket)
46✔
593

46✔
594
                return err
46✔
595
        })
596
        if err != nil {
48✔
597
                return nil, err
1✔
598
        }
1✔
599

600
        return payment, err
46✔
601
}
602

603
// Fail transitions a payment into the Failed state, and records the reason the
604
// payment failed. After invoking this method, InitPayment should return nil on
605
// its next call for this payment hash, allowing the switch to make a
606
// subsequent payment.
607
func (p *KVPaymentsDB) Fail(paymentHash lntypes.Hash,
608
        reason FailureReason) (*MPPayment, error) {
16✔
609

16✔
610
        var (
16✔
611
                updateErr error
16✔
612
                payment   *MPPayment
16✔
613
        )
16✔
614
        err := kvdb.Batch(p.db, func(tx kvdb.RwTx) error {
32✔
615
                // Reset the update error, to avoid carrying over an error
16✔
616
                // from a previous execution of the batched db transaction.
16✔
617
                updateErr = nil
16✔
618
                payment = nil
16✔
619

16✔
620
                prefetchPayment(tx, paymentHash)
16✔
621
                bucket, err := fetchPaymentBucketUpdate(tx, paymentHash)
16✔
622
                if errors.Is(err, paymentsdb.ErrPaymentNotInitiated) {
17✔
623
                        updateErr = paymentsdb.ErrPaymentNotInitiated
1✔
624
                        return nil
1✔
625
                } else if err != nil {
16✔
626
                        return err
×
627
                }
×
628

629
                // We mark the payment as failed as long as it is known. This
630
                // lets the last attempt to fail with a terminal write its
631
                // failure to the KVPaymentsDB without synchronizing with
632
                // other attempts.
633
                _, err = fetchPaymentStatus(bucket)
15✔
634
                if errors.Is(err, paymentsdb.ErrPaymentNotInitiated) {
15✔
635
                        updateErr = paymentsdb.ErrPaymentNotInitiated
×
636
                        return nil
×
637
                } else if err != nil {
15✔
638
                        return err
×
639
                }
×
640

641
                // Put the failure reason in the bucket for record keeping.
642
                v := []byte{byte(reason)}
15✔
643
                err = bucket.Put(paymentFailInfoKey, v)
15✔
644
                if err != nil {
15✔
645
                        return err
×
646
                }
×
647

648
                // Retrieve attempt info for the notification, if available.
649
                payment, err = fetchPayment(bucket)
15✔
650
                if err != nil {
15✔
651
                        return err
×
652
                }
×
653

654
                return nil
15✔
655
        })
656
        if err != nil {
16✔
657
                return nil, err
×
658
        }
×
659

660
        return payment, updateErr
16✔
661
}
662

663
// FetchPayment returns information about a payment from the database.
664
func (p *KVPaymentsDB) FetchPayment(paymentHash lntypes.Hash) (
665
        *MPPayment, error) {
151✔
666

151✔
667
        var payment *MPPayment
151✔
668
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
302✔
669
                prefetchPayment(tx, paymentHash)
151✔
670
                bucket, err := fetchPaymentBucket(tx, paymentHash)
151✔
671
                if err != nil {
152✔
672
                        return err
1✔
673
                }
1✔
674

675
                payment, err = fetchPayment(bucket)
150✔
676

150✔
677
                return err
150✔
678
        }, func() {
151✔
679
                payment = nil
151✔
680
        })
151✔
681
        if err != nil {
152✔
682
                return nil, err
1✔
683
        }
1✔
684

685
        return payment, nil
150✔
686
}
687

688
// prefetchPayment attempts to prefetch as much of the payment as possible to
689
// reduce DB roundtrips.
690
func prefetchPayment(tx kvdb.RTx, paymentHash lntypes.Hash) {
433✔
691
        rb := kvdb.RootBucket(tx)
433✔
692
        kvdb.Prefetch(
433✔
693
                rb,
433✔
694
                []string{
433✔
695
                        // Prefetch all keys in the payment's bucket.
433✔
696
                        string(paymentsRootBucket),
433✔
697
                        string(paymentHash[:]),
433✔
698
                },
433✔
699
                []string{
433✔
700
                        // Prefetch all keys in the payment's htlc bucket.
433✔
701
                        string(paymentsRootBucket),
433✔
702
                        string(paymentHash[:]),
433✔
703
                        string(paymentHtlcsBucket),
433✔
704
                },
433✔
705
        )
433✔
706
}
433✔
707

708
// createPaymentBucket creates or fetches the sub-bucket assigned to this
709
// payment hash.
710
func createPaymentBucket(tx kvdb.RwTx, paymentHash lntypes.Hash) (
711
        kvdb.RwBucket, error) {
149✔
712

149✔
713
        payments, err := tx.CreateTopLevelBucket(paymentsRootBucket)
149✔
714
        if err != nil {
149✔
715
                return nil, err
×
716
        }
×
717

718
        return payments.CreateBucketIfNotExists(paymentHash[:])
149✔
719
}
720

721
// fetchPaymentBucket fetches the sub-bucket assigned to this payment hash. If
722
// the bucket does not exist, it returns ErrPaymentNotInitiated.
723
func fetchPaymentBucket(tx kvdb.RTx, paymentHash lntypes.Hash) (
724
        kvdb.RBucket, error) {
209✔
725

209✔
726
        payments := tx.ReadBucket(paymentsRootBucket)
209✔
727
        if payments == nil {
209✔
728
                return nil, paymentsdb.ErrPaymentNotInitiated
×
729
        }
×
730

731
        bucket := payments.NestedReadBucket(paymentHash[:])
209✔
732
        if bucket == nil {
210✔
733
                return nil, paymentsdb.ErrPaymentNotInitiated
1✔
734
        }
1✔
735

736
        return bucket, nil
208✔
737
}
738

739
// fetchPaymentBucketUpdate is identical to fetchPaymentBucket, but it returns a
740
// bucket that can be written to.
741
func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
742
        kvdb.RwBucket, error) {
154✔
743

154✔
744
        payments := tx.ReadWriteBucket(paymentsRootBucket)
154✔
745
        if payments == nil {
154✔
746
                return nil, paymentsdb.ErrPaymentNotInitiated
×
747
        }
×
748

749
        bucket := payments.NestedReadWriteBucket(paymentHash[:])
154✔
750
        if bucket == nil {
156✔
751
                return nil, paymentsdb.ErrPaymentNotInitiated
2✔
752
        }
2✔
753

754
        return bucket, nil
152✔
755
}
756

757
// nextPaymentSequence returns the next sequence number to store for a new
758
// payment.
759
func (p *KVPaymentsDB) nextPaymentSequence() ([]byte, error) {
149✔
760
        p.seqMu.Lock()
149✔
761
        defer p.seqMu.Unlock()
149✔
762

149✔
763
        // Set a new upper bound in the DB every 1000 payments to avoid
149✔
764
        // conflicts on the sequence when using etcd.
149✔
765
        if p.currSeq == p.storedSeq {
188✔
766
                var currPaymentSeq, newUpperBound uint64
39✔
767
                if err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
78✔
768
                        paymentsBucket, err := tx.CreateTopLevelBucket(
39✔
769
                                paymentsRootBucket,
39✔
770
                        )
39✔
771
                        if err != nil {
39✔
772
                                return err
×
773
                        }
×
774

775
                        currPaymentSeq = paymentsBucket.Sequence()
39✔
776
                        newUpperBound = currPaymentSeq + paymentSeqBlockSize
39✔
777

39✔
778
                        return paymentsBucket.SetSequence(newUpperBound)
39✔
779
                }, func() {}); err != nil {
39✔
780
                        return nil, err
×
781
                }
×
782

783
                // We lazy initialize the cached currPaymentSeq here using the
784
                // first nextPaymentSequence() call. This if statement will auto
785
                // initialize our stored currPaymentSeq, since by default both
786
                // this variable and storedPaymentSeq are zero which in turn
787
                // will have us fetch the current values from the DB.
788
                if p.currSeq == 0 {
78✔
789
                        p.currSeq = currPaymentSeq
39✔
790
                }
39✔
791

792
                p.storedSeq = newUpperBound
39✔
793
        }
794

795
        p.currSeq++
149✔
796
        b := make([]byte, 8)
149✔
797
        binary.BigEndian.PutUint64(b, p.currSeq)
149✔
798

149✔
799
        return b, nil
149✔
800
}
801

802
// fetchPaymentStatus fetches the payment status of the payment. If the payment
803
// isn't found, it will return error `ErrPaymentNotInitiated`.
804
func fetchPaymentStatus(bucket kvdb.RBucket) (PaymentStatus, error) {
192✔
805
        // Creation info should be set for all payments, regardless of state.
192✔
806
        // If not, it is unknown.
192✔
807
        if bucket.Get(paymentCreationInfoKey) == nil {
336✔
808
                return 0, paymentsdb.ErrPaymentNotInitiated
144✔
809
        }
144✔
810

811
        payment, err := fetchPayment(bucket)
48✔
812
        if err != nil {
48✔
813
                return 0, err
×
814
        }
×
815

816
        return payment.Status, nil
48✔
817
}
818

819
// FetchInFlightPayments returns all payments with status InFlight.
820
func (p *KVPaymentsDB) FetchInFlightPayments() ([]*MPPayment, error) {
4✔
821
        var (
4✔
822
                inFlights      []*MPPayment
4✔
823
                start          = time.Now()
4✔
824
                lastLogTime    = time.Now()
4✔
825
                processedCount int
4✔
826
        )
4✔
827

4✔
828
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
8✔
829
                payments := tx.ReadBucket(paymentsRootBucket)
4✔
830
                if payments == nil {
4✔
831
                        return nil
×
832
                }
×
833

834
                return payments.ForEach(func(k, _ []byte) error {
6✔
835
                        bucket := payments.NestedReadBucket(k)
2✔
836
                        if bucket == nil {
2✔
837
                                return fmt.Errorf("non bucket element")
×
838
                        }
×
839

840
                        p, err := fetchPayment(bucket)
2✔
841
                        if err != nil {
2✔
842
                                return err
×
843
                        }
×
844

845
                        processedCount++
2✔
846
                        if time.Since(lastLogTime) >=
2✔
847
                                paymentProgressLogInterval {
2✔
848

×
849
                                log.Debugf("Scanning inflight payments "+
×
850
                                        "(in progress), processed %d, last "+
×
851
                                        "processed payment: %v", processedCount,
×
852
                                        p.Info)
×
853

×
854
                                lastLogTime = time.Now()
×
855
                        }
×
856

857
                        // Skip the payment if it's terminated.
858
                        if p.Terminated() {
2✔
859
                                return nil
×
860
                        }
×
861

862
                        inFlights = append(inFlights, p)
2✔
863

2✔
864
                        return nil
2✔
865
                })
866
        }, func() {
4✔
867
                inFlights = nil
4✔
868
        })
4✔
869
        if err != nil {
4✔
870
                return nil, err
×
871
        }
×
872

873
        elapsed := time.Since(start)
4✔
874
        log.Debugf("Completed scanning for inflight payments: "+
4✔
875
                "total_processed=%d, found_inflight=%d, elapsed=%v",
4✔
876
                processedCount, len(inFlights),
4✔
877
                elapsed.Round(time.Millisecond))
4✔
878

4✔
879
        return inFlights, nil
4✔
880
}
881

882
// htlcBucketKey creates a composite key from prefix and id where the result is
883
// simply the two concatenated.
884
func htlcBucketKey(prefix, id []byte) []byte {
265✔
885
        key := make([]byte, len(prefix)+len(id))
265✔
886
        copy(key, prefix)
265✔
887
        copy(key[len(prefix):], id)
265✔
888

265✔
889
        return key
265✔
890
}
265✔
891

892
// FetchPayments returns all sent payments found in the DB.
893
func (p *KVPaymentsDB) FetchPayments() ([]*MPPayment, error) {
41✔
894
        var payments []*MPPayment
41✔
895

41✔
896
        err := kvdb.View(p.db, func(tx kvdb.RTx) error {
82✔
897
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
41✔
898
                if paymentsBucket == nil {
41✔
899
                        return nil
×
900
                }
×
901

902
                return paymentsBucket.ForEach(func(k, v []byte) error {
189✔
903
                        bucket := paymentsBucket.NestedReadBucket(k)
148✔
904
                        if bucket == nil {
148✔
905
                                // We only expect sub-buckets to be found in
×
906
                                // this top-level bucket.
×
907
                                return fmt.Errorf("non bucket element in " +
×
908
                                        "payments bucket")
×
909
                        }
×
910

911
                        p, err := fetchPayment(bucket)
148✔
912
                        if err != nil {
148✔
913
                                return err
×
914
                        }
×
915

916
                        payments = append(payments, p)
148✔
917

148✔
918
                        // For older versions of lnd, duplicate payments to a
148✔
919
                        // payment has was possible. These will be found in a
148✔
920
                        // sub-bucket indexed by their sequence number if
148✔
921
                        // available.
148✔
922
                        duplicatePayments, err := fetchDuplicatePayments(bucket)
148✔
923
                        if err != nil {
148✔
924
                                return err
×
925
                        }
×
926

927
                        payments = append(payments, duplicatePayments...)
148✔
928

148✔
929
                        return nil
148✔
930
                })
931
        }, func() {
41✔
932
                payments = nil
41✔
933
        })
41✔
934
        if err != nil {
41✔
935
                return nil, err
×
936
        }
×
937

938
        // Before returning, sort the payments by their sequence number.
939
        sort.Slice(payments, func(i, j int) bool {
276✔
940
                return payments[i].SequenceNum < payments[j].SequenceNum
235✔
941
        })
235✔
942

943
        return payments, nil
41✔
944
}
945

946
func fetchCreationInfo(bucket kvdb.RBucket) (*PaymentCreationInfo, error) {
624✔
947
        b := bucket.Get(paymentCreationInfoKey)
624✔
948
        if b == nil {
624✔
949
                return nil, fmt.Errorf("creation info not found")
×
950
        }
×
951

952
        r := bytes.NewReader(b)
624✔
953

624✔
954
        return deserializePaymentCreationInfo(r)
624✔
955
}
956

957
func fetchPayment(bucket kvdb.RBucket) (*MPPayment, error) {
624✔
958
        seqBytes := bucket.Get(paymentSequenceKey)
624✔
959
        if seqBytes == nil {
624✔
960
                return nil, fmt.Errorf("sequence number not found")
×
961
        }
×
962

963
        sequenceNum := binary.BigEndian.Uint64(seqBytes)
624✔
964

624✔
965
        // Get the PaymentCreationInfo.
624✔
966
        creationInfo, err := fetchCreationInfo(bucket)
624✔
967
        if err != nil {
624✔
968
                return nil, err
×
969
        }
×
970

971
        var htlcs []HTLCAttempt
624✔
972
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
624✔
973
        if htlcsBucket != nil {
994✔
974
                // Get the payment attempts. This can be empty.
370✔
975
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
370✔
976
                if err != nil {
370✔
977
                        return nil, err
×
978
                }
×
979
        }
980

981
        // Get failure reason if available.
982
        var failureReason *FailureReason
624✔
983
        b := bucket.Get(paymentFailInfoKey)
624✔
984
        if b != nil {
692✔
985
                reason := FailureReason(b[0])
68✔
986
                failureReason = &reason
68✔
987
        }
68✔
988

989
        // Create a new payment.
990
        payment := &MPPayment{
624✔
991
                SequenceNum:   sequenceNum,
624✔
992
                Info:          creationInfo,
624✔
993
                HTLCs:         htlcs,
624✔
994
                FailureReason: failureReason,
624✔
995
        }
624✔
996

624✔
997
        // Set its state and status.
624✔
998
        if err := payment.setState(); err != nil {
624✔
999
                return nil, err
×
1000
        }
×
1001

1002
        return payment, nil
624✔
1003
}
1004

1005
// fetchHtlcAttempts retrieves all htlc attempts made for the payment found in
1006
// the given bucket.
1007
func fetchHtlcAttempts(bucket kvdb.RBucket) ([]HTLCAttempt, error) {
377✔
1008
        htlcsMap := make(map[uint64]*HTLCAttempt)
377✔
1009

377✔
1010
        attemptInfoCount := 0
377✔
1011
        err := bucket.ForEach(func(k, v []byte) error {
1,412✔
1012
                aid := byteOrder.Uint64(k[len(k)-8:])
1,035✔
1013

1,035✔
1014
                if _, ok := htlcsMap[aid]; !ok {
1,696✔
1015
                        htlcsMap[aid] = &HTLCAttempt{}
661✔
1016
                }
661✔
1017

1018
                var err error
1,035✔
1019
                switch {
1,035✔
1020
                case bytes.HasPrefix(k, htlcAttemptInfoKey):
661✔
1021
                        attemptInfo, err := readHtlcAttemptInfo(v)
661✔
1022
                        if err != nil {
661✔
1023
                                return err
×
1024
                        }
×
1025

1026
                        attemptInfo.AttemptID = aid
661✔
1027
                        htlcsMap[aid].HTLCAttemptInfo = *attemptInfo
661✔
1028
                        attemptInfoCount++
661✔
1029

1030
                case bytes.HasPrefix(k, htlcSettleInfoKey):
83✔
1031
                        htlcsMap[aid].Settle, err = readHtlcSettleInfo(v)
83✔
1032
                        if err != nil {
83✔
1033
                                return err
×
1034
                        }
×
1035

1036
                case bytes.HasPrefix(k, htlcFailInfoKey):
291✔
1037
                        htlcsMap[aid].Failure, err = readHtlcFailInfo(v)
291✔
1038
                        if err != nil {
291✔
1039
                                return err
×
1040
                        }
×
1041

1042
                default:
×
1043
                        return fmt.Errorf("unknown htlc attempt key")
×
1044
                }
1045

1046
                return nil
1,035✔
1047
        })
1048
        if err != nil {
377✔
1049
                return nil, err
×
1050
        }
×
1051

1052
        // Sanity check that all htlcs have an attempt info.
1053
        if attemptInfoCount != len(htlcsMap) {
377✔
1054
                return nil, paymentsdb.ErrNoAttemptInfo
×
1055
        }
×
1056

1057
        keys := make([]uint64, len(htlcsMap))
377✔
1058
        i := 0
377✔
1059
        for k := range htlcsMap {
1,038✔
1060
                keys[i] = k
661✔
1061
                i++
661✔
1062
        }
661✔
1063

1064
        // Sort HTLC attempts by their attempt ID. This is needed because in the
1065
        // DB we store the attempts with keys prefixed by their status which
1066
        // changes order (groups them together by status).
1067
        sort.Slice(keys, func(i, j int) bool {
689✔
1068
                return keys[i] < keys[j]
312✔
1069
        })
312✔
1070

1071
        htlcs := make([]HTLCAttempt, len(htlcsMap))
377✔
1072
        for i, key := range keys {
1,038✔
1073
                htlcs[i] = *htlcsMap[key]
661✔
1074
        }
661✔
1075

1076
        return htlcs, nil
377✔
1077
}
1078

1079
// readHtlcAttemptInfo reads the payment attempt info for this htlc.
1080
func readHtlcAttemptInfo(b []byte) (*HTLCAttemptInfo, error) {
661✔
1081
        r := bytes.NewReader(b)
661✔
1082
        return deserializeHTLCAttemptInfo(r)
661✔
1083
}
661✔
1084

1085
// readHtlcSettleInfo reads the settle info for the htlc. If the htlc isn't
1086
// settled, nil is returned.
1087
func readHtlcSettleInfo(b []byte) (*HTLCSettleInfo, error) {
83✔
1088
        r := bytes.NewReader(b)
83✔
1089
        return deserializeHTLCSettleInfo(r)
83✔
1090
}
83✔
1091

1092
// readHtlcFailInfo reads the failure info for the htlc. If the htlc hasn't
1093
// failed, nil is returned.
1094
func readHtlcFailInfo(b []byte) (*HTLCFailInfo, error) {
291✔
1095
        r := bytes.NewReader(b)
291✔
1096
        return deserializeHTLCFailInfo(r)
291✔
1097
}
291✔
1098

1099
// fetchFailedHtlcKeys retrieves the bucket keys of all failed HTLCs of a
1100
// payment bucket.
1101
func fetchFailedHtlcKeys(bucket kvdb.RBucket) ([][]byte, error) {
7✔
1102
        htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket)
7✔
1103

7✔
1104
        var htlcs []HTLCAttempt
7✔
1105
        var err error
7✔
1106
        if htlcsBucket != nil {
14✔
1107
                htlcs, err = fetchHtlcAttempts(htlcsBucket)
7✔
1108
                if err != nil {
7✔
1109
                        return nil, err
×
1110
                }
×
1111
        }
1112

1113
        // Now iterate though them and save the bucket keys for the failed
1114
        // HTLCs.
1115
        var htlcKeys [][]byte
7✔
1116
        for _, h := range htlcs {
19✔
1117
                if h.Failure == nil {
15✔
1118
                        continue
3✔
1119
                }
1120

1121
                htlcKeyBytes := make([]byte, 8)
9✔
1122
                binary.BigEndian.PutUint64(htlcKeyBytes, h.AttemptID)
9✔
1123

9✔
1124
                htlcKeys = append(htlcKeys, htlcKeyBytes)
9✔
1125
        }
1126

1127
        return htlcKeys, nil
7✔
1128
}
1129

1130
// QueryPayments is a query to the payments database which is restricted
1131
// to a subset of payments by the payments query, containing an offset
1132
// index and a maximum number of returned payments.
1133
func (p *KVPaymentsDB) QueryPayments(_ context.Context,
1134
        query PaymentsQuery) (PaymentsResponse, error) {
36✔
1135

36✔
1136
        var resp PaymentsResponse
36✔
1137

36✔
1138
        if err := kvdb.View(p.db, func(tx kvdb.RTx) error {
72✔
1139
                // Get the root payments bucket.
36✔
1140
                paymentsBucket := tx.ReadBucket(paymentsRootBucket)
36✔
1141
                if paymentsBucket == nil {
36✔
1142
                        return nil
×
1143
                }
×
1144

1145
                // Get the index bucket which maps sequence number -> payment
1146
                // hash and duplicate bool. If we have a payments bucket, we
1147
                // should have an indexes bucket as well.
1148
                indexes := tx.ReadBucket(paymentsIndexBucket)
36✔
1149
                if indexes == nil {
36✔
1150
                        return fmt.Errorf("index bucket does not exist")
×
1151
                }
×
1152

1153
                // accumulatePayments gets payments with the sequence number
1154
                // and hash provided and adds them to our list of payments if
1155
                // they meet the criteria of our query. It returns the number
1156
                // of payments that were added.
1157
                accumulatePayments := func(sequenceKey, hash []byte) (bool,
36✔
1158
                        error) {
88✔
1159

52✔
1160
                        r := bytes.NewReader(hash)
52✔
1161
                        paymentHash, err := deserializePaymentIndex(r)
52✔
1162
                        if err != nil {
52✔
1163
                                return false, err
×
1164
                        }
×
1165

1166
                        payment, err := fetchPaymentWithSequenceNumber(
52✔
1167
                                tx, paymentHash, sequenceKey,
52✔
1168
                        )
52✔
1169
                        if err != nil {
52✔
1170
                                return false, err
×
1171
                        }
×
1172

1173
                        // To keep compatibility with the old API, we only
1174
                        // return non-succeeded payments if requested.
1175
                        if payment.Status != StatusSucceeded &&
52✔
1176
                                !query.IncludeIncomplete {
57✔
1177

5✔
1178
                                return false, err
5✔
1179
                        }
5✔
1180

1181
                        // Get the creation time in Unix seconds, this always
1182
                        // rounds down the nanoseconds to full seconds.
1183
                        createTime := payment.Info.CreationTime.Unix()
47✔
1184

47✔
1185
                        // Skip any payments that were created before the
47✔
1186
                        // specified time.
47✔
1187
                        if createTime < query.CreationDateStart {
56✔
1188
                                return false, nil
9✔
1189
                        }
9✔
1190

1191
                        // Skip any payments that were created after the
1192
                        // specified time.
1193
                        if query.CreationDateEnd != 0 &&
38✔
1194
                                createTime > query.CreationDateEnd {
40✔
1195

2✔
1196
                                return false, nil
2✔
1197
                        }
2✔
1198

1199
                        // At this point, we've exhausted the offset, so we'll
1200
                        // begin collecting invoices found within the range.
1201
                        resp.Payments = append(resp.Payments, payment)
36✔
1202

36✔
1203
                        return true, nil
36✔
1204
                }
1205

1206
                // Create a paginator which reads from our sequence index bucket
1207
                // with the parameters provided by the payments query.
1208
                paginator := newPaginator(
36✔
1209
                        indexes.ReadCursor(), query.Reversed, query.IndexOffset,
36✔
1210
                        query.MaxPayments,
36✔
1211
                )
36✔
1212

36✔
1213
                // Run a paginated query, adding payments to our response.
36✔
1214
                if err := paginator.query(accumulatePayments); err != nil {
36✔
1215
                        return err
×
1216
                }
×
1217

1218
                // Counting the total number of payments is expensive, since we
1219
                // literally have to traverse the cursor linearly, which can
1220
                // take quite a while. So it's an optional query parameter.
1221
                if query.CountTotal {
36✔
1222
                        var (
×
1223
                                totalPayments uint64
×
1224
                                err           error
×
1225
                        )
×
1226
                        countFn := func(_, _ []byte) error {
×
1227
                                totalPayments++
×
1228

×
1229
                                return nil
×
1230
                        }
×
1231

1232
                        // In non-boltdb database backends, there's a faster
1233
                        // ForAll query that allows for batch fetching items.
1234
                        fastBucket, ok := indexes.(kvdb.ExtendedRBucket)
×
1235
                        if ok {
×
1236
                                err = fastBucket.ForAll(countFn)
×
1237
                        } else {
×
1238
                                err = indexes.ForEach(countFn)
×
1239
                        }
×
1240
                        if err != nil {
×
1241
                                return fmt.Errorf("error counting payments: %w",
×
1242
                                        err)
×
1243
                        }
×
1244

1245
                        resp.TotalCount = totalPayments
×
1246
                }
1247

1248
                return nil
36✔
1249
        }, func() {
36✔
1250
                resp = PaymentsResponse{}
36✔
1251
        }); err != nil {
36✔
1252
                return resp, err
×
1253
        }
×
1254

1255
        // Need to swap the payments slice order if reversed order.
1256
        if query.Reversed {
52✔
1257
                for l, r := 0, len(resp.Payments)-1; l < r; l, r = l+1, r-1 {
24✔
1258
                        resp.Payments[l], resp.Payments[r] =
8✔
1259
                                resp.Payments[r], resp.Payments[l]
8✔
1260
                }
8✔
1261
        }
1262

1263
        // Set the first and last index of the returned payments so that the
1264
        // caller can resume from this point later on.
1265
        if len(resp.Payments) > 0 {
52✔
1266
                resp.FirstIndexOffset = resp.Payments[0].SequenceNum
16✔
1267
                resp.LastIndexOffset =
16✔
1268
                        resp.Payments[len(resp.Payments)-1].SequenceNum
16✔
1269
        }
16✔
1270

1271
        return resp, nil
36✔
1272
}
1273

1274
// fetchPaymentWithSequenceNumber get the payment which matches the payment hash
1275
// *and* sequence number provided from the database. This is required because
1276
// we previously had more than one payment per hash, so we have multiple indexes
1277
// pointing to a single payment; we want to retrieve the correct one.
1278
func fetchPaymentWithSequenceNumber(tx kvdb.RTx, paymentHash lntypes.Hash,
1279
        sequenceNumber []byte) (*MPPayment, error) {
58✔
1280

58✔
1281
        // We can now lookup the payment keyed by its hash in
58✔
1282
        // the payments root bucket.
58✔
1283
        bucket, err := fetchPaymentBucket(tx, paymentHash)
58✔
1284
        if err != nil {
58✔
1285
                return nil, err
×
1286
        }
×
1287

1288
        // A single payment hash can have multiple payments associated with it.
1289
        // We lookup our sequence number first, to determine whether this is
1290
        // the payment we are actually looking for.
1291
        seqBytes := bucket.Get(paymentSequenceKey)
58✔
1292
        if seqBytes == nil {
58✔
1293
                return nil, paymentsdb.ErrNoSequenceNumber
×
1294
        }
×
1295

1296
        // If this top level payment has the sequence number we are looking for,
1297
        // return it.
1298
        if bytes.Equal(seqBytes, sequenceNumber) {
103✔
1299
                return fetchPayment(bucket)
45✔
1300
        }
45✔
1301

1302
        // If we were not looking for the top level payment, we are looking for
1303
        // one of our duplicate payments. We need to iterate through the seq
1304
        // numbers in this bucket to find the correct payments. If we do not
1305
        // find a duplicate payments bucket here, something is wrong.
1306
        dup := bucket.NestedReadBucket(duplicatePaymentsBucket)
13✔
1307
        if dup == nil {
14✔
1308
                return nil, paymentsdb.ErrNoDuplicateBucket
1✔
1309
        }
1✔
1310

1311
        var duplicatePayment *MPPayment
12✔
1312
        err = dup.ForEach(func(k, v []byte) error {
27✔
1313
                subBucket := dup.NestedReadBucket(k)
15✔
1314
                if subBucket == nil {
15✔
1315
                        // We one bucket for each duplicate to be found.
×
1316
                        return paymentsdb.ErrNoDuplicateNestedBucket
×
1317
                }
×
1318

1319
                seqBytes := subBucket.Get(duplicatePaymentSequenceKey)
15✔
1320
                if seqBytes == nil {
15✔
1321
                        return err
×
1322
                }
×
1323

1324
                // If this duplicate payment is not the sequence number we are
1325
                // looking for, we can continue.
1326
                if !bytes.Equal(seqBytes, sequenceNumber) {
19✔
1327
                        return nil
4✔
1328
                }
4✔
1329

1330
                duplicatePayment, err = fetchDuplicatePayment(subBucket)
11✔
1331
                if err != nil {
11✔
1332
                        return err
×
1333
                }
×
1334

1335
                return nil
11✔
1336
        })
1337
        if err != nil {
12✔
1338
                return nil, err
×
1339
        }
×
1340

1341
        // If none of the duplicate payments matched our sequence number, we
1342
        // failed to find the payment with this sequence number; something is
1343
        // wrong.
1344
        if duplicatePayment == nil {
13✔
1345
                return nil, paymentsdb.ErrDuplicateNotFound
1✔
1346
        }
1✔
1347

1348
        return duplicatePayment, nil
11✔
1349
}
1350

1351
// DeletePayment deletes a payment from the DB given its payment hash. If
1352
// failedHtlcsOnly is set, only failed HTLC attempts of the payment will be
1353
// deleted.
1354
func (p *KVPaymentsDB) DeletePayment(paymentHash lntypes.Hash,
1355
        failedHtlcsOnly bool) error {
11✔
1356

11✔
1357
        return kvdb.Update(p.db, func(tx kvdb.RwTx) error {
22✔
1358
                payments := tx.ReadWriteBucket(paymentsRootBucket)
11✔
1359
                if payments == nil {
11✔
1360
                        return nil
×
1361
                }
×
1362

1363
                bucket := payments.NestedReadWriteBucket(paymentHash[:])
11✔
1364
                if bucket == nil {
12✔
1365
                        return fmt.Errorf("non bucket element in payments " +
1✔
1366
                                "bucket")
1✔
1367
                }
1✔
1368

1369
                // If the status is InFlight, we cannot safely delete
1370
                // the payment information, so we return early.
1371
                paymentStatus, err := fetchPaymentStatus(bucket)
10✔
1372
                if err != nil {
10✔
1373
                        return err
×
1374
                }
×
1375

1376
                // If the payment has inflight HTLCs, we cannot safely delete
1377
                // the payment information, so we return an error.
1378
                if err := paymentStatus.removable(); err != nil {
13✔
1379
                        return fmt.Errorf("payment '%v' has inflight HTLCs"+
3✔
1380
                                "and therefore cannot be deleted: %w",
3✔
1381
                                paymentHash.String(), err)
3✔
1382
                }
3✔
1383

1384
                // Delete the failed HTLC attempts we found.
1385
                if failedHtlcsOnly {
11✔
1386
                        toDelete, err := fetchFailedHtlcKeys(bucket)
4✔
1387
                        if err != nil {
4✔
1388
                                return err
×
1389
                        }
×
1390

1391
                        htlcsBucket := bucket.NestedReadWriteBucket(
4✔
1392
                                paymentHtlcsBucket,
4✔
1393
                        )
4✔
1394

4✔
1395
                        for _, htlcID := range toDelete {
10✔
1396
                                err = htlcsBucket.Delete(
6✔
1397
                                        htlcBucketKey(
6✔
1398
                                                htlcAttemptInfoKey, htlcID,
6✔
1399
                                        ),
6✔
1400
                                )
6✔
1401
                                if err != nil {
6✔
1402
                                        return err
×
1403
                                }
×
1404

1405
                                err = htlcsBucket.Delete(
6✔
1406
                                        htlcBucketKey(htlcFailInfoKey, htlcID),
6✔
1407
                                )
6✔
1408
                                if err != nil {
6✔
1409
                                        return err
×
1410
                                }
×
1411

1412
                                err = htlcsBucket.Delete(
6✔
1413
                                        htlcBucketKey(
6✔
1414
                                                htlcSettleInfoKey, htlcID,
6✔
1415
                                        ),
6✔
1416
                                )
6✔
1417
                                if err != nil {
6✔
1418
                                        return err
×
1419
                                }
×
1420
                        }
1421

1422
                        return nil
4✔
1423
                }
1424

1425
                seqNrs, err := fetchSequenceNumbers(bucket)
3✔
1426
                if err != nil {
3✔
1427
                        return err
×
1428
                }
×
1429

1430
                err = payments.DeleteNestedBucket(paymentHash[:])
3✔
1431
                if err != nil {
3✔
1432
                        return err
×
1433
                }
×
1434

1435
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
3✔
1436
                for _, k := range seqNrs {
6✔
1437
                        if err := indexBucket.Delete(k); err != nil {
3✔
1438
                                return err
×
1439
                        }
×
1440
                }
1441

1442
                return nil
3✔
1443
        }, func() {})
11✔
1444
}
1445

1446
// DeletePayments deletes all completed and failed payments from the DB. If
1447
// failedOnly is set, only failed payments will be considered for deletion. If
1448
// failedHtlcsOnly is set, the payment itself won't be deleted, only failed HTLC
1449
// attempts. The method returns the number of deleted payments, which is always
1450
// 0 if failedHtlcsOnly is set.
1451
func (p *KVPaymentsDB) DeletePayments(failedOnly,
1452
        failedHtlcsOnly bool) (int, error) {
6✔
1453

6✔
1454
        var numPayments int
6✔
1455
        err := kvdb.Update(p.db, func(tx kvdb.RwTx) error {
12✔
1456
                payments := tx.ReadWriteBucket(paymentsRootBucket)
6✔
1457
                if payments == nil {
6✔
1458
                        return nil
×
1459
                }
×
1460

1461
                var (
6✔
1462
                        // deleteBuckets is the set of payment buckets we need
6✔
1463
                        // to delete.
6✔
1464
                        deleteBuckets [][]byte
6✔
1465

6✔
1466
                        // deleteIndexes is the set of indexes pointing to these
6✔
1467
                        // payments that need to be deleted.
6✔
1468
                        deleteIndexes [][]byte
6✔
1469

6✔
1470
                        // deleteHtlcs maps a payment hash to the HTLC IDs we
6✔
1471
                        // want to delete for that payment.
6✔
1472
                        deleteHtlcs = make(map[lntypes.Hash][][]byte)
6✔
1473
                )
6✔
1474
                err := payments.ForEach(func(k, _ []byte) error {
24✔
1475
                        bucket := payments.NestedReadBucket(k)
18✔
1476
                        if bucket == nil {
18✔
1477
                                // We only expect sub-buckets to be found in
×
1478
                                // this top-level bucket.
×
1479
                                return fmt.Errorf("non bucket element in " +
×
1480
                                        "payments bucket")
×
1481
                        }
×
1482

1483
                        // If the status is InFlight, we cannot safely delete
1484
                        // the payment information, so we return early.
1485
                        paymentStatus, err := fetchPaymentStatus(bucket)
18✔
1486
                        if err != nil {
18✔
1487
                                return err
×
1488
                        }
×
1489

1490
                        // If the payment has inflight HTLCs, we cannot safely
1491
                        // delete the payment information, so we return an nil
1492
                        // to skip it.
1493
                        if err := paymentStatus.removable(); err != nil {
24✔
1494
                                return nil
6✔
1495
                        }
6✔
1496

1497
                        // If we requested to only delete failed payments, we
1498
                        // can return if this one is not.
1499
                        if failedOnly && paymentStatus != StatusFailed {
16✔
1500
                                return nil
4✔
1501
                        }
4✔
1502

1503
                        // If we are only deleting failed HTLCs, fetch them.
1504
                        if failedHtlcsOnly {
11✔
1505
                                toDelete, err := fetchFailedHtlcKeys(bucket)
3✔
1506
                                if err != nil {
3✔
1507
                                        return err
×
1508
                                }
×
1509

1510
                                hash, err := lntypes.MakeHash(k)
3✔
1511
                                if err != nil {
3✔
1512
                                        return err
×
1513
                                }
×
1514

1515
                                deleteHtlcs[hash] = toDelete
3✔
1516

3✔
1517
                                // We return, we are only deleting attempts.
3✔
1518
                                return nil
3✔
1519
                        }
1520

1521
                        // Add the bucket to the set of buckets we can delete.
1522
                        deleteBuckets = append(deleteBuckets, k)
5✔
1523

5✔
1524
                        // Get all the sequence number associated with the
5✔
1525
                        // payment, including duplicates.
5✔
1526
                        seqNrs, err := fetchSequenceNumbers(bucket)
5✔
1527
                        if err != nil {
5✔
1528
                                return err
×
1529
                        }
×
1530

1531
                        deleteIndexes = append(deleteIndexes, seqNrs...)
5✔
1532
                        numPayments++
5✔
1533

5✔
1534
                        return nil
5✔
1535
                })
1536
                if err != nil {
6✔
1537
                        return err
×
1538
                }
×
1539

1540
                // Delete the failed HTLC attempts we found.
1541
                for hash, htlcIDs := range deleteHtlcs {
9✔
1542
                        bucket := payments.NestedReadWriteBucket(hash[:])
3✔
1543
                        htlcsBucket := bucket.NestedReadWriteBucket(
3✔
1544
                                paymentHtlcsBucket,
3✔
1545
                        )
3✔
1546

3✔
1547
                        for _, aid := range htlcIDs {
6✔
1548
                                if err := htlcsBucket.Delete(
3✔
1549
                                        htlcBucketKey(htlcAttemptInfoKey, aid),
3✔
1550
                                ); err != nil {
3✔
1551
                                        return err
×
1552
                                }
×
1553

1554
                                if err := htlcsBucket.Delete(
3✔
1555
                                        htlcBucketKey(htlcFailInfoKey, aid),
3✔
1556
                                ); err != nil {
3✔
1557
                                        return err
×
1558
                                }
×
1559

1560
                                if err := htlcsBucket.Delete(
3✔
1561
                                        htlcBucketKey(htlcSettleInfoKey, aid),
3✔
1562
                                ); err != nil {
3✔
1563
                                        return err
×
1564
                                }
×
1565
                        }
1566
                }
1567

1568
                for _, k := range deleteBuckets {
11✔
1569
                        if err := payments.DeleteNestedBucket(k); err != nil {
5✔
1570
                                return err
×
1571
                        }
×
1572
                }
1573

1574
                // Get our index bucket and delete all indexes pointing to the
1575
                // payments we are deleting.
1576
                indexBucket := tx.ReadWriteBucket(paymentsIndexBucket)
6✔
1577
                for _, k := range deleteIndexes {
12✔
1578
                        if err := indexBucket.Delete(k); err != nil {
6✔
1579
                                return err
×
1580
                        }
×
1581
                }
1582

1583
                return nil
6✔
1584
        }, func() {
6✔
1585
                numPayments = 0
6✔
1586
        })
6✔
1587
        if err != nil {
6✔
1588
                return 0, err
×
1589
        }
×
1590

1591
        return numPayments, nil
6✔
1592
}
1593

1594
// fetchSequenceNumbers fetches all the sequence numbers associated with a
1595
// payment, including those belonging to any duplicate payments.
1596
func fetchSequenceNumbers(paymentBucket kvdb.RBucket) ([][]byte, error) {
8✔
1597
        seqNum := paymentBucket.Get(paymentSequenceKey)
8✔
1598
        if seqNum == nil {
8✔
1599
                return nil, errors.New("expected sequence number")
×
1600
        }
×
1601

1602
        sequenceNumbers := [][]byte{seqNum}
8✔
1603

8✔
1604
        // Get the duplicate payments bucket, if it has no duplicates, just
8✔
1605
        // return early with the payment sequence number.
8✔
1606
        duplicates := paymentBucket.NestedReadBucket(duplicatePaymentsBucket)
8✔
1607
        if duplicates == nil {
15✔
1608
                return sequenceNumbers, nil
7✔
1609
        }
7✔
1610

1611
        // If we do have duplicated, they are keyed by sequence number, so we
1612
        // iterate through the duplicates bucket and add them to our set of
1613
        // sequence numbers.
1614
        if err := duplicates.ForEach(func(k, v []byte) error {
2✔
1615
                sequenceNumbers = append(sequenceNumbers, k)
1✔
1616
                return nil
1✔
1617
        }); err != nil {
1✔
1618
                return nil, err
×
1619
        }
×
1620

1621
        return sequenceNumbers, nil
1✔
1622
}
1623

1624
func serializePaymentCreationInfo(w io.Writer, c *PaymentCreationInfo) error {
151✔
1625
        var scratch [8]byte
151✔
1626

151✔
1627
        if _, err := w.Write(c.PaymentIdentifier[:]); err != nil {
151✔
1628
                return err
×
1629
        }
×
1630

1631
        byteOrder.PutUint64(scratch[:], uint64(c.Value))
151✔
1632
        if _, err := w.Write(scratch[:]); err != nil {
151✔
1633
                return err
×
1634
        }
×
1635

1636
        if err := serializeTime(w, c.CreationTime); err != nil {
151✔
1637
                return err
×
1638
        }
×
1639

1640
        byteOrder.PutUint32(scratch[:4], uint32(len(c.PaymentRequest)))
151✔
1641
        if _, err := w.Write(scratch[:4]); err != nil {
151✔
1642
                return err
×
1643
        }
×
1644

1645
        if _, err := w.Write(c.PaymentRequest); err != nil {
151✔
1646
                return err
×
1647
        }
×
1648

1649
        // Any remaining bytes are TLV encoded records. Currently, these are
1650
        // only the custom records provided by the user to be sent to the first
1651
        // hop. But this can easily be extended with further records by merging
1652
        // the records into a single TLV stream.
1653
        err := c.FirstHopCustomRecords.SerializeTo(w)
151✔
1654
        if err != nil {
151✔
1655
                return err
×
1656
        }
×
1657

1658
        return nil
151✔
1659
}
1660

1661
func deserializePaymentCreationInfo(r io.Reader) (*PaymentCreationInfo,
1662
        error) {
626✔
1663

626✔
1664
        var scratch [8]byte
626✔
1665

626✔
1666
        c := &PaymentCreationInfo{}
626✔
1667

626✔
1668
        if _, err := io.ReadFull(r, c.PaymentIdentifier[:]); err != nil {
626✔
1669
                return nil, err
×
1670
        }
×
1671

1672
        if _, err := io.ReadFull(r, scratch[:]); err != nil {
626✔
1673
                return nil, err
×
1674
        }
×
1675
        c.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
626✔
1676

626✔
1677
        creationTime, err := deserializeTime(r)
626✔
1678
        if err != nil {
626✔
1679
                return nil, err
×
1680
        }
×
1681
        c.CreationTime = creationTime
626✔
1682

626✔
1683
        if _, err := io.ReadFull(r, scratch[:4]); err != nil {
626✔
1684
                return nil, err
×
1685
        }
×
1686

1687
        reqLen := byteOrder.Uint32(scratch[:4])
626✔
1688
        payReq := make([]byte, reqLen)
626✔
1689
        if reqLen > 0 {
1,252✔
1690
                if _, err := io.ReadFull(r, payReq); err != nil {
626✔
1691
                        return nil, err
×
1692
                }
×
1693
        }
1694
        c.PaymentRequest = payReq
626✔
1695

626✔
1696
        // Any remaining bytes are TLV encoded records. Currently, these are
626✔
1697
        // only the custom records provided by the user to be sent to the first
626✔
1698
        // hop. But this can easily be extended with further records by merging
626✔
1699
        // the records into a single TLV stream.
626✔
1700
        c.FirstHopCustomRecords, err = lnwire.ParseCustomRecordsFrom(r)
626✔
1701
        if err != nil {
626✔
1702
                return nil, err
×
1703
        }
×
1704

1705
        return c, nil
626✔
1706
}
1707

1708
func serializeHTLCAttemptInfo(w io.Writer, a *HTLCAttemptInfo) error {
72✔
1709
        if err := WriteElements(w, a.sessionKey); err != nil {
72✔
1710
                return err
×
1711
        }
×
1712

1713
        if err := SerializeRoute(w, a.Route); err != nil {
72✔
1714
                return err
×
1715
        }
×
1716

1717
        if err := serializeTime(w, a.AttemptTime); err != nil {
72✔
1718
                return err
×
1719
        }
×
1720

1721
        // If the hash is nil we can just return.
1722
        if a.Hash == nil {
72✔
1723
                return nil
×
1724
        }
×
1725

1726
        if _, err := w.Write(a.Hash[:]); err != nil {
72✔
1727
                return err
×
1728
        }
×
1729

1730
        // Merge the fixed/known records together with the custom records to
1731
        // serialize them as a single blob. We can't do this in SerializeRoute
1732
        // because we're in the middle of the byte stream there. We can only do
1733
        // TLV serialization at the end of the stream, since EOF is allowed for
1734
        // a stream if no more data is expected.
1735
        producers := []tlv.RecordProducer{
72✔
1736
                &a.Route.FirstHopAmount,
72✔
1737
        }
72✔
1738
        tlvData, err := lnwire.MergeAndEncode(
72✔
1739
                producers, nil, a.Route.FirstHopWireCustomRecords,
72✔
1740
        )
72✔
1741
        if err != nil {
72✔
1742
                return err
×
1743
        }
×
1744

1745
        if _, err := w.Write(tlvData); err != nil {
72✔
1746
                return err
×
1747
        }
×
1748

1749
        return nil
72✔
1750
}
1751

1752
func deserializeHTLCAttemptInfo(r io.Reader) (*HTLCAttemptInfo, error) {
663✔
1753
        a := &HTLCAttemptInfo{}
663✔
1754
        err := ReadElements(r, &a.sessionKey)
663✔
1755
        if err != nil {
663✔
1756
                return nil, err
×
1757
        }
×
1758

1759
        a.Route, err = DeserializeRoute(r)
663✔
1760
        if err != nil {
663✔
1761
                return nil, err
×
1762
        }
×
1763

1764
        a.AttemptTime, err = deserializeTime(r)
663✔
1765
        if err != nil {
663✔
1766
                return nil, err
×
1767
        }
×
1768

1769
        hash := lntypes.Hash{}
663✔
1770
        _, err = io.ReadFull(r, hash[:])
663✔
1771

663✔
1772
        switch {
663✔
1773
        // Older payment attempts wouldn't have the hash set, in which case we
1774
        // can just return.
1775
        case errors.Is(err, io.EOF), errors.Is(err, io.ErrUnexpectedEOF):
×
1776
                return a, nil
×
1777

1778
        case err != nil:
×
1779
                return nil, err
×
1780

1781
        default:
663✔
1782
        }
1783

1784
        a.Hash = &hash
663✔
1785

663✔
1786
        // Read any remaining data (if any) and parse it into the known records
663✔
1787
        // and custom records.
663✔
1788
        extraData, err := io.ReadAll(r)
663✔
1789
        if err != nil {
663✔
1790
                return nil, err
×
1791
        }
×
1792

1793
        customRecords, _, _, err := lnwire.ParseAndExtractCustomRecords(
663✔
1794
                extraData, &a.Route.FirstHopAmount,
663✔
1795
        )
663✔
1796
        if err != nil {
663✔
1797
                return nil, err
×
1798
        }
×
1799

1800
        a.Route.FirstHopWireCustomRecords = customRecords
663✔
1801

663✔
1802
        return a, nil
663✔
1803
}
1804

1805
func serializeHop(w io.Writer, h *route.Hop) error {
149✔
1806
        if err := WriteElements(w,
149✔
1807
                h.PubKeyBytes[:],
149✔
1808
                h.ChannelID,
149✔
1809
                h.OutgoingTimeLock,
149✔
1810
                h.AmtToForward,
149✔
1811
        ); err != nil {
149✔
1812
                return err
×
1813
        }
×
1814

1815
        if err := binary.Write(w, byteOrder, h.LegacyPayload); err != nil {
149✔
1816
                return err
×
1817
        }
×
1818

1819
        // For legacy payloads, we don't need to write any TLV records, so
1820
        // we'll write a zero indicating the our serialized TLV map has no
1821
        // records.
1822
        if h.LegacyPayload {
229✔
1823
                return WriteElements(w, uint32(0))
80✔
1824
        }
80✔
1825

1826
        // Gather all non-primitive TLV records so that they can be serialized
1827
        // as a single blob.
1828
        //
1829
        // TODO(conner): add migration to unify all fields in a single TLV
1830
        // blobs. The split approach will cause headaches down the road as more
1831
        // fields are added, which we can avoid by having a single TLV stream
1832
        // for all payload fields.
1833
        var records []tlv.Record
69✔
1834
        if h.MPP != nil {
133✔
1835
                records = append(records, h.MPP.Record())
64✔
1836
        }
64✔
1837

1838
        // Add blinding point and encrypted data if present.
1839
        if h.EncryptedData != nil {
71✔
1840
                records = append(records, record.NewEncryptedDataRecord(
2✔
1841
                        &h.EncryptedData,
2✔
1842
                ))
2✔
1843
        }
2✔
1844

1845
        if h.BlindingPoint != nil {
70✔
1846
                records = append(records, record.NewBlindingPointRecord(
1✔
1847
                        &h.BlindingPoint,
1✔
1848
                ))
1✔
1849
        }
1✔
1850

1851
        if h.AMP != nil {
69✔
1852
                records = append(records, h.AMP.Record())
×
1853
        }
×
1854

1855
        if h.Metadata != nil {
136✔
1856
                records = append(records, record.NewMetadataRecord(&h.Metadata))
67✔
1857
        }
67✔
1858

1859
        if h.TotalAmtMsat != 0 {
70✔
1860
                totalMsatInt := uint64(h.TotalAmtMsat)
1✔
1861
                records = append(
1✔
1862
                        records, record.NewTotalAmtMsatBlinded(&totalMsatInt),
1✔
1863
                )
1✔
1864
        }
1✔
1865

1866
        // Final sanity check to absolutely rule out custom records that are not
1867
        // custom and write into the standard range.
1868
        if err := h.CustomRecords.Validate(); err != nil {
69✔
1869
                return err
×
1870
        }
×
1871

1872
        // Convert custom records to tlv and add to the record list.
1873
        // MapToRecords sorts the list, so adding it here will keep the list
1874
        // canonical.
1875
        tlvRecords := tlv.MapToRecords(h.CustomRecords)
69✔
1876
        records = append(records, tlvRecords...)
69✔
1877

69✔
1878
        // Otherwise, we'll transform our slice of records into a map of the
69✔
1879
        // raw bytes, then serialize them in-line with a length (number of
69✔
1880
        // elements) prefix.
69✔
1881
        mapRecords, err := tlv.RecordsToMap(records)
69✔
1882
        if err != nil {
69✔
1883
                return err
×
1884
        }
×
1885

1886
        numRecords := uint32(len(mapRecords))
69✔
1887
        if err := WriteElements(w, numRecords); err != nil {
69✔
1888
                return err
×
1889
        }
×
1890

1891
        for recordType, rawBytes := range mapRecords {
336✔
1892
                if err := WriteElements(w, recordType); err != nil {
267✔
1893
                        return err
×
1894
                }
×
1895

1896
                if err := wire.WriteVarBytes(w, 0, rawBytes); err != nil {
267✔
1897
                        return err
×
1898
                }
×
1899
        }
1900

1901
        return nil
69✔
1902
}
1903

1904
// maxOnionPayloadSize is the largest Sphinx payload possible, so we don't need
1905
// to read/write a TLV stream larger than this.
1906
const maxOnionPayloadSize = 1300
1907

1908
func deserializeHop(r io.Reader) (*route.Hop, error) {
1,331✔
1909
        h := &route.Hop{}
1,331✔
1910

1,331✔
1911
        var pub []byte
1,331✔
1912
        if err := ReadElements(r, &pub); err != nil {
1,331✔
1913
                return nil, err
×
1914
        }
×
1915
        copy(h.PubKeyBytes[:], pub)
1,331✔
1916

1,331✔
1917
        if err := ReadElements(r,
1,331✔
1918
                &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
1,331✔
1919
        ); err != nil {
1,331✔
1920
                return nil, err
×
1921
        }
×
1922

1923
        // TODO(roasbeef): change field to allow LegacyPayload false to be the
1924
        // legacy default?
1925
        err := binary.Read(r, byteOrder, &h.LegacyPayload)
1,331✔
1926
        if err != nil {
1,331✔
1927
                return nil, err
×
1928
        }
×
1929

1930
        var numElements uint32
1,331✔
1931
        if err := ReadElements(r, &numElements); err != nil {
1,331✔
1932
                return nil, err
×
1933
        }
×
1934

1935
        // If there're no elements, then we can return early.
1936
        if numElements == 0 {
2,023✔
1937
                return h, nil
692✔
1938
        }
692✔
1939

1940
        tlvMap := make(map[uint64][]byte)
639✔
1941
        for i := uint32(0); i < numElements; i++ {
3,186✔
1942
                var tlvType uint64
2,547✔
1943
                if err := ReadElements(r, &tlvType); err != nil {
2,547✔
1944
                        return nil, err
×
1945
                }
×
1946

1947
                rawRecordBytes, err := wire.ReadVarBytes(
2,547✔
1948
                        r, 0, maxOnionPayloadSize, "tlv",
2,547✔
1949
                )
2,547✔
1950
                if err != nil {
2,547✔
1951
                        return nil, err
×
1952
                }
×
1953

1954
                tlvMap[tlvType] = rawRecordBytes
2,547✔
1955
        }
1956

1957
        // If the MPP type is present, remove it from the generic TLV map and
1958
        // parse it back into a proper MPP struct.
1959
        //
1960
        // TODO(conner): add migration to unify all fields in a single TLV
1961
        // blobs. The split approach will cause headaches down the road as more
1962
        // fields are added, which we can avoid by having a single TLV stream
1963
        // for all payload fields.
1964
        mppType := uint64(record.MPPOnionType)
639✔
1965
        if mppBytes, ok := tlvMap[mppType]; ok {
1,273✔
1966
                delete(tlvMap, mppType)
634✔
1967

634✔
1968
                var (
634✔
1969
                        mpp    = &record.MPP{}
634✔
1970
                        mppRec = mpp.Record()
634✔
1971
                        r      = bytes.NewReader(mppBytes)
634✔
1972
                )
634✔
1973
                err := mppRec.Decode(r, uint64(len(mppBytes)))
634✔
1974
                if err != nil {
634✔
1975
                        return nil, err
×
1976
                }
×
1977
                h.MPP = mpp
634✔
1978
        }
1979

1980
        // If encrypted data or blinding key are present, remove them from
1981
        // the TLV map and parse into proper types.
1982
        encryptedDataType := uint64(record.EncryptedDataOnionType)
639✔
1983
        if data, ok := tlvMap[encryptedDataType]; ok {
641✔
1984
                delete(tlvMap, encryptedDataType)
2✔
1985
                h.EncryptedData = data
2✔
1986
        }
2✔
1987

1988
        blindingType := uint64(record.BlindingPointOnionType)
639✔
1989
        if blindingPoint, ok := tlvMap[blindingType]; ok {
640✔
1990
                delete(tlvMap, blindingType)
1✔
1991

1✔
1992
                h.BlindingPoint, err = btcec.ParsePubKey(blindingPoint)
1✔
1993
                if err != nil {
1✔
1994
                        return nil, fmt.Errorf("invalid blinding point: %w",
×
1995
                                err)
×
1996
                }
×
1997
        }
1998

1999
        ampType := uint64(record.AMPOnionType)
639✔
2000
        if ampBytes, ok := tlvMap[ampType]; ok {
639✔
2001
                delete(tlvMap, ampType)
×
2002

×
2003
                var (
×
2004
                        amp    = &record.AMP{}
×
2005
                        ampRec = amp.Record()
×
2006
                        r      = bytes.NewReader(ampBytes)
×
2007
                )
×
2008
                err := ampRec.Decode(r, uint64(len(ampBytes)))
×
2009
                if err != nil {
×
2010
                        return nil, err
×
2011
                }
×
2012
                h.AMP = amp
×
2013
        }
2014

2015
        // If the metadata type is present, remove it from the tlv map and
2016
        // populate directly on the hop.
2017
        metadataType := uint64(record.MetadataOnionType)
639✔
2018
        if metadata, ok := tlvMap[metadataType]; ok {
1,276✔
2019
                delete(tlvMap, metadataType)
637✔
2020

637✔
2021
                h.Metadata = metadata
637✔
2022
        }
637✔
2023

2024
        totalAmtMsatType := uint64(record.TotalAmtMsatBlindedType)
639✔
2025
        if totalAmtMsat, ok := tlvMap[totalAmtMsatType]; ok {
640✔
2026
                delete(tlvMap, totalAmtMsatType)
1✔
2027

1✔
2028
                var (
1✔
2029
                        totalAmtMsatInt uint64
1✔
2030
                        buf             [8]byte
1✔
2031
                )
1✔
2032
                if err := tlv.DTUint64(
1✔
2033
                        bytes.NewReader(totalAmtMsat),
1✔
2034
                        &totalAmtMsatInt,
1✔
2035
                        &buf,
1✔
2036
                        uint64(len(totalAmtMsat)),
1✔
2037
                ); err != nil {
1✔
2038
                        return nil, err
×
2039
                }
×
2040

2041
                h.TotalAmtMsat = lnwire.MilliSatoshi(totalAmtMsatInt)
1✔
2042
        }
2043

2044
        h.CustomRecords = tlvMap
639✔
2045

639✔
2046
        return h, nil
639✔
2047
}
2048

2049
// SerializeRoute serializes a route.
2050
func SerializeRoute(w io.Writer, r route.Route) error {
74✔
2051
        if err := WriteElements(w,
74✔
2052
                r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
74✔
2053
        ); err != nil {
74✔
2054
                return err
×
2055
        }
×
2056

2057
        if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
74✔
2058
                return err
×
2059
        }
×
2060

2061
        for _, h := range r.Hops {
223✔
2062
                if err := serializeHop(w, h); err != nil {
149✔
2063
                        return err
×
2064
                }
×
2065
        }
2066

2067
        // Any new/extra TLV data is encoded in serializeHTLCAttemptInfo!
2068

2069
        return nil
74✔
2070
}
2071

2072
// DeserializeRoute deserializes a route.
2073
func DeserializeRoute(r io.Reader) (route.Route, error) {
665✔
2074
        rt := route.Route{}
665✔
2075
        if err := ReadElements(r,
665✔
2076
                &rt.TotalTimeLock, &rt.TotalAmount,
665✔
2077
        ); err != nil {
665✔
2078
                return rt, err
×
2079
        }
×
2080

2081
        var pub []byte
665✔
2082
        if err := ReadElements(r, &pub); err != nil {
665✔
2083
                return rt, err
×
2084
        }
×
2085
        copy(rt.SourcePubKey[:], pub)
665✔
2086

665✔
2087
        var numHops uint32
665✔
2088
        if err := ReadElements(r, &numHops); err != nil {
665✔
2089
                return rt, err
×
2090
        }
×
2091

2092
        var hops []*route.Hop
665✔
2093
        for i := uint32(0); i < numHops; i++ {
1,996✔
2094
                hop, err := deserializeHop(r)
1,331✔
2095
                if err != nil {
1,331✔
2096
                        return rt, err
×
2097
                }
×
2098
                hops = append(hops, hop)
1,331✔
2099
        }
2100
        rt.Hops = hops
665✔
2101

665✔
2102
        // Any new/extra TLV data is decoded in deserializeHTLCAttemptInfo!
665✔
2103

665✔
2104
        return rt, nil
665✔
2105
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc